Kafka所支持的分區(qū)策略:
1- 粘性分區(qū)策略(2.4版本下, 支持輪詢策略) ?Java客戶端支持, 但是Python客戶端不支持
2- hash取模的策略
3- 指定分區(qū)策略
4- 隨機分區(qū)策略: Python客戶端是支持的 Java不支持
5- 自定義分區(qū)策略?
1- 指定分區(qū)策略
public ProducerRecord(String topic, Integer partition, K key, V value) {
? this(topic, partition, null, key, value, null);
}
在生產(chǎn)端, 構(gòu)建數(shù)據(jù)承載對象的時候, 采用此構(gòu)造, 即可采用指定分區(qū)的策略
分區(qū)的編號從0開始的
注意: 指定分區(qū)與defaultPartitioner類沒有任何的關(guān)系
2- Hash 取模策略
2.1 創(chuàng)建數(shù)據(jù)承載對象的時候, 必須使用僅傳遞 k v的構(gòu)造方法, 即可使用Hash方式
public ProducerRecord(String topic, K key, V value) {
? this(topic, null, null, key, value, null);
}
注意: 當執(zhí)行Hash取模分區(qū)策略, 其底層是通過一個默認的分區(qū)類來完成Hash取模計算: DefaultPartitioner(默認分區(qū)類)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
? if (keyBytes == null) {
? return stickyPartitionCache.partition(topic, cluster);
? }?
? // 當有key的時候, 采用Hash取模的方式
? List
? int numPartitions = partitions.size();
? // hash the keyBytes to choose a partition
? return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
? }
?
說明: 在使用此種分發(fā)策略的時候, Key值一定是可變的, 千萬不要固定不變
3- 粘性分區(qū)策略
3.1 創(chuàng)建數(shù)據(jù)承載對象的時候,只需要傳遞value即可, 此時采用的粘性的分區(qū)策略
public ProducerRecord(String topic, V value) {
? this(topic, null, null, null, value, null);
}
注意: 當執(zhí)行Hash取模分區(qū)策略, 其底層是通過一個默認的分區(qū)類來完成Hash取模計算: DefaultPartitioner(默認分區(qū)類)
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
? if (keyBytes == null) {
? ?// 當key為null的時候, 采用的分發(fā)的方式為stickyPartition(粘性分區(qū))
? return stickyPartitionCache.partition(topic, cluster);
? }?
? // 當有key的時候, 采用Hash取模的方式
? List
? int numPartitions = partitions.size();
? // hash the keyBytes to choose a partition
? return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
? }
4- 自定義分區(qū)策略
如何自定義分區(qū)呢? 抄 ?抄DefaultPartitioner
1- 創(chuàng)建一個類, 實現(xiàn) Partitioner 接口
2- 重寫接口中方法: partition() ?和 close方法, 主要是重寫partition()?
?partition方法的參數(shù)列表說明:
??? ?String topic: 指定要寫入到那個topic上
??? ?Object key : 表示 key值?
??? ?byte[] keyBytes: ?表示 key的字節(jié)數(shù)組
??? ?Object value: 表示value的值
??? ?byte[] valueBytes: 表示value的字節(jié)數(shù)組?
??? ?Cluster cluster : 集群對象 可以幫助獲取某個topic有多少個分片
3) 將自定義的分區(qū)類, 配置到生產(chǎn)者的配置信息中:?
?key: partitioner.class
?value值:?
??? ?默認值:org.apache.kafka.clients.producer.internals.DefaultPartitioner
??? ?修改為我們自己寫的類即可
?
?將其放置到生產(chǎn)者的properties中
# 博學谷IT 技術(shù)支持
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