真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

flink的時間及時區(qū)問題怎樣解決

flink的時間及時區(qū)問題怎樣解決,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

武清網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、自適應(yīng)網(wǎng)站建設(shè)等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)2013年開創(chuàng)至今到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)。

1.時間紀元

所謂的”時間紀元”就是1970年1月1日0時0分0秒,指的是開始的時間。比如Java類代碼:

Date date = new Date(0);

System.out.println(date);

打印出來的結(jié)果:

Thu Jan 01 08:00:00 CST 1970

也是1970年1月1日,實際上時分秒是0點0分0秒,這里打印出來的時間是8點而非0點,原因是存在系統(tǒng)時間和本地時間的問題,其實系統(tǒng)時間依然是0點,只不過我們的電腦時區(qū)設(shè)置為東8區(qū),故打印的結(jié)果是8點。

只需要將時區(qū)設(shè)置為GMT+0,即可打印出0點0分0秒

System.setProperty("user.timezone","GMT+0");

實際上時區(qū)問題都是在此時間紀元基礎(chǔ)上加/減一定的offset。

2.Flink時間

說java紀元跟本文將的flink時間問題有啥關(guān)系呢?

Flink在使用時間的這個概念的時候就是基于時間紀元這個概念的。比如首先,我們的時區(qū)是東八區(qū),在我們的視野中UTC-0時間應(yīng)該加8小時的offset,才是我們看到的時間,所以在使用flink的窗口的時候往往比我們當前的時間少8小時。

還有flink的窗口對其,也是基于紀元時間的。比如下面的有三個窗口函數(shù)的例子

1).5min滾動窗口

14:16:391啟動的窗口,滾動窗口時間是5min,會發(fā)現(xiàn)并不是等待五分鐘之后才有結(jié)果輸出,而是到了14:20:00.0的時候就直接輸出結(jié)果了。

flink的時間及時區(qū)問題怎樣解決

2).30min滾動窗口

14:27:11啟動的滾動窗口,是在14:30:00的時候就直接輸出了,而不是等待半小時。

flink的時間及時區(qū)問題怎樣解決

3).1hour滾動窗口

15:54:48啟動的一小時的滾動窗口,輸出時間是16點整。

flink的時間及時區(qū)問題怎樣解決

時間上差了八小時,但是對齊是基于時間紀元的整數(shù)單位。

3.解決差八小時問題

實際在使用的時候flink輸出的時差很令人反感,但是沒辦法flink目前不支持配置時區(qū),但是blink支持,等待著合并吧。

其實,時區(qū)問題解決方案比較多吧,要想不傷筋動骨,主要介紹以下三種:

  1. flink端不做處理。也即是在讀取數(shù)據(jù)的時候加上8小時的offset。

  2. 使用udf等算子給時間戳加上8小時的offset。

  3. sink內(nèi)部做處理。

1).Udf實現(xiàn)

sink端處理

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.TimeZone;
public class UTC2Local extends ScalarFunction {    public Timestamp eval(Timestamp s) {        long timestamp = s.getTime() + 28800000;        return new Timestamp(timestamp);    }
}

注冊udf

  tEnv.registerFunction("utc2local",new UTC2Local());

使用udf

  Table table1 = tEnv.sqlQuery("select count(number),utc2local(TUMBLE_END(proctime, INTERVAL '1' HOUR)) from res group by TUMBLE(proctime, INTERVAL '1' HOUR)");

2). sink內(nèi)部支持

sink端的實現(xiàn)也比較簡單,主要是判斷輸出字段類型,然后加上8小時offset即可??梢詤⒖糱link的printtablesink的實現(xiàn)。

  override def invoke(in: JTuple2[JBool, Row]): Unit = {    val sb = new StringBuilder    val row = in.f1    for (i <- 0 to row.getArity - 1) {      if (i > 0) sb.append(",")      val f = row.getField(i)      if (f.isInstanceOf[Date]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd", tz))      } else if (f.isInstanceOf[Time]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss", tz))      } else if (f.isInstanceOf[Timestamp]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime,          "yyyy-MM-dd HH:mm:ss.SSS", tz))      } else {        sb.append(StringUtils.arrayAwareToString(f))      }    }
   if (in.f0) {      System.out.println(prefix + "(+)" + sb.toString())    } else {      System.out.println(prefix + "(-)" + sb.toString())    }  }

看完上述內(nèi)容,你們掌握flink的時間及時區(qū)問題怎樣解決的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!


文章標題:flink的時間及時區(qū)問題怎樣解決
當前URL:http://weahome.cn/article/pephsc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部