這篇文章主要講解了“Python定時(shí)庫APScheduler的原理及用法”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Python定時(shí)庫APScheduler的原理及用法”吧!
在沙坡頭等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站設(shè)計(jì)、做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需求定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,成都全網(wǎng)營銷推廣,外貿(mào)網(wǎng)站建設(shè),沙坡頭網(wǎng)站建設(shè)費(fèi)用合理。APScheduler簡(jiǎn)介
APscheduler全稱Advanced Python Scheduler
作用為在指定的時(shí)間規(guī)則執(zhí)行指定的作業(yè)。
指定時(shí)間規(guī)則的方式可以是間隔多久執(zhí)行,可以是指定日期時(shí)間的執(zhí)行,也可以類似Linux系統(tǒng)中Crontab中的方式執(zhí)行任務(wù)。
指定的任務(wù)就是一個(gè)Python函數(shù)。
APScheduler組件
APScheduler版本 3.6.3
APScheduler中幾個(gè)重要的概念
Job 作業(yè)
作用
Job作為APScheduler最小執(zhí)行單位。 創(chuàng)建Job時(shí)指定執(zhí)行的函數(shù),函數(shù)中所需參數(shù),Job執(zhí)行時(shí)的一些設(shè)置信息。
構(gòu)建說明
id:指定作業(yè)的唯一ID name:指定作業(yè)的名字 trigger:apscheduler定義的觸發(fā)器,用于確定Job的執(zhí)行時(shí)間,根據(jù)設(shè)置的trigger規(guī)則,計(jì)算得到下次執(zhí)行此job的 時(shí)間, 滿足時(shí)將會(huì)執(zhí)行 executor:apscheduler定義的執(zhí)行器,job創(chuàng)建時(shí)設(shè)置執(zhí)行器的名字,根據(jù)字符串你名字到scheduler獲取到執(zhí)行此 job的 執(zhí)行器,執(zhí)行job指定的函數(shù) max_instances:執(zhí)行此job的大實(shí)例數(shù),executor執(zhí)行job時(shí),根據(jù)job的id來計(jì)算執(zhí)行次數(shù),根據(jù)設(shè)置的大實(shí)例數(shù) 來確定是否可執(zhí)行 next_run_time:Job下次的執(zhí)行時(shí)間,創(chuàng)建Job時(shí)可以指定一個(gè)時(shí)間[datetime],不指定的話則默認(rèn)根據(jù)trigger獲取觸 發(fā)時(shí)間 misfire_grace_time:Job的延遲執(zhí)行時(shí)間,例如Job的計(jì)劃執(zhí)行時(shí)間是21:00:00,但因服務(wù)重啟或其他原因?qū)е?21:00:31才執(zhí)行,如果設(shè)置此key為40,則該job會(huì)繼續(xù)執(zhí)行,否則將會(huì)丟棄此job coalesce:Job是否合并執(zhí)行,是一個(gè)bool值。例如scheduler停止20s后重啟啟動(dòng),而job的觸發(fā)器設(shè)置為5s執(zhí)行 一次,因此此job錯(cuò)過了4個(gè)執(zhí)行時(shí)間,如果設(shè)置為是,則會(huì)合并到一次執(zhí)行,否則會(huì)逐個(gè)執(zhí)行 func:Job執(zhí)行的函數(shù) args:Job執(zhí)行函數(shù)需要的位置參數(shù) kwargs:Job執(zhí)行函數(shù)需要的關(guān)鍵字參數(shù)
Trigger 觸發(fā)器
Trigger綁定到Job,在scheduler調(diào)度篩選Job時(shí),根據(jù)觸發(fā)器的規(guī)則計(jì)算出Job的觸發(fā)時(shí)間,然后與當(dāng)前時(shí)間比較 確定此Job是否會(huì)被執(zhí)行,總之就是根據(jù)trigger規(guī)則計(jì)算出下一個(gè)執(zhí)行時(shí)間。 Trigger有多種種類,指定時(shí)間的DateTrigger,指定間隔時(shí)間的IntervalTrigger,像Linux的crontab 一樣的CronTrigger
目前APScheduler支持觸發(fā)器:
DateTrigger IntervalTrigger CronTrigger
Executor 執(zhí)行器
Executor在scheduler中初始化,另外也可通過scheduler的add_executor動(dòng)態(tài)添加Executor。 每個(gè)executor都會(huì)綁定一個(gè)alias,這個(gè)作為唯一標(biāo)識(shí)綁定到Job,在實(shí)際執(zhí)行時(shí)會(huì)根據(jù)Job綁定的executor 找到實(shí)際的執(zhí)行器對(duì)象,然后根據(jù)執(zhí)行器對(duì)象執(zhí)行Job Executor的種類會(huì)根據(jù)不同的調(diào)度來選擇,如果選擇AsyncIO作為調(diào)度的庫,那么選擇AsyncIOExecutor,如果 選擇tornado作為調(diào)度的庫,選擇TornadoExecutor,如果選擇啟動(dòng)進(jìn)程作為調(diào)度, 選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以 Executor的選擇需要根據(jù)實(shí)際的scheduler來選擇不同的執(zhí)行器
目前APScheduler支持的Executor:
AsyncIOExecutor GeventExecutor ThreadPoolExecutor ProcessPoolExecutor TornadoExecutor TwistedExecutor
Jobstore 作業(yè)存儲(chǔ)
Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動(dòng)態(tài)添加Jobstore。每個(gè)jobstore都會(huì) 綁定一個(gè)alias,scheduler在Add Job時(shí),根據(jù)指定的jobstore在scheduler中找到相應(yīng)的jobstore, 并將job添加到j(luò)obstore中。 Jobstore主要是通過pickle庫的loads和dumps【實(shí)現(xiàn)核心是通過python的__getstate__和__setstate__重寫實(shí)現(xiàn)】, 每次變更時(shí)將Job動(dòng)態(tài)保存到存儲(chǔ)中,使用時(shí)再動(dòng)態(tài)的加載出來,作為存儲(chǔ)的可以是redis,也可以是數(shù)據(jù)庫【通過 sqlarchemy這個(gè)庫集成多種數(shù)據(jù)庫】,也可以是mongodb等
目前APScheduler支持的Jobstore:
MemoryJobStore MongoDBJobStore RedisJobStore RethinkDBJobStore SQLAlchemyJobStore ZooKeeperJobStore
Event 事件
Event是APScheduler在進(jìn)行某些操作時(shí)觸發(fā)相應(yīng)的事件,用戶可以自定義一些函數(shù)來監(jiān)聽這些事件, 當(dāng)觸發(fā)某些Event時(shí),做一些具體的操作 常見的比如。Job執(zhí)行異常事件 EVENT_JOB_ERROR。Job執(zhí)行時(shí)間錯(cuò)過事件 EVENT_JOB_MISSED。
目前APScheduler定義的Event
EVENT_SCHEDULER_STARTED EVENT_SCHEDULER_START EVENT_SCHEDULER_SHUTDOWN EVENT_SCHEDULER_PAUSED EVENT_SCHEDULER_RESUMED EVENT_EXECUTOR_ADDED EVENT_EXECUTOR_REMOVED EVENT_JOBSTORE_ADDED EVENT_JOBSTORE_REMOVED EVENT_ALL_JOBS_REMOVED EVENT_JOB_ADDED EVENT_JOB_REMOVED EVENT_JOB_MODIFIED EVENT_JOB_EXECUTED EVENT_JOB_ERROR EVENT_JOB_MISSED EVENT_JOB_SUBMITTED EVENT_JOB_MAX_INSTANCES
Listener 監(jiān)聽事件
Listener表示用戶自定義監(jiān)聽的一些Event,當(dāng)Job觸發(fā)了EVENT_JOB_MISSED事件時(shí)可以根據(jù)需求做一些其他處理。
Scheduler 調(diào)度器
Scheduler是APScheduler的核心,所有相關(guān)組件通過其定義。scheduler啟動(dòng)之后,將開始按照配置的任務(wù)進(jìn)行調(diào)度。 除了依據(jù)所有定義Job的trigger生成的將要調(diào)度時(shí)間喚醒調(diào)度之外。當(dāng)發(fā)生Job信息變更時(shí)也會(huì)觸發(fā)調(diào)度。 scheduler可根據(jù)自身的需求選擇不同的組件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則選擇 TornadoScheduler。
目前APScheduler支持的Scheduler:
AsyncIOScheduler BackgroundScheduler BlockingScheduler GeventScheduler QtScheduler TornadoScheduler TwistedScheduler
這里前面一期的Python學(xué)習(xí)教程里有提到過!
Scheduler工作流程圖
這里重點(diǎn)挑選兩個(gè)重要的流程畫一個(gè)簡(jiǎn)陋的流程圖,來看一下scheduler的工作原理。其一個(gè)是添加add job,另一是scheduler每次喚醒調(diào)度時(shí)的執(zhí)行過程
Scheduler添加job流程
Scheduler調(diào)度流程
APScheduler使用示例
AsyncIO調(diào)度示例
import asyncio import datetime from apscheduler.events import EVENT_JOB_EXECUTED from apscheduler.executors.asyncio import AsyncIOExecutor from apscheduler.jobstores.redis import RedisJobStore # 需要安裝redis from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger # 定義jobstore 使用redis 存儲(chǔ)job信息 default_redis_jobstore = RedisJobStore( db=2, jobs_key="apschedulers.default_jobs", run_times_key="apschedulers.default_run_times", host="127.0.0.1", port=6379, password="test" ) # 定義executor 使用asyncio是的調(diào)度執(zhí)行規(guī)則 first_executor = AsyncIOExecutor() # 初始化scheduler時(shí),可以直接指定jobstore和executor init_scheduler_options = { "jobstores": { # first 為 jobstore的名字,在創(chuàng)建Job時(shí)直接直接此名字即可 "default": default_redis_jobstore }, "executors": { # first 為 executor 的名字,在創(chuàng)建Job時(shí)直接直接此名字,執(zhí)行時(shí)則會(huì)使用此executor執(zhí)行 "first": first_executor }, # 創(chuàng)建job時(shí)的默認(rèn)參數(shù) "job_defaults": { 'coalesce': False, # 是否合并執(zhí)行 'max_instances': 1 # 大實(shí)例數(shù) } } # 創(chuàng)建scheduler scheduler = AsyncIOScheduler(**init_scheduler_options) # 啟動(dòng)調(diào)度 scheduler.start() second_redis_jobstore = RedisJobStore( db=2, jobs_key="apschedulers.second_jobs", run_times_key="apschedulers.second_run_times", host="127.0.0.1", port=6379, password="test" ) scheduler.add_jobstore(second_redis_jobstore, 'second') # 定義executor 使用asyncio是的調(diào)度執(zhí)行規(guī)則 second_executor = AsyncIOExecutor() scheduler.add_executor(second_executor, "second") # *********** 關(guān)于 APScheduler中有關(guān)Event相關(guān)使用示例 ************* # 定義函數(shù)監(jiān)聽事件 def job_execute(event): """ 監(jiān)聽事件處理 :param event: :return: """ print( "job執(zhí)行job:\ncode => {}\njob.id => {}\njobstore=>{}".format( event.code, event.job_id, event.jobstore )) # 給EVENT_JOB_EXECUTED[執(zhí)行完成job事件]添加回調(diào),這里就是每次Job執(zhí)行完成了我們就輸出一些信息 scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED) # *********** 關(guān)于 APScheduler中有關(guān)Job使用示例 ************* # 使用的是asyncio,所以job執(zhí)行的函數(shù)可以是一個(gè)協(xié)程,也可以是一個(gè)普通函數(shù),AsyncIOExecutor會(huì)根據(jù)配置的函數(shù)來進(jìn)行調(diào)度, # 如果是協(xié)程則會(huì)直接丟入到loop中,如果是普通函數(shù)則會(huì)啟用線程處理 # 我們定義兩個(gè)函數(shù)來看看執(zhí)行的結(jié)果 def interval_func(message): print("現(xiàn)在時(shí)間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print("我是普通函數(shù)") print(message) async def async_func(message): print("現(xiàn)在時(shí)間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) print("我是協(xié)程") print(message) # 將上述的兩個(gè)函數(shù)按照不同的方式創(chuàng)造觸發(fā)器來執(zhí)行 # *********** 關(guān)于 APScheduler中有關(guān)Trigger使用示例 ************* # 使用Trigger有兩種方式,一種是用類創(chuàng)建使用,另一個(gè)是使用字符串的方式 # 使用字符串指定別名, scheduler初始化時(shí)已將其定義的trigger加載,所以指定字符串可以直接使用 if scheduler.get_job("interval_func_test", "default"): # 存在的話,先刪除 scheduler.remove_job("interval_func_test", "default") # 立馬開始 2分鐘后結(jié)束, 每10s執(zhí)行一次 存儲(chǔ)到first jobstore second執(zhí)行 scheduler.add_job(interval_func, "interval", args=["我是10s執(zhí)行一次,存放在jobstore default, executor default"], seconds=10, id="interval_func_test", jobstore="default", executor="default", start_date=datetime.datetime.now(), end_date=datetime.datetime.now() + datetime.timedelta(seconds=240)) # 先創(chuàng)建tigger trigger = IntervalTrigger(seconds=5) if scheduler.get_job("interval_func_test_2", "second"): # 存在的話,先刪除 scheduler.remove_job("interval_func_test_2", "second") # 每隔5s執(zhí)行一次 scheduler.add_job(async_func, trigger, args=["我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second"], id="interval_func_test_2", jobstore="second", executor="second") # 使用協(xié)程的函數(shù)執(zhí)行,且使用cron的方式配置觸發(fā)器 if scheduler.get_job("cron_func_test", "default"): # 存在的話,先刪除 scheduler.remove_job("cron_func_test", "default") # 立馬開始 每10s執(zhí)行一次 scheduler.add_job(async_func, "cron", args=["我是 每分鐘 30s 時(shí)執(zhí)行一次,存放在jobstore default, executor default"], second='30', id="cron_func_test", jobstore="default", executor="default") # 先創(chuàng)建tigger trigger = CronTrigger(second='20,40') if scheduler.get_job("cron_func_test_2", "second"): # 存在的話,先刪除 scheduler.remove_job("cron_func_test_2", "second") # 每隔5s執(zhí)行一次 scheduler.add_job(async_func, trigger, args=["我是每分鐘 20s 40s時(shí)各執(zhí)行一次,存放在jobstore second, executor = second"], id="cron_func_test_2", jobstore="second", executor="second") # 使用創(chuàng)建trigger對(duì)象直接創(chuàng)建 print("啟動(dòng): {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) asyncio.get_event_loop().run_forever()
輸出結(jié)果部分截取
啟動(dòng)之后,每隔5s運(yùn)行一次的JOB
啟動(dòng): 2019-12-05 14:13:11 【這部分是定義的協(xié)程函數(shù)輸出的內(nèi)容】 現(xiàn)在時(shí)間: 2019-12-05 14:13:16 我是協(xié)程 我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second 【這部分是監(jiān)聽job執(zhí)行完成之后的回調(diào)輸出】 job執(zhí)行job: code => 4096 job.id => interval_func_test_2 jobstore=>second
在20s和40s時(shí)各執(zhí)行一次的Job
現(xiàn)在時(shí)間: 2019-12-05 14:13:20 我是協(xié)程 我是每分鐘 20s 40s時(shí)各執(zhí)行一次,存放在jobstore second, executor = second job執(zhí)行job: code => 4096 job.id => cron_func_test_2 jobstore=>second
每隔10s執(zhí)行一次的job
現(xiàn)在時(shí)間: 2019-12-05 14:13:21 我是普通函數(shù) 我是10s執(zhí)行一次,存放在jobstore default, executor default 現(xiàn)在時(shí)間: 2019-12-05 14:13:21 我是協(xié)程 我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second job執(zhí)行job: code => 4096 job.id => interval_func_test jobstore=>default job執(zhí)行job: code => 4096 job.id => interval_func_test_2 jobstore=>second
每隔5s執(zhí)行一次的Job
現(xiàn)在時(shí)間: 2019-12-05 14:13:26 我是協(xié)程 我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second job執(zhí)行job: code => 4096 job.id => interval_func_test_2 jobstore=>second
每分鐘30s時(shí)執(zhí)行一次
現(xiàn)在時(shí)間: 2019-12-05 14:13:30 我是協(xié)程 我是 每分鐘 30s 時(shí)執(zhí)行一次,存放在jobstore default, executor default job執(zhí)行job: code => 4096 job.id => cron_func_test jobstore=>default
感謝各位的閱讀,以上就是“Python定時(shí)庫APScheduler的原理及用法”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Python定時(shí)庫APScheduler的原理及用法這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!