参考

https://schedule.readthedocs.io/en/stable/

安装

pip install schedule

基本使用

基本的时间调度,调用语句已经说明地比较清晰:

import schedule
import time


def job():
    """
    运行的任务
    :return:
    """
    print("I'm working...")


# 每 10 分钟运行一次
schedule.every(10).minutes.do(job)
# 每小时运行一次
schedule.every().hour.do(job)
# 每天的 10 点半运行一次
schedule.every().day.at("10:30").do(job)
# 每个月运行一次
schedule.every().monday.do(job)
# 每周三的 13:15 运行一次
schedule.every().wednesday.at("13:15").do(job)
# 每分钟的第 17 s 运行
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

接下来对使用中常见的问题进行说明:

并行执行任务

while True 所在是主线程,当 sleep 的时候将会切换执行 job
举例:当一个任务耗时 10 s 执行完毕,但主程序每次切换的间隔是 3 s, 当新一轮的任务来临时,上一轮的任务还没有执行完毕。

import schedule
import time


def job():
    print(f"======job start {time.time()}======")
    time.sleep(10)
    print(f"======jod done {time.time()}======")


for i in range(2):
    schedule.every(2).seconds.do(job)


while True:
    schedule.run_pending()
    print(f"======main start {time.time()}======")
    print(len(schedule.jobs))
    time.sleep(3)
    print(f"======main done {time.time()}======")

执行的结果: 从时间戳可见,整个过程是单线程的。

======main start 1563267266.730888======
2
======main done 1563267269.7347798======
======job start 1563267269.734943======
======jod done 1563267279.736053======
======job start 1563267279.736196======
======jod done 1563267289.740654======
======main start 1563267289.740792======
2
======main done 1563267292.745822======
======job start 1563267292.745949======
======jod done 1563267302.748458======
======job start 1563267302.748595======
======jod done 1563267312.752794======
======main start 1563267312.752919======
2
======main done 1563267315.757164======
...

我们想要并行执行任务,可以考虑使用多线程:

import threading
import schedule
import time


def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()


def job():
    print(f"======job start {time.time()}======")
    time.sleep(10)
    print(f"======jod done {time.time()}======")


for i in range(2):
    schedule.every(2).seconds.do(run_threaded, job)


while True:
    schedule.run_pending()
    print(f"======main start {time.time()}======")
    print(len(schedule.jobs))
    time.sleep(3)
    print(f"======main done {time.time()}======")

运行结果如下:

======main start 1563267930.512416======
2
======main done 1563267933.515164======
======job start 1563267933.5159109======
======job start 1563267933.516366======
======main start 1563267933.516513======
2
======main done 1563267936.518573======
======job start 1563267936.5190458======
======job start 1563267936.519522============main start 1563267936.5196202======
2

======main done 1563267939.5213609======
======job start 1563267939.5218391======
======job start 1563267939.522301============main start 1563267939.522398======
2

======main done 1563267942.52272======
======job start 1563267942.523227======
======job start 1563267942.52368======
======main start 1563267942.523786======
2
======jod done 1563267943.518081======
======jod done 1563267943.5181959======
======main done 1563267945.526912======
...

如果我们想要严格控制线程数量,可以使用共享队列以及一个或者多个工作线程:

from queue import Queue
import time
import threading
import schedule


def job():
    print(f"======job start {time.time()}======")
    time.sleep(10)
    print(f"======jod done {time.time()}======")


def worker_main():
    while 1:
        job_func = jobqueue.get()
        job_func()
        jobqueue.task_done()


jobqueue = Queue()

schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)

# 严格控制在一个线程中去完成
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while 1:
    schedule.run_pending()
    time.sleep(1)

捕获任务中的异常

schedule 并不会捕获任务中出现的异常,( Therefore any exceptions thrown during job execution will bubble up and interrupt schedule’s run_xyz function.) 因此任何在任务的执行过程中任何的异常都会冒泡中断 schedule 的 run_xyz 函数。
举例子:

import functools

import schedule

def bad_task():
    return 1 / 0


schedule.every(5).seconds.do(bad_task)

while True:
    schedule.run_pending()

如果我们想要捕获异常自己做处理,可以使用装饰器的形式:

