真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

利用flink統(tǒng)計(jì)消息回復(fù)情況

其中用到了滑動(dòng)窗口函數(shù)大小30秒,間隔15秒,且大于窗口10秒的數(shù)據(jù),被丟棄。(實(shí)際業(yè)務(wù)這三個(gè)值 應(yīng)為是 10 分鐘,1分鐘,5分鐘)。代碼先記錄一下

成都創(chuàng)新互聯(lián)一直通過網(wǎng)站建設(shè)和網(wǎng)站營銷幫助企業(yè)獲得更多客戶資源。 以"深度挖掘,量身打造,注重實(shí)效"的一站式服務(wù),以成都網(wǎng)站建設(shè)、網(wǎng)站制作、移動(dòng)互聯(lián)產(chǎn)品、營銷型網(wǎng)站建設(shè)服務(wù)為核心業(yè)務(wù)。十年網(wǎng)站制作的經(jīng)驗(yàn),使用新網(wǎng)站建設(shè)技術(shù),全新開發(fā)出的標(biāo)準(zhǔn)網(wǎng)站,不但價(jià)格便宜而且實(shí)用、靈活,特別適合中小公司網(wǎng)站制作。網(wǎng)站管理系統(tǒng)簡單易用,維護(hù)方便,您可以完全操作網(wǎng)站資料,是中小公司快速網(wǎng)站建設(shè)的選擇。

public static void main(String[] arg) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableSysoutLogging();//開啟Sysout打日志
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設(shè)置窗口的時(shí)間單位為process time

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafkaip:9092");
        props.put("group.id", "metric-group4");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); //value 反序列化

        DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
                "im-message-topic3",  //kafka topic
                new SimpleStringSchema(),  // String 序列化
                props)).setParallelism(1);

        DataStream bean3DataStream = dataStreamSource.map(new MapFunction() {         
            @Override
            public Message map(String value) throws Exception {
                 logger.info("receive msg:"+value); 
                 JSONObject jsonObject =JSONObject.parseObject(value);
                 Message s= new Message(
                         jsonObject.getString("sessionId"),
                         jsonObject.getString("fromUid"), 
                         jsonObject.getString("toUid"),
                         jsonObject.getString("chatType"),

                         jsonObject.getString("type"),
                         jsonObject.getString("msgId"),
                         jsonObject.getString("msg"),
                         jsonObject.getLong("timestampSend")

                         );
                 return s;
            }
        });

        //設(shè)置水印,并過濾數(shù)據(jù)
        DataStream bean3DataStreamWithAssignTime = 
                bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction() {  
                    @Override
                    public void apply(TimeWindow window, Iterable values, Collector out)
                            throws Exception {
                        for (Message t: values) {
                            logger.info("window start time:"+new Date(window.getStart()).toString());
                            logger.info("real time:"+new Date(t.getTimestampSend()).toString());
                            if(t.getTimestampSend() appendStream =tableEnv.toAppendStream(tb3, Row.class);
//        appendStream.addSink(new Sink());

    //對(duì)過濾后的數(shù)據(jù),使用正則匹配數(shù)據(jù)
        Table tb2 = tableEnv.sqlQuery(
                "SELECT " +
                        " * " +
                        "FROM myTable" +
                        " " +
                        "MATCH_RECOGNIZE ( " +
                        "PARTITION BY sessionId " +
                        "ORDER BY rowtime " +
                        "MEASURES " +
                        "e2.timestampSend as answerTime, "+
                        "LAST(e1.timestampSend) as customer_event_time, " +
                        "e2.fromUid as empUid, " +
                        "e1.timestampSend as askTime," +                      
                        "1 as total_talk " +          
                        "ONE ROW PER MATCH " +
                        "AFTER MATCH SKIP TO LAST e2 " +
                        "PATTERN (e1+ e2+?) " +
                        "DEFINE " +
                        "e1 as e1.type = 'yonghu', " +
                        "e2 as e2.type = 'guanjia' " +
                        ")"+
                        ""
                );

           DataStream appendStream2 =tableEnv.toAppendStream(tb2, Row.class);
           appendStream2.addSink(new Sink2());

           env.execute("msg v5");   

    }

    public static class TruckTimestamp extends AscendingTimestampExtractor {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractAscendingTimestamp(Message element) {
            return element.getTimestampSend();
        }
    }

     public static class Sink implements SinkFunction {
            /**
         * 
         */
        private static final long serialVersionUID = 1L;

            @Override
            public void invoke(Row value) throws Exception {
                System.out.println(new Date().toString()+"orinal time:"+value.toString());
            }
        }

     public static class Sink2 implements SinkFunction {
            /**
         * 
         */
        private static final long serialVersionUID = 1L;

            @Override
            public void invoke(Row value) throws Exception {
                System.out.println(new Date().toString()+"new time:"+value.toString());
            }
        }

本文題目:利用flink統(tǒng)計(jì)消息回復(fù)情況
本文地址:http://weahome.cn/article/igsdsj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部