真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流-創(chuàng)新互聯(lián)

一、為什么選擇Kafka

為溧水等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及溧水網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站制作、成都網(wǎng)站制作、溧水網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!

為什么選Kafka?鑒于龐大的數(shù)據(jù)量,需要將其做成分布式,這時(shí)需要將Q里面的數(shù)據(jù)分到許多機(jī)器上進(jìn)行存儲(chǔ),除此之外還有分布式的計(jì)算需求。同時(shí)需要支持多語言,如Java、GO、php等,另外還有高可用的需求。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

二、Kafka集群

Realtime的Kafka集群通過Mirror Maker將數(shù)據(jù)全部同步到Analysis的Kafka集群。

Realtime的Kafka集群主要負(fù)責(zé)在線實(shí)時(shí)讀寫,Analysis負(fù)責(zé)很多工作,諸如數(shù)據(jù)的導(dǎo)入導(dǎo)出,數(shù)據(jù)的多次流出給集群和網(wǎng)絡(luò)硬盤帶來了較大壓力。為保證線上的穩(wěn)定性,要保證兩邊是隔開的。另外關(guān)于Topic目前有五萬多,每秒可能會(huì)有100多萬的數(shù)據(jù)流入流出。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

三、Kafka的用戶使用問題

1.參數(shù)配置問題, Kafka有很多參數(shù)需要配置,常用的集群配置,延遲,重要性等,需要封裝。

  1. 開發(fā)測(cè)試不方便,使用者通常會(huì)有這樣的需求:我的數(shù)據(jù)寫進(jìn)去沒,消費(fèi)沒,他寫的數(shù)據(jù)長什么樣子,結(jié)構(gòu)化的數(shù)據(jù)還需自己寫代碼來解析,等等。這些問題沒有工具和平臺(tái)來解決,會(huì)大大降低開發(fā)效率。

  2. Topic申請(qǐng)不方便,topic是不能開放自己創(chuàng)建的,我們?cè)跍y(cè)試環(huán)境開放過Topic,發(fā)現(xiàn)一周內(nèi)漲到了好幾萬,而且參數(shù)千奇百怪,有全用默認(rèn)參數(shù)的,有根據(jù)文檔,時(shí)間先來10個(gè)9的,也有partition直接來100的。工單方式對(duì)管理者很不友好,需要登上服務(wù)器敲命令,效率低下,且容易出錯(cuò)。

  3. 結(jié)構(gòu)化數(shù)據(jù)查詢不方便,瓜子的結(jié)構(gòu)化使用的是AVRO, 序列化之后的數(shù)據(jù)很難查看原始數(shù)據(jù)。

  4. 消費(fèi)異常定位不便,比如消費(fèi)的數(shù)據(jù)或者位置不對(duì),如果想要回滾重新消費(fèi)或跳過臟數(shù)據(jù)就面臨各種問題。從哪個(gè)offset開始重新消費(fèi)呢?或者跳到之后的哪個(gè)offset呢?另外就是滾動(dòng)重啟了一個(gè)服務(wù),結(jié)果發(fā)現(xiàn)消費(fèi)的數(shù)據(jù)少了一批,很有可能是某一個(gè)隱藏的consumer同時(shí)在用這個(gè)consumer group,但是找了一圈沒找到哪個(gè)服務(wù)還沒關(guān)掉。

  5. 不知道下游,如果寫了生產(chǎn)者生產(chǎn)的Topic數(shù)據(jù),卻不知道有哪些consumer,如果要對(duì)Topic信息發(fā)生改變時(shí),不知該通知誰,這是很復(fù)雜的事情。要么先上,下游出問題了自己叫,要么躊躇不前,先收集下游,當(dāng)然實(shí)際情況一般是前者,經(jīng)常雞飛狗跳。

  6. 運(yùn)維復(fù)雜,日常運(yùn)維包括topic partition增加,幫助定位臟數(shù)據(jù)(因?yàn)樗麄儾恢烙信K數(shù)據(jù)),幫助排除配置問題等等。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

四、解決方案:Kafka platform

為解決上述問題,瓜子上線了Kafka platform,主要面向用戶和管理兩方面的功能。

面向用戶包括:查看數(shù)據(jù),了解消費(fèi)情況,方便地添加監(jiān)控報(bào)警,以及如果出現(xiàn)問題后,快速查錯(cuò)的工具。

