真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Python的周期任務(wù)調(diào)度工具是什么

本篇內(nèi)容主要講解“Python的周期任務(wù)調(diào)度工具是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Python的周期任務(wù)調(diào)度工具是什么”吧!

創(chuàng)新互聯(lián)公司主營(yíng)東港網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,重慶APP開(kāi)發(fā),東港h5微信小程序搭建,東港網(wǎng)站營(yíng)銷(xiāo)推廣歡迎東港等地區(qū)企業(yè)咨詢(xún)

如果你想周期性地執(zhí)行某個(gè) Python 腳本,最出名的選擇應(yīng)該是 Crontab 腳本,但是 Crontab 具有以下缺點(diǎn):

  •  1.不方便執(zhí)行秒級(jí)任務(wù)。

  •   2.當(dāng)需要執(zhí)行的定時(shí)任務(wù)有上百個(gè)的時(shí)候,Crontab 的管理就會(huì)特別不方便。

還有一個(gè)選擇是 Celery,但是 Celery 的配置比較麻煩,如果你只是需要一個(gè)輕量級(jí)的調(diào)度工具,Celery 不會(huì)是一個(gè)好選擇。

在你想要使用一個(gè)輕量級(jí)的任務(wù)調(diào)度工具,而且希望它盡量簡(jiǎn)單、容易使用、不需要外部依賴(lài),最好能夠容納 Crontab 的所有基本功能,那么 Schedule 模塊是你的不二之選。

使用它來(lái)調(diào)度任務(wù)可能只需要幾行代碼,感受一下:

# Python 實(shí)用寶典  import schedule  import time  def job():      print("I'm working...")  schedule.every(10).minutes.do(job)  while True:      schedule.run_pending()      time.sleep(1)

上面的代碼表示每10分鐘執(zhí)行一次 job 函數(shù),非常簡(jiǎn)單方便。你只需要引入 schedule 模塊,通過(guò)調(diào)用 scedule.every(時(shí)間數(shù)).時(shí)間類(lèi)型.do(job)  發(fā)布周期任務(wù)。

發(fā)布后的周期任務(wù)需要用 run_pending 函數(shù)來(lái)檢測(cè)是否執(zhí)行,因此需要一個(gè) While 循環(huán)不斷地輪詢(xún)這個(gè)函數(shù)。

下面具體講講Schedule模塊的安裝和初級(jí)、進(jìn)階使用方法。

1.準(zhǔn)備

開(kāi)始之前,你要確保Python和pip已經(jīng)成功安裝在電腦上,如果沒(méi)有,可以訪(fǎng)問(wèn)這篇文章:超詳細(xì)Python安裝指南 進(jìn)行安裝。

(可選1) 如果你用Python的目的是數(shù)據(jù)分析,可以直接安裝Anaconda:Python數(shù)據(jù)分析與挖掘好幫手—Anaconda,它內(nèi)置了Python和pip.

(可選2) 此外,推薦大家用VSCode編輯器,它有許多的優(yōu)點(diǎn):Python 編程的最好搭檔—VSCode 詳細(xì)指南。

請(qǐng)選擇以下任一種方式輸入命令安裝依賴(lài):

1. Windows 環(huán)境 打開(kāi) Cmd (開(kāi)始-運(yùn)行-CMD)。

2. MacOS 環(huán)境 打開(kāi) Terminal (command+空格輸入Terminal)。

3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.

pip install schedule

2.基本使用

最基本的使用在文首已經(jīng)提到過(guò),下面給大家展示更多的調(diào)度任務(wù)例子:

