真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎樣使用ApacheFlink中的TableSQLAPIx

本篇文章為大家展示了怎樣使用Apache Flink中的Table SQL APIx,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、小程序定制開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了微山免費(fèi)建站歡迎大家使用!

什么是Flink關(guān)系型API?

雖然Flink已經(jīng)支持了DataSet和DataStream API,但是有沒(méi)有一種更好的方式去編程,而不用關(guān)心具體的API實(shí)現(xiàn)?不需要去了解Java和Scala的具體實(shí)現(xiàn)。

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.

Flink提供了三層API,每一層API提供了一個(gè)在簡(jiǎn)潔性和表達(dá)力之間的權(quán)衡 。

怎樣使用Apache Flink中的Table SQL APIx

最低層是一個(gè)有狀態(tài)的事件驅(qū)動(dòng)。在這一層進(jìn)行開發(fā)是非常麻煩的。

雖然很多功能基于DataSet和DataStreamAPI是可以完成的,需要熟悉這兩套API,而且必須要熟悉Java和Scala,這是有一定的難度的。一個(gè)框架如果在使用的過(guò)程中沒(méi)法使用SQL來(lái)處理,那么這個(gè)框架就有很大的限制。雖然對(duì)于開發(fā)人員無(wú)所謂,但是對(duì)于用戶來(lái)說(shuō)卻不顯示。因此SQL是非常面向大眾語(yǔ)言。

好比MapReduce使用Hive SQL,Spark使用Spark SQL,F(xiàn)link使用Flink SQL。

雖然Flink支持批處理/流處理,那么如何做到API層面的統(tǒng)一?

這樣Table和SQL應(yīng)運(yùn)而生。

這其實(shí)就是一個(gè)關(guān)系型API,操作起來(lái)如同操作MySQL一樣簡(jiǎn)單。

Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. 

Apache Flink通過(guò)使用Table API和SQL 兩大特性,來(lái)統(tǒng)一批處理和流處理。 Table API是一個(gè)查詢API,集成了Scala和Java語(yǔ)言,并且允許使用select filter join等操作。

使用Table SQL API需要額外依賴

java:

        
            org.apache.flink
            flink-streaming-scala_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.11
            ${flink.version}
        

scala:

        
            org.apache.flink
            flink-table-planner_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-scala-bridge_2.11
            ${flink.version}
        

使用Table SQL API編程

首先導(dǎo)入上面的依賴,然后讀取sales.csv文件,文件內(nèi)容如下:

transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0
116,4,2,500.0
117,1,2,500.0
118,1,2,500.0
119,1,3,500.0
120,1,2,500.0
121,2,4,500.0
122,1,2,500.0
123,1,4,500.0
124,1,2,500.0

Scala

object TableSQLAPI {

  def main(args: Array[String]): Unit = {
    val bEnv = ExecutionEnvironment.getExecutionEnvironment
    val bTableEnv = BatchTableEnvironment.create(bEnv)
    val filePath="E:/test/sales.csv"
    // 已經(jīng)拿到DataSet
    val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)
    // DataSet => Table
  }

  case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double
                     )
}

首先拿到DataSet,接下來(lái)將DataSet轉(zhuǎn)為Table,然后就可以執(zhí)行SQL了

    // DataSet => Table
    val salesTable = bTableEnv.fromDataSet(csv)
    // 注冊(cè)成Table  Table => table
    bTableEnv.registerTable("sales", salesTable)
    // sql
    val resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")
    bTableEnv.toDataSet[Row](resultTable).print()

輸出結(jié)果如下:

4,500.0
3,500.0
1,4110.0
2,1605.0

這種方式只需要使用SQL就可以實(shí)現(xiàn)之前寫mapreduce的功能。大大方便了開發(fā)過(guò)程。

Java

package com.vincent.course06;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class JavaTableSQLAPI {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
        DataSource salesDataSource = bEnv.readCsvFile("E:/test/sales.csv").ignoreFirstLine().
                pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid");
        Table sales = bTableEnv.fromDataSet(salesDataSource);
        bTableEnv.registerTable("sales", sales);
        Table resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");
        DataSet rowDataSet = bTableEnv.toDataSet(resultTable, Row.class);
        rowDataSet.print();
    }

    public static class Sales {
        public String transactionId;
        public String customerId;
        public String itemId;
        public Double amountPaid;

        @Override
        public String toString() {
            return "Sales{" +
                    "transactionId='" + transactionId + '\'' +
                    ", customerId='" + customerId + '\'' +
                    ", itemId='" + itemId + '\'' +
                    ", amountPaid=" + amountPaid +
                    '}';
        }
    }
}

上述內(nèi)容就是怎樣使用Apache Flink中的Table SQL APIx,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


新聞名稱:怎樣使用ApacheFlink中的TableSQLAPIx
文章來(lái)源:http://weahome.cn/article/jgpchh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部