這篇文章主要介紹“向kafka集群發(fā)布記錄的kafka客戶端怎么實現(xiàn)”,在日常操作中,相信很多人在向kafka集群發(fā)布記錄的kafka客戶端怎么實現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”向kafka集群發(fā)布記錄的kafka客戶端怎么實現(xiàn)”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
10年積累的網(wǎng)站制作、成都網(wǎng)站設計經(jīng)驗,可以快速應對客戶對網(wǎng)站的新想法和需求。提供各種問題對應的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡服務。我雖然不認識你,你也不認識我。但先網(wǎng)站制作后付款的網(wǎng)站建設流程,更有昭陽免費網(wǎng)站建設讓你可以放心的選擇與我們合作。
生產(chǎn)者是線程安全的,而且,多線程共享同一個producer實例通常比多個producer實例更快。
這里是一個簡單的例子,使用producer發(fā)送字符串數(shù)據(jù),包含key和value。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
一個producer由幾部分組成:1、一個buff poll,保存尚未發(fā)送的數(shù)據(jù);2、一個后臺運行的I/O線程,負責執(zhí)行數(shù)據(jù)發(fā)送。producer使用完畢后,務必執(zhí)行close操作,否則將會造成資源的泄漏。
send()方法是異步的。當調(diào)用它時,它將記錄添加到緩沖區(qū)中,并立即返回。這使得producer能夠批量的執(zhí)行數(shù)據(jù)的生產(chǎn)。
acks有3個可能的值,0:客戶端不必等待任何的server響應;1:leader of partition將會在把數(shù)據(jù)寫入自己的log之后,響應客戶端,而不必等待其他的follower完成同步的操作;all:leader和follower全部完成log寫入操作。服務器才會響應客戶端。相比之下,all最慢但是可靠性更好。
如果請求失敗,生產(chǎn)者可以自動重試,但是我們已經(jīng)設置retries = 0,那么重試將不會發(fā)生。如果我們開啟了重試,可能會出現(xiàn)重復記錄的問題。
producer保持每個partition的未發(fā)送數(shù)據(jù)的緩沖區(qū)。這些緩沖的大小由batch.size配置指定。如果增大這個配置,可以一次性執(zhí)行更大的批量操作,但需要更多的內(nèi)存(因為我們通常會有一個緩沖區(qū)為每個partition)。
默認情況下,緩沖區(qū)可以立即發(fā)送,即使在緩沖區(qū)中有額外的未使用的空間。但是如果你想減少請求的數(shù)量,可以設置linger.ms > 0。producer會等待一段時間(單位是毫秒)之后在進行發(fā)送,以期獲得更大的批量操作。例如,在上面的代碼片段,設置linger.ms = 1, 可能會有100條記錄被批量發(fā)送。但是,如果在1毫秒的時間內(nèi),沒有跟多的數(shù)據(jù)到達緩沖區(qū),那么這1毫秒的等待僅僅是增加了延遲,而沒有達到任何正面的效果。需要注意的是,如果在短時間內(nèi),大量的數(shù)據(jù)到達緩沖區(qū),即使 linger.ms = 0 ,仍然會發(fā)生批量操作。
buffer.memory控制提供給producer的緩沖內(nèi)存總量,如果該緩沖區(qū)的寫入速率長時間大于輸出速率,那么這個緩沖區(qū)將耗盡。當緩沖區(qū)耗盡后,額外的發(fā)送調(diào)用將被阻塞。阻塞一段時間之后(max.block.ms
),將會拋出一個TimeoutException。
key.serializer
和 value.serializer負責把record當中key和value
分別轉(zhuǎn)換為byte數(shù)組,kafka提供了一組簡單的序列化class。
到此,關于“向kafka集群發(fā)布記錄的kafka客戶端怎么實現(xiàn)”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
網(wǎng)站標題:向kafka集群發(fā)布記錄的kafka客戶端怎么實現(xiàn)
鏈接URL:http://weahome.cn/article/ieiosg.html