1. schedule

如果要实现一个小的定时任务脚本,可以采用schedule这个轻量级定时任务调度库。

import schedule
import time

def job(name):
    print(name,'do something...')

# 每十分钟执行任务
schedule.every(10).minutes.do(job, name)
# 每小时执行任务
schedule.every().hour.do(job, name)
# 每天定点执行任务
schedule.every().day.at('10:30').do(job, name)
# 每周一执行任务
schedule.every().monday.do(job, name)
# 每周一定点任务
schedule.every().monday.at('10:30').do(job, name)

while True:
    # 保持任务运行
    schedule.run_pending()
    time.sleep(1)

值得注意的是,如果是多个任务运行,实际上他们是串行执行的。如果上面的任务耗时,会影响下面任务的运行。

对于这种情况,可以使用多线程/多进程来解决。

import datetime
import schedule
import threading
import time


def job1():
    print('this is job1')
    time.sleep(2)
    print('job1:',datetime.datetime.now())

def job2():
    print('this is job2')
    time.sleep(2)
    print('job2:',datetime.datetime.now())

def task1():
    threading.Thread(target=job1).start()

def task2():
    threading.Thread(target=job2).start()

def run():
    schedule.every(10).seconds.do(task1)
    schedule.every(10).seconds.do(task2)

    while True:
        schedule.run_pending()
        time.sleep(1)

schedule的使用比较简单,就是一个死循环执行任务,因此定时任务job不应该是死循环类型的,这个任务线程需要有一个执行完毕的出口,否则会导致无限循环问题。另外一点是,定时任务的执行时间如果比schedule的间隔时间长,同样会造成线程堆积问题,引发异常。

2. celery

celery是一个强大的分布式任务队列,相比schedule更加完备而强大,同时也更加“重”。它可以让任务的执行完全脱离主程序,甚至是分配到其他主机上运行。

通常使用celery来实现异步任务和定时任务。其结构组成如下:

可以看到,celery主要包含以下几个模块:

  • 任务模块

包含异步任务与定时任务。异步任务通常在业务逻辑中被触发,并被发往任务队列;定时任务由celery beat进程周期性地将任务发往任务队列。

  • 消息中间件broker

broker,即为任务调度队列,接收任务生产者发来的任务消息,将任务存入队列。celery本身不提供任务队列,推荐使用RabbitMQ和Redis。

  • 任务执行单元worker

worker实时监控消息队列,获取调度的任务,并执行。

  • 任务结果存储backend

backend用于存储任务的执行结果,通消息中间件一样,存储可使用RabbitMQ,Redis,MongoDB等。

异步任务

使用celery实现异步任务主要包括三个步骤:

  • 创建一个celery实例
  • 启动celery worker
  • 程序调用异步任务

以下做具体介绍:

  1. 1.创建celery实例

这里使用Redis作为broker和backend。

创建文件task.py

import time
from celery import Celery

# 指定消息中间件用redis
broker = 'redis://127.0.0.1:6379'
# 指定存储用redis
backend = 'redis://127.0.0.1:6379/0'
# 创建一个celery实例app,名称为my_task
app = Celery('my_task', broker=broker, backend=backend)

# 创建一个celery任务add,被@app.task装饰后,成为可被调度的任务
@app.task
def add(x, y):
    time.sleep(5) # 模拟耗时操作
    return x+y
  1. 2.启动celery worker

在当前目录下,使用如下方式启动celery worker

celery worker -A task --loglevel=info

其中:

  • 参数-A指定了celery实例的位置,这里是task.py中,celery会自动在该文件中寻找celery实例对象,当然也可以直接指定为-A task.app;
  • 参数--loglevel指定了日志级别,默认为warning,也可以使用-l info来表示;

  1. 3.调用任务

现在可以使用delay()或者apply_async()方法来调用任务。

在当前目录下,打开Python控制台,输入如下:

我们从task.py中导入了add任务对象,然后使用delay()方法发送任务到broker,worker进程监测到该任务后就执行,

这时发现报错,原因是在Windows系统下使用celery4版本,解决方法是安装一个eventlet包,然后启动worker时加一个参数

celery worker -A task -l info -P eventlet

然后就可以正常使用了。

另外如果想获取执行后的结果,可以这样做:

上面是在交互环境中调用任务,实际上通常在程序用调用,建立client.py如下:

from task import add
import time

print('开始时间', time.ctime())
add.delay(2,5)
print('完成时间', time.ctime())

然后执行文件,结果如下:

可以看出,虽然任务函数需要等待5秒才返回结果,但是由于是一个异步任务,不会阻塞当前主程序,所以立刻执行了打印完成的语句。

相比直接把broker和backend配置写入程序代码中,更好的方式是增加一个配置文件,通常命名为`celeryconfig.py`。

__init__.py代码如下:

from celery import Celery

# 创建celery实例
app = Celery('demo')

# 通过celery实例加载配置模块
app.config_from_object('celery_app.celeryconfig')

celeryconfig.py代码如下:

# 指定broker
BROKER_URL = 'redis://127.0.0.1:6379'
# 指定backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

# 指定时区
CELERY_TIMEZONE = 'Asia/Shanghai'

# 指定导入的任务模块
CELERY_IMPORTS = (
    'celery_app.task1',
    'celery_app.task2'
)

task1.py代码如下:

import time
from celery_app import app

@app.task
def add(x,y):
    time.sleep(2)
    return x+y

task2.py代码如下:

import time
from celery_app import app

@app.task
def multiply(x,y):
    time.sleep(2)
    return x*y

client.py代码如下:

import time
from celery_app import task1
from celery_app import task2


print('开始', time.ctime())
task1.add.delay(2,3) # delay是apply_async的快捷方式
task2.multiply.apply_async(args=[2,3])

print('完成', time.ctime())

现在可以启动worker进程

然后执行python命令运行client.py文件。

在worker窗口,我们可以看到任务的执行

定时任务

celery beat进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。

除了celerconfig.py内容增加了定时调度内容,其他模块和异步任务相同。

celerconfig.py代码如下:

from datetime import timedelta
from celery.schedules import crontab

# 指定broker
BROKER_URL = 'redis://127.0.0.1:6379'
# 指定backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

# 指定时区
CELERY_TIMEZONE = 'Asia/Shanghai'

# 指定导入的任务模块
CELERY_IMPORTS = (
    'celery_app.task1',
    'celery_app.task2'
)

# 定时调度schedules
CELERYBEAT_SCHEDULE={
    'add-every-30-seconds':{
        'task':'celery_app.task1.add',
        'schedule':timedelta(seconds=30),  # 每30秒执行一次
        'args':(2,3)                        # 任务函数参数
    },

    'multiply-every-30-seconds':{
        'task':'celery_app.task2.multiply',
        'schedule':crontab(hour=14,minute=30), # 每天下午2点30分执行一次
        'args':(2,3)                            # 任务函数参数
    }

}

现在,启动worker进程,然后启动beat进程,定时任务将被发送到broker

之后在worker窗口,可以看到task1每30秒执行一次,task2则定点执行一次。

为了简化,也可以将启动worker进程和beat进程放在一条命令中:

celery -B -A celery_app worker -l info -P eventlet

文章参考schedules和celery

更多推荐

任务调度schedule和celery