当前位置:主页 > 软件编程 > Python代码 >

Python定时任务工具之APScheduler使用方式

时间:2022-09-04 10:24:38 | 栏目:Python代码 | 点击:

APScheduler (advanceded python scheduler)是一款Python开发的定时任务工具。

文档地址 apscheduler.readthedocs.io/en/latest/u…

特点:

1 安装

pip install apscheduler

2 组成

3 使用方式

from apscheduler.schedulers.background import BlockingScheduler
#
 创建定时任务的调度器对象
scheduler = BlockingScheduler()
# 创建执行器
executors = {
 'default': ThreadPoolExecutor(20),
}
# 定义定时任务
def my_job(param1, param2): # 参数通过add_job()args传递传递过来
 print(param1) # 100
 print(param2) # python
# 向调度器中添加定时任务
scheduler.add_job(my_job, 'date', args=[100, 'python'], executors=executors)
# 启动定时任务调度器工作
scheduler.start()


4 调度器 Scheduler

负责管理定时任务

BlockingScheduler : 作为独立进程时使用

from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # 此处程序会发生阻塞
BackgroundScheduler : 在框架程序(如Django、Flask)中使用.
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # 此处程序不会发生阻塞

4 执行器 executors

在定时任务该执行时,以进程或线程方式执行任务

ThreadPoolExecutor
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers) 

使用方法

from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
  'default': ThreadPoolExecutor(20) # 最多20个线程同时执行
 }
 scheduler = BackgroundScheduler(executors=executors)
ProcessPoolExecutor

from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)

使用方法

from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
  'default': ProcessPoolExecutor(5) # 最多5个进程同时执行
 }
scheduler = BackgroundScheduler(executors=executors)

5 触发器 Trigger

指定定时任务执行的时机。

1) date 在特定的时间日期执行

from datetime import date
# 在2019年11月6日00:00:00执行
sched.add_job(my_job, 'date', run_date=date(2019, 11, 6))
# 在2019年11月6日16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5))
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05')
# 立即执行
sched.add_job(my_job, 'date') 
sched.start()

2) interval 经过指定的时间间隔执行

weeks (int) ?C number of weeks to wait
days (int) ?C number of days to wait
hours (int) ?C number of hours to wait
minutes (int) ?C number of minutes to wait
seconds (int) ?C number of seconds to wait
start_date (datetime|str) ?C starting point for the interval calculation
end_date (datetime|str) ?C latest possible date/time to trigger on
timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations
from datetime import datetime
# 每两小时执行一次
sched.add_job(job_function, 'interval', hours=2)
# 在2012年10月10日09:30:00 到2014年6月15日11:00:00的时间内,每两小时执行一次
sched.add_job(job_function, 'interval', hours=2, start_date='2012-10-10 09:30:00', end_date='2014-06-15 11:00:00')

3) cron 按指定的周期执行

year (int|str) ?C 4-digit year

month (int|str) ?C month (1-12)

day (int|str) ?C day of the (1-31)

week (int|str) ?C ISO week (1-53)

day_of_week (int|str) ?C number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)

hour (int|str) ?C hour (0-23)

minute (int|str) ?C minute (0-59)

second (int|str) ?C second (0-59)

start_date (datetime|str) ?C earliest possible date/time to trigger on (inclusive)

end_date (datetime|str) ?C latest possible date/time to trigger on (inclusive)

timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations (defaults to scheduler timezone)

# 在6、7、8、11、12月的第三个周五的00:00, 01:00, 02:00和03:00 执行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

# 在2014年5月30日前的周一到周五的5:30执行
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')


6.任务存储

MemoryJobStore 默认内存存储
MongoDBJobStore 任务保存到MongoDB
from apscheduler.jobstores.mongodb import MongoDB
JobStoreMongoDBJobStore()复制代码
RedisJobStore 任务保存到redis
from apscheduler.jobstores.redis import RedisJobStore
RedisJobStore()

7 配置方法

方法1

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
 'default': ThreadPoolExecutor(20),
}
conf = { # redis配置
 "host":127.0.0.1,
 "port":6379,
 "db":15, # 连接15号数据库
 "max_connections":10 # redis最大支持300个连接数
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) # 添加任务持久化存储方式,如果未安装redis可省略此步骤

方法2

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
 'default': {'type': 'threadpool', 'max_workers': 20},
 'processpool': ProcessPoolExecutor(max_workers=5)
}
scheduler = BackgroundScheduler()
# .. 此处可以编写其他代码
# 使用configure方法进行配置
scheduler.configure(executors=executors)

8 启动

scheduler.start()

对于BlockingScheduler ,程序会阻塞在这,防止退出,作为独立进程时使用。(可以用来生成静态页面)

对于BackgroundScheduler,可以在应用程序中使用。不再以单独的进程使用。(如30分钟内取消订单)

9 扩展

任务管理

方式1

job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任务
job.remove() # 删除任务
job.pause() # 暂定任务
job.resume() # 恢复任务

方式2

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任务 
scheduler.remove_job('my_job_id') # 删除任务
scheduler.pause_job('my_job_id') # 暂定任务
scheduler.resume_job('my_job_id') # 恢复任务

调整任务调度周期

job.modify(max_instances=6, name='Alternate name')
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')复制代码
停止APScheduler运行
scheduler.shutdown()

10 综合使用

这里提供30分钟取消订单支付的思路,可以使用Flask或者Django程序都能实现,这里是在django应用中动态的添加一个定时任务,调度器需要使用BackgroundScheduler。下面先定义执行订单取消的任务。

from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BackgroundScheduler
from goods.models import SKU
from orders.models import OrderGoods
def cancel_order_job(order_id, sku_id, stock, sales):
 # 将订单商品和订单信息筛选出来
 order_goods = OrderGoods.objects.filter( order_id=order_id, sku_id=sku_id)
 order_goods.delete() # 删除订单
 try:
  sku = SKU.objects.get(id=sku_id)
  sku.stock += stock # 订单删掉后商品表里的库存恢复
  sku.sales -= sales # 商品表里销量还原
  sku.save()
 except Exception as e:
  print(e)


具体操作哪些表要根据自身表的设计来定,大致是上面的思路。然后在生成订单的视图中同时生成取消订单的任务。然后将取消订单cancel_order_job()需要的参数传递过去,注意要判定当前订单的状态为未支付状态。

from datetime import datetime, timedelta

class OrderCommitView(View):

 def post(self, request):
 # ... 此处省略生成订单相关逻辑
   if status == OrderInfo.STATUS.UNPADED: # 待支付状态
  
    executors = {
    'default': ThreadPoolExecutor(10) 
    }
    now = datetime.now()
    delay = now + timedelta(minutes=30) # 从当前下订单延时30分钟后
    scheduler = BackgroundScheduler(executors=executors)
    # 添加定时任务
    scheduler.add_job(cancel_order_job, 'date', run_date=delay,
    args=[order_id, sku.id, sku.stock, sku.sales])
    scheduler.start()
    # ....省略其他业务及返回


注意: 如果需要周期性的执行一个定时任务,如果用到了django中模型类或者Flask的配置信息等相关信息,需要将框架的配置信息导入。

总结

您可能感兴趣的文章:

相关文章