目录

前言

schedule库的安装

schedule库的使用

1.简单使用

2.定时/周期执行配置

3.标签与任务的取消

总结


前言

  如果你想要在某个系统上,定时/周期性执行python脚本的话,可以考虑在python脚本中使用轻量级的 schedule 库。这个库入手简单方便。

schedule库的安装

        因为 schedule 库属于第三方模块,在使用之前需要提前安装,打开终端:

        安装命令:  `  pip install schedule  `

schedule库的使用

1.简单使用

        示例1  使用 schedule 的常见步骤

# 第一步 引入依赖
import schedule

# 第二步 创建需要调度的任务 常常是封装成一个函数/一个python文件
def task():
    # 具体逻辑
    print("我在天上飘")

# 第三步 配置schedule执行的时机
schedule.every(2).seconds.do(task)  # 每2秒执行 task 任务

# 第四步 维持schedule存活状态 - 常常与逻辑死循环结合
while True:
    schedule.run_pending()  # 保持schedule一直在运行

         示例2  当某一个任务并不会被多个逻辑征用的时候,可以使用装饰器

# 与上文相同的作用
from schedule import repeat, every, run_pending

@repeat(every(2).seconds)
def task():
    print("我在天上飘")

while True:
    run_pending()

         示例3  每天上午9:00-10:30,下午13:00-15:00这两个时间段,每5秒发送一次请求

import csv
import re
import requests
import schedule
import time

url = ''
header = {}
is_running = False  # 控制是否发送请求


def get_data():
    # 发送请求,获取数据的逻辑
    response = requests.get(url=url, headers=header)
    # 数据处理逻辑 (此处省略)


def task1():
    """
    控制开始发送请求
    """
    global is_running
    is_running = True
    while is_running:
        get_data()
        time.sleep(5)


def task2():
    """
    控制结束发送请求
    """
    global is_running
    is_running = False


if __name__ == '__main__':
    schedule.every().day.at("09:00").do(task1)
    schedule.every().day.at("10:30").do(task2)
    schedule.every().day.at("13:00").do(task1)
    schedule.every().day.at("15:00").do(task2)
    while True:
        schedule.run_pending()
        time.sleep(1)

         示例4  向任务传参

import schedule

def task(message):
    print(message)

schedule.every(2).seconds.do(task, "我在天上飘")

while True:
    schedule.run_pending()
    time.sleep(1)

2.定时/周期执行配置

        示例如下 - 可以参考模仿着写

import schedule
import time


def task():
    print("我在天上飘")


schedule.every().seconds.do(task)  # 每秒运行 1 次
schedule.every(2).seconds.do(task)  # 每2秒运行 1 次
schedule.every().minutes.do(task)  # 每分钟运行 1 次
schedule.every().hours.do(task)  # 每小时运行 1 次
schedule.every().days.do(task)  # 每天运行 1 次
schedule.every().weeks.do(task)  # 每周运行 1 次
schedule.every().monday.do(task)  # 每周运行 1 次
schedule.every().minute.at(":30").do(task)  # 每分钟的第30秒运行 1 次
schedule.every().day.at("08:00").do(task)  # 每天上午8点运行 1 次
schedule.every().monday.at("23:30").do(task)  # 每周一晚上11:30运行 1 次
schedule.every().tuesday.at("23:30").do(task)  # 每周二晚上11:30运行 1 次
schedule.every().wednesday.at("23:30").do(task)  # 每周三晚上11:30运行 1 次
schedule.every().thursday.at("23:30").do(task)  # 每周四晚上11:30运行 1 次
schedule.every().friday.at("23:30").do(task)  # 每周五晚上11:30运行 1 次
schedule.every().saturday.at("23:30").do(task)  # 每周六晚上11:30运行 1 次
schedule.every().sunday.at("23:30").do(task)  # 每周日晚上11:30运行 1 次


while True:
    schedule.run_pending()
    time.sleep(1)

3.标签与任务的取消

        在每个 do(func) 后面都可以用 tag 标签来标记,标签可以不唯一

import schedule
import time

def task(message):
    print('{},真滴吗?'.format(message))

# 为每个调度任务配上两个标签,标签个数不做限制
schedule.every().seconds.do(task, "我在天上飘").tag("second-task", "first-one")
schedule.every().minutes.do(task, "我在地上跑").tag("second-task", "secondary-one")

while True:
    schedule.run_pending()
    time.sleep(1)

        取消任务 或者 取消定时任务

schedule.clear('secondary-one')  # 取消所有带有 'secondary-one' 标签的任务
schedule.clear()  # 取消所有任务

总结

        schedule 是一个很小的轻量级的库,可以解决一部分具有定时调度的问题。


本人技术有限,若有错误,欢迎指正。

更多推荐

schedule - 轻量级任务调度器/定时器