真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

go語言異步,go語言異步模型

golang語言:for循環(huán)里面包含一個函數(shù)體的執(zhí)行循序

go func是golang的協(xié)程,就像多線程,異步執(zhí)行,所以,代碼段1執(zhí)行完3遍后,可能3次協(xié)成剛執(zhí)行完。在代碼段1中如果sleep一下應該就能給協(xié)程時間執(zhí)行了。

成都一家集口碑和實力的網(wǎng)站建設(shè)服務(wù)商,擁有專業(yè)的企業(yè)建站團隊和靠譜的建站技術(shù),10多年企業(yè)及個人網(wǎng)站建設(shè)經(jīng)驗 ,為成都數(shù)千家客戶提供網(wǎng)頁設(shè)計制作,網(wǎng)站開發(fā),企業(yè)網(wǎng)站制作建設(shè)等服務(wù),包括成都營銷型網(wǎng)站建設(shè),品牌網(wǎng)站建設(shè),同時也為不同行業(yè)的客戶提供網(wǎng)站制作、成都做網(wǎng)站的服務(wù),包括成都電商型網(wǎng)站制作建設(shè),裝修行業(yè)網(wǎng)站制作建設(shè),傳統(tǒng)機械行業(yè)網(wǎng)站建設(shè),傳統(tǒng)農(nóng)業(yè)行業(yè)網(wǎng)站制作建設(shè)。在成都做網(wǎng)站,選網(wǎng)站制作建設(shè)服務(wù)商就選成都創(chuàng)新互聯(lián)。

go有沒有開源的類似java的mina或者netty的socket框架

go語言應該沒有,java netty這種高性能異步IO模型的框架,建議你還是用java語言開發(fā)吧

Golang kafka簡述和操作(sarama同步異步和消費組)

一、Kafka簡述

1. 為什么需要用到消息隊列

異步:對比以前的串行同步方式來說,可以在同一時間做更多的事情,提高效率;

解耦:在耦合太高的場景,多個任務(wù)要對同一個數(shù)據(jù)進行操作消費的時候,會導致一個任務(wù)的處理因為另一個任務(wù)對數(shù)據(jù)的操作變得及其復雜。

緩沖:當遇到突發(fā)大流量的時候,消息隊列可以先把所有消息有序保存起來,避免直接作用于系統(tǒng)主體,系統(tǒng)主題始終以一個平穩(wěn)的速率去消費這些消息。

2.為什么選擇kafka呢?

這沒有絕對的好壞,看個人需求來選擇,我這里就抄了一段他人總結(jié)的的優(yōu)缺點,可見原文

kafka的優(yōu)點:

1.支持多個生產(chǎn)者和消費者2.支持broker的橫向拓展3.副本集機制,實現(xiàn)數(shù)據(jù)冗余,保證數(shù)據(jù)不丟失4.通過topic將數(shù)據(jù)進行分類5.通過分批發(fā)送壓縮數(shù)據(jù)的方式,減少數(shù)據(jù)傳輸開銷,提高吞高量6.支持多種模式的消息7.基于磁盤實現(xiàn)數(shù)據(jù)的持久化8.高性能的處理信息,在大數(shù)據(jù)的情況下,可以保證亞秒級的消息延遲9.一個消費者可以支持多種topic的消息10.對CPU和內(nèi)存的消耗比較小11.對網(wǎng)絡(luò)開銷也比較小12.支持跨數(shù)據(jù)中心的數(shù)據(jù)復制13.支持鏡像集群

kafka的缺點:

1.由于是批量發(fā)送,所以數(shù)據(jù)達不到真正的實時2.對于mqtt協(xié)議不支持3.不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入4.只能支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實現(xiàn)全局消息有序5.監(jiān)控不完善,需要安裝插件6.需要配合zookeeper進行元數(shù)據(jù)管理7.會丟失數(shù)據(jù),并且不支持事務(wù)8.可能會重復消費數(shù)據(jù),消息會亂序,可用保證一個固定的partition內(nèi)部的消息是有序的,但是一個topic有多個partition的話,就不能保證有序了,需要zookeeper的支持,topic一般需要人工創(chuàng)建,部署和維護一般都比mq高

3. Golang 操作kafka

3.1. kafka的環(huán)境

網(wǎng)上有很多搭建kafka環(huán)境教程,這里就不再搭建,就展示一下kafka的環(huán)境,在kubernetes上進行的搭建,有需要的私我,可以發(fā)yaml文件

3.2. 第三方庫

github.com/Shopify/sarama // kafka主要的庫*github.com/bsm/sarama-cluster // kafka消費組

3.3. 消費者

單個消費者

funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{? ? ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//獲得該topic所有的分區(qū)iferr !=nil{? ? ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {? ? ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{? ? ? ? fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}? ? ? wg.Add(1)gofunc(sarama.PartitionConsumer){//為每個分區(qū)開一個go協(xié)程去取值formsg :=rangepc.Messages() {//阻塞直到有值發(fā)送過來,然后再繼續(xù)等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? ? }deferpc.AsyncClose()? ? ? ? wg.Done()? ? ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}

消費組

funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始從最新的offset開始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{? ? ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){? ? ? errors := c.Errors()? ? ? noti := c.Notifications()for{select{caseerr := -errors:? ? ? ? ? ? glog.Errorln(err)case-noti:? ? ? ? }? ? ? }? }(c)formsg :=rangec.Messages() {? ? ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? c.MarkOffset(msg,"")//MarkOffset 并不是實時寫入kafka,有可能在程序crash時丟掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生產(chǎn)者

同步生產(chǎn)者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//賦值為-1:這意味著producer在follower副本確認接收到數(shù)據(jù)后才算一次發(fā)送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//寫到隨機分區(qū)中,默認設(shè)置8個分區(qū)config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{? ? ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{? ? ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)}

異步生產(chǎn)者

funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必須有這個選項config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//這個部分一定要寫,不然通道會被堵塞gofunc(p sarama.AsyncProducer){? ? ? errors := p.Errors()? ? ? success := p.Successes()for{select{caseerr := -errors:iferr !=nil{? ? ? ? ? ? ? glog.Errorln(err)? ? ? ? ? ? }case-success:? ? ? ? }? ? ? }? }(p)for{? ? ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))? ? ? fmt.Fprintln(os.Stdout, v)? ? ? msg := sarama.ProducerMessage{? ? ? ? Topic: topics,? ? ? ? Value: sarama.ByteEncoder(v),? ? ? }? ? ? p.Input() - msg? ? ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{? ? ? }}

3.5. 結(jié)果展示-

同步生產(chǎn)打?。?/p>

分區(qū)ID:0,offset:90

消費打?。?/p>

Partition:0,Offset:90,key:,value:Hello World!

異步生產(chǎn)打?。?/p>

async:7272async:7616async:998

消費打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998


分享題目:go語言異步,go語言異步模型
轉(zhuǎn)載來于:http://weahome.cn/article/hescpo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部