教你怎么用Python监控愉客行车程

一、愉客行车程监控并通知

大概思路:用户填写指定信息在config.json文件中,通过定时访问网页,获取指定信息,从而达到对指定车程的监控

1.分析网页

在这里插入图片描述

按下F12,打开开发者工具,再刷新一下网页

找到我们需要的信息

在这里插入图片描述

然后再分析一下它的请求方式

在这里插入图片描述

很直观的就看到了几条主要的信息

第一条和第三条是null不重要
第二条是起始站
第四条是终点站
第五条是个数字,经过反复尝试,发现是固定参数
第六条乍一看应该是时间戳,经过验证,的确是车票指定日期零点的时间戳

2.请求头伪装、带参访问指定网页,获取信息:

def get_html(startStation, endStation, timeStamp):    # 模拟请求    headers = {        'Accept': 'application/json, text/javascript, */*; q=0.01',        'Accept-Encoding': 'gzip, deflate, br',        'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-HK;q=0.6',        'Connection': 'keep-alive',        'Content-Length': '124',        'Content-Type': 'application/json; charset=UTF-8',        'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="90", "Google Chrome";v="90"',        'sec-ch-ua-mobile': '?0',        'Sec-Fetch-Dest': 'empty',        'Sec-Fetch-Mode': 'cors',        'Sec-Fetch-Site': 'cross-site',        'Host': 'busserver.cqyukexing.com',        'Origin': 'https://www.96096kp.com',        'Referer': 'https://www.96096kp.com/',        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36',    }    data = {        'departureName': startStation,        'destinationId': 'null',        'destinationName': endStation,        'opSource': '7',        # 指定日期时间戳        'queryDate': timeStamp,    }    data = json.dumps(data)    url = 'https://busserver.cqyukexing.com/busticket/schedule_list_310?channel=7'    response = requests.post(url, headers=headers, data=data, timeout=5)    if response.status_code == 200:        html = response.text        # print(html)        return html

3.将返回的数据解析

因为请求获得的数据是json格式的,所以用jsonpath做数据解析

def parse_html(html):    # 解析获取的数据    items = []    html = json.loads(html)    for i in range(len(jsonpath.jsonpath(html, '$..scheduleInfo'))):        item = {}        timeStamp = jsonpath.jsonpath(html, '$..scheduleInfo..departureTime')[i]        item["发车日期"] = time.strftime("%Y-%m-%d", time.localtime(timeStamp))        # 检测是否过期        out_data(item["发车日期"])        item["发车时间"] = jsonpath.jsonpath(html, '$..scheduleInfo..departureTimeDesc')[i]        item["起始站"] = jsonpath.jsonpath(html, '$..departureStation..name')[i]        # item["地址"] = jsonpath.jsonpath(html, '$..departureStation..addr')[i]        item["终点站"] = jsonpath.jsonpath(html, '$..destinationStation..name')[i]        item["余票"] = jsonpath.jsonpath(html, '$..scheduleInfo..remainSeatCnt')[i]        item["票价"] = jsonpath.jsonpath(html, '$..scheduleInfo..fullTicketPrice')[i]        item["车型"] = jsonpath.jsonpath(html, '$..scheduleInfo..busType')[i]        item["车牌号"] = jsonpath.jsonpath(html, '$..scheduleInfo..scheduleCode')[i]        item["路线"] = jsonpath.jsonpath(html, '$..scheduleInfo..lineName')[i][3:]        item["状态"] = '/033[32m' if item["余票"] > 0 else '/033[31m'        # item["途径"] = jsonpath.jsonpath(html, '$..scheduleInfo..stopStation')[i]        items.append(item)    return items

4.筛选出有票的车次

这里是将已经获取过的车次保存到文件中,一旦检测到新的车次,就准备通知,如果检测到没有新车次,不做通知

def watch_ticks(bus_list):    # 检查目前还有票的车次    format_info(bus_list)    has_ticks = []    filename = 'tick_log of ' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"] + '.txt'    # 如果log文件不存在,则新建一个空的文件    if not os.path.exists('./logs/' + filename):        f = open('./logs/' + filename, 'w')        f.close()    with open('./logs/' + filename, 'r+', encoding='utf-8') as file:        alreald_send = file.read()    for bus in bus_list:        if bus["余票"] != 0 and bus["发车时间"] not in alreald_send or not len(alreald_send):            has_ticks.append(bus)            with open('./logs/tick_log of ' + bus["起始站"] + '-' + bus["终点站"] + '.txt', 'a+', encoding='utf-8') as file:                file.write(bus["发车时间"] + '/n')    # print(has_ticks)    return has_ticks