import functools

import schedule


def catch_exceptions(cancel_on_failure=False):
    """
    捕获异常处理程序
    :param cancel_on_failure: 决定在异常出现时是否对程序进行中断
    :return:
    """
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator


@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0


schedule.every(5).seconds.do(bad_task)

while True:
    schedule.run_pending()

这时进程没有被终止,但是任务已经取消了:

/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/mygit/pydailynotes/python-schedule/demo005.py
Traceback (most recent call last):
  File "/Users/furuiyang/mygit/pydailynotes/python-schedule/demo005.py", line 16, in wrapper
    return job_func(*args, **kwargs)
  File "/Users/furuiyang/mygit/pydailynotes/python-schedule/demo005.py", line 28, in bad_task
    return 1 / 0
ZeroDivisionError: division by zero

执行一个只运行一次的任务

举例: 该任务在运行一次之后被取消, 具体我们可以打印 schedule 中的 jobs 得知。

import time

import schedule


def job_that_executes_once():
    print("""
    Beautiful is better than ugly.
    Explicit is better than implicit.
    Simple is better than complex.
    Complex is better than complicated.
    Flat is better than nested.
    Sparse is better than dense.
    Readability counts.
    Special cases aren't special enough to break the rules.
    Although practicality beats purity.
    Errors should never pass silently.
    Unless explicitly silenced.
    In the face of ambiguity, refuse the temptation to guess.
    There should be one-- and preferably only one --obvious way to do it.
    Although that way may not be obvious at first unless you're Dutch.
    Now is better than never.
    Although never is often better than *right* now.
    If the implementation is hard to explain, it's a bad idea.
    If the implementation is easy to explain, it may be a good idea.
    Namespaces are one honking great idea -- let's do more of those!
    """)
    # Do some work ...
    return schedule.CancelJob


schedule.every(5).seconds.do(job_that_executes_once)


while True:
    schedule.run_pending()
    print(schedule.jobs)
    time.sleep(3)

一次性取消多个任务

后面这几个部分其实没有用到,但是为了完整性还是写在这里,方便之后用到的时候整理思路。

import random
import time

import schedule


def greet(name):
    print('Hello {}'.format(name))


schedule.every(4).seconds.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every(4).seconds.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every(3).seconds.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(2).seconds.do(greet, 'Derek').tag('daily-tasks', 'guest')


while True:
    schedule.run_pending()
    n = random.randint(1, 5)
    print(n)
    if n == 3:
        schedule.clear('daily-tasks')
    time.sleep(10)

运行结果:
可以看到,在取消了每个被标记的任务组之后, 这一组任务都不再执行。

/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/mygit/pydailynotes/python-schedule/demo007.py
4
Hello Derek
Hello Monica
Hello Andrea
Hello John
3
Hello Monica
Hello John
1
Hello Monica
Hello John
5
Hello Monica
Hello John
2
Hello Monica
Hello John

为任务添加通用日志

最简单的方法是实现一个装饰器:

import functools
import time

import schedule


# This decorator can be applied to
def with_logging(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print('LOG: Running job "%s"' % func.__name__)
        result = func(*args, **kwargs)
        print('LOG: Job "%s" completed' % func.__name__)
        return result
    return wrapper


@with_logging
def job():
    print('Hello, World.')


schedule.every(3).seconds.do(job)


while 1:
    schedule.run_pending()
    time.sleep(1)

怎样随机时间执行任务

import functools
import time

import schedule


def with_logging(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        t1 = time.time()  # 本次执行的时间点 
        print('LOG: Running job "%s"' % func.__name__)
        result = func(*args, **kwargs)
        print('LOG: Job "%s" completed' % func.__name__)
        print(time.time() - t1)  # 可以看做是上一次执行的时间 

        return result
    return wrapper


@with_logging
def my_job():
    # This job will execute every 5 to 10 seconds.
    print('Foo')


schedule.every(5).to(10).seconds.do(my_job)


while True:
    schedule.run_pending()
    time.sleep(3)

向任务函数中传参

import time

import schedule


def greet(name):
    print('Hello', name)


schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

while True:
    schedule.run_pending()
    time.sleep(3)

更多推荐

Python定时任务库schedule的使用