celery,rabbitmq,django,db,計劃任務(wù),后臺管理 ?;镜拿詈痛a。
成都創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站制作、網(wǎng)站設(shè)計、海湖新網(wǎng)絡(luò)推廣、微信小程序開發(fā)、海湖新網(wǎng)絡(luò)營銷、海湖新企業(yè)策劃、海湖新品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們大的嘉獎;成都創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供海湖新建站搭建服務(wù),24小時服務(wù)熱線:13518219792,官方網(wǎng)址:www.cdcxhl.com0. 安裝
0.1 install
1. celery 基礎(chǔ)命令
1.1. 啟動 celery
1.2. 后臺進(jìn)程啟動celery worker
1.3. 重啟 celery worker 后臺進(jìn)程
1.4. 停止 celery worker 后臺進(jìn)程
1.5. 等待 停止 celery worker 后臺進(jìn)程
1.6. 指定 pidfile & logfile
1.7. 啟動 多個 worker 并且指定不同參數(shù)
1.8. 手動殺死所有worker進(jìn)程
1.9. 較完整的celery的啟動命令
2. 管理相關(guān)命令
2.1. 啟動 flower
2.2. 需要 目錄
2.3. 使用 librabbitmq
0.1 install
sudoapt-getinstallbuild-essentialpython-dev sudopipinstallcelery sudopipinstalllibrabbitmq sudopipinstallflower sudoapt-getinstallrabbitmq-server sudorabbitmq-pluginsenablerabbitmq_management sudoservicerabbitmq-serverrestart sudorabbitmq-pluginsdisablerabbitmq_management
訪問 http://host:15672 即可進(jìn)入管理界面。
默認(rèn)用戶名,密碼都是 guest
1.1. 啟動 celery
celery-Ataskworker-linfo
啟動時指定要使用的 queue
celery-Ataskworker-linfo-Qbrand_queue,new_queue-E
1.2. 后臺進(jìn)程啟動celery worker
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E
可通過命令查看后臺啟動的進(jìn)程:
ps-aux|grepcelery [celeryd:w1@x:MainProcess]-active-(worker-E-Atask-linfo-Qbrand_queue,new_queue--logfile=w1.log--pidfile=w1.pid--hostname=w1@x)
可以看到默認(rèn)添加了幾個參數(shù):
--logfile=w1.log默認(rèn)在當(dāng)前文件夾新建w1.log文件 --pidfile=w1.pid默認(rèn)在當(dāng)前文件夾新建w1.pid文件 --hostname=w1@x默認(rèn)實例名woker_name/機器名
1.3. 重啟 celery worker 后臺進(jìn)程
celerymultirestartw1-Atask-linfo-Qbrand_queue,new_queue-E
1.4. 停止 celery worker 后臺進(jìn)程
celerymultistopw1-Atask-linfo-Qbrand_queue,new_queue-E
stop 命令是異步的,worker 會立即停止,即使當(dāng)時仍然有任務(wù)在執(zhí)行,
并且不會寫停止worker相關(guān)的日志
1.5. 等待 停止 celery worker 后臺進(jìn)程
celerymultistopwaitw1-Atask-linfo-Qbrand_queue,new_queue-E
這個停止命令,會等待正在運行的所有任務(wù)都完成再停止。
1.6. 指定 pidfile & logfile
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log
1.7. 啟動 多個 worker 并且指定不同參數(shù)
celerymultistart10-Atask-linfo-E-Q:1-3images,video-Q:4,5data-Qdefault-L:4,5debug
啟動了10個worker:
worker 1,2,3 使用了隊列 images, video
worker 4,5 使用了隊列 data
worker 其他 使用了隊列 default
-L 是什么參數(shù)?
1.8. 手動殺死所有worker進(jìn)程
psauxww|grep\'celeryworker\'|awk\'{print$2}\'|xargskill-9
1.9. 較完整的celery的啟動命令
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue,time_queue,cron_queue-E-B-s/var/www/api/space/run/celerybeat-schedule--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log
2.1 啟動 flower
celeryflower--broker=amqp://tiger:tiger@192.168.0.6:5672/vr_tiger--address=192.168.0.4--port=5555--broker_api=http://tiger:tiger@192.168.0.6:15672/api/--basic_auth=tiger:tiger rabbitmq主機地址:192.168.0.6 本機地址:192.168.0.4 本地監(jiān)聽端口:5555
2.2 需要目錄
run/ log/
2.3 使用 librabbitmq
Ifyou’reusingRabbitMQ(AMQP)asthebrokerthenyoucaninstallthelibrabbitmqmoduletouseanoptimizedclientwritteninC: $pipinstalllibrabbitmq
注意
1. 后臺運行 celery 的是否,worker 信息不會保存。所以,每次對 worker 操作時都需要加上相同的參數(shù)。特別是 pidfile 和 logfile 需要相同。
2. 我沒有使用 celery 提供的任務(wù)結(jié)果存儲。我在業(yè)務(wù)中自己處理 過程及結(jié)果。
代碼示例
配置文件:
#-*-coding=utf-8-*- #FileName:task_conf.py from__future__importabsolute_import fromceleryimportCelery fromdatetimeimporttimedelta fromcelery.schedulesimportcrontab \'\'\' task_random:是任務(wù)的名稱 broker:通過amqp://用戶名:密碼@ip/虛擬主機連接amqp include:任務(wù)程序 \'\'\' #消息隊列配置 mq_host=\'192.168.0.6\' mq_name=\'tiger\' mq_pass=\'tiger\' mq_vr=\'vr_tiger\' broker=\'amqp://%s:%s@%s/%s\'%(mq_name,mq_pass,mq_host,mq_vr) #初始化app app=Celery(\'name_wash\',broker=broker,include=[\'task\']) #指定任務(wù)存儲隊列 app.conf.update( CELERY_ROUTES={ \'task.exe_task\':{\'queue\':\'brand_queue\'}, \'task.task_sms_send\':{\'queue\':\'new_queue\'}, \'task.task_sec\':{\'queue\':\'time_queue\'}, \'task.task_cron\':{\'queue\':\'cron_queue\'} }, CELERYBEAT_SCHEDULE={ \'exe-every-10-seconds\':{ \'task\':\'task.task_sec\', \'schedule\':timedelta(seconds=30), \'args\':[1], }, \'add-every-monday-morning\':{ \'task\':\'task.task_cron\', \'schedule\':crontab(hour=15,minute=47,day_of_week=5), \'args\':(15232897835,), }, }, #CELERY_TASK_SERIALIZER=\'json\', #CELERY_ACCEPT_CONTENT=[\'json\'],#Ignoreothercontent #CELERY_RESULT_SERIALIZER=\'json\', CELERY_EVENT_QUEUE_TTL=5, CELERY_TIMEZONE=\'Asia/Shanghai\', CELERY_ENABLE_UTC=True, CELERY_DISABLE_RATE_LIMITS=True, CELERY_IGNORE_RESULT=True ) if__name__==\'__main__\': app.start()
任務(wù)文件:
#-*-coding=utf-8-*- #FileName:task.py \'\'\' task \'\'\' from__future__importabsolute_import importtime importtraceback fromjob.task_confimportapp @app.task(ignore_result=True) defexe_task(task_id,number): \'\'\'根據(jù)參數(shù)執(zhí)行任務(wù)\'\'\' try: print\'exetask:\',task_id time.sleep(number) except: traceback.print_exc() return(task_id,number,-1) return\'true:)\' @app.task deftask_sms_send(mobile,content): \'\'\'任務(wù)-發(fā)送短信\'\'\' try: print\'sendsms:mobile->%s,content->%s\'%(mobile,content) except: traceback.print_exc() return\'Fail:(\' return\'Success:)\' @app.task deftask_sec(mobile): \'\'\'測試任務(wù)時間定制\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F\' return\'S\' @app.task deftask_cron(mobile): \'\'\'測試任務(wù)時間定制Cron\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F-cron\' return\'S-cron\' defmain(): res=exe_task(2,2) print\'res:\',res if__name__==\'__main__\': main()
添加任務(wù)代碼:
#-*-coding=utf-8-*- #FileName:add_task.py importtime importtraceback importrandom fromtaskimportexe_task,task_sms_send defaction(): tries=0 while1: try: tries+=1 iftries>=20: break task_id=tries number=random.randint(1,5) exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') print\'addedonetask\' time.sleep(1) except: traceback.print_exc() pass print\'addtaskdone\' defadd_task_by_django(task_id,number): \'\'\'測試從django添加任務(wù)\'\'\' exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') defadd_task_sms(mobile,content): \'\'\'添加發(fā)送短信任務(wù)\'\'\' #列表參數(shù)或者字典參數(shù) #task.apply_async(args=[arg1,arg2],kwargs={\'kwarg1\':\'x\',\'kwarg2\':\'y\'}) task_sms_send.apply_async(args=[mobile,content],queue=\'new_queue\') print\'taskadded:sms\' defmain(): action() if__name__==\'__main__\': main()