5.格式化终端输出信息

输出车程信息,这里改了终端车次显示的颜色,有票的是绿色、没票的是红色,很快就能识别出自己想要的

def format_info(bus_list):    print(bus_list[0]["发车日期"] + '/t' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"])    print('-' * 120)    # print("/t发车时间"    #       "/t/t/t起始站"    #       "/t/t/t终点站"    #       "/t/t余票"    #       "/t/t票价"    #       "/t/t路线"    #       "/t/t车型"    #       "/t/t车牌号")    for bus in bus_list:        print(bus["状态"] + "/t" + bus["发车时间"],              "/t/t" + bus["起始站"],              "/t/t" + bus["终点站"],              "/t/t" + str(bus["余票"]),              "/t/t/t" + str(bus["票价"]),              "/t/t" + bus["路线"],              "/t/t" + bus["车型"],              "/t/t" + bus["车牌号"] + '/033[0m')    print('-' * 120)

6.设定邮件通知

这里代码是以前的,我直接拿来改了一下

def send_email(sendUser, mail_user, mail_pass, receivers, start, end, tick_date, message):    """发送邮件"""    # 第三方 SMTP 服务    mail_host = 'smtp.qq.com'  # 设置服务器    sender = mail_user    # 创建一个带附件的案例    mail = MIMEMultipart()    mail['From'] = Header(sendUser, 'utf-8')    mail['To'] = ";".join(receivers)    subject = '愉客行有新的票务情况:' + tick_date + '-' + start + '-' + end  # 邮件标题    mail['Subject'] = Header(subject, 'utf-8')    # 邮件正文内容    mail.attach(MIMEText(message, 'plain', 'utf-8'))    try:        smtpObj = smtplib.SMTP()        smtpObj.connect(mail_host, 25)  # 25为端口号        smtpObj.login(mail_user, mail_pass)        smtpObj.sendmail(sender, receivers, mail.as_string())        print(receivers + "/t发送成功")  # 邮件发送成功    except Exception as e:        pass    finally:        smtpObj.quit()

7.设定主函数

这里把用户输入的信息转换一下,将日期转为时间戳,并且可支持多车程的监控,配置文件应一一对应。
将获取到的车程信息保存
如果有变化,立刻发送邮件通知
设定了定时执行,这里是每隔30分钟执行一次

def main():    global timer_times    timer_times = timer_times + 1    for i in range(len(startStation)):        html = get_html(startStation[i], endStation[i], timeStamp[i])        bus_list = parse_html(html)        # pprint.pprint(bus_list)        has_ticks = watch_ticks(bus_list)        json.dump(bus_list,                  open('./data/bus_list of ' + startStation[i] + '-' + endStation[i] + '.json', 'a+', encoding='utf-8'),                  ensure_ascii=False)        if len(has_ticks):            json.dump(has_ticks, open('./data/has_ticks of ' + startStation[i] + '-' + endStation[i] + '.json', 'w+',                                      encoding='utf-8'), ensure_ascii=False)            message = '/n'.join([str(tick).replace(',', '/n') for tick in has_ticks])            send_email(sendUser[i], mail_user[i], mail_pass[i], receivers[i], startStation[i], endStation[i],                       ticksDate[i], message)    # 定时延迟    now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())    log_message = ("/n定时任务已触发至:第%s轮/n当前时间:%s/n" % (timer_times, now))    with open("./logs/log.txt", 'a+', encoding="utf-8") as file:        file.write(log_message)    print(log_message)    time.sleep(1800)    timer = threading.Timer(1800, main())    timer.start()

8.程序入口

获取config.json文件的信息,执行main函数,开始定时任务

if __name__ == '__main__':    with open('config.json', 'r', encoding='utf-8') as file:        config = json.load(file)    startStation = config["起始站"]    endStation = config["终点站"]    ticksDate = config["车票日期"]    timeArray = [time.strptime(tick_date + ' 00:00:00', "%Y-%m-%d %H:%M:%S") for tick_date in config["车票日期"]]    timeStamp = [int(time.mktime(times)) for times in timeArray]    sendUser = config["发送人"]    mail_user = config["用户名"]    mail_pass = config["第三方客户端授权码"]    receivers = config["接收方"]    # 定时延迟    timer_times = 0    timer = threading.Timer(1800, main())    timer.start()

本来是想挂到服务器上,就做了一个检测日期的函数,如果车程日期在当前日期之前,就直接退出程序,最后还是在本地上运行的,就没用的上

def out_data(date):    # 检查车票跟踪是否过时    # 是否过期一天    tomorrow = datetime.date.today() - datetime.timedelta(days=1)    if date == tomorrow:        print("车票跟踪已过时!")        os.exit(0)

9.结果图

在这里插入图片描述

二、目录结构

在这里插入图片描述

三、完整代码

import datetimeimport osimport smtplibimport threadingimport timefrom email.header import Headerfrom email.mime.multipart import MIMEMultipartfrom email.mime.text import MIMETextimport requestsimport jsonimport jsonpathdef get_html(startStation, endStation, timeStamp):    # 模拟请求    headers = {        'Accept': 'application/json, text/javascript, */*; q=0.01',        'Accept-Encoding': 'gzip, deflate, br',        'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-HK;q=0.6',        'Connection': 'keep-alive',        'Content-Length': '124',        'Content-Type': 'application/json; charset=UTF-8',        'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="90", "Google Chrome";v="90"',        'sec-ch-ua-mobile': '?0',        'Sec-Fetch-Dest': 'empty',        'Sec-Fetch-Mode': 'cors',        'Sec-Fetch-Site': 'cross-site',        'Host': 'busserver.cqyukexing.com',        'Origin': 'https://www.96096kp.com',        'Referer': 'https://www.96096kp.com/',        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36',    }    data = {        'departureName': startStation,        'destinationId': 'null',        'destinationName': endStation,        'opSource': '7',        # 指定日期时间戳        'queryDate': timeStamp,    }    data = json.dumps(data)    url = 'https://busserver.cqyukexing.com/busticket/schedule_list_310?channel=7'    response = requests.post(url, headers=headers, data=data, timeout=5)    if response.status_code == 200:        html = response.text        # print(html)        return htmldef parse_html(html):    # 解析获取的数据    items = []    html = json.loads(html)    for i in range(len(jsonpath.jsonpath(html, '$..scheduleInfo'))):        item = {}        timeStamp = jsonpath.jsonpath(html, '$..scheduleInfo..departureTime')[i]        item["发车日期"] = time.strftime("%Y-%m-%d", time.localtime(timeStamp))        # 检测是否过期        out_data(item["发车日期"])        item["发车时间"] = jsonpath.jsonpath(html, '$..scheduleInfo..departureTimeDesc')[i]        item["起始站"] = jsonpath.jsonpath(html, '$..departureStation..name')[i]        # item["地址"] = jsonpath.jsonpath(html, '$..departureStation..addr')[i]        item["终点站"] = jsonpath.jsonpath(html, '$..destinationStation..name')[i]        item["余票"] = jsonpath.jsonpath(html, '$..scheduleInfo..remainSeatCnt')[i]        item["票价"] = jsonpath.jsonpath(html, '$..scheduleInfo..fullTicketPrice')[i]        item["车型"] = jsonpath.jsonpath(html, '$..scheduleInfo..busType')[i]        item["车牌号"] = jsonpath.jsonpath(html, '$..scheduleInfo..scheduleCode')[i]        item["路线"] = jsonpath.jsonpath(html, '$..scheduleInfo..lineName')[i][3:]        item["状态"] = '/033[32m' if item["余票"] > 0 else '/033[31m'        # item["途径"] = jsonpath.jsonpath(html, '$..scheduleInfo..stopStation')[i]        items.append(item)    return itemsdef watch_ticks(bus_list):    # 检查目前还有票的车次    format_info(bus_list)    has_ticks = []    filename = 'tick_log of ' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"] + '.txt'    # 如果log文件不存在,则新建一个空的文件    if not os.path.exists('./logs/' + filename):        f = open('./logs/' + filename, 'w')        f.close()    with open('./logs/' + filename, 'r+', encoding='utf-8') as file:        alreald_send = file.read()    for bus in bus_list:        if bus["余票"] != 0 and bus["发车时间"] not in alreald_send or not len(alreald_send):            has_ticks.append(bus)            with open('./logs/tick_log of ' + bus["起始站"] + '-' + bus["终点站"] + '.txt', 'a+', encoding='utf-8') as file:                file.write(bus["发车时间"] + '/n')    # print(has_ticks)    return has_ticksdef out_data(date):    # 检查车票跟踪是否过时    # 是否过期一天    tomorrow = datetime.date.today() - datetime.timedelta(days=1)    if date == tomorrow:        print("车票跟踪已过时!")        os.exit(0)def format_info(bus_list):    print(bus_list[0]["发车日期"] + '/t' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"])    print('-' * 120)    # print("/t发车时间"    #       "/t/t/t起始站"    #       "/t/t/t终点站"    #       "/t/t余票"    #       "/t/t票价"    #       "/t/t路线"    #       "/t/t车型"    #       "/t/t车牌号")    for bus in bus_list:        print(bus["状态"] + "/t" + bus["发车时间"],              "/t/t" + bus["起始站"],              "/t/t" + bus["终点站"],              "/t/t" + str(bus["余票"]),              "/t/t/t" + str(bus["票价"]),              "/t/t" + bus["路线"],              "/t/t" + bus["车型"],              "/t/t" + bus["车牌号"] + '/033[0m')    print('-' * 120)def send_email(sendUser, mail_user, mail_pass, receivers, start, end, tick_date, message):    """发送邮件"""    # 第三方 SMTP 服务    mail_host = 'smtp.qq.com'  # 设置服务器    sender = mail_user    # 创建一个带附件的案例    mail = MIMEMultipart()    mail['From'] = Header(sendUser, 'utf-8')    mail['To'] = ";".join(receivers)    subject = '愉客行有新的票务情况:' + tick_date + '-' + start + '-' + end  # 邮件标题    mail['Subject'] = Header(subject, 'utf-8')    # 邮件正文内容    mail.attach(MIMEText(message, 'plain', 'utf-8'))    try:        smtpObj = smtplib.SMTP()        smtpObj.connect(mail_host, 25)  # 25为端口号        smtpObj.login(mail_user, mail_pass)        smtpObj.sendmail(sender, receivers, mail.as_string())        print(receivers + "/t发送成功")  # 邮件发送成功    except Exception as e:        pass    finally:        smtpObj.quit()def main():    global timer_times    timer_times = timer_times + 1    for i in range(len(startStation)):        html = get_html(startStation[i], endStation[i], timeStamp[i])        bus_list = parse_html(html)        # pprint.pprint(bus_list)        has_ticks = watch_ticks(bus_list)        json.dump(bus_list,                  open('./data/bus_list of ' + startStation[i] + '-' + endStation[i] + '.json', 'a+', encoding='utf-8'),                  ensure_ascii=False)        if len(has_ticks):            json.dump(has_ticks, open('./data/has_ticks of ' + startStation[i] + '-' + endStation[i] + '.json', 'w+',                                      encoding='utf-8'), ensure_ascii=False)            message = '/n'.join([str(tick).replace(',', '/n') for tick in has_ticks])            send_email(sendUser[i], mail_user[i], mail_pass[i], receivers[i], startStation[i], endStation[i],                       ticksDate[i], message)    # 定时延迟    now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())    log_message = ("/n定时任务已触发至:第%s轮/n当前时间:%s/n" % (timer_times, now))    with open("./logs/log.txt", 'a+', encoding="utf-8") as file:        file.write(log_message)    print(log_message)    time.sleep(1800)    timer = threading.Timer(1800, main())    timer.start()if __name__ == '__main__':    with open('config.json', 'r', encoding='utf-8') as file:        config = json.load(file)    startStation = config["起始站"]    endStation = config["终点站"]    ticksDate = config["车票日期"]    timeArray = [time.strptime(tick_date + ' 00:00:00', "%Y-%m-%d %H:%M:%S") for tick_date in config["车票日期"]]    timeStamp = [int(time.mktime(times)) for times in timeArray]    sendUser = config["发送人"]    mail_user = config["用户名"]    mail_pass = config["第三方客户端授权码"]    receivers = config["接收方"]    # 定时延迟    timer_times = 0    timer = threading.Timer(1800, main())    timer.start()

四、config.json文件

{  "车票日期": [    "2021-4-30",    "2021-5-5"  ],  "起始站": [    "万州",    "彭水县"  ],  "终点站": [    "涪陵",    "万州"  ],  "发送人": [    "愉客行",    "愉客行"  ],  "用户名": [    "1*******27@qq.com",    "1*******27@qq.com"  ],  "第三方客户端授权码": [    "oxms********iicj",    "oxms********iicj"  ],  "接收方": [    "265******8@qq.com",    "265******8@qq.com"  ]}

到此这篇关于教你怎么用python监控愉客行车程的文章就介绍到这了,更多相关python监控愉客行车程内容请搜索 以前的文章或继续浏览下面的相关文章希望大家以后多多支持 !

相关文章

发表新评论