真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

SparkStreaming+SparkSQL如何實(shí)現(xiàn)配置化ETL

本篇文章給大家分享的是有關(guān)Spark Streaming + Spark SQL如何實(shí)現(xiàn)配置化ETL,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

按需求定制制作可以根據(jù)自己的需求進(jìn)行定制,成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作構(gòu)思過程中功能建設(shè)理應(yīng)排到主要部位公司成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作的運(yùn)用實(shí)際效果公司網(wǎng)站制作網(wǎng)站建立與制做的實(shí)際意義

傳統(tǒng)的Spark Streaming程序需要:

  • 構(gòu)建StreamingContext

  • 設(shè)置checkpoint

  • 鏈接數(shù)據(jù)源

  • 各種transform

  • foreachRDD 輸出

通常而言,你可能會(huì)因?yàn)橐咄晟厦娴牧鞒潭鴺?gòu)建了一個(gè)很大的程序,比如一個(gè)main方法里上百行代碼,雖然在開發(fā)小功能上足夠便利,但是復(fù)用度更方面是不夠的,而且不利于協(xié)作,所以需要一個(gè)更高層的開發(fā)包提供支持。

如何開發(fā)一個(gè)Spark Streaming程序

我只要在配置文件添加如下一個(gè)job配置,就可以作為標(biāo)準(zhǔn)的的Spark Streaming 程序提交運(yùn)行:

{

  "test": {
    "desc": "測試",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",
        "params": [
          {
            "metadata.broker.list":"xxx",
            "auto.offset.reset":"largest",
            "topics":"xxx"
          }
        ]      },
      {
        "name": "streaming.core.compositor.spark.JSONTableCompositor",
        "params": [{"tableName":"test"}
        ]      },
      {
        "name": "streaming.core.compositor.spark.SQLCompositor",
        "params": [{"sql":"select a from test"}
        ]      },
      {
        "name": "streaming.core.compositor.RDDPrintOutputCompositor",
        "params": [
          {
          }
        ]      }
    ],
    "configParams": {
    }  }}

上面的配置相當(dāng)于完成了如下的一個(gè)流程:

  1. 從Kafka消費(fèi)數(shù)據(jù)

  2. 將Kafka數(shù)據(jù)轉(zhuǎn)化為表

  3. 通過SQL進(jìn)行處理

  4. 打印輸出

是不是很簡單,而且還可以支持熱加載,動(dòng)態(tài)添加job等

特性

該實(shí)現(xiàn)的特性有:

  1. 配置化

  2. 支持多Job配置

  3. 支持各種數(shù)據(jù)源模塊

  4. 支持通過SQL完成數(shù)據(jù)處理

  5. 支持多種輸出模塊

未來可擴(kuò)展的支持包含:

  1. 動(dòng)態(tài)添加或者刪除job更新,而不用重啟Spark Streaming

  2. 支持Storm等其他流式引擎

  3. 更好的多job互操作

配置格式說明

該實(shí)現(xiàn)完全基于ServiceframeworkDispatcher 完成,核心功能大概只花了三個(gè)小時(shí)。

這里我們先理出幾個(gè)概念:

  1. Spark Streaming 定義為一個(gè)App

  2. 每個(gè)Action定義為一個(gè)Job.一個(gè)App可以包含多個(gè)Job

配置文件結(jié)構(gòu)設(shè)計(jì)如下:

{  "job1": {    "desc": "測試",    "strategy": "streaming.core.strategy.SparkStreamingStrategy",    "algorithm": [],    "ref": [],    "compositor": [
      {        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",        "params": [
          {            "metadata.broker.list":"xxx",            "auto.offset.reset":"largest",            "topics":"xxx"
          }
        ]
      } ,  
    ],    "configParams": {
    }
  },  "job2":{
   ........
 } 
}

