2021-07-10 12:21:25 Cyberbolt
APScheduler 可以为 Python 开发定时任务 或 周期循环任务,有利于系统脚本开发。
首选安装方法是使用pip:
pip install apscheduler
同步任务示例
例 1 .函数 1 秒执行一次
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
#待执行函数
def task():
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sched = BlockingScheduler(timezone="Asia/Shanghai") #初始化时指定时区
sched.add_job(task, 'interval', seconds=1) #1秒钟执行一次
def main():
sched.start() #前台挂起任务
if __name__ == '__main__':
main()
interval 还有以下参数可以自定义:
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
jitter (int|None) – delay the job execution by jitter
seconds at most
例 2.函数每周五23:00执行一次
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
#待执行函数
def task():
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sched = BlockingScheduler(timezone="Asia/Shanghai") #初始化时指定时区
sched.add_job(task, 'cron', day_of_week=4, hour=23, minute=0, second=0) #每周五23:00执行一次
def main():
sched.start() #前台挂起任务
if __name__ == '__main__':
main()
cron 还有以下参数可以自定义:
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of month (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
jitter (int|None) – delay the job execution by jitter
seconds at most
例 3.异步任务示例,函数 1 秒执行一次
如果您不希望执行任务造成阻塞,可以使用非阻塞的异步任务。(此处选择 BackgroundScheduler)
其他参数不变,仅仅在导包和初始化时改变。
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
#待执行函数
def task():
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sched = BackgroundScheduler(timezone="Asia/Shanghai") #初始化时指定时区
sched.add_job(task, 'interval', seconds=1) #1秒钟执行一次
def main():
sched.start() #后台挂起任务,此处仅为示例,实际使用时需要主进程也保持运行
if __name__ == '__main__':
main()
除了 BackgroundScheduler,其他选项请参考 APScheduler 官方文档,此处简单罗列:
BlockingScheduler
:当调度程序是您的进程中唯一运行的东西时使用
BackgroundScheduler
:当您不使用以下任何框架时使用,并且希望调度程序在您的应用程序的后台运行
AsyncIOScheduler
:如果您的应用程序使用 asyncio 模块,请使用
GeventScheduler
: 如果您的应用程序使用 gevent,请使用
TornadoScheduler
:如果您正在构建 Tornado 应用程序,请使用
TwistedScheduler
:如果您正在构建 Twisted 应用程序,请使用
QtScheduler
: 如果您正在构建 Qt 应用程序,请使用