背景

最近有个需求,需要实现一个定时或定期任务的功能,需要实现每月、每日、每时、一次性等需求,必须是轻量级不依赖其它额外组件,并能支持动态添加任务。

定时任务库对比

根据上面需求,从社区中找到了几个 Python 好用的任务调度库。有以下几个库:

  • schedule:Python job scheduling for humans. 轻量级,无需配置的作业调度库

  • python-crontab: 针对系统 Cron 操作 crontab 文件的作业调度库

  • Apscheduler:一个高级的 Python 任务调度库

  • Celery: 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度

优缺点对比:

  • schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务

  • Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求

  • Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源(如 redis、MongoDB),支持接入到各种异步框架(如 gevent、asyncio、tornado)

  • Celery 支持配置定期任务、支持 crontab 模式配置,不支持一次性定时任务

schedule

Python 任务调度库,和 requests 库一样 for humans. 这个库也是最轻量级的一个任务调度库,schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。

直接使用 pip install schedule 进行安装使用,下面来看看官网给的示例:

	import schedule
	import time

	# 定义你要周期运行的函数
	def job():
		print("I'm working...")
	
	schedule.every(10).minutes.do(job) 				# 每隔 10 分钟运行一次 job 函数
	schedule.every().hour.do(job) 					# 每隔 1 小时运行一次 job 函数
	schedule.every().day.at("10:30").do(job) 		# 每天在 10:30 时间点运行 job 函数
	schedule.every().monday.do(job) 				# 每周一 运行一次 job 函数
	schedule.every().wednesday.at("13:15").do(job) 	# 每周三 13:15 时间点运行 job 函数
	schedule.every().minute.at(":17").do(job) 		# 每分钟的 17 秒时间点运行 job 函数
	
	while True:
		schedule.run_pending() 						# 运行所有可以运行的任务
		time.sleep(1)

schedule 常见问题

  1. 如何并行执行任务
    schedule 是阻塞式的,默认情况下, schedule 按顺序执行所有的作业,不能达到并行执行任务。如下所示:

    	import arrow
    	import schedule
    	
    	def job1():
    		print("job1 start time: %s" % arrow.get().format())
    		time.sleep(2)
    		print("job1 end time: %s" % arrow.get().format())
    	
    	def job2():
    		print("job2 start time: %s" % arrow.get().format())
    		time.sleep(5)
    		print("job2 end time: %s" % arrow.get().format())
    	
    	def job3():
    		print("job3 start time: %s" % arrow.get().format())
    		time.sleep(10)
    		print("job3 end time: %s" % arrow.get().format())
    	
    	if __name__ == '__main__':
    		schedule.every(10).seconds.do(job1)
    		schedule.every(30).seconds.do(job2)
    		schedule.every(5).to(10).seconds.do(job3)
    	
    		while True:
    			schedule.run_pending()
    

    返回部分结果如下所示,几个任务并不是并行开始的,是安装时间顺序先后开始的:

    	job3 start time: 2019-06-01 09:27:54+00:00
    	job3 end time: 2019-06-01 09:28:04+00:00
    	job1 start time: 2019-06-01 09:28:04+00:00
    	job1 end time: 2019-06-01 09:28:06+00:00
    	job3 start time: 2019-06-01 09:28:13+00:00
    	job3 end time: 2019-06-01 09:28:23+00:00
    	job2 start time: 2019-06-01 09:28:23+00:00
    	job2 end time: 2019-06-01 09:28:28+00:00
    	job1 start time: 2019-06-01 09:28:28+00:00
    	job1 end time: 2019-06-01 09:28:30+00:00
    	job3 start time: 2019-06-01 09:28:30+00:00
    	job3 end time: 2019-06-01 09:28:40+00:00
    	job1 start time: 2019-06-01 09:28:40+00:00
    	job1 end time: 2019-06-01 09:28:42+00:00
    

    如果需要实现并行,那么使用多线程方式运行任务,官方给出的并行方案如下:

    	import threading
    	import time
    	import schedule
    	
    	def job():
    		print("I'm running on thread %s" % threading.current_thread())
    	
    	def run_threaded(job_func):
    		job_thread = threading.Thread(target=job_func)
    		job_thread.start()
    	
    	schedule.every(10).seconds.do(run_threaded, job)
    	schedule.every(10).seconds.do(run_threaded, job)
    	schedule.every(10).seconds.do(run_threaded, job)
    	schedule.every(10).seconds.do(run_threaded, job)
    	schedule.every(10).seconds.do(run_threaded, job)
    	
    	while 1:
    		schedule.run_pending()
    		time.sleep(1)
    
    
    	# 可以通过 run_daemon_thread 起一个守护线程方式来达到动态添加任务的功能,每个任务最终通过新开线程方式执行
    	import threading
    	
    	def ensure_schedule():
    		schedule.every(5).seconds.do(do_some)
    	def ensure_schedule_2():
    		schedule.every(10).seconds.do(print_some)
    	
    	def run_daemon_thread(target, *args, **kwargs):
    		job_thread = threading.Thread(target=target, args=args, kwargs=kwargs)
    		job_thread.setDaemon(True)
    		job_thread.start()
    	
    	def __start_schedule_deamon():
    		def schedule_run():
    			while True:
    				schedule.run_pending()
    				time.sleep(1)
    	
    		t = threading.Thread(target=schedule_run)
    		t.setDaemon(True)
    		t.start()
    		
    	def init_schedule_job():
    		run_daemon_thread(ensure_schedule)
    		run_daemon_thread(ensure_schedule_2)
    		
    	init_schedule_job()
    	__start_schedule_deamon()
    
  2. 如何在不阻塞主线程的情况下连续运行调度程序?
    官方推荐了这个方式,在单独的线程中运行调度程序,如下,在单独的线程中运行 run_pending 调度程序。通过 threading 库的 Event 来实现

     def run_continuously(self, interval=1):
            """Continuously run, while executing pending jobs at each elapsed
            time interval.
            @return cease_continuous_run: threading.Event which can be set to
            cease continuous run.
            Please note that it is *intended behavior that run_continuously()
            does not run missed jobs*. For example, if you've registered a job
            that should run every minute and you set a continuous run interval
            of one hour then your job won't be run 60 times at each interval but
            only once.
            """
            cease_continuous_run = threading.Event()
    
            class ScheduleThread(threading.Thread):
                @classmethod
                def run(cls):
                    while not cease_continuous_run.is_set():
                        self.run_pending()
                        time.sleep(interval)
    
            continuous_thread = ScheduleThread()
            continuous_thread.start()
            return cease_continuous_run
    
  3. 是否支持时区
    官方不计划支持时区,可使用:

    讨论:https://github/dbader/schedule/pull/16
    时区解决:https://github/imiric/schedule/tree/feat/timezone
    
  4. 如果我的任务抛出异常怎么办?
    schedule 不捕获作业执行期间发生的异常,因此在任务执行期间的任何异常都会冒泡并中断调度的 run_xyz(如 run_pending ) 函数, 也就是 run_pending 中断退出,导致其它任务无法执行

    	import functools
    	
    	def catch_exceptions(cancel_on_failure=False):
    		def catch_exceptions_decorator(job_func):
    			@functools.wraps(job_func)
    			def wrapper(*args, **kwargs):
    				try:
    					return job_func(*args, **kwargs)
    				except:
    					import traceback
    					print(traceback.format_exc())
    					if cancel_on_failure:
    						return schedule.CancelJob
    			return wrapper
    		return catch_exceptions_decorator
    	
    	@catch_exceptions(cancel_on_failure=True)
    	def bad_task():
    		return 1 / 0
    	
    	schedule.every(5).minutes.do(bad_task)
    

    另外一种解决方案:
    https://gist.github/mplewis/8483f1c24f2d6259aef6

  5. 如何设置只跑一次的任务?

    	def job_that_executes_once():
    		# Do some work ...
    		return schedule.CancelJob
    	
    	schedule.every().day.at('22:30').do(job_that_executes_once)
    
  6. 如何一次取消多个任务?

    # 通过 tag 函数给它们添加唯一标识符进行分组,取消时通过标识符进行取消相应组的任务
    def greet(name):
    	print('Hello {}'.format(name))
    
    schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
    schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
    schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
    schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
    
    schedule.clear('daily-tasks')
    
  7. 如何传递参数给任务函数

    	def greet(name):
    		print('Hello', name)
    	
    	schedule.every(2).seconds.do(greet, name='Alice')
    	schedule.every(4).seconds.do(greet, name='Bob')
    

