目录
前言
schedule库的安装
schedule库的使用
1.简单使用
2.定时/周期执行配置
3.标签与任务的取消
总结
前言
如果你想要在某个系统上,定时/周期性执行python脚本的话,可以考虑在python脚本中使用轻量级的 schedule 库。这个库入手简单方便。
schedule库的安装
因为 schedule 库属于第三方模块,在使用之前需要提前安装,打开终端:
安装命令: ` pip install schedule `
schedule库的使用
1.简单使用
示例1 使用 schedule 的常见步骤
# 第一步 引入依赖
import schedule
# 第二步 创建需要调度的任务 常常是封装成一个函数/一个python文件
def task():
# 具体逻辑
print("我在天上飘")
# 第三步 配置schedule执行的时机
schedule.every(2).seconds.do(task) # 每2秒执行 task 任务
# 第四步 维持schedule存活状态 - 常常与逻辑死循环结合
while True:
schedule.run_pending() # 保持schedule一直在运行
示例2 当某一个任务并不会被多个逻辑征用的时候,可以使用装饰器
# 与上文相同的作用
from schedule import repeat, every, run_pending
@repeat(every(2).seconds)
def task():
print("我在天上飘")
while True:
run_pending()
示例3 每天上午9:00-10:30,下午13:00-15:00这两个时间段,每5秒发送一次请求
import csv
import re
import requests
import schedule
import time
url = ''
header = {}
is_running = False # 控制是否发送请求
def get_data():
# 发送请求,获取数据的逻辑
response = requests.get(url=url, headers=header)
# 数据处理逻辑 (此处省略)
def task1():
"""
控制开始发送请求
"""
global is_running
is_running = True
while is_running:
get_data()
time.sleep(5)
def task2():
"""
控制结束发送请求
"""
global is_running
is_running = False
if __name__ == '__main__':
schedule.every().day.at("09:00").do(task1)
schedule.every().day.at("10:30").do(task2)
schedule.every().day.at("13:00").do(task1)
schedule.every().day.at("15:00").do(task2)
while True:
schedule.run_pending()
time.sleep(1)
示例4 向任务传参
import schedule
def task(message):
print(message)
schedule.every(2).seconds.do(task, "我在天上飘")
while True:
schedule.run_pending()
time.sleep(1)
2.定时/周期执行配置
示例如下 - 可以参考模仿着写
import schedule
import time
def task():
print("我在天上飘")
schedule.every().seconds.do(task) # 每秒运行 1 次
schedule.every(2).seconds.do(task) # 每2秒运行 1 次
schedule.every().minutes.do(task) # 每分钟运行 1 次
schedule.every().hours.do(task) # 每小时运行 1 次
schedule.every().days.do(task) # 每天运行 1 次
schedule.every().weeks.do(task) # 每周运行 1 次
schedule.every().monday.do(task) # 每周运行 1 次
schedule.every().minute.at(":30").do(task) # 每分钟的第30秒运行 1 次
schedule.every().day.at("08:00").do(task) # 每天上午8点运行 1 次
schedule.every().monday.at("23:30").do(task) # 每周一晚上11:30运行 1 次
schedule.every().tuesday.at("23:30").do(task) # 每周二晚上11:30运行 1 次
schedule.every().wednesday.at("23:30").do(task) # 每周三晚上11:30运行 1 次
schedule.every().thursday.at("23:30").do(task) # 每周四晚上11:30运行 1 次
schedule.every().friday.at("23:30").do(task) # 每周五晚上11:30运行 1 次
schedule.every().saturday.at("23:30").do(task) # 每周六晚上11:30运行 1 次
schedule.every().sunday.at("23:30").do(task) # 每周日晚上11:30运行 1 次
while True:
schedule.run_pending()
time.sleep(1)
3.标签与任务的取消
在每个 do(func) 后面都可以用 tag 标签来标记,标签可以不唯一
import schedule
import time
def task(message):
print('{},真滴吗?'.format(message))
# 为每个调度任务配上两个标签,标签个数不做限制
schedule.every().seconds.do(task, "我在天上飘").tag("second-task", "first-one")
schedule.every().minutes.do(task, "我在地上跑").tag("second-task", "secondary-one")
while True:
schedule.run_pending()
time.sleep(1)
取消任务 或者 取消定时任务
schedule.clear('secondary-one') # 取消所有带有 'secondary-one' 标签的任务
schedule.clear() # 取消所有任务
总结
schedule 是一个很小的轻量级的库,可以解决一部分具有定时调度的问题。
本人技术有限,若有错误,欢迎指正。
更多推荐
schedule - 轻量级任务调度器/定时器
发布评论