本篇文章給大家分享的是有關(guān)Flink1.8中如何進(jìn)行流處理SocketWordCount,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
成都創(chuàng)新互聯(lián)是一家專注于成都做網(wǎng)站、成都網(wǎng)站建設(shè)與策劃設(shè)計(jì),莆田網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)10余年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:莆田等地區(qū)。莆田做網(wǎng)站價(jià)格咨詢:13518219792
概述:
這里主要演示flink源碼實(shí)例中“WordCount”程序的流窗口版本。
此程序連接到socket服務(wù)器并從socket讀取字符串。最簡(jiǎn)單的嘗試方法是打開一個(gè)文本服務(wù)器(在端口9999),使用netcat工具
我這里也貼一下:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hadoop.ljs.flink.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
*
This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the netcat tool via
*
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
*
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
/*hostname = "10.124.165.98";
port = 9999;*/
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWordCount " +
"--hostname --port ', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l ' and " +
"type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream windowCounts = text
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/**
* Data type for words with count.
*/
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
通過maven package打出jar包:flink191-1.0-SNAPSHOT-jar-with-dependencies
直接提交到flink在yarn中已啟動(dòng)的一個(gè)session中,從flink界面上傳jar:
上傳后,選中jar前面的復(fù)選框,可直接填寫相關(guān)參數(shù):
參數(shù)格式:--參數(shù)名 參數(shù)值 --參數(shù)名2 參數(shù)值2
參數(shù)獲取是通過上面代碼第49行的工具類獲?。ü潭ǜ袷剑?/p>
ParameterTool params = ParameterTool.fromArgs(args);
最后點(diǎn)擊“Submit”按鈕,提交任務(wù)運(yùn)行即可。
界面也可查看日志和輸出:
以上就是Flink1.8中如何進(jìn)行流處理SocketWordCount,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。