# Python 實(shí)用寶典  import schedule  import time  def job():      print("I'm working...")  # 每十分鐘執(zhí)行任務(wù)  schedule.every(10).minutes.do(job)  # 每個(gè)小時(shí)執(zhí)行任務(wù)  schedule.every().hour.do(job)  # 每天的10:30執(zhí)行任務(wù)  schedule.every().day.at("10:30").do(job)  # 每個(gè)月執(zhí)行任務(wù)  schedule.every().monday.do(job)  # 每個(gè)星期三的13:15分執(zhí)行任務(wù)  schedule.every().wednesday.at("13:15").do(job)  # 每分鐘的第17秒執(zhí)行任務(wù)  schedule.every().minute.at(":17").do(job)  while True:      schedule.run_pending()      time.sleep(1)

可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過(guò)如果你想只運(yùn)行一次任務(wù)的話(huà),可以這么配:

# Python 實(shí)用寶典  import schedule  import time  def job_that_executes_once():      # 此處編寫(xiě)的任務(wù)只會(huì)執(zhí)行一次...      return schedule.CancelJob  schedule.every().day.at('22:30').do(job_that_executes_once)  while True:      schedule.run_pending()      time.sleep(1)

參數(shù)傳遞

如果你有參數(shù)需要傳遞給作業(yè)去執(zhí)行,你只需要這么做:

# Python 實(shí)用寶典  import schedule  def greet(name):      print('Hello', name)  # do() 將額外的參數(shù)傳遞給job函數(shù)  schedule.every(2).seconds.do(greet, name='Alice')  schedule.every(4).seconds.do(greet, name='Bob')

獲取目前所有的作業(yè)

如果你想獲取目前所有的作業(yè):

# Python 實(shí)用寶典  import schedule  def hello():      print('Hello world')  schedule.every().second.do(hello)  all_jobs = schedule.get_jobs()

取消所有作業(yè)

如果某些機(jī)制觸發(fā)了,你需要立即清除當(dāng)前程序的所有作業(yè):

# Python 實(shí)用寶典  import schedule  def greet(name):      print('Hello {}'.format(name))  schedule.every().second.do(greet)  schedule.clear()

標(biāo)簽功能

在設(shè)置作業(yè)的時(shí)候,為了后續(xù)方便管理作業(yè),你可以給作業(yè)打個(gè)標(biāo)簽,這樣你可以通過(guò)標(biāo)簽過(guò)濾獲取作業(yè)或取消作業(yè)。

# Python 實(shí)用寶典  import schedule  def greet(name):      print('Hello {}'.format(name))  # .tag 打標(biāo)簽  schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')  schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')  schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')  schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')   # get_jobs(標(biāo)簽):可以獲取所有該標(biāo)簽的任務(wù)  friends = schedule.get_jobs('friend')  # 取消所有 daily-tasks 標(biāo)簽的任務(wù)  schedule.clear('daily-tasks')

設(shè)定作業(yè)截止時(shí)間

如果你需要讓某個(gè)作業(yè)到某個(gè)時(shí)間截止,你可以通過(guò)這個(gè)方法:

# Python 實(shí)用寶典  import schedule  from datetime import datetime, timedelta, time  def job():      print('Boo')  # 每個(gè)小時(shí)運(yùn)行作業(yè),18:30后停止  schedule.every(1).hours.until("18:30").do(job)  # 每個(gè)小時(shí)運(yùn)行作業(yè),2030-01-01 18:33 today  schedule.every(1).hours.until("2030-01-01 18:33").do(job)  # 每個(gè)小時(shí)運(yùn)行作業(yè),8個(gè)小時(shí)后停止  schedule.every(1).hours.until(timedelta(hours=8)).do(job)  # 每個(gè)小時(shí)運(yùn)行作業(yè),11:32:42后停止  schedule.every(1).hours.until(time(11, 33, 42)).do(job)  # 每個(gè)小時(shí)運(yùn)行作業(yè),2020-5-17 11:36:20后停止  schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

截止日期之后,該作業(yè)將無(wú)法運(yùn)行。

立即運(yùn)行所有作業(yè),而不管其安排如何

如果某個(gè)機(jī)制觸發(fā)了,你需要立即運(yùn)行所有作業(yè),可以調(diào)用 schedule.run_all() :

