真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

flinkstreamingsql怎么使用

本篇內(nèi)容主要講解“flink streaming sql怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“flink streaming sql怎么使用”吧!

我們提供的服務(wù)有:網(wǎng)站設(shè)計(jì)、成都網(wǎng)站設(shè)計(jì)、微信公眾號(hào)開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、阿爾山ssl等。為成百上千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的阿爾山網(wǎng)站制作公司

背景

SQL,Structured Query Language:結(jié)構(gòu)化查詢語言,作為一個(gè)通用、流行的查詢語言,不僅僅是在傳統(tǒng)的數(shù)據(jù)庫,在大數(shù)據(jù)領(lǐng)域也變得越來越流行,hive、spark、kafka、flink等大數(shù)據(jù)組件都支持sql的查詢,使用sql可以讓一些不懂這些組件原理的人,輕松的來操作,大大的降低了使用的門檻,今天我們先來簡單的講講在flink的流處理中如何使用sql. 

實(shí)例講解 

構(gòu)造StreamTableEnvironment對象

在flink的流處理中,要使用sql,需要首先構(gòu)造一個(gè)StreamTableEnvironment對象,方法比較簡單。

sql中用到的catalog、table、function等都需要注冊到StreamTableEnvironment才能使用。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
   

注冊table

接下來要將相應(yīng)的表的信息注冊到StreamTableEnvironment對象中,有以下幾種方式可以選擇.
以下的代碼是基于flink 1.10.0版本進(jìn)行講解的,各個(gè)版本略有不同。 

使用Tuple


  //使用flink的二元組,這個(gè)時(shí)候需要自定義字段名稱
  Tuple2 tuple2 = Tuple2.of("jack", 10);
  //構(gòu)造一個(gè)Tuple的DataStream
  DataStream> tupleStream = env.fromElements(tuple2);
//  注冊到StreamTableEnvironment,并且指定對應(yīng)的字段名
  tableEnv.createTemporaryView("usersTuple", tupleStream, "name,age");
  //執(zhí)行一個(gè)sql 查詢. 然后返回一個(gè)table對象
  Table table = tableEnv.sqlQuery("select name,age from usersTuple");
//  將table對象轉(zhuǎn)成flink的DataStream,以便后續(xù)操作,我們這里將其輸出
  tableEnv.toAppendStream(table, Row.class).print();

使用Row

flink中提供的元組Tuple是有限制的,最多到Tuple25,所以如果我們有更多的字段,可以選擇使用flink中的Row對象.


 //使用Row
  Row row = new Row(2);
  row.setField(0, "zhangsan");
  row.setField(1, 20);
  DataStream rowDataStream = env.fromElements(row);
  tableEnv.createTemporaryView("usersRow", rowDataStream, "name,age");
  Table tableRow = tableEnv.sqlQuery("select name,age from usersRow");
  tableEnv.toAppendStream(tableRow, Row.class).print();
   

使用java的Pojo類

首先定一個(gè)pojo類


 public static class User{
  private String name;
  private int age;

  public String getName(){
   return name;
  }

  public void setName(String name){
   this.name = name;
  }

  public int getAge(){
   return age;
  }

  public void setAge(int age){
   this.age = age;
  }
 }
 

定義這個(gè)pojo類是要符合flink的序列化規(guī)則,是有一定要求的,具體的可以參考【1】:

  1. 該類是public類型并且沒有非靜態(tài)內(nèi)部類
  2. 該類擁有公有的無參構(gòu)造器
  3. 類(以及所有超類)中的所有非靜態(tài)、非 transient 字段都是公有的(非 final 的);或者遵循 Java bean 規(guī)則,字段是private的,但是具有public類型的 getter 和 setter 方法

User user = new User();
  user.setName("Tom");
  user.setAge(20);
  DataStream userDataStream = env.fromElements(user);
  tableEnv.createTemporaryView("usersPojo", userDataStream);
  Table tablePojo = tableEnv.sqlQuery("select name,age from usersPojo");
  tableEnv.toAppendStream(tablePojo, Row.class).print();

 

如果使用的是java pojo類型的DataStream,就不用聲明字段名稱了,flink會(huì)自動(dòng)解析pojo類中的字段名稱和類型來作為table的字段和類型。 

使用外部存儲(chǔ)


  //連接外部系統(tǒng),比如文件,kafka等
  Schema schema = new Schema()
    .field("name", DataTypes.STRING())
    .field("age", DataTypes.INT());
  tableEnv.connect(new FileSystem().path("...."))
          .withFormat(new Csv())
          .withSchema(schema)
          .createTemporaryTable("usersFile");
  Table tableFile = tableEnv.sqlQuery("select name,age from usersFile");
  tableEnv.toAppendStream(tableFile, Row.class).print();
 

使用外部存儲(chǔ)的時(shí)候需要指定以下對象:

  1. tableEnv.connect(ConnectorDescriptor ...) 指定連接符,目前flink支持Elasticsearch、hbase、kafka、filesystem這幾類
  2. withFormat(FormatDescriptor format) 這個(gè)就是指定我們從上述數(shù)據(jù)源讀取的數(shù)據(jù)的格式,比如json、csv、parquet等等
  3. .withSchema(Schema schema) 給我們的table定義一個(gè)schema,也就是字段的名稱和類型,用于sql查詢
  4. .createTemporaryTable("usersFile") 給表起一個(gè)名字,并且注冊到StreamTableEnvironment中

到此,相信大家對“flink streaming sql怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


當(dāng)前名稱:flinkstreamingsql怎么使用
當(dāng)前地址:http://weahome.cn/article/iedjho.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部