這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)Flink中怎么自定義redis的Sink函數(shù),文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新互聯(lián)公司專注于企業(yè)全網(wǎng)整合營(yíng)銷推廣、網(wǎng)站重做改版、肥城網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5網(wǎng)站設(shè)計(jì)、商城建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為肥城等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。
1.添加redis對(duì)應(yīng)pom依賴
org.apache.bahir flink-connector-redis_2.11 1.0
2.主函數(shù)代碼:
package com.hadoop.ljs.flink110.redis;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import scala.Tuple2;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-05-02 10:30
* @version: v1.0
* @description: com.hadoop.ljs.flink110.redis
*/
public class RedisSinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment();
DataStream
source = senv.socketTextStream("localhost", 9000); DataStream
filter = source.filter(new FilterFunction () { @Override
public boolean filter(String value) throws Exception {
if (null == value || value.split(",").length != 2) {
return false;
}
return true;
}
});
DataStream
> keyValue = filter.map(new MapFunction >() { @Override
public Tuple2
map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<>(split[0], split[1]);
}
});
//創(chuàng)建redis的配置 單機(jī)redis用FlinkJedisPoolConfig,集群redis需要用FlinkJedisClusterConfig
FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("worker2.hadoop.ljs").setPort(6379).setPassword("123456a?").build();
keyValue.addSink(new RedisSink
>(redisConf, new RedisMapper >() { @Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"table1");
}
@Override
public String getKeyFromData(Tuple2
data) { return data._1;
}
@Override
public String getValueFromData(Tuple2
data) { return data._2;
}
}));
/*啟動(dòng)執(zhí)行*/
senv.execute();
}
}
3.函數(shù)測(cè)試
1).window端scoket發(fā)送數(shù)據(jù)
2.redis結(jié)果驗(yàn)證
上述就是小編為大家分享的Flink中怎么自定義Redis的Sink函數(shù)了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。