小編給大家分享一下python連接kafka的方法,希望大家閱讀完這篇文章后大所收獲,下面讓我們一起去探討方法吧!
創(chuàng)新互聯(lián)建站服務(wù)項(xiàng)目包括深圳網(wǎng)站建設(shè)、深圳網(wǎng)站制作、深圳網(wǎng)頁制作以及深圳網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,深圳網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到深圳省份的部分城市,未來相信會繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
1、kafka-python安裝:
# PyPI安裝 pip install kafka-python # conda安裝 conda install -c conda-forge kafka-python # anaconda自帶pip安裝 /root/anaconda3/bin/pip install kafka-python
2、kafka-python生產(chǎn)者
producer.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime import json import time import uuid from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092') topic = 'test_20181105' def test(): print('begin') try: n = 0 while True: dic = {} dic['id'] = n n = n + 1 dic['myuuid'] = str(uuid.uuid4().hex) dic['time'] = datetime.datetime.now().strftime("%Y%m%d %H:%M:%S") producer.send(topic, json.dumps(dic).encode()) print("send:" + json.dumps(dic)) time.sleep(0.5) except KafkaError as e: print(e) finally: producer.close() print('done') if __name__ == '__main__': test()
服務(wù)器集群中配置好Kafka, 修改上面程序中的ip地址和端口號, 執(zhí)行python腳本就可以成功將消息發(fā)送到 topic: test_20181105
send:{"id": 1411, "myuuid": "a25a3d0361f94d3b8fffd5967ab5df01", "time": "20181105 16:11:14"} send:{"id": 1412, "myuuid": "784efd5389564194941240dca66233b6", "time": "20181105 16:11:14"} send:{"id": 1413, "myuuid": "6a211195319e447aa559614662f70590", "time": "20181105 16:11:15"} send:{"id": 1414, "myuuid": "2cc45bd82baf4a1cb41ea4786e50a0df", "time": "20181105 16:11:15"} send:{"id": 1415, "myuuid": "b7dfed4919c74164b83cf3ec28e257b6", "time": "20181105 16:11:16"} send:{"id": 1416, "myuuid": "9218eceb17834c228f5ab01ca7595272", "time": "20181105 16:11:16"} send:{"id": 1417, "myuuid": "c2751c54c390453f9eedd417fb1e5a31", "time": "20181105 16:11:17"} send:{"id": 1418, "myuuid": "9bbc4ef2cfbb42148332eb979b1142cb", "time": "20181105 16:11:17"} send:{"id": 1419, "myuuid": "f4998a862494445c976137793b55ed73", "time": "20181105 16:11:18"}
3、kafka-python消費(fèi)者
consumer.py
#!/bin/env python from kafka import KafkaConsumer # connect to Kafka server and pass the topic we want to consume consumer = KafkaConsumer('test_20181105',group_id = 'test_group2', bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092') try: for msg in consumer: print(msg) # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)) except KeyboardInterrupt as e: print(e)
同樣修改上面的Ip地址和端口號,就可以接收 topic: test_20181105上的消息:
ConsumerRecord(topic='test_20181105', partition=1, offset=951, timestamp=1541405600340, timestamp_type=0, key=None, value=b'{"id": 1663, "myuuid": "0f744021b2d9468886908ee6685a0fdb", "time": "20181105 16:13:20"}', checksum=1357895145, serialized_key_size=-1, serialized_value_size=87) ConsumerRecord(topic='test_20181105', partition=0, offset=935, timestamp=1541405600841, timestamp_type=0, key=None, value=b'{"id": 1664, "myuuid": "9379f68f656644bdb2d30911f06240e4", "time": "20181105 16:13:20"}', checksum=-715594646, serialized_key_size=-1, serialized_value_size=87) ConsumerRecord(topic='test_20181105', partition=1, offset=952, timestamp=1541405601341, timestamp_type=0, key=None, value=b'{"id": 1665, "myuuid": "f4a5fa5b32cd4b7991612b626bea4b0e", "time": "20181105 16:13:21"}', checksum=-2068072013, serialized_key_size=-1, serialized_value_size=87)
可以通過設(shè)置不同的group_id 來實(shí)現(xiàn)消息隊(duì)列或消息訂閱:
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間 看完了這篇文章,相信你對python連接kafka的方法有了一定的了解,想了解更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
網(wǎng)頁名稱:python連接kafka的方法
鏈接URL:http://weahome.cn/article/pojcss.html