管理方面包括: 權(quán)限管理, 申請(qǐng)審批,還有一些常用操作。比如,seek offset, 或是刪掉一個(gè)Topic,對(duì)partitions進(jìn)行擴(kuò)容等。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

  1. 數(shù)據(jù)查詢

可以通過offset查詢對(duì)應(yīng)offset的數(shù)據(jù),也可以通過進(jìn)入Kafka的大致時(shí)間,查詢那段內(nèi)的數(shù)據(jù),可以看到每條信息的partition,offset,入Kafka的時(shí)間,AVRO的版本信息等。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

  1. 消費(fèi)查詢

通過下圖顯示的界面可以查看一條消息,了解哪些consumer group已經(jīng)消費(fèi)了,哪些沒有消費(fèi)。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

同時(shí)可以查看它現(xiàn)在正在被哪個(gè)IP進(jìn)行消費(fèi),這時(shí)我們可以方便地定位到有個(gè)consumer沒有關(guān)閉,它是在哪臺(tái)機(jī)器上,這些來自于我們的實(shí)踐經(jīng)驗(yàn)。還可以看到每個(gè)consumer group的消費(fèi)延遲情況,精確到條數(shù),partition的延遲。也可以看到partition消息總數(shù),可以排查一些消息不均的問題。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

下圖為監(jiān)控報(bào)警,可以了解Topic的流入、流出數(shù)據(jù),每秒寫入多少條消息,多大的size,每秒流出的情況。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

報(bào)警是對(duì)Topic建一些流量報(bào)警,或是一些延遲報(bào)警,建好之后只需要訂閱一下即可,非常方便。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

五、瓜子結(jié)構(gòu)化數(shù)據(jù)流

目前有許多使用場(chǎng)景,比如前端埋點(diǎn),tracking日志,Mysql數(shù)據(jù)同步,操作日志,一些諸如服務(wù)之間的交換,基于SQL的streaming,APM的數(shù)據(jù),還有基于SQL的ETL等,都可以通過結(jié)構(gòu)化將其快速同步到大數(shù)據(jù)中做后續(xù)分析。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

我們是通過confluent提供的一整套解決方案來實(shí)現(xiàn)的。其中最主要的兩個(gè)組件是:Schema Registry和Kafka Connect。Schema Registry用于存儲(chǔ)schema信息,Kafka connect用于數(shù)據(jù)轉(zhuǎn)移。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

目前,瓜子除日志部分外,90%以上為結(jié)構(gòu)化。為什么選擇Avro?因?yàn)锳vro速度快,并且跨語言支持,所有的Schema AVSC都是用JSON做的,對(duì)JSON支持的特別好,如果可能沒人想為一個(gè)schema定義再學(xué)一門語言吧。而且通過JSON無需code generation。

但僅有avro還不夠,我們?cè)谑褂弥袝?huì)面臨更多的問題,比如:

  • 統(tǒng)一的schema中心,這與配置中心的必要性是一樣的道理,沒有統(tǒng)一的地方,口口相傳,配置亂飛不是我們想看到的。

  • 多版本的需求,schema是肯定會(huì)有更新需求的,也肯定有回滾需求,也會(huì)有兼容需求,所以多版本是需要滿足的。

  • 版本兼容性檢查,設(shè)想一下上游改了一個(gè)schema的列名,下游寫到hive的時(shí)候就蒙了,歷史數(shù)據(jù)咋辦啊,現(xiàn)在這列數(shù)據(jù)又怎么處理。所以得有版本兼容,而且最好滿足下游所有組件的需求。

  • schema得有注釋,給人展示的schema最好能有給人讀的注釋,很多人昨天定義的enum,今天就忘了,這個(gè)事情很常見。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

為解決這些問題,我們引入了confluence的Schema Registry。Confluence的Schema registry,通過RESTful接口,提供了類似配置中心的能力,還有完善的UI,支持版本兼容性檢查,支持多版本等,完全滿足了我們的需求。而且自帶HA,通過Kafka存儲(chǔ)配置信息,保證一致性。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

五、瓜子的實(shí)踐

當(dāng)然,僅有這些還不夠,我們?cè)趯?shí)踐中遇到了很多問題,比如schema注冊(cè)不可能完全開放,歷史告訴我們完全的自由意味著混亂。為在實(shí)踐中利用好avro,我們前后改了兩個(gè)方案,來保證schema可控。

  1. 最初的方案

