這篇文章主要介紹“go語言任務(wù)隊列machinery的用法”,在日常操作中,相信很多人在go語言任務(wù)隊列machinery的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”go語言任務(wù)隊列machinery的用法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
麻陽網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)公司,麻陽網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為麻陽1000+提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營銷網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的麻陽做網(wǎng)站的公司定做!
go實現(xiàn)的基于消息中間件的異步任務(wù)隊列, 下面是學習筆記
步驟1: 創(chuàng)建server,配置參數(shù)、注冊task。(此處server只是個配置作用, 并不是單獨的server進程)
步驟2: 啟動worker
步驟3: 發(fā)送task
與celery的用法是完全一致的
func startServer() (*machinery.Server, error) { cnf := &config.Config{ Broker: "amqp://guest:guest@localhost:5672/", DefaultQueue: "machinery_tasks", ResultBackend: "amqp://guest:guest@localhost:5672/", ResultsExpireIn: 3600, //任務(wù)有效期 AMQP: &config.AMQPConfig{ Exchange: "machinery_exchange", ExchangeType: "direct", BindingKey: "machinery_task", PrefetchCount: 3, //限定消費能力 }, } // Create server instance broker := amqpbroker.New(cnf) backend := amqpbackend.New(cnf) lock := eagerlock.New() //任務(wù)鎖 server := machinery.NewServer(cnf, broker, backend, lock) // Register tasks tasks := map[string]interface{}{ "add": exampletasks.Add, "multiply": exampletasks.Multiply, "sum_ints": exampletasks.SumInts, "sum_floats": exampletasks.SumFloats, "concat": exampletasks.Concat, "split": exampletasks.Split, "panic_task": exampletasks.PanicTask, "long_running_task": exampletasks.LongRunningTask, } return server, server.RegisterTasks(tasks) }
創(chuàng)建worker, 之后就可以啟動了
func worker() error { //消費者的標記 consumerTag := "machinery_worker" server, err := startServer() if err != nil { return err } //第二個參數(shù)并發(fā)數(shù), 0表示不限制 worker := server.NewWorker(consumerTag, 0) //鉤子函數(shù) errorhandler := func(err error) {} pretaskhandler := func(signature *tasks.Signature) {} posttaskhandler := func(signature *tasks.Signature) {} worker.SetPostTaskHandler(posttaskhandler) worker.SetErrorHandler(errorhandler) worker.SetPreTaskHandler(pretaskhandler) return worker.Launch() }
啟動結(jié)果
INFO: 2021/05/01 08:28:27 worker.go:58 Launching a worker with the following settings: INFO: 2021/05/01 08:28:27 worker.go:59 - Broker: amqp://192.168.120.101:5672 INFO: 2021/05/01 08:28:27 worker.go:61 - DefaultQueue: machinery_tasks INFO: 2021/05/01 08:28:27 worker.go:65 - ResultBackend: amqp://192.168.120.101:5672 INFO: 2021/05/01 08:28:27 worker.go:67 - AMQP: machinery_exchange INFO: 2021/05/01 08:28:27 worker.go:68 - Exchange: machinery_exchange INFO: 2021/05/01 08:28:27 worker.go:69 - ExchangeType: direct INFO: 2021/05/01 08:28:27 worker.go:70 - BindingKey: machinery_task INFO: 2021/05/01 08:28:27 worker.go:71 - PrefetchCount: 0 INFO: 2021/05/01 08:28:27 amqp.go:96 [*] Waiting for messages. To exit press CTRL+C
server, _ := startServer() signature := &tasks.Signature{ Name: "add", Args: []tasks.Arg{ { Type: "int64", Value: 1, }, { Type: "int64", Value: 1, }, }, } asyncResult, _ := server.SendTask(signature) fmt.Println(asyncResult.Get(time.Millisecond * 5)) //等待間隔,理論上是越小越好 //asyncResult.GetWithTimeout(time.Second*120, time.Millisecond * 5) //第一個參數(shù)才是timeout
以上就是machinery的基本用法,與celery基本一樣, 更詳細內(nèi)容參考官方文檔
到此,關(guān)于“go語言任務(wù)隊列machinery的用法”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
網(wǎng)站欄目:go語言任務(wù)隊列machinery的用法
標題路徑:http://weahome.cn/article/joopjj.html