教你怎么用Python监控愉客行车程
一、愉客行车程监控并通知
大概思路:用户填写指定信息在config.json文件中,通过定时访问网页,获取指定信息,从而达到对指定车程的监控
1.分析网页
按下F12,打开开发者工具,再刷新一下网页
找到我们需要的信息
然后再分析一下它的请求方式
很直观的就看到了几条主要的信息
第一条和第三条是null不重要
第二条是起始站
第四条是终点站
第五条是个数字,经过反复尝试,发现是固定参数
第六条乍一看应该是时间戳,经过验证,的确是车票指定日期零点的时间戳
2.请求头伪装、带参访问指定网页,获取信息:
def get_html(startStation, endStation, timeStamp): # 模拟请求 headers = { 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-HK;q=0.6', 'Connection': 'keep-alive', 'Content-Length': '124', 'Content-Type': 'application/json; charset=UTF-8', 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="90", "Google Chrome";v="90"', 'sec-ch-ua-mobile': '?0', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'cross-site', 'Host': 'busserver.cqyukexing.com', 'Origin': 'https://www.96096kp.com', 'Referer': 'https://www.96096kp.com/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36', } data = { 'departureName': startStation, 'destinationId': 'null', 'destinationName': endStation, 'opSource': '7', # 指定日期时间戳 'queryDate': timeStamp, } data = json.dumps(data) url = 'https://busserver.cqyukexing.com/busticket/schedule_list_310?channel=7' response = requests.post(url, headers=headers, data=data, timeout=5) if response.status_code == 200: html = response.text # print(html) return html
3.将返回的数据解析
因为请求获得的数据是json格式的,所以用jsonpath做数据解析
def parse_html(html): # 解析获取的数据 items = [] html = json.loads(html) for i in range(len(jsonpath.jsonpath(html, '$..scheduleInfo'))): item = {} timeStamp = jsonpath.jsonpath(html, '$..scheduleInfo..departureTime')[i] item["发车日期"] = time.strftime("%Y-%m-%d", time.localtime(timeStamp)) # 检测是否过期 out_data(item["发车日期"]) item["发车时间"] = jsonpath.jsonpath(html, '$..scheduleInfo..departureTimeDesc')[i] item["起始站"] = jsonpath.jsonpath(html, '$..departureStation..name')[i] # item["地址"] = jsonpath.jsonpath(html, '$..departureStation..addr')[i] item["终点站"] = jsonpath.jsonpath(html, '$..destinationStation..name')[i] item["余票"] = jsonpath.jsonpath(html, '$..scheduleInfo..remainSeatCnt')[i] item["票价"] = jsonpath.jsonpath(html, '$..scheduleInfo..fullTicketPrice')[i] item["车型"] = jsonpath.jsonpath(html, '$..scheduleInfo..busType')[i] item["车牌号"] = jsonpath.jsonpath(html, '$..scheduleInfo..scheduleCode')[i] item["路线"] = jsonpath.jsonpath(html, '$..scheduleInfo..lineName')[i][3:] item["状态"] = '/033[32m' if item["余票"] > 0 else '/033[31m' # item["途径"] = jsonpath.jsonpath(html, '$..scheduleInfo..stopStation')[i] items.append(item) return items
4.筛选出有票的车次
这里是将已经获取过的车次保存到文件中,一旦检测到新的车次,就准备通知,如果检测到没有新车次,不做通知
def watch_ticks(bus_list): # 检查目前还有票的车次 format_info(bus_list) has_ticks = [] filename = 'tick_log of ' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"] + '.txt' # 如果log文件不存在,则新建一个空的文件 if not os.path.exists('./logs/' + filename): f = open('./logs/' + filename, 'w') f.close() with open('./logs/' + filename, 'r+', encoding='utf-8') as file: alreald_send = file.read() for bus in bus_list: if bus["余票"] != 0 and bus["发车时间"] not in alreald_send or not len(alreald_send): has_ticks.append(bus) with open('./logs/tick_log of ' + bus["起始站"] + '-' + bus["终点站"] + '.txt', 'a+', encoding='utf-8') as file: file.write(bus["发车时间"] + '/n') # print(has_ticks) return has_ticks
5.格式化终端输出信息
输出车程信息,这里改了终端车次显示的颜色,有票的是绿色、没票的是红色,很快就能识别出自己想要的
def format_info(bus_list): print(bus_list[0]["发车日期"] + '/t' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"]) print('-' * 120) # print("/t发车时间" # "/t/t/t起始站" # "/t/t/t终点站" # "/t/t余票" # "/t/t票价" # "/t/t路线" # "/t/t车型" # "/t/t车牌号") for bus in bus_list: print(bus["状态"] + "/t" + bus["发车时间"], "/t/t" + bus["起始站"], "/t/t" + bus["终点站"], "/t/t" + str(bus["余票"]), "/t/t/t" + str(bus["票价"]), "/t/t" + bus["路线"], "/t/t" + bus["车型"], "/t/t" + bus["车牌号"] + '/033[0m') print('-' * 120)
6.设定邮件通知
这里代码是以前的,我直接拿来改了一下
def send_email(sendUser, mail_user, mail_pass, receivers, start, end, tick_date, message): """发送邮件""" # 第三方 SMTP 服务 mail_host = 'smtp.qq.com' # 设置服务器 sender = mail_user # 创建一个带附件的案例 mail = MIMEMultipart() mail['From'] = Header(sendUser, 'utf-8') mail['To'] = ";".join(receivers) subject = '愉客行有新的票务情况:' + tick_date + '-' + start + '-' + end # 邮件标题 mail['Subject'] = Header(subject, 'utf-8') # 邮件正文内容 mail.attach(MIMEText(message, 'plain', 'utf-8')) try: smtpObj = smtplib.SMTP() smtpObj.connect(mail_host, 25) # 25为端口号 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, mail.as_string()) print(receivers + "/t发送成功") # 邮件发送成功 except Exception as e: pass finally: smtpObj.quit()
7.设定主函数
这里把用户输入的信息转换一下,将日期转为时间戳,并且可支持多车程的监控,配置文件应一一对应。
将获取到的车程信息保存
如果有变化,立刻发送邮件通知
设定了定时执行,这里是每隔30分钟执行一次
def main(): global timer_times timer_times = timer_times + 1 for i in range(len(startStation)): html = get_html(startStation[i], endStation[i], timeStamp[i]) bus_list = parse_html(html) # pprint.pprint(bus_list) has_ticks = watch_ticks(bus_list) json.dump(bus_list, open('./data/bus_list of ' + startStation[i] + '-' + endStation[i] + '.json', 'a+', encoding='utf-8'), ensure_ascii=False) if len(has_ticks): json.dump(has_ticks, open('./data/has_ticks of ' + startStation[i] + '-' + endStation[i] + '.json', 'w+', encoding='utf-8'), ensure_ascii=False) message = '/n'.join([str(tick).replace(',', '/n') for tick in has_ticks]) send_email(sendUser[i], mail_user[i], mail_pass[i], receivers[i], startStation[i], endStation[i], ticksDate[i], message) # 定时延迟 now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) log_message = ("/n定时任务已触发至:第%s轮/n当前时间:%s/n" % (timer_times, now)) with open("./logs/log.txt", 'a+', encoding="utf-8") as file: file.write(log_message) print(log_message) time.sleep(1800) timer = threading.Timer(1800, main()) timer.start()
8.程序入口
获取config.json文件的信息,执行main函数,开始定时任务
if __name__ == '__main__': with open('config.json', 'r', encoding='utf-8') as file: config = json.load(file) startStation = config["起始站"] endStation = config["终点站"] ticksDate = config["车票日期"] timeArray = [time.strptime(tick_date + ' 00:00:00', "%Y-%m-%d %H:%M:%S") for tick_date in config["车票日期"]] timeStamp = [int(time.mktime(times)) for times in timeArray] sendUser = config["发送人"] mail_user = config["用户名"] mail_pass = config["第三方客户端授权码"] receivers = config["接收方"] # 定时延迟 timer_times = 0 timer = threading.Timer(1800, main()) timer.start()
本来是想挂到服务器上,就做了一个检测日期的函数,如果车程日期在当前日期之前,就直接退出程序,最后还是在本地上运行的,就没用的上
def out_data(date): # 检查车票跟踪是否过时 # 是否过期一天 tomorrow = datetime.date.today() - datetime.timedelta(days=1) if date == tomorrow: print("车票跟踪已过时!") os.exit(0)
9.结果图
二、目录结构
三、完整代码
import datetimeimport osimport smtplibimport threadingimport timefrom email.header import Headerfrom email.mime.multipart import MIMEMultipartfrom email.mime.text import MIMETextimport requestsimport jsonimport jsonpathdef get_html(startStation, endStation, timeStamp): # 模拟请求 headers = { 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-HK;q=0.6', 'Connection': 'keep-alive', 'Content-Length': '124', 'Content-Type': 'application/json; charset=UTF-8', 'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="90", "Google Chrome";v="90"', 'sec-ch-ua-mobile': '?0', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'cross-site', 'Host': 'busserver.cqyukexing.com', 'Origin': 'https://www.96096kp.com', 'Referer': 'https://www.96096kp.com/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36', } data = { 'departureName': startStation, 'destinationId': 'null', 'destinationName': endStation, 'opSource': '7', # 指定日期时间戳 'queryDate': timeStamp, } data = json.dumps(data) url = 'https://busserver.cqyukexing.com/busticket/schedule_list_310?channel=7' response = requests.post(url, headers=headers, data=data, timeout=5) if response.status_code == 200: html = response.text # print(html) return htmldef parse_html(html): # 解析获取的数据 items = [] html = json.loads(html) for i in range(len(jsonpath.jsonpath(html, '$..scheduleInfo'))): item = {} timeStamp = jsonpath.jsonpath(html, '$..scheduleInfo..departureTime')[i] item["发车日期"] = time.strftime("%Y-%m-%d", time.localtime(timeStamp)) # 检测是否过期 out_data(item["发车日期"]) item["发车时间"] = jsonpath.jsonpath(html, '$..scheduleInfo..departureTimeDesc')[i] item["起始站"] = jsonpath.jsonpath(html, '$..departureStation..name')[i] # item["地址"] = jsonpath.jsonpath(html, '$..departureStation..addr')[i] item["终点站"] = jsonpath.jsonpath(html, '$..destinationStation..name')[i] item["余票"] = jsonpath.jsonpath(html, '$..scheduleInfo..remainSeatCnt')[i] item["票价"] = jsonpath.jsonpath(html, '$..scheduleInfo..fullTicketPrice')[i] item["车型"] = jsonpath.jsonpath(html, '$..scheduleInfo..busType')[i] item["车牌号"] = jsonpath.jsonpath(html, '$..scheduleInfo..scheduleCode')[i] item["路线"] = jsonpath.jsonpath(html, '$..scheduleInfo..lineName')[i][3:] item["状态"] = '/033[32m' if item["余票"] > 0 else '/033[31m' # item["途径"] = jsonpath.jsonpath(html, '$..scheduleInfo..stopStation')[i] items.append(item) return itemsdef watch_ticks(bus_list): # 检查目前还有票的车次 format_info(bus_list) has_ticks = [] filename = 'tick_log of ' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"] + '.txt' # 如果log文件不存在,则新建一个空的文件 if not os.path.exists('./logs/' + filename): f = open('./logs/' + filename, 'w') f.close() with open('./logs/' + filename, 'r+', encoding='utf-8') as file: alreald_send = file.read() for bus in bus_list: if bus["余票"] != 0 and bus["发车时间"] not in alreald_send or not len(alreald_send): has_ticks.append(bus) with open('./logs/tick_log of ' + bus["起始站"] + '-' + bus["终点站"] + '.txt', 'a+', encoding='utf-8') as file: file.write(bus["发车时间"] + '/n') # print(has_ticks) return has_ticksdef out_data(date): # 检查车票跟踪是否过时 # 是否过期一天 tomorrow = datetime.date.today() - datetime.timedelta(days=1) if date == tomorrow: print("车票跟踪已过时!") os.exit(0)def format_info(bus_list): print(bus_list[0]["发车日期"] + '/t' + bus_list[0]["起始站"] + '-' + bus_list[0]["终点站"]) print('-' * 120) # print("/t发车时间" # "/t/t/t起始站" # "/t/t/t终点站" # "/t/t余票" # "/t/t票价" # "/t/t路线" # "/t/t车型" # "/t/t车牌号") for bus in bus_list: print(bus["状态"] + "/t" + bus["发车时间"], "/t/t" + bus["起始站"], "/t/t" + bus["终点站"], "/t/t" + str(bus["余票"]), "/t/t/t" + str(bus["票价"]), "/t/t" + bus["路线"], "/t/t" + bus["车型"], "/t/t" + bus["车牌号"] + '/033[0m') print('-' * 120)def send_email(sendUser, mail_user, mail_pass, receivers, start, end, tick_date, message): """发送邮件""" # 第三方 SMTP 服务 mail_host = 'smtp.qq.com' # 设置服务器 sender = mail_user # 创建一个带附件的案例 mail = MIMEMultipart() mail['From'] = Header(sendUser, 'utf-8') mail['To'] = ";".join(receivers) subject = '愉客行有新的票务情况:' + tick_date + '-' + start + '-' + end # 邮件标题 mail['Subject'] = Header(subject, 'utf-8') # 邮件正文内容 mail.attach(MIMEText(message, 'plain', 'utf-8')) try: smtpObj = smtplib.SMTP() smtpObj.connect(mail_host, 25) # 25为端口号 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, mail.as_string()) print(receivers + "/t发送成功") # 邮件发送成功 except Exception as e: pass finally: smtpObj.quit()def main(): global timer_times timer_times = timer_times + 1 for i in range(len(startStation)): html = get_html(startStation[i], endStation[i], timeStamp[i]) bus_list = parse_html(html) # pprint.pprint(bus_list) has_ticks = watch_ticks(bus_list) json.dump(bus_list, open('./data/bus_list of ' + startStation[i] + '-' + endStation[i] + '.json', 'a+', encoding='utf-8'), ensure_ascii=False) if len(has_ticks): json.dump(has_ticks, open('./data/has_ticks of ' + startStation[i] + '-' + endStation[i] + '.json', 'w+', encoding='utf-8'), ensure_ascii=False) message = '/n'.join([str(tick).replace(',', '/n') for tick in has_ticks]) send_email(sendUser[i], mail_user[i], mail_pass[i], receivers[i], startStation[i], endStation[i], ticksDate[i], message) # 定时延迟 now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) log_message = ("/n定时任务已触发至:第%s轮/n当前时间:%s/n" % (timer_times, now)) with open("./logs/log.txt", 'a+', encoding="utf-8") as file: file.write(log_message) print(log_message) time.sleep(1800) timer = threading.Timer(1800, main()) timer.start()if __name__ == '__main__': with open('config.json', 'r', encoding='utf-8') as file: config = json.load(file) startStation = config["起始站"] endStation = config["终点站"] ticksDate = config["车票日期"] timeArray = [time.strptime(tick_date + ' 00:00:00', "%Y-%m-%d %H:%M:%S") for tick_date in config["车票日期"]] timeStamp = [int(time.mktime(times)) for times in timeArray] sendUser = config["发送人"] mail_user = config["用户名"] mail_pass = config["第三方客户端授权码"] receivers = config["接收方"] # 定时延迟 timer_times = 0 timer = threading.Timer(1800, main()) timer.start()
四、config.json文件
{ "车票日期": [ "2021-4-30", "2021-5-5" ], "起始站": [ "万州", "彭水县" ], "终点站": [ "涪陵", "万州" ], "发送人": [ "愉客行", "愉客行" ], "用户名": [ "1*******27@qq.com", "1*******27@qq.com" ], "第三方客户端授权码": [ "oxms********iicj", "oxms********iicj" ], "接收方": [ "265******8@qq.com", "265******8@qq.com" ]}
到此这篇关于教你怎么用python监控愉客行车程的文章就介绍到这了,更多相关python监控愉客行车程内容请搜索 以前的文章或继续浏览下面的相关文章希望大家以后多多支持 !
最后更新于 2021-11-23 09:11:27 并被添加「」标签,已有 位童鞋阅读过。
本站使用「署名 4.0 国际」创作共享协议,可自由转载、引用,但需署名作者且注明文章出处
相关文章
- Python OpenCV快速入门教程
- 在u盘安装Mac OS最简单的方法无需复杂的替换/恢复
- 【燕之坊心意黄小米475g】赤峰特产新鲜小米粥黄金苗五谷杂粮粗粮
- 青蛙王子宝宝霜滋润保湿补水擦脸婴儿润肤乳身体乳儿童润肤面霜
- 简约风卡包防盗刷精致小巧大容量多卡位驾驶证男女日系防磁卡套