# Python 實(shí)用寶典  import schedule  def job_1():      print('Foo')  def job_2():      print('Bar')  schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2)  schedule.run_all()  # 立即運(yùn)行所有作業(yè),每次作業(yè)間隔10秒  schedule.run_all(delay_seconds=10)

3.高級(jí)使用

裝飾器安排作業(yè)

如果你覺(jué)得設(shè)定作業(yè)這種形式太啰嗦了,也可以使用裝飾器模式:

# Python 實(shí)用寶典  from schedule import every, repeat, run_pending  import time  # 此裝飾器效果等同于 schedule.every(10).minutes.do(job)  @repeat(every(10).minutes) def job():      print("I am a scheduled job")  while True:      run_pending()      time.sleep(1)

并行執(zhí)行

默認(rèn)情況下,Schedule 按順序執(zhí)行所有作業(yè)。其背后的原因是,很難找到讓每個(gè)人都高興的并行執(zhí)行模型。

不過(guò)你可以通過(guò)多線(xiàn)程的形式來(lái)運(yùn)行每個(gè)作業(yè)以解決此限制:

# Python 實(shí)用寶典  import threading  import time  import schedule  def job1():      print("I'm running on thread %s" % threading.current_thread())  def job2():     print("I'm running on thread %s" % threading.current_thread())  def job3():      print("I'm running on thread %s" % threading.current_thread())  def run_threaded(job_func):      job_thread = threading.Thread(target=job_func)     job_thread.start()  schedule.every(10).seconds.do(run_threaded, job1)  schedule.every(10).seconds.do(run_threaded, job2)  schedule.every(10).seconds.do(run_threaded, job3)  while True:      schedule.run_pending()      time.sleep(1)

日志記錄

Schedule 模塊同時(shí)也支持 logging 日志記錄,這么使用:

# Python 實(shí)用寶典  import schedule  import logging  logging.basicConfig()  schedule_logger = logging.getLogger('schedule')  # 日志級(jí)別為DEBUG  schedule_logger.setLevel(level=logging.DEBUG)  def job():      print("Hello, Logs")  schedule.every().second.do(job)  schedule.run_all()  schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between  DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})  Hello, Logs  DEBUG:schedule:Deleting *all* jobs

異常處理

Schedule 不會(huì)自動(dòng)捕捉異常,它遇到異常會(huì)直接拋出,這會(huì)導(dǎo)致一個(gè)嚴(yán)重的問(wèn)題:后續(xù)所有的作業(yè)都會(huì)被中斷執(zhí)行,因此我們需要捕捉到這些異常。

你可以手動(dòng)捕捉,但是某些你預(yù)料不到的情況需要程序進(jìn)行自動(dòng)捕獲,加一個(gè)裝飾器就能做到了:

# Python 實(shí)用寶典  import functools  def catch_exceptions(cancel_on_failure=False):      def catch_exceptions_decorator(job_func):          @functools.wraps(job_func)          def wrapper(*args, **kwargs):              try:                  return job_func(*args, **kwargs)              except:                  import traceback                  print(traceback.format_exc())                  if cancel_on_failure:                      return schedule.CancelJob          return wrapper      return catch_exceptions_decorator  @catch_exceptions(cancel_on_failure=True)  def bad_task():      return 1 / 0  schedule.every(5).minutes.do(bad_task)

這樣,bad_task 在執(zhí)行時(shí)遇到的任何錯(cuò)誤,都會(huì)被 catch_exceptions  捕獲,這點(diǎn)在保證調(diào)度任務(wù)正常運(yùn)轉(zhuǎn)的時(shí)候非常關(guān)鍵。

到此,相信大家對(duì)“Python的周期任務(wù)調(diào)度工具是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!


分享題目:Python的周期任務(wù)調(diào)度工具是什么
URL分享:http://weahome.cn/article/iggssi.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部