文章目录
- 定时任务库对比
- 简介
- 安装
- 初试
- 不适用场景
- 运行间隔
- 装饰器
- 传递参数
- 取消任务
- 运行一次任务
- 检索所有任务
- 取消所有任务
- 根据标签检索任务
- 根据标签取消任务
- 随机间隔运行
- 运行任务到某时间
- 同一任务运行间隔
- 马上运行所有任务
- 后台运行
- 并行运行
- 遇到的坑
- 参考文献
定时任务库对比
推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler
库 | 大小 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
Schedule | 轻量级 | 易用无配置 | 不能动态添加任务或持久化任务 | 简单任务 |
Celery | 重量级 | ①任务队列 ②分布式 | ①不能动态添加定时任务到系统中,如Flask(Django可以) ②设置起来较累赘 | 任务队列 |
APScheduler | 相对重量级 | ①灵活,可动态增删定时任务并持久化 ②支持多种存储后端 ③集成框架多,用户广 | 重量级,学习成本大 | 通用 |
Rocketry | 轻量级 | 易用功能强 | 尚未成熟,文档不清晰 | 通用 |
简介
Schedule
是一款轻量级定时任务库,易用无配置。
本文所有代码
安装
pip install schedule
初试
import time
import schedule
def job():
print('working...')
schedule.every(10).minutes.do(job) # 每10分钟
schedule.every().hour.do(job) # 每小时
schedule.every().day.at('10:30').do(job) # 每天10:30
schedule.every().monday.do(job) # 每周一
schedule.every().wednesday.at('13:15').do(job) # 每周三13:15
schedule.every().minute.at(':17').do(job) # 每分钟的17秒
while True:
schedule.run_pending()
time.sleep(1)
不适用场景
- 持久化任务
- 精确时间
- 并发运行
- 时区工作日假期的本地化
- 不考虑任务运行所需的时间
为保证稳定运行计划,需要将长时间运行的任务从主线程分离开,参考示例实现。
运行间隔
import schedule
import time
def job():
print('working...')
# 每隔3秒/分/小时/天/周运行
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)
# 每分钟的第23秒运行
schedule.every().minute.at(':23').do(job)
# 每小时的第42分钟运行
schedule.every().hour.at(':42').do(job)
# 每4小时的20分30秒运行,如果现在是02:00,那么初次运行时间是06:20:30
schedule.every(5).hours.at('20:30').do(job)
# 每天固定时间运行
schedule.every().day.at('10:30').do(job)
schedule.every().day.at('10:30:42').do(job)
# 每周固定时间运行
schedule.every().monday.do(job)
schedule.every().wednesday.at('13:15').do(job)
while True:
schedule.run_pending()
time.sleep(1)
装饰器
通过 @repeat()
装饰静态方法
import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
print('working...')
while True:
run_pending()
time.sleep(1)
传递参数
import schedule
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
schedule.run_pending()
装饰器(待测试)
from schedule import every, repeat
@repeat(every().second, 'World')
@repeat(every().day, 'Mars')
def hello(planet):
print('Hello', planet)
while True:
schedule.run_pending()
取消任务
调用 schedule.cancel_job(job)
import schedule
i = 0
def some_task():
global i
i += 1
print(i)
if i == 10:
schedule.cancel_job(job)
print('cancel job')
exit(0)
job = schedule.every().second.do(some_task)
while True:
schedule.run_pending()
效果
运行一次任务
任务返回 schedule.CancelJob
import time
import schedule
def job_that_executes_once():
print('Hello')
return schedule.CancelJob
schedule.every().minute.at(':17').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
检索所有任务
import schedule
def hello():
print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()
print(all_jobs)
# [Every 1 second do hello() (last run: [never], next run: 2021-04-15 22:45:44)]
取消所有任务
调用 schedule.clear()
import schedule
i = 0
def hello():
print('Hello world')
def some_task():
global i
i += 1
print(i)
if i == 5:
schedule.clear()
print('clear all jobs')
exit(0)
schedule.every().second.do(hello)
schedule.every().second.do(some_task)
while True:
schedule.run_pending()
效果
根据标签检索任务
调用 schedule.get_jobs(tag)
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
# [Every 1 day do greet('Andrea') (last run: [never], next run: 2021-04-16 22:46:51), Every 1 hour do greet('John') (last run: [never], next run: 2021-04-15 23:46:51)]
根据标签取消任务
调用 schedule.clear(tag)
import schedule
def greet(name):
print('Hello {}'.format(name))
if name == 'Cancel':
schedule.clear('second-tasks')
print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
schedule.run_pending()
效果
随机间隔运行
import schedule
from datetime import datetime
last = datetime.now()
def job():
global last
now = datetime.now()
print(now - last)
last = now
schedule.every(1).to(5).seconds.do(job) # 每隔1-5秒运行
while True:
schedule.run_pending()
效果
运行任务到某时间
import schedule
from datetime import datetime, timedelta, time
def job():
print('working...')
schedule.every().second.until('23:59').do(job) # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止
while True:
schedule.run_pending()
同一任务运行间隔
调用 schedule.idle_seconds()
import time
import schedule
def job():
print('working...')
schedule.every(5).seconds.do(job)
while True:
n = schedule.idle_seconds()
print(n)
if n is None: # 没其他任务
break
elif n > 0:
time.sleep(n) # 精确时间
schedule.run_pending()
马上运行所有任务
不管调度,调用 schedule.run_all()
运行所有任务
import schedule
def job():
print('working...')
def job1():
print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3) # 任务间延迟3秒
效果
后台运行
import time
import schedule
import threading
def run_continuously(interval=1):
"""隔一段时间运行"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def background_job():
print('Hello from the background thread')
# 启动后台线程
schedule.every().second.do(background_job)
stop_run_continuously = run_continuously()
# 做其他事
for i in range(10):
print(i)
time.sleep(1)
# 停止后台线程
stop_run_continuously.set()
效果
并行运行
使用 Python 内置队列实现
import time
import queue
import schedule
import threading
def job():
print('working...')
def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()
jobqueue = queue.Queue()
schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()
while 1:
schedule.run_pending()
time.sleep(1)
效果
遇到的坑
- Schedule ImportError: cannot import name ‘repeat’
Win10下安装最新版本为 schedule==1.0.0,有些功能无法使用
参考文献
- Schedule GitHub
- Schedule Documentation
更多推荐
Python轻量级定时任务库Schedule
发布评论