這篇文章給大家分享的是有關(guān)Flink中Connectors如何連接RabbitMq的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。
創(chuàng)新互聯(lián)服務(wù)緊隨時(shí)代發(fā)展步伐,進(jìn)行技術(shù)革新和技術(shù)進(jìn)步,經(jīng)過(guò)十多年的發(fā)展和積累,已經(jīng)匯集了一批資深網(wǎng)站策劃師、設(shè)計(jì)師、專(zhuān)業(yè)的網(wǎng)站實(shí)施團(tuán)隊(duì)以及高素質(zhì)售后服務(wù)人員,并且完全形成了一套成熟的業(yè)務(wù)流程,能夠完全依照客戶(hù)要求對(duì)網(wǎng)站進(jìn)行成都網(wǎng)站制作、網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè)、建設(shè)、維護(hù)、更新和改版,實(shí)現(xiàn)客戶(hù)網(wǎng)站對(duì)外宣傳展示的首要目的,并為客戶(hù)企業(yè)品牌互聯(lián)網(wǎng)化提供全面的解決方案。
通過(guò)使用Flink DataStream Connectors 數(shù)據(jù)流連接器連接到RabbitMq消息隊(duì)列中間件,并提供數(shù)據(jù)流輸入與輸出操作;
示例環(huán)境
java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7
示例數(shù)據(jù)源 (項(xiàng)目碼云下載)
Flink 系例 之 搭建開(kāi)發(fā)環(huán)境與數(shù)據(jù)
示例模塊 (pom.xml)
Flink 系例 之 DataStream Connectors 與 示例模塊
數(shù)據(jù)流輸入
DataStreamSource.java
package com.flink.examples.rabbitmq; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; /** * @Description 從MQ中獲取數(shù)據(jù)并輸出到DataStream流中 */ public class DataStreamSource { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); final DataStreamstream = env .addSource(new RMQSource ( connectionConfig, "test", true, new SimpleStringSchema())) .setParallelism(1); stream.print(); env.execute("flink rabbitMq source"); } }
數(shù)據(jù)流輸出
DataStreamSink.java
package com.flink.examples.rabbitmq; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; /** * @Description 將DataStream流中的數(shù)據(jù)輸出到rabbitMq隊(duì)列中 */ public class DataStreamSink { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); String [] words = new String[]{"props","student","build","name","execute"}; final DataStreamstream = env.fromElements(words); stream.addSink(new RMQSink (connectionConfig,"test",new SimpleStringSchema())); env.execute("flink rabbitMq sink"); } }
數(shù)據(jù)展示
感謝各位的閱讀!關(guān)于“Flink中Connectors如何連接RabbitMq”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!