一個(gè)完整的App 對(duì)應(yīng)一個(gè)配置文件。每個(gè)頂層配置選項(xiàng),如job1,job2分別對(duì)應(yīng)一個(gè)工作流。他們最終都會(huì)運(yùn)行在一個(gè)App上(Spark Streaming實(shí)例上)。

  • strategy 用來定義如何組織 compositor,algorithm, ref 的調(diào)用關(guān)系

  • algorithm作為數(shù)據(jù)來源

  • compositor 數(shù)據(jù)處理鏈路模塊。大部分情況我們都是針對(duì)該接口進(jìn)行開發(fā)

  • ref 是對(duì)其他job的引用。通過配合合適的strategy,我們將多個(gè)job組織成一個(gè)新的job

  • 每個(gè)組件( compositor,algorithm, strategy) 都支持參數(shù)配置

上面主要是解析了配置文件的形態(tài),并且ServiceframeworkDispatcher 已經(jīng)給出了一套接口規(guī)范,只要照著實(shí)現(xiàn)就行。

模塊實(shí)現(xiàn)

那對(duì)應(yīng)的模塊是如何實(shí)現(xiàn)的?本質(zhì)是將上面的配置文件,通過已經(jīng)實(shí)現(xiàn)的模塊,轉(zhuǎn)化為Spark Streaming程序。

以SQLCompositor 的具體實(shí)現(xiàn)為例:

class SQLCompositor[T] extends Compositor[T] {  private var _configParams: util.List[util.Map[Any, Any]] = _  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)//策略引擎ServiceFrameStrategy 會(huì)調(diào)用該方法將配置傳入進(jìn)來
  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {    this._configParams = configParams
  }// 獲取配置的sql語句
  def sql = {
    _configParams(0).get("sql").toString
  }  def outputTable = {
    _configParams(0).get("outputTable").toString
  }//執(zhí)行的主方法,大體是從上一個(gè)模塊獲取SQLContext(已經(jīng)注冊(cè)了對(duì)應(yīng)的table),//然后根據(jù)該模塊的配置,設(shè)置查詢語句,最后得到一個(gè)新的dataFrame.// middleResult里的T其實(shí)是DStream,我們會(huì)傳遞到下一個(gè)模塊,Output模塊//params參數(shù)則是方便各個(gè)模塊共享信息,這里我們將對(duì)應(yīng)處理好的函數(shù)傳遞給下一個(gè)模塊
  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {    var dataFrame: DataFrame = null
    val func = params.get("table").asInstanceOf[(RDD[String]) => SQLContext]
    params.put("sql",(rdd:RDD[String])=>{      val sqlContext = func(rdd)
      dataFrame = sqlContext.sql(sql)
      dataFrame
    })
    middleResult
  }
}

上面的代碼就完成了一個(gè)SQL模塊。那如果我們要完成一個(gè)自定義的.map函數(shù)呢?可類似下面的實(shí)現(xiàn):

abstract class MapCompositor[T,U] extends Compositor[T]{  private var _configParams: util.List[util.Map[Any, Any]] = _  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {    this._configParams = configParams
  }  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {    val dstream = middleResult(0).asInstanceOf[DStream[String]]    val newDstream = dstream.map(f=>parseLog(f))    List(newDstream.asInstanceOf[T])
  }  def parseLog(line:String): U}class YourCompositor[T,U] extends MapCompositor[T,U]{ override def parseLog(line:String):U={
     ....your logical
  }
}

同理你可以實(shí)現(xiàn)filter,repartition等其他函數(shù)。

該方式提供了一套更為高層的API抽象,用戶只要關(guān)注具體實(shí)現(xiàn)而無需關(guān)注Spark的使用。同時(shí)也提供了一套配置化系統(tǒng),方便構(gòu)建數(shù)據(jù)處理流程,并且復(fù)用原有的模塊,支持使用SQL進(jìn)行數(shù)據(jù)處理。

以上就是Spark Streaming + Spark SQL如何實(shí)現(xiàn)配置化ETL,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


網(wǎng)站名稱:SparkStreaming+SparkSQL如何實(shí)現(xiàn)配置化ETL
當(dāng)前地址:http://weahome.cn/article/giihog.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部