為實(shí)現(xiàn)統(tǒng)一管控,所有schema會(huì)通過git來管理,如果需要使用可以fork該git。如果要做一個(gè)上線,更新或添加一個(gè)schema,可以通過提merge request提交給管理員。管理員檢查沒有問題后直接通過gitlab-ci自動(dòng)注冊(cè),管理員只需完成確認(rèn)的操作。

但這樣會(huì)出現(xiàn)一些問題,首先是上線流程太長,要上線或更新一個(gè)schema時(shí),需要提交merge request,要等管理員收到郵件后才可查看,屆時(shí)如果管理員發(fā)現(xiàn)schema寫的不對(duì),還需重新再走一次流程,中間可能花一天時(shí)間。且回滾復(fù)雜,沒有權(quán)限管理。而且很多人會(huì)犯同樣的錯(cuò)誤,客服表示相當(dāng)?shù)睦速M(fèi)時(shí)間。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

六、平臺(tái)化解決方案

通過平臺(tái)化解決方案,我們提供了一個(gè)類似于git的頁面,可在上面直接提交schema,在下面直接點(diǎn)擊校驗(yàn),在評(píng)估新上線的schema是否有問題后,等待后臺(tái)審批即可。其中可以加諸如權(quán)限管理等一些功能。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

七、為什么用到Kafka connect

Kafka connect專注copy數(shù)據(jù),把一個(gè)數(shù)據(jù)從data source轉(zhuǎn)到Kafka,再從Kafka轉(zhuǎn)到其它地方。它支持批和流,同時(shí)支持實(shí)時(shí)和批處理,比如5min同步一次。

另外,它支持多個(gè)系統(tǒng)之間互相copy,數(shù)據(jù)源可能是Mysql、SQL Server 也可能是Oracle 。sink可以是Hbase、Hive等。它自己定義了一套plugin接口,可以自己寫很多數(shù)據(jù)源和不支持的sink。

并且他自己做到了分布式并行,支持完善的HA和load balance,還提供方便的RESTful 接口。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

在沒有Kafka connect之前,運(yùn)維ETL非常麻煩。拿canal來說,canal有server和client,都需手動(dòng)部署,如果你有100臺(tái)canal節(jié)點(diǎn)1000個(gè)數(shù)據(jù)庫,想想看吧,管理員如何知道哪臺(tái)機(jī)器上跑了哪些庫表,新增的任務(wù)又放在哪臺(tái)機(jī)來運(yùn)行。

此外,如果Mysql修改了一個(gè)字段,還需要讓程序員去機(jī)器上看一下那張表是如何修改的,相應(yīng)的所有下游都需相應(yīng)的完成表結(jié)構(gòu)修改之后, 才能跑起來,響應(yīng)速度非常慢。

目前Kafka connect已經(jīng)解決了這些問題。其具備一個(gè)非常重要的特性,如果上游數(shù)據(jù)根據(jù)AVRO兼容性進(jìn)行的修改,connect會(huì)在下游同樣做一些兼容性的修改,自動(dòng)更改下游表結(jié)構(gòu),減輕了運(yùn)維負(fù)擔(dān)。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

我們來看看Kafka connect 的架構(gòu),Kafka connect會(huì)把所有信息存到Kafka 中,其中config topic存元數(shù)據(jù),Stutas topic指當(dāng)前哪些節(jié)點(diǎn)正在跑什么樣的job,offset topic指當(dāng)前比如某個(gè)Topic的某個(gè)partitions到底消費(fèi)到哪條數(shù)據(jù)。

WorKer都是無狀態(tài)的,在上面可以跑許多task,同樣一個(gè)task1,可能對(duì)應(yīng)5個(gè)partitions,如果只給它三個(gè)并發(fā),它會(huì)分布在三臺(tái)機(jī)器上。如果一臺(tái)機(jī)器掛了,這些job都會(huì)分配到另外兩臺(tái)機(jī)器,而且是實(shí)時(shí)同步的。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

八、瓜子Plugins

瓜子對(duì)Kafka connect的很多plugins做了修改。

  1. Maxwell

其中我們把canal通過maxwell替換,并且把maxwell做成了Kafka connect的plugin。

