您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

详细解析Python实现定时任务之apscheduler

2024/3/2 1:28:48发布16次查看
本篇文章给大家带来了关于python的相关知识,其中主要介绍了关于实现定时任务的相关问题,可以使用第三方包来管理定时任务,相对来说apscheduler使用起来更简单,下面一起来看一下使用的方法,希望对大家有帮助。
【相关推荐:python3视频教程 】
初识apscheduler来个简单的例子看看apscheduler是如何使用的。
#encoding:utf-8from apscheduler.schedulers.blocking import blockingschedulerimport datetimedef sch_test(): now = datetime.datetime.now().strftime('%y-%m-%d %h:%m:%s') print('时间:{}, 测试apscheduler'.format(now))task = blockingscheduler()task.add_job(func=sch_test, trigger='cron', second='*/10')task.start()
上述例子很简单,我们首先要定义一个apscheduler的对象,然后add_job添加任务,最后start开启任务就行了。
例子是每隔10秒运行一次sch_test任务,运行结果如下:
时间:2022-10-08 15:16:30, 测试apscheduler时间:2022-10-08 15:16:40, 测试apscheduler时间:2022-10-08 15:16:50, 测试apscheduler时间:2022-10-08 15:17:00, 测试apscheduler
如果我们要在执行任务函数时携带参数,只要在add_job函数中添加args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。
apscheduler有哪些模块上面例子中我们初步了解到如何使用apschedulerl了,接下来需要知道apscheduler的设计框架。apscheduler有四个主要模块,分别是:触发器triggers、任务存储器job_stores、执行器executors、调度器schedulers。
1. 触发器triggers:
触发器指的是任务指定的触发方式,例子中我们用的是“cron”方式。我们可以选择cron、date、interval中的一个。
cron表示的是定时任务,类似linux crontab,在指定的时间触发。
可用参数如下:
除此之外,我们还可用表达式类型去设置cron。比如常用的有:
使用方法示例,在每天7点20分执行一次:
task.add_job(func=sch_test, args=('定时任务',), trigger='cron',
hour='7', minute='20')
date表示具体到某个时间的一次性任务;
使用方法示例:
# 使用run_date指定运行时间task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))# 或者用next_run_timetask.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
interval表示的是循环任务,指定一个间隔时间,每过间隔时间执行一次。
interval可设置如下的参数:
使用方法示例,每隔3秒执行一次sch_test任务:
task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)。
来个例子把3种触发器都使用一遍:
# encoding:utf-8from apscheduler.schedulers.blocking import blockingschedulerimport datetimedef sch_test(job_type): now = datetime.datetime.now().strftime('%y-%m-%d %h:%m:%s') print('时间:{}, {}测试apscheduler'.format(now, job_type))task = blockingscheduler()task.add_job(func=sch_test, args=('一次性任务',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))task.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')task.add_job(func=sch_test, args=('循环任务',), trigger='interval', seconds=3)task.start()
打印部分结果:
时间:2022-10-08 15:45:49, 一次性任务测试apscheduler时间:2022-10-08 15:45:49, 循环任务测试apscheduler时间:2022-10-08 15:45:50, 定时任务测试apscheduler时间:2022-10-08 15:45:52, 循环任务测试apscheduler时间:2022-10-08 15:45:55, 定时任务测试apscheduler时间:2022-10-08 15:45:55, 循环任务测试apscheduler时间:2022-10-08 15:45:58, 循环任务测试apscheduler
通过代码示例和结果展示,我们可清晰的知道不同触发器的使用区别。
2. 任务存储器job_stores
顾名思义,任务存储器是存储任务的地方,默认都是存储在内存中。我们也可自定义存储方式,比如将任务存到mysql中。这里有以下几种选择:
通常默认存储在内存即可,但若程序故障重启的话,会重新拉取任务运行了,如果你对任务的执行要求高,那么可以选择其他的存储器。
使用sqlalchemyjobstore存储器示例:
from apscheduler.schedulers.blocking import blockingschedulerdef sch_test(job_type): now = datetime.datetime.now().strftime('%y-%m-%d %h:%m:%s') print('时间:{}, {}测试apscheduler'.format(now, job_type))sched = blockingscheduler()# 使用mysql存储任务sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'sched.add_jobstore('sqlalchemy',url=sql_url)# 添加任务sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')sched.start()
3. 执行器executors
执行器的功能就是将任务放到线程池或进程池中运行。有以下几种选择:
默认是threadpoolexecutor, 常用的也就是第线程和进程池执行器。如果应用是cpu密集型操作,可用processpoolexecutor来执行。
4. 调度器schedulers
调度器属于apscheduler的核心,它扮演着统筹整个apscheduler系统的角色,存储器、执行器、触发器在它的调度下正常运行。调度器有以下几个:
不是特定场景下,我们最常用的是blockingscheduler调度器。
异常监听
定时任务在运行时,若出现错误,需要设置监听机制,我们通常结合logging模块记录错误信息。
使用示例:
from apscheduler.schedulers.blocking import blockingschedulerimport datetimefrom apscheduler.events import event_job_executed , event_job_errorimport logging# logging日志配置打印格式及保存位置logging.basicconfig(level=logging.info, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%y-%m-%d %h:%m:%s', filename='sche.log', filemode='a')def log_listen(event):if event.exception :print ( '任务出错,报错信息:{}'.format(event.exception))else:print ( '任务正常运行...' )def sch_test(job_type): now = datetime.datetime.now().strftime('%y-%m-%d %h:%m:%s') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0)sched = blockingscheduler()# 使用mysql存储任务sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'sched.add_jobstore('sqlalchemy',url=sql_url)# 添加任务sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')# 配置任务执行完成及错误时的监听sched.add_listener(log_listen, event_job_executed | event_job_error)# 配置日志监听sched._logger = loggingsched.start()
apscheduler的封装使用
上面介绍了apscheduler框架的主要模块,我们基本能掌握怎样使用apscheduler了。下面就来封装一下apscheduler吧,以后要用直接在这份代码上修改就行了。
from apscheduler.schedulers.blocking import blockingschedulerfrom apscheduler.executors.pool import threadpoolexecutor, processpoolexecutorfrom apscheduler.events import event_job_executed , event_job_errorimport loggingimport logging.handlersimport osimport datetimeclass loggerutils(): def init_logger(self, logger_name): # 日志格式 formatter = logging.formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s') log_obj = logging.getlogger(logger_name) log_obj.setlevel(logging.info) # 设置log存储位置 path = '/data/logs/' filename = '{}{}.log'.format(path, logger_name) if not os.path.exists(path): os.makedirs(path) # 设置日志按照时间分割 timehandler = logging.handlers.timedrotatingfilehandler( filename, when='d', # 按照什么维度切割, s:秒,m:分,h:小时,d:天,w:周 interval=1, # 多少天切割一次 backupcount=10 # 保留几天 ) timehandler.setlevel(logging.info) timehandler.setformatter(formatter) log_obj.addhandler(timehandler) return log_objclass scheduler(loggerutils): def __init__(self): # 执行器设置 executors = { 'default': threadpoolexecutor(10), # 设置一个名为“default”的threadpoolexecutor,其worker值为10 'processpool': processpoolexecutor(5) # 设置一个名为“processpool”的processpoolexecutor,其worker值为5 } self.scheduler = blockingscheduler(timezone="asia/shanghai", executors=executors) # 存储器设置 # 这里使用sqlalchemy存储器,将任务存储在mysql sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8' self.scheduler.add_jobstore('sqlalchemy',url=sql_url) def log_listen(event): if event.exception: # 日志记录 self.scheduler._logger.error(event.traceback) # 配置任务执行完成及错误时的监听 self.scheduler.add_listener(log_listen, event_job_executed | event_job_error) # 配置日志监听 self.scheduler._logger = self.init_logger('sche_test') def add_job(self, *args, **kwargs): """添加任务""" self.scheduler.add_job(*args, **kwargs) def start(self): """开启任务""" self.scheduler.start()# 测试任务def sch_test(job_type): now = datetime.datetime.now().strftime('%y-%m-%d %h:%m:%s') print('时间:{}, {}测试apscheduler'.format(now, job_type)) print(1/0)# 添加任务,开启任务sched = scheduler()# 添加任务sched.add_job(func=sch_test, args=('定时任务',), trigger='cron', second='*/5')# 开启任务sched.start()
【相关推荐:python3视频教程 】
以上就是详细解析python实现定时任务之apscheduler的详细内容。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product