时间:2023-03-15来源:系统城装机大师作者:佚名
定时任务触发方式有几种类型,日常的工作中,研发同学运用比较多的就是cron方式
查了一下APScheduler框架内支持多种定时任务方式
首先先安装apscheduler模块
1 | $ pip install apscheduler |
代码如下:(在方法内注释了各种时间参数的定义与范围)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
from apscheduler.schedulers.blocking import BlockingScheduler class Timing: def __init__( self , start_date, end_date, hour = None ): self .start_date = start_date self .end_date = end_date self .hour = hour def cron( self , job, * value_list): """cron格式 在特定时间周期性地触发""" # year (int 或 str) – 年,4位数字 # month (int 或 str) – 月 (范围1-12) # day (int 或 str) – 日 (范围1-31) # week (int 或 str) – 周 (范围1-53) # day_of_week (int 或 str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) # hour (int 或 str) – 时 (范围0-23) # minute (int 或 str) – 分 (范围0-59) # second (int 或 str) – 秒 (范围0-59) # start_date (datetime 或 str) – 最早开始日期(包含) # end_date (datetime 或 str) – 分 最晚结束时间(包含) # timezone (datetime.tzinfo 或str) – 指定时区 scheduler = BlockingScheduler() scheduler.add_job(job, 'cron' , start_date = self .start_date, end_date = self .end_date, hour = self .hour, args = [ * value_list]) scheduler.start() def interval( self , job, * value_list): """interval格式 周期触发任务""" # weeks (int) - 间隔几周 # days (int) - 间隔几天 # hours (int) - 间隔几小时 # minutes (int) - 间隔几分钟 # seconds (int) - 间隔多少秒 # start_date (datetime 或 str) - 开始日期 # end_date (datetime 或 str) - 结束日期 # timezone (datetime.tzinfo 或str) - 时区 scheduler = BlockingScheduler() # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法 scheduler.add_job(job, 'interval' , minutes = 1 , seconds = 30 , start_date = self .start_date, end_date = self .end_date, args = [ * value_list]) scheduler.start() @ staticmethod def date(job, * value_list): """date格式 特定时间点触发""" # run_date (datetime 或 str) - 作业的运行日期或时间 # timezone (datetime.tzinfo 或 str) - 指定时区 scheduler = BlockingScheduler() # 在 2019-8-30 01:00:01 运行一次 job 方法 scheduler.add_job(job, 'date' , run_date = '2019-8-30 01:00:00' , args = [ * value_list]) scheduler.start() |
封装的方法不是很通用,后面会优化一下代码,但最起码现在是能用的,哈哈哈哈哈哈
思考了一下思路,巡检触发任务,然后触发钉钉,所以定时任务应该是在最上层
之前分享的钉钉封装的代码内底部继续完善一下
1 2 3 4 5 6 7 8 |
if __name__ = = '__main__' : file_list = [ "test_shiyan.py" , "MeetSpringFestival.py" ] # run_py(file_list) case_list = [ "test_case_01" , "test_case_02" ] # run_case(test_sample, case_list) dingDing_list = [ 2 , case_list, test_sample] # run_dingDing(*dingDing_list) Timing( '2022-02-15 00:00:00' , '2022-02-16 00:00:00' , '0-23' ).cron(run_dingDing, * dingDing_list) |
把run_dingDing()的函数我们放在已经封装好的Timing().cron(run_dingDing,*dingDing_list)内,那么run_dingDing()内的参数我们通过元组的方式传入
就是我们上面写的这里能看到
1 2 3 4 |
def cron( self , job, * value_list): """cron格式 在特定时间周期性地触发""" scheduler.add_job(job, 'cron' , start_date = self .start_date, end_date = self .end_date, hour = self .hour, args = [ * value_list]) |
时间范围的填写我放在了Timing()初始化内,看着舒服一点
在运行Timing().cron()后就可以触发定时了,但是必须要开着电脑才可以,等后面开始研究平台,存储在服务器内就美吱吱了~
apscheduler 运行过程中出现类似如下报错:
Run time of job "9668_hack (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.387821Run time of job "9668_index (trigger: interval[0:30:00], next run at: 2018-10-29 21:30:00 CST)" was missed by 0:01:47.392574Run time of job "9669_deep (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.397622Run time of job "9669_hack (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.402938Run time of job "9669_index (trigger: interval[0:30:00], next run at: 2018-10-29 21:30:00 CST)" was missed by 0:01:47.407996
针对该问题百度是基本上指不上了,google到了关键配置,但仍然出现该报错,于是继续找资料,刨根问底这到是什么鬼问题导致的。
google 到的是github上的一个issue:https://github.com/agronholm/apscheduler/issues/146
里面说到了一个参数:misfire_grace_time,但是这个参数到底是干嘛用的,在其他地方找到了解释,其中涉及到几个其他参数,但是结合自己的理解综合总结一下
coalesce
:当由于某种原因导致某个job积攒了好几次没有实际运行(比如说系统挂了5分钟后恢复,有一个任务是每分钟跑一次的,按道理说这5分钟内本来是“计划”运行5次的,但实际没有执行),如果coalesce为True,下次这个job被submit给executor时,只会执行1次,也就是最后这次,如果为False,那么会执行5次(不一定,因为还有其他条件,看后面misfire_grace_time的解释)max_instance
:就是说同一个job同一时间最多有几个实例再跑,比如一个耗时10分钟的job,被指定每分钟运行1次,如果我们max_instance值为5,那么在第6~10分钟上,新的运行实例不会被执行,因为已经有5个实例在跑了misfire_grace_time
:设想和上述coalesce类似的场景,如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。示例:
15分钟一次的的任务,misfire_grace_time 设置100秒,在0:06分的时候提示:
Run time of job "9392_index (trigger: interval[0:15:00], next run at: 2018-10-27 00:15:00 CST)" was missed by 0:06:03.931026
解释:
于是我修改了配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
class Config( object ): SCHEDULER_JOBSTORES = { 'default' : RedisJobStore(db = 3 ,host = '0.0.0.0' , port = 6378 ,password = '******' ), } SCHEDULER_EXECUTORS = { 'default' : { 'type' : 'processpool' , 'max_workers' : 50 } #用进程池提升任务处理效率 } SCHEDULER_JOB_DEFAULTS = { 'coalesce' : True , #积攒的任务只跑一次 'max_instances' : 1000 , #支持1000个实例并发 'misfire_grace_time' : 600 #600秒的任务超时容错 } SCHEDULER_API_ENABLED = True |
我本以为这样应该就没什么问题了,配置看似完美,但是现实是残忍的,盯着apscheduler日志看了一会,熟悉的“was missed by”又出现了,这时候就需要怀疑这个配置到底有没有生效了,然后发现果然没有生效,从/scheduler/jobs中可以看到任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
{ "id" : "9586_site_status" , "name" : "9586_site_status" , "func" : "monitor_scheduler:monitor_site_status" , "args" : [ 9586 , "http://sl.jxcn.cn/" , 1000 , 100 , 200 , "", 0 , 2 ], "kwargs" : {}, "trigger" : "interval" , "start_date" : "2018-09-14T00:00:00+08:00" , "end_date" : "2018-12-31T00:00:00+08:00" , "minutes" : 15 , "misfire_grace_time" : 10 , "max_instances" : 3000 , "next_run_time" : "2018-10-24T18:00:00+08:00" } |
可以看到任务中默认就有misfire_grace_time配置,没有改为600,折腾一会发现修改配置,重启与修改任务都不会生效,只能修改配置后删除任务重新添加(才能把这个默认配置用上),或者修改任务的时候把这个值改掉
1 | scheduler.modify_job(func = func, id = id , args = args, trigger = trigger, minutes = minutes,start_date = start_date,end_date = end_date,misfire_grace_time = 600 ) |
然后就可以了?图样图森破,missed 依然存在。
其实从后来的报错可以发现这个容错时间是用上的,因为从执行时间加上600秒后才出现的报错。
那么还是回到这个超时根本问题上,即使容错时间足够长,没有这个报错了,但是一个任务执行时间过长仍然是个根本问题,所以终极思路还在于如何优化executor的执行时间上。
当然这里根据不同的任务处理方式是不一样的,在于各自的代码了,比如更改链接方式、代码是否有冗余请求,是否可以改为异步执行,等等。
而我自己的任务解决方式为:由接口请求改为python模块直接传参,redis链接改为内网,极大提升执行效率,所以也就控制了执行超时问题。
以上为个人经验,希望能给大家一个参考。
2023-03-17
python flask项目打包成docker镜像发布的过程2023-03-17
python调试模块ipdb详解2023-03-17
python使用openai生成图像的超详细教程