物聯(lián)網(wǎng)中最常用的編程語言,即Java,C,C ++,Python,JavaScript和Go。
網(wǎng)站的建設(shè)創(chuàng)新互聯(lián)建站專注網(wǎng)站定制,經(jīng)驗(yàn)豐富,不做模板,主營(yíng)網(wǎng)站定制開發(fā).小程序定制開發(fā),H5頁(yè)面制作!給你煥然一新的設(shè)計(jì)體驗(yàn)!已為公路鉆孔機(jī)等企業(yè)提供專業(yè)服務(wù)。
Java:物聯(lián)網(wǎng)技術(shù)最流行的編程語言
Java有多個(gè)應(yīng)用領(lǐng)域,從后端編程到Android的移動(dòng)應(yīng)用。根據(jù) Eclipse基金會(huì)執(zhí)行的2017年物聯(lián)網(wǎng)開發(fā)者調(diào)查,Java首次提供了用于物聯(lián)網(wǎng)開發(fā)的編程語言列表,專門用于網(wǎng)關(guān)和云。
使用Java進(jìn)行物聯(lián)網(wǎng)開發(fā)的一個(gè)主要好處是便攜性。Java沒有任何硬件限制,這意味著您可以在計(jì)算機(jī)上編寫和調(diào)試Java代碼,并將其部署到幾乎任何運(yùn)行Java虛擬機(jī)的設(shè)備上。出于這個(gè)原因,許多公司選擇聘請(qǐng)Java開發(fā)人員進(jìn)行物聯(lián)網(wǎng)項(xiàng)目。
C:嵌入式設(shè)備的關(guān)鍵編程語言
C編程語言接下來成為物聯(lián)網(wǎng)IoT堆棧最喜歡的語言。然而,根據(jù)Eclipse基金會(huì)的說法,它被認(rèn)為是受限設(shè)備開發(fā)的領(lǐng)先技術(shù)。
該編程語言提供對(duì)低級(jí)硬件API的直接訪問。由于其與機(jī)器語言的相似性,C非??焖偾异`活,使其成為處理能力有限的物聯(lián)網(wǎng)系統(tǒng)的完美選擇。
C ++:Linux的第一語言
與其前身C一樣,C ++已廣泛用于嵌入式系統(tǒng)開發(fā)。但是,C ++的主要優(yōu)勢(shì)在于處理能力,在任務(wù)更加復(fù)雜時(shí)使其成為C的有用替代方案。
C ++最適合編寫硬件特定的代碼。它可與Linux,第一大物聯(lián)網(wǎng)技術(shù)操作系統(tǒng)配合使用。但是,與Java相比,它具有有限的可移植性。
Python:面向數(shù)據(jù)的物聯(lián)網(wǎng)系統(tǒng)的解決方案
作為最受歡迎的網(wǎng)絡(luò)編程語言之一,以及科學(xué)計(jì)算的前沿技術(shù),Python在物聯(lián)網(wǎng)開發(fā)中也獲得了巨大的推動(dòng)力。 對(duì)于數(shù)據(jù)密集型應(yīng)用程序,Python是一個(gè)不錯(cuò)的選擇,特別是在管理和組織復(fù)雜數(shù)據(jù)時(shí)。
JavaScript:事件驅(qū)動(dòng)物聯(lián)網(wǎng)應(yīng)用的最佳解決方案
根據(jù)年度StackOverflow開發(fā)者調(diào)查顯示,JavaScript是過去五年來最流行的編程語言之一,是現(xiàn)代Web開發(fā)中的核心技術(shù)。
在許多其他應(yīng)用領(lǐng)域中,JavaScript是物聯(lián)網(wǎng)編程語言中最常用的構(gòu)建事件驅(qū)動(dòng)系統(tǒng)。它可以管理連接設(shè)備的大型網(wǎng)絡(luò),并且在需要處理多個(gè)任務(wù)而無需等待其他任務(wù)完成時(shí)可以勝任。JavaScript對(duì)IoT的主要優(yōu)勢(shì)之一是非常節(jié)約資源。
Go:堅(jiān)固的技術(shù)堆棧為復(fù)雜的物聯(lián)網(wǎng)網(wǎng)絡(luò)提供動(dòng)力
Go是一款開源編程語言,由Google創(chuàng)建。盡管它不能像語言那樣擁有同樣廣泛的用途,但我們之前專注于這一點(diǎn),它是在您的物聯(lián)網(wǎng)系統(tǒng)內(nèi)建立通信層的強(qiáng)大技術(shù)。
Go語言關(guān)于物聯(lián)網(wǎng)的主要優(yōu)勢(shì)是并發(fā)性和同時(shí)運(yùn)行多個(gè)進(jìn)程(數(shù)據(jù)輸入和輸出)的能力。這使得構(gòu)建由多個(gè)傳感器和設(shè)備組成的復(fù)雜IoT網(wǎng)絡(luò)變得更加容易。
一、Kafka簡(jiǎn)述
1. 為什么需要用到消息隊(duì)列
異步:對(duì)比以前的串行同步方式來說,可以在同一時(shí)間做更多的事情,提高效率;
解耦:在耦合太高的場(chǎng)景,多個(gè)任務(wù)要對(duì)同一個(gè)數(shù)據(jù)進(jìn)行操作消費(fèi)的時(shí)候,會(huì)導(dǎo)致一個(gè)任務(wù)的處理因?yàn)榱硪粋€(gè)任務(wù)對(duì)數(shù)據(jù)的操作變得及其復(fù)雜。
緩沖:當(dāng)遇到突發(fā)大流量的時(shí)候,消息隊(duì)列可以先把所有消息有序保存起來,避免直接作用于系統(tǒng)主體,系統(tǒng)主題始終以一個(gè)平穩(wěn)的速率去消費(fèi)這些消息。
2.為什么選擇kafka呢?
這沒有絕對(duì)的好壞,看個(gè)人需求來選擇,我這里就抄了一段他人總結(jié)的的優(yōu)缺點(diǎn),可見原文
kafka的優(yōu)點(diǎn):
1.支持多個(gè)生產(chǎn)者和消費(fèi)者2.支持broker的橫向拓展3.副本集機(jī)制,實(shí)現(xiàn)數(shù)據(jù)冗余,保證數(shù)據(jù)不丟失4.通過topic將數(shù)據(jù)進(jìn)行分類5.通過分批發(fā)送壓縮數(shù)據(jù)的方式,減少數(shù)據(jù)傳輸開銷,提高吞高量6.支持多種模式的消息7.基于磁盤實(shí)現(xiàn)數(shù)據(jù)的持久化8.高性能的處理信息,在大數(shù)據(jù)的情況下,可以保證亞秒級(jí)的消息延遲9.一個(gè)消費(fèi)者可以支持多種topic的消息10.對(duì)CPU和內(nèi)存的消耗比較小11.對(duì)網(wǎng)絡(luò)開銷也比較小12.支持跨數(shù)據(jù)中心的數(shù)據(jù)復(fù)制13.支持鏡像集群
kafka的缺點(diǎn):
1.由于是批量發(fā)送,所以數(shù)據(jù)達(dá)不到真正的實(shí)時(shí)2.對(duì)于mqtt協(xié)議不支持3.不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入4.只能支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實(shí)現(xiàn)全局消息有序5.監(jiān)控不完善,需要安裝插件6.需要配合zookeeper進(jìn)行元數(shù)據(jù)管理7.會(huì)丟失數(shù)據(jù),并且不支持事務(wù)8.可能會(huì)重復(fù)消費(fèi)數(shù)據(jù),消息會(huì)亂序,可用保證一個(gè)固定的partition內(nèi)部的消息是有序的,但是一個(gè)topic有多個(gè)partition的話,就不能保證有序了,需要zookeeper的支持,topic一般需要人工創(chuàng)建,部署和維護(hù)一般都比mq高
3. Golang 操作kafka
3.1. kafka的環(huán)境
網(wǎng)上有很多搭建kafka環(huán)境教程,這里就不再搭建,就展示一下kafka的環(huán)境,在kubernetes上進(jìn)行的搭建,有需要的私我,可以發(fā)yaml文件
3.2. 第三方庫(kù)
github.com/Shopify/sarama // kafka主要的庫(kù)*github.com/bsm/sarama-cluster // kafka消費(fèi)組
3.3. 消費(fèi)者
單個(gè)消費(fèi)者
funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{? ? ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//獲得該topic所有的分區(qū)iferr !=nil{? ? ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {? ? ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{? ? ? ? fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}? ? ? wg.Add(1)gofunc(sarama.PartitionConsumer){//為每個(gè)分區(qū)開一個(gè)go協(xié)程去取值formsg :=rangepc.Messages() {//阻塞直到有值發(fā)送過來,然后再繼續(xù)等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? ? }deferpc.AsyncClose()? ? ? ? wg.Done()? ? ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}
消費(fèi)組
funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始從最新的offset開始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{? ? ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){? ? ? errors := c.Errors()? ? ? noti := c.Notifications()for{select{caseerr := -errors:? ? ? ? ? ? glog.Errorln(err)case-noti:? ? ? ? }? ? ? }? }(c)formsg :=rangec.Messages() {? ? ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? c.MarkOffset(msg,"")//MarkOffset 并不是實(shí)時(shí)寫入kafka,有可能在程序crash時(shí)丟掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生產(chǎn)者
同步生產(chǎn)者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//賦值為-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//寫到隨機(jī)分區(qū)中,默認(rèn)設(shè)置8個(gè)分區(qū)config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{? ? ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{? ? ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)}
異步生產(chǎn)者
funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必須有這個(gè)選項(xiàng)config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//這個(gè)部分一定要寫,不然通道會(huì)被堵塞gofunc(p sarama.AsyncProducer){? ? ? errors := p.Errors()? ? ? success := p.Successes()for{select{caseerr := -errors:iferr !=nil{? ? ? ? ? ? ? glog.Errorln(err)? ? ? ? ? ? }case-success:? ? ? ? }? ? ? }? }(p)for{? ? ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))? ? ? fmt.Fprintln(os.Stdout, v)? ? ? msg := sarama.ProducerMessage{? ? ? ? Topic: topics,? ? ? ? Value: sarama.ByteEncoder(v),? ? ? }? ? ? p.Input() - msg? ? ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{? ? ? }}
3.5. 結(jié)果展示-
同步生產(chǎn)打?。?/p>
分區(qū)ID:0,offset:90
消費(fèi)打?。?/p>
Partition:0,Offset:90,key:,value:Hello World!
異步生產(chǎn)打印:
async:7272async:7616async:998
消費(fèi)打?。?/p>
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
近期正在探索前端、后端、系統(tǒng)端各類常用組件與工具,對(duì)其一些常見的組件進(jìn)行再次整理一下,形成標(biāo)準(zhǔn)化組件專題,后續(xù)該專題將包含各類語言中的一些常用組件。歡迎大家進(jìn)行持續(xù)關(guān)注。
本節(jié)我們分享的是基于Golang實(shí)現(xiàn)的高性能和彈性的流處理器 benthos ,它能夠以各種代理模式連接各種 源 和 接收器,并對(duì)有效負(fù)載執(zhí)行 水合、濃縮、轉(zhuǎn)換和過濾 。
它帶有 強(qiáng)大的映射語言 ,易于部署和監(jiān)控,并且可以作為靜態(tài)二進(jìn)制文件、docker 映像或 無服務(wù)器函數(shù) 放入您的管道,使其成為云原生。
Benthos 是完全聲明性的,流管道在單個(gè)配置文件中定義,允許您指定連接器和處理階段列表:
Apache Pulsar, AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (Pub/Sub, Cloud storage), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS JetStream, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), Stdin/Stdout, TCP UDP, sockets and ZMQ4.
1、docker安裝
具體使用方式可以參見該 文檔
有關(guān)如何配置更高級(jí)的流處理概念(例如流連接、擴(kuò)充工作流等)的指導(dǎo),請(qǐng)查看 說明書部分。
有關(guān)在 Go 中構(gòu)建您自己的自定義插件的指導(dǎo),請(qǐng)查看 公共 API。