本篇內(nèi)容介紹了“flink將數(shù)據(jù)錄入數(shù)據(jù)庫”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
創(chuàng)新互聯(lián)主營濟陽網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,APP應(yīng)用開發(fā),濟陽h5成都小程序開發(fā)搭建,濟陽網(wǎng)站營銷推廣歡迎濟陽等地區(qū)企業(yè)咨詢//主類 package flink.streaming import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.CheckpointingMode object StreamingTest { def main(args: Array[String]): Unit = { val kafkaProps = new Properties() //kafka的一些屬性 kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092") //所在的消費組 kafkaProps.setProperty("group.id", "group2") //獲取當前的執(zhí)行環(huán)境 val evn = StreamExecutionEnvironment.getExecutionEnvironment //evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //kafka的consumer,test1是要消費的topic val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps) //kafkaSource.assignTimestampsAndWatermarks(assigner) //設(shè)置從最新的offset開始消費 //kafkaSource.setStartFromGroupOffsets() kafkaSource.setStartFromLatest() //自動提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //flink的checkpoint的時間間隔 //evn.enableCheckpointing(2000) //添加consumer val stream = evn.addSource(kafkaSource) evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE) //stream.setParallelism(3) val text = stream.flatMap{ _.toLowerCase().split(" ")filter { _.nonEmpty} } .map{(_,1)} .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .map(x=>{(x._1,(new Integer(x._2)))}) //text.print() //啟動執(zhí)行 text.addSink(new Ssinks()) evn.execute("kafkawd") } }
//自定義sink package flink.streaming import java.sql.Connection import java.sql.PreparedStatement import java.sql.DriverManager import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.configuration.Configuration class Ssinks extends RichSinkFunction[(String,Integer)]{ var conn:Connection=_; var pres:PreparedStatement = _; var username = "root"; var password = "123456"; var dburl = "jdbc:mysql://192.168.6.132:3306/hgs?useUnicode=true&characterEncoding=utf-8&useSSL=false"; var sql = "insert into words(word,count) values(?,?)"; override def invoke(value:(String, Integer) ) { pres.setString(1, value._1); pres.setInt(2,value._2); pres.executeUpdate(); System.out.println("values :" +value._1+"--"+value._2); } override def open( parameters:Configuration) { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(dburl, username, password); pres = conn.prepareStatement(sql); super.close() } override def close() { pres.close(); conn.close(); } }
“flink將數(shù)據(jù)錄入數(shù)據(jù)庫”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!