真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink中Connectors如何連接RabbitMq

這篇文章給大家分享的是有關(guān)Flink中Connectors如何連接RabbitMq的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。

創(chuàng)新互聯(lián)服務(wù)緊隨時(shí)代發(fā)展步伐,進(jìn)行技術(shù)革新和技術(shù)進(jìn)步,經(jīng)過(guò)十多年的發(fā)展和積累,已經(jīng)匯集了一批資深網(wǎng)站策劃師、設(shè)計(jì)師、專(zhuān)業(yè)的網(wǎng)站實(shí)施團(tuán)隊(duì)以及高素質(zhì)售后服務(wù)人員,并且完全形成了一套成熟的業(yè)務(wù)流程,能夠完全依照客戶(hù)要求對(duì)網(wǎng)站進(jìn)行成都網(wǎng)站制作、網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè)、建設(shè)、維護(hù)、更新和改版,實(shí)現(xiàn)客戶(hù)網(wǎng)站對(duì)外宣傳展示的首要目的,并為客戶(hù)企業(yè)品牌互聯(lián)網(wǎng)化提供全面的解決方案。

通過(guò)使用Flink DataStream Connectors 數(shù)據(jù)流連接器連接到RabbitMq消息隊(duì)列中間件,并提供數(shù)據(jù)流輸入與輸出操作;

示例環(huán)境

java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7

示例數(shù)據(jù)源 (項(xiàng)目碼云下載)

Flink 系例 之 搭建開(kāi)發(fā)環(huán)境與數(shù)據(jù)

示例模塊 (pom.xml)

Flink 系例 之 DataStream Connectors 與 示例模塊

數(shù)據(jù)流輸入

DataStreamSource.java

package com.flink.examples.rabbitmq;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * @Description 從MQ中獲取數(shù)據(jù)并輸出到DataStream流中
 */
public class DataStreamSource {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("127.0.0.1")
                .setPort(5672)
                .setUserName("admin")
                .setPassword("admin")
                .setVirtualHost("datastream")
                .build();

        final DataStream stream = env
                .addSource(new RMQSource( connectionConfig, "test", true, new SimpleStringSchema()))
                .setParallelism(1);

        stream.print();
        env.execute("flink rabbitMq source");
    }
}

數(shù)據(jù)流輸出

DataStreamSink.java

package com.flink.examples.rabbitmq;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * @Description 將DataStream流中的數(shù)據(jù)輸出到rabbitMq隊(duì)列中
 */
public class DataStreamSink {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("127.0.0.1")
                .setPort(5672)
                .setUserName("admin")
                .setPassword("admin")
                .setVirtualHost("datastream")
                .build();

        String [] words = new String[]{"props","student","build","name","execute"};
        final DataStream stream = env.fromElements(words);
        stream.addSink(new RMQSink(connectionConfig,"test",new SimpleStringSchema()));
        env.execute("flink rabbitMq sink");
    }
}

數(shù)據(jù)展示

Flink中Connectors如何連接RabbitMq

感謝各位的閱讀!關(guān)于“Flink中Connectors如何連接RabbitMq”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!


文章題目:Flink中Connectors如何連接RabbitMq
瀏覽地址:http://weahome.cn/article/jogdpe.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部