文章目录

  • 定时任务库对比
  • 简介
  • 安装
  • 初试
  • 不适用场景
  • 运行间隔
  • 装饰器
  • 传递参数
  • 取消任务
  • 运行一次任务
  • 检索所有任务
  • 取消所有任务
  • 根据标签检索任务
  • 根据标签取消任务
  • 随机间隔运行
  • 运行任务到某时间
  • 同一任务运行间隔
  • 马上运行所有任务
  • 后台运行
  • 并行运行
  • 遇到的坑
  • 参考文献

定时任务库对比

推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler

大小优点缺点适用场景
Schedule轻量级易用无配置不能动态添加任务或持久化任务简单任务
Celery重量级①任务队列
②分布式
①不能动态添加定时任务到系统中,如Flask(Django可以)
②设置起来较累赘
任务队列
APScheduler相对重量级①灵活,可动态增删定时任务并持久化
②支持多种存储后端
③集成框架多,用户广
重量级,学习成本大通用
Rocketry轻量级易用功能强尚未成熟,文档不清晰通用




简介

Schedule 是一款轻量级定时任务库,易用无配置。




本文所有代码




安装

pip install schedule




初试

import time
import schedule


def job():
    print('working...')


schedule.every(10).minutes.do(job)  # 每10分钟
schedule.every().hour.do(job)  # 每小时
schedule.every().day.at('10:30').do(job)  # 每天10:30
schedule.every().monday.do(job)  # 每周一
schedule.every().wednesday.at('13:15').do(job)  # 每周三13:15
schedule.every().minute.at(':17').do(job)  # 每分钟的17秒

while True:
    schedule.run_pending()
    time.sleep(1)




不适用场景

  • 持久化任务
  • 精确时间
  • 并发运行
  • 时区工作日假期的本地化
  • 不考虑任务运行所需的时间

为保证稳定运行计划,需要将长时间运行的任务从主线程分离开,参考示例实现。




运行间隔

import schedule
import time


def job():
    print('working...')


# 每隔3秒/分/小时/天/周运行
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)

# 每分钟的第23秒运行
schedule.every().minute.at(':23').do(job)

# 每小时的第42分钟运行
schedule.every().hour.at(':42').do(job)

# 每4小时的20分30秒运行,如果现在是02:00,那么初次运行时间是06:20:30
schedule.every(5).hours.at('20:30').do(job)

# 每天固定时间运行
schedule.every().day.at('10:30').do(job)
schedule.every().day.at('10:30:42').do(job)

# 每周固定时间运行
schedule.every().monday.do(job)
schedule.every().wednesday.at('13:15').do(job)

while True:
    schedule.run_pending()
    time.sleep(1)




装饰器

通过 @repeat() 装饰静态方法

import time
from schedule import every, repeat, run_pending


@repeat(every().second)
def job():
    print('working...')


while True:
    run_pending()
    time.sleep(1)




传递参数

import schedule


def greet(name):
    print('Hello', name)


schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

while True:
    schedule.run_pending()

装饰器(待测试)

from schedule import every, repeat


@repeat(every().second, 'World')
@repeat(every().day, 'Mars')
def hello(planet):
    print('Hello', planet)


while True:
    schedule.run_pending()




取消任务

调用 schedule.cancel_job(job)

import schedule

i = 0


def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)


job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

效果




运行一次任务

任务返回 schedule.CancelJob

import time
import schedule


def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob


schedule.every().minute.at(':17').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)




检索所有任务

import schedule


def hello():
    print('Hello world')


schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()
print(all_jobs)
# [Every 1 second do hello() (last run: [never], next run: 2021-04-15 22:45:44)]




取消所有任务

调用 schedule.clear()

import schedule

i = 0


def hello():
    print('Hello world')


def some_task():
    global i
    i += 1
    print(i)
    if i == 5:
        schedule.clear()
        print('clear all jobs')
        exit(0)


schedule.every().second.do(hello)
schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

效果




根据标签检索任务

调用 schedule.get_jobs(tag)

import schedule


def greet(name):
    print('Hello {}'.format(name))


schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

friends = schedule.get_jobs('friend')
print(friends)
# [Every 1 day do greet('Andrea') (last run: [never], next run: 2021-04-16 22:46:51), Every 1 hour do greet('John') (last run: [never], next run: 2021-04-15 23:46:51)]




根据标签取消任务

调用 schedule.clear(tag)

import schedule


def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')


schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')

while True:
    schedule.run_pending()

效果




随机间隔运行

import schedule
from datetime import datetime

last = datetime.now()


def job():
    global last
    now = datetime.now()
    print(now - last)
    last = now


schedule.every(1).to(5).seconds.do(job)  # 每隔1-5秒运行

while True:
    schedule.run_pending()

效果




运行任务到某时间

import schedule
from datetime import datetime, timedelta, time


def job():
    print('working...')


schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止

schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止

while True:
    schedule.run_pending()




同一任务运行间隔

调用 schedule.idle_seconds()

import time
import schedule


def job():
    print('working...')


schedule.every(5).seconds.do(job)

while True:
    n = schedule.idle_seconds()
    print(n)
    if n is None:  # 没其他任务
        break
    elif n > 0:
        time.sleep(n)  # 精确时间
    schedule.run_pending()




马上运行所有任务

不管调度,调用 schedule.run_all() 运行所有任务

import schedule


def job():
    print('working...')


def job1():
    print('Hello...')


schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)

schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟3秒

效果




后台运行

import time
import schedule
import threading


def run_continuously(interval=1):
    """隔一段时间运行"""
    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):
        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                schedule.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.start()
    return cease_continuous_run


def background_job():
    print('Hello from the background thread')


# 启动后台线程
schedule.every().second.do(background_job)
stop_run_continuously = run_continuously()

# 做其他事
for i in range(10):
    print(i)
    time.sleep(1)

# 停止后台线程
stop_run_continuously.set()

效果




并行运行

使用 Python 内置队列实现

import time
import queue
import schedule
import threading


def job():
    print('working...')


def worker_main():
    while 1:
        job_func = jobqueue.get()
        job_func()
        jobqueue.task_done()


jobqueue = queue.Queue()

schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)
schedule.every(3).seconds.do(jobqueue.put, job)

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while 1:
    schedule.run_pending()
    time.sleep(1)

效果




遇到的坑

  1. Schedule ImportError: cannot import name ‘repeat’

Win10下安装最新版本为 schedule==1.0.0,有些功能无法使用




参考文献

  1. Schedule GitHub
  2. Schedule Documentation

更多推荐

Python轻量级定时任务库Schedule