真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

本篇內(nèi)容介紹了“如何使用Python Celery動(dòng)態(tài)添加定時(shí)任務(wù)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛(ài)。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:主機(jī)域名、網(wǎng)頁(yè)空間、營(yíng)銷軟件、網(wǎng)站建設(shè)、資源網(wǎng)站維護(hù)、網(wǎng)站推廣。

    一、背景

    實(shí)際工作中會(huì)有一些耗時(shí)的異步任務(wù)需要使用定時(shí)調(diào)度,比如發(fā)送郵件,拉取數(shù)據(jù),執(zhí)行定時(shí)腳本

    通過(guò)celery 實(shí)現(xiàn)調(diào)度主要思想是 通過(guò)引入中間人redis,啟動(dòng) worker 進(jìn)行任務(wù)執(zhí)行 ,celery-beat進(jìn)行定時(shí)任務(wù)數(shù)據(jù)存儲(chǔ)

    二、Celery動(dòng)態(tài)添加定時(shí)任務(wù)的官方文檔

    celery文檔:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery 自定義調(diào)度類說(shuō)明:

    自定義調(diào)度器類可以在命令行中指定(--scheduler參數(shù))

    django-celery-beat文檔 : https://pypi.org/project/django-celery-beat/

    關(guān)于django-celery-beat 插件的說(shuō)明:

    此擴(kuò)展使您能夠?qū)⒍ㄆ谌蝿?wù)計(jì)劃存儲(chǔ)在數(shù)據(jù)庫(kù)中,可以從 Django 管理界面管理周期性任務(wù),您可以在其中創(chuàng)建、編輯和刪除周期性任務(wù)以及它們應(yīng)該運(yùn)行的頻率

    三、celery簡(jiǎn)單實(shí)用

    3.1 基礎(chǔ)環(huán)境配置

    1. 安裝最新版本的Django

    pip3 install django #當(dāng)前我安裝的版本是 3.0.6

    2. 創(chuàng)建項(xiàng)目

    django-admin startproject typeidea
    django-admin startapp blog

    3.安裝 celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于動(dòng)態(tài)添加定時(shí)任務(wù)
    pip3 install django-celery-results
    pip3 install redis
    3.2 測(cè)試使用Celery應(yīng)用

    1. 創(chuàng)建blog目錄、新建task.py

    首先在Django項(xiàng)目中創(chuàng)建一個(gè)blog文件夾,并且在blog文件夾下創(chuàng)建tasks.py模塊, 如下:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    tasks.py代碼如下:

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做為broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 創(chuàng)建任務(wù)函數(shù)
    @app.task
    def my_task():
        print('任務(wù)正在執(zhí)行...')

    Celery第一個(gè)參數(shù)是給其設(shè)定一個(gè)名字, 第二參數(shù)我們?cè)O(shè)定一個(gè)中間人broker, 在這里我們使用Redis作為中間人。my_task函數(shù)是我們編寫的一個(gè)任務(wù)函數(shù), 通過(guò)加上裝飾器app.task, 將其注冊(cè)到broker的隊(duì)列中。

    2. 啟動(dòng)redis、創(chuàng)建worker

    現(xiàn)在我們?cè)趧?chuàng)建一個(gè)worker, 等待處理隊(duì)列中的任務(wù)。

    進(jìn)入項(xiàng)目的根目錄,執(zhí)行命令: celery -A celery_tasks.tasks worker -l info

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    3. 調(diào)用任務(wù)

    下面來(lái)測(cè)試一下功能,創(chuàng)建一個(gè)任務(wù),加入任務(wù)隊(duì)列中,提供worker執(zhí)行。

    進(jìn)入python終端, 執(zhí)行如下代碼:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    

    調(diào)用一個(gè)任務(wù)函數(shù),將會(huì)返回一個(gè)AsyncResult對(duì)象,這個(gè)對(duì)象可以用來(lái)檢查任務(wù)的狀態(tài)或者獲得任務(wù)的返回值。

    4. 查看結(jié)果

    在worker的終端查看任務(wù)執(zhí)行情況,可以看到已經(jīng)收到83484dfe-f729-417b-8e51-6c7ae32a1377 任務(wù),并打印了任務(wù)執(zhí)行信息

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    5. 存儲(chǔ)并查看任務(wù)執(zhí)行狀態(tài)

    把任務(wù)執(zhí)行結(jié)果賦值給ret,然后調(diào)用result() 會(huì)產(chǎn)生 DisabledBackend 報(bào)錯(cuò),可見(jiàn)沒(méi)有配置后端存儲(chǔ)的時(shí)候并不能保存任務(wù)執(zhí)行的狀態(tài)信息,下一節(jié)我們會(huì)講到如何配置backend保存任務(wù)執(zhí)行結(jié)果

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    四、配置backend存儲(chǔ)任務(wù)執(zhí)行結(jié)果

    如果我們想跟蹤任務(wù)的狀態(tài),Celery需要將結(jié)果保存到某個(gè)地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

    1. 添加backend參數(shù)

    在本例中我們使用Redis作為存儲(chǔ)結(jié)果的方案,通過(guò)Celery的backend參數(shù)來(lái)設(shè)定任務(wù)結(jié)果存儲(chǔ)地址。我們將tasks模塊修改如下:

    from celery import Celery
     
    # 使用redis作為broker以及backend
    app = Celery('celery_tasks.tasks',
                 broker='redis://127.0.0.1:6379/8',
                 backend='redis://127.0.0.1:6379/9')
     
    # 創(chuàng)建任務(wù)函數(shù)
    @app.task
    def my_task(a, b):
        print("任務(wù)函數(shù)正在執(zhí)行....")
        return a + b

    給Celery增加了backend參數(shù),指定redis作為結(jié)果存儲(chǔ),并將任務(wù)函數(shù)修改為兩個(gè)參數(shù),并且有返回值。

    2. 調(diào)用任務(wù)/查看任務(wù)執(zhí)行結(jié)果

    下面再來(lái)執(zhí)行調(diào)用一下這個(gè)任務(wù)看看。

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False

    再來(lái)看看worker的執(zhí)行情況,如下:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    可以看到celery任務(wù)已經(jīng)執(zhí)行成功了。

    但是這只是一個(gè)開(kāi)始,下一步要看看如何添加定時(shí)的任務(wù)。

    四、優(yōu)化Celery目錄結(jié)構(gòu)

    上面直接將Celery的應(yīng)用創(chuàng)建、配置、tasks任務(wù)全部寫在了一個(gè)文件,這樣在后面項(xiàng)目越來(lái)越大,也是不方便的。下面來(lái)拆分一下,并且添加一些常用的參數(shù)。

    基本結(jié)構(gòu)如下

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    $ vim typeidea/celery.py (Celery應(yīng)用文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name='typeidea'
    # set the default django setting module for the 'celery' program
    os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings')
    app = Celery(project_name)
     
    app.config_from_object('django.conf:settings')
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的參數(shù)文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 設(shè)置結(jié)果存儲(chǔ)
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
    # 設(shè)置代理人broker
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    # celery 的啟動(dòng)工作數(shù)量設(shè)置
    CELERY_WORKER_CONCURRENCY = 20
    # 任務(wù)預(yù)取功能,就是每個(gè)工作的進(jìn)程/線程在獲取任務(wù)的時(shí)候,會(huì)盡量多拿 n 個(gè),以保證獲取的通訊成本可以壓縮。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情況下可以防止死鎖
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 執(zhí)行多少個(gè)任務(wù)后進(jìn)行重啟操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果網(wǎng)絡(luò)資源有限,不建議開(kāi)足馬力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    vim blog/tasks.py (tasks 任務(wù)文件)

    import time
    from blog.celery import app
     
    # 創(chuàng)建任務(wù)函數(shù)
    @app.task
    def my_task(a, b, c):
        print('任務(wù)正在執(zhí)行...')
        print('任務(wù)1函數(shù)休眠10s')
        time.sleep(10)
        return a + b + c

    五、開(kāi)始使用django-celery-beat調(diào)度器

    使用 django-celery-beat 動(dòng)態(tài)添加定時(shí)任務(wù)  celery 4.x 版本在 django 框架中是使用 django-celery-beat 進(jìn)行動(dòng)態(tài)添加定時(shí)任務(wù)的。前面雖然已經(jīng)安裝了這個(gè)庫(kù),但是還要再說(shuō)明一下。

    1. 安裝 django-celery-beat

    pip3 install django-celery-beat

    2.在項(xiàng)目的 settings 文件配置 django-celery-beat

    INSTALLED_APPS = [
        'blog',
        'django_celery_beat',
        ...
    ]
     
    # Django設(shè)置時(shí)區(qū)
    LANGUAGE_CODE = 'zh-hans'  # 使用中國(guó)語(yǔ)言
    TIME_ZONE = 'Asia/Shanghai'  # 設(shè)置Django使用中國(guó)上海時(shí)間
    # 如果USE_TZ設(shè)置為True時(shí),Django會(huì)使用系統(tǒng)默認(rèn)設(shè)置的時(shí)區(qū),此時(shí)的TIME_ZONE不管有沒(méi)有設(shè)置都不起作用
    # 如果USE_TZ 設(shè)置為False,TIME_ZONE = 'Asia/Shanghai', 則使用上海的UTC時(shí)間。
    USE_TZ = False

    3. 創(chuàng)建 django-celery-beat 相關(guān)表

    執(zhí)行Django數(shù)據(jù)庫(kù)遷移: python manage.py migrate

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 為celery 設(shè)置環(huán)境變量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 創(chuàng)建celery app
    app = Celery('blog')
    # 從單獨(dú)的配置模塊中加載配置
    app.config_from_object(celeryconfig)
     
    # 設(shè)置app自動(dòng)加載任務(wù)
    app.autodiscover_tasks([
        'blog',
    ])

    配置 celeryconfig.py

    # 設(shè)置結(jié)果存儲(chǔ)
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
    # 設(shè)置代理人broker
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    # celery 的啟動(dòng)工作數(shù)量設(shè)置
    CELERY_WORKER_CONCURRENCY = 20
    # 任務(wù)預(yù)取功能,就是每個(gè)工作的進(jìn)程/線程在獲取任務(wù)的時(shí)候,會(huì)盡量多拿 n 個(gè),以保證獲取的通訊成本可以壓縮。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情況下可以防止死鎖
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 執(zhí)行多少個(gè)任務(wù)后進(jìn)行重啟操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果網(wǎng)絡(luò)資源有限,不建議開(kāi)足馬力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    編寫任務(wù) tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做為broker
    # app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1')
     
    # 創(chuàng)建任務(wù)函數(shù)
    @app.task
    def my_task(a, b, c):
        print('任務(wù)正在執(zhí)行...')
        print('任務(wù)1函數(shù)休眠10s')
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任務(wù)2函數(shù)正在執(zhí)行....")
        print('任務(wù)2函數(shù)休眠10s')
        time.sleep(10)

    5. 啟動(dòng)定時(shí)任務(wù)work

    啟動(dòng)定時(shí)任務(wù)首先需要有一個(gè)work執(zhí)行異步任務(wù),然后再啟動(dòng)一個(gè)定時(shí)器觸發(fā)任務(wù)。

    啟動(dòng)任務(wù) work

    $ celery -A blog worker -l info

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    啟動(dòng)定時(shí)器觸發(fā) beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    六、具體操作演練

    6.1 創(chuàng)建基于間隔時(shí)間的周期性任務(wù)

    1. 初始化周期間隔對(duì)象interval 對(duì)象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    ]>

    2.創(chuàng)建一個(gè)無(wú)參數(shù)的周期性間隔任務(wù)

    >>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',)
    

    beat 調(diào)度服務(wù)日志顯示如下:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    worker 服務(wù)日志顯示如下:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    3.創(chuàng)建一個(gè)帶參數(shù)的周期性間隔任務(wù)

    >>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30]))
    

    beat 調(diào)度服務(wù)日志結(jié)果:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    worker 服務(wù)日志結(jié)果:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    4.如何高并發(fā)執(zhí)行任務(wù)

    需要并行執(zhí)行任務(wù)的時(shí)候,就需要設(shè)置多個(gè)worker來(lái)執(zhí)行任務(wù)。

    6.2 創(chuàng)建一個(gè)不帶參數(shù)的周期性間隔任務(wù)

    1.初始化 crontab 的調(diào)度對(duì)象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute='*',
    ... hour='*',
    ... day_of_week='*',
    ... day_of_month='*',
    ... timezone=pytz.timezone('Asia/Shanghai')
    ... )

    2. 創(chuàng)建不帶參數(shù)的定時(shí)任務(wù)

    PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)

    beat 調(diào)度服務(wù)執(zhí)行結(jié)果

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    worker 執(zhí)行服務(wù)結(jié)果

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    6.3 周期性任務(wù)的查詢、刪除操作

    1. 周期性任務(wù)的查詢

    >>> PeriodicTask.objects.all()
    ]>
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    

    控制臺(tái)實(shí)際操作記錄

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    2.周期性任務(wù)的暫停/啟動(dòng)

    2.1 設(shè)置my_taks2_crontab 暫停任務(wù)

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker輸出:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    可以看到worker從19:31以后已經(jīng)沒(méi)有輸出了,說(shuō)明已經(jīng)成功吧my_task2_crontab 任務(wù)暫停

    2.2 設(shè)置my_task2_crontab 開(kāi)啟任務(wù)

    把任務(wù)的 enabled 為 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker輸出:

    如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)

    可以看到worker從19:36開(kāi)始有輸出,說(shuō)明已把my_task2_crontab 任務(wù)重新啟動(dòng)

    3. 周期性任務(wù)的刪除

    獲取到指定的任務(wù)后調(diào)用delete(),再次查詢指定任務(wù)會(huì)發(fā)現(xiàn)已經(jīng)不存在了

    PeriodicTask.objects.get(name='my_task2_crontab').delete()
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    Traceback (most recent call last):
      File "", line 1, in 
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    “如何使用Python Celery動(dòng)態(tài)添加定時(shí)任務(wù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!


    名稱欄目:如何使用Python?Celery動(dòng)態(tài)添加定時(shí)任務(wù)
    新聞來(lái)源:http://weahome.cn/article/jjjcjh.html

    其他資訊

    在線咨詢

    微信咨詢

    電話咨詢

    028-86922220(工作日)

    18980820575(7×24)

    提交需求

    返回頂部