APScheduler 库

APScheduler(Advanced Python Scheduler)是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能, 是一个轻量级但功能强大的进程内任务调度程序。它有以下三个特点:

  • 类似于 Liunx Cron 的调度程序(可选的开始/结束时间)

  • 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)

  • 一次性执行任务(在设定的日期/时间运行一次任务)

可以按照个人喜好来混合和匹配调度系统和存储作业的后端存储,支持以下几种后台作业存储:

  • Memory

  • SQLAlchemy (任何 SQLAlchemy 支持的关系型数据库)

  • MongoDB

  • Redis

  • ZooKeeper

  • RethinkDB

APScheduler 集成了以下几个 Python 框架:

  • asyncio

  • gevent

  • Tornado

  • Twisted

  • Qt

总结以上,APScheduler 支持基于日期、固定时间、crontab 形式三种形式的任务调度,可以灵活接入各种类型的后台作业存储来持久化作业,同时提供了多种调度器(后面提及),集成多种 Python 框架,可以根据实际情况灵活组合后台存储以及调度器来使用。

APScheduler 的架构及工作原理

  1. APScheduler 基本概念
    APScheduler 由四个组件构成:

    • triggers 触发器
      触发器包含调度逻辑。每个作业(job)都有自己的触发器,用于确定下一个作业何时运行。除了最初的配置,触发器是完全无状态的

    • job stores 作业存储
      job stores 是存放作业的地方,默认保存在内存中。作业数据序列化后保存至持久性数据库,从持久性数据库加载回来时会反序列化。作业存储(job stores)不将作业数据保存在内存中(默认存储除外),相反,内存只是充当后端存储在保存、加载、更新、查找作业时的中间人角色。作业存储不能在调度器(schedulers) 之间共享

    • executors 执行器
      执行器处理作业的运行。它们通常通过将作业中的指定可调用部分提交给线程或进程池来实现这一点。 当作业完成后,执行器通知调度器,然后调度器发出一个适当的事件

    • schedulers 调度器
      调度器是将其余部分绑定在一起的工具。通常只有一个调度器(scheduler)在应用程序中运行。应用程序开发者通常不直接处理作业存储(job stores)、执行器(executors)或者触发器(triggers)。相反,调度器提供了适当的接口来处理它们。配置作业存储(job stores)和执行器(executors)是通过调度器(scheduler)来完成的,就像添加、修改和删除 job(作业)一样

  2. APScheduler 架构图

更多推荐

Python 定时任务(schedule, Apscheduler, celery, python-crontab)