今天就跟大家聊聊有關(guān)Java中的Kafka 怎么利用API進行調(diào)用,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供河曲網(wǎng)站建設(shè)、河曲做網(wǎng)站、河曲網(wǎng)站設(shè)計、河曲網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、河曲企業(yè)網(wǎng)站模板建站服務(wù),10多年河曲做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。
1.客戶端創(chuàng)建對應(yīng)協(xié)議的請求
2.客戶端發(fā)送請求給對應(yīng)的broker
3.broker處理請求,并發(fā)送response給客戶端
雖然Kafka提供的大量的腳本工具用于各種功能的實現(xiàn),但很多時候我們還是希望可以把某些功能以編程的方式嵌入到另一個系統(tǒng)中。這時使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個范例,同時也會針對“創(chuàng)建topic”和“查看位移”這兩個主要功能給出對應(yīng)的例子。 需要提前說明的是,本文給出的范例并沒有考慮Kafka集群開啟安全的情況。另外Kafka的KIP4應(yīng)該一直在優(yōu)化命令行工具以及各種管理操作,有興趣的讀者可以關(guān)注這個KIP。
本文中用到的API依賴于kafka-clients,所以如果你使用Maven構(gòu)建的話,請加上:
org.apache.kafka kafka-clients 0.10.2.0
如果是gradle,請加上:
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'
底層框架
/** * 發(fā)送請求主方法 * @param host 目標broker的主機名 * @param port 目標broker的端口 * @param request 請求對象 * @param apiKey 請求類型 * @return 序列化后的response * @throws IOException */ public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException { Socket socket = connect(host, port); try { return send(request, apiKey, socket); } finally { socket.close(); } } /** * 發(fā)送序列化請求并等待response返回 * @param socket 連向目標broker的socket * @param request 序列化后的請求 * @return 序列化后的response * @throws IOException */ private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException { sendRequest(socket, request); return getResponse(socket); } /** * 發(fā)送序列化請求給socket * @param socket 連向目標broker的socket * @param request 序列化后的請求 * @throws IOException */ private void sendRequest(Socket socket, byte[] request) throws IOException { DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); dos.writeInt(request.length); dos.write(request); dos.flush(); } /** * 從給定socket處獲取response * @param socket 連向目標broker的socket * @return 獲取到的序列化后的response * @throws IOException */ private byte[] getResponse(Socket socket) throws IOException { DataInputStream dis = null; try { dis = new DataInputStream(socket.getInputStream()); byte[] response = new byte[dis.readInt()]; dis.readFully(response); return response; } finally { if (dis != null) { dis.close(); } } } /** * 創(chuàng)建Socket連接 * @param hostName 目標broker主機名 * @param port 目標broker服務(wù)端口, 比如9092 * @return 創(chuàng)建的Socket連接 * @throws IOException */ private Socket connect(String hostName, int port) throws IOException { return new Socket(hostName, port); } /** * 向給定socket發(fā)送請求 * @param request 請求對象 * @param apiKey 請求類型, 即屬于哪種請求 * @param socket 連向目標broker的socket * @return 序列化后的response * @throws IOException */ private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException { RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0); ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf()); header.writeTo(buffer); request.writeTo(buffer); byte[] serializedRequest = buffer.array(); byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest); ByteBuffer responseBuffer = ByteBuffer.wrap(response); ResponseHeader.parse(responseBuffer); return responseBuffer; }
有了這些方法的鋪墊,我們就可以創(chuàng)建具體的請求了。
創(chuàng)建topic
/** * 創(chuàng)建topic * 由于只是樣例代碼,有些東西就硬編碼寫到程序里面了(比如主機名和端口),各位看官自行修改即可 * @param topicName topic名 * @param partitions 分區(qū)數(shù) * @param replicationFactor 副本數(shù) * @throws IOException */ public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException { Maptopics = new HashMap<>(); // 插入多個元素便可同時創(chuàng)建多個topic topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor)); int creationTimeoutMs = 60000; CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build(); ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS); CreateTopicsResponse.parse(response, request.version()); }
查看位移
/** * 獲取某個consumer group下的某個topic分區(qū)的位移 * @param groupID group id * @param topic topic名 * @param parititon 分區(qū)號 * @throws IOException */ public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException { TopicPartition tp = new TopicPartition(topic, parititon); OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp)) .setVersion((short)2).build(); ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH); OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version()); OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp); System.out.println(partitionData.offset); }
/** * 獲取某個consumer group下所有topic分區(qū)的位移信息 * @param groupID group id * @return (topic分區(qū) --> 分區(qū)信息)的map * @throws IOException */ public MapgetAllOffsetsForGroup(String groupID) throws IOException { OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build(); ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH); OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version()); return resp.responseData(); }
okay, 上面就是“創(chuàng)建topic”和“查看位移”的樣例代碼,各位看官可以參考著這兩個例子構(gòu)建其他類型的請求。
看完上述內(nèi)容,你們對Java中的Kafka 怎么利用API進行調(diào)用有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。