原生的Maxwell不支持AVRO,瓜子通過debezium思想對(duì)Maxwell進(jìn)行了修改,使其支持avro格式,并用Mysql管理meta,并且支持Mysql的數(shù)據(jù)庫切換。

  1. HDFS

我們采用的是confluence公司的hdfs插件,但是其本身存在很多問題,比如寫入hive的時(shí)候會(huì)把當(dāng)做partition的列也寫到主表數(shù)據(jù)中,雖然不影響hive的使用,但是影響presto讀取hive,這里我們改了源碼,去掉了主表中的這些列。

Hdfs在插件重啟時(shí)會(huì)去hdfs中讀取所有文件來確定從哪個(gè)offset繼續(xù),這里會(huì)有兩個(gè)問題:耗時(shí)太長,切換集群時(shí)offset無法接續(xù),我們也對(duì)他做了修改。

plugin寫入hive時(shí)支持用Kafka的timestamp做分區(qū),也支持用數(shù)據(jù)內(nèi)的某些列做分區(qū),但是不支持兩者同時(shí)用,我們也修改了一下。

  1. HBase

Hbase的plugin只支持最原始的導(dǎo)出,我們會(huì)有些特殊的需求,比如對(duì)rowkey自定義一下,通常mysql主鍵是自增ID,hbase不推薦用自增ID做rowkey,我們會(huì)有reverse的需求,還有多列聯(lián)合做rowkey的需求等,這個(gè)我們也改了源碼,支持通過配置自定義rowkey生成。

原始plugin不支持kerberos,而我們online hbase是帶權(quán)限的,所以也改了一下

還有一些小功能,比如把所有類型都先轉(zhuǎn)成string再存,支持delete,支持json等。

  1. KUDU

我們對(duì)kudu的使用很多,kudu開源的plugin有些bug,我們發(fā)現(xiàn)后也fix了一下。

Kudu的數(shù)據(jù)來源都是mysql,但是經(jīng)常會(huì)有mysql刷庫的情況,這時(shí)量就會(huì)很大,kudu sink會(huì)有較大的延時(shí),我們改了一下plugin,添加了自適應(yīng)流量控制,自動(dòng)擴(kuò)充成多線程處理,也會(huì)在流量小時(shí),自動(dòng)縮容。

九、瓜子數(shù)據(jù)庫的Data Pipeline

瓜子的數(shù)據(jù)倉庫完全是基于Kafka、AVRO的結(jié)構(gòu)化數(shù)據(jù)來做的。數(shù)據(jù)源非常多,需要將多個(gè)業(yè)務(wù)線的幾千張表同步到數(shù)倉,對(duì)外提供服務(wù)。

整個(gè)數(shù)據(jù)倉庫采用Lambda架構(gòu),分為T+1的存量處理和T+0.1的增量處理兩個(gè)流程。

先說T+1的存量處理部分,目前瓜子將所有mysql表通過Maxwell插件放到Kafka中,再通過Kafka connect寫到Hbase里,Hbase每天晚上做一次snapshot(快照),寫到hive中,然后經(jīng)過多輪ETL:DWB-->DWD-->DW-->DM,最后將DM層數(shù)據(jù)導(dǎo)入Kudu中,對(duì)外提供BI分析服務(wù),當(dāng)然離線olap分析還是通過presto直接訪問Hive查詢。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

再說T+0.1的增量流程,同T+1一樣,數(shù)據(jù)通過maxwell進(jìn)入Kafka,這部分流程共用,然后增量數(shù)據(jù)會(huì)實(shí)時(shí)通過kudu的插件寫入kudu中,再通過impala做ETL,生成數(shù)據(jù)對(duì)外提供T+0.1的查詢,對(duì)外提供的查詢是通過另一套impala來做的。 未來我們還會(huì)考慮通過Flink直接讀取Kafka中數(shù)據(jù)來做實(shí)時(shí)ETL,提高實(shí)時(shí)性。

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

這是我們數(shù)倉架構(gòu)的整體架構(gòu)圖

DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。


文章題目:DataPipeline丨瓜子二手車基于Kafka的結(jié)構(gòu)化數(shù)據(jù)流-創(chuàng)新互聯(lián)
當(dāng)前鏈接:http://weahome.cn/article/cojcij.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部