真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Kafka消息序列化和反序列化(上)

Kafka Producer在發(fā)送消息時(shí)必須配置的參數(shù)為:bootstrap.servers、key.serializer、value.serializer。序列化操作是在攔截器(Interceptor)執(zhí)行之后并且在分配分區(qū)(partitions)之前執(zhí)行的。

創(chuàng)新互聯(lián)建站是專業(yè)的雙陽網(wǎng)站建設(shè)公司,雙陽接單;提供成都做網(wǎng)站、網(wǎng)站制作,網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行雙陽網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

首先我們通過一段示例代碼來看下普通情況下Kafka Producer如何編寫:

public class ProducerJavaDemo {
    public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("client.id", "hidden-producer-client-id-1");
        properties.put("bootstrap.servers", brokerList);

        Producer producer = new KafkaProducer(properties);

        while (true) {
            String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";
            ProducerRecord producerRecord = new ProducerRecord(topic,message);
            try {
                Future future =  producer.send(producerRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.print(metadata.offset()+"    ");
                        System.out.print(metadata.topic()+"    ");
                        System.out.println(metadata.partition());
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

這里采用的客戶端不是0.8.x.x時(shí)代的Scala版本,而是Java編寫的新Kafka Producer, 相應(yīng)的Maven依賴如下:


    org.apache.kafka
    kafka-clients
    1.0.0

上面的程序中使用的是Kafka客戶端自帶的org.apache.kafka.common.serialization.StringSerializer,除了用于String類型的序列化器之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long這幾種類型,它們都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer接口,此接口有三種方法:

public void configure(Map configs, boolean isKey):用來配置當(dāng)前類。
public byte[] serialize(String topic, T data):用來執(zhí)行序列化。
public void close():用來關(guān)閉當(dāng)前序列化器。一般情況下這個(gè)方法都是個(gè)空方法,如果實(shí)現(xiàn)了此方法,必須確保此方法的冪等性,因?yàn)檫@個(gè)方法很可能會被KafkaProducer調(diào)用多次。
下面我們來看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具體實(shí)現(xiàn),源碼如下:

public class StringSerializer implements Serializer {
    private String encoding = "UTF8";

    @Override
    public void configure(Map configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

首先看下StringSerializer中的configure(Map)

public class Company {
    private String name;
    private String address;
    //省略Getter, Setter, Constructor & toString方法
}

接下去我們來實(shí)現(xiàn)Company類型的Serializer,即下面代碼示例中的DemoSerializer。

package com.hidden.client;

public class DemoSerializer implements Serializer {
    public void configure(Map configs, boolean isKey) {}
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }
    public void close() {}
}

使用時(shí)只需要在Kafka Producer的config中修改value.serializer屬性即可,示例如下:

properties.put("value.serializer", "com.hidden.client.DemoSerializer");
//記得也要將相應(yīng)的String類型改為Company類型,如:
//Producer producer = new KafkaProducer(properties);
//Company company = new Company();
//company.setName("hidden.cooperation-" + new Date().getTime());
//company.setAddress("Shanghai, China");
//ProducerRecord producerRecord = new ProducerRecord(topic,company);1234567

本文的重點(diǎn)是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識點(diǎn)高級進(jìn)階干貨,希望對想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

Kafka 消息序列化和反序列化(上)
Kafka 消息序列化和反序列化(上)
Kafka 消息序列化和反序列化(上)
Kafka 消息序列化和反序列化(上)


分享名稱:Kafka消息序列化和反序列化(上)
本文地址:http://weahome.cn/article/iepiij.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部