真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Spark結(jié)構(gòu)化流處理機(jī)制之容錯(cuò)機(jī)制的示例分析

這篇文章給大家分享的是有關(guān)Spark結(jié)構(gòu)化流處理機(jī)制之容錯(cuò)機(jī)制的示例分析的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。

公司主營業(yè)務(wù):網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)公司推出寧城免費(fèi)做網(wǎng)站回饋大家。

容錯(cuò)機(jī)制

端到端的有且僅有一次保證,是結(jié)構(gòu)化流設(shè)計(jì)的關(guān)鍵目標(biāo)之一.

結(jié)構(gòu)化流設(shè)計(jì)了 Structured Streaming sources,sinks等等,來跟蹤確切的處理進(jìn)度,并讓其重啟或重運(yùn)行來處理任何故障

streaming source是類似kafka的偏移量(offsets)來跟蹤流的讀取位置.執(zhí)行引擎使用檢查點(diǎn)(checkpoint)和預(yù)寫日志(write ahead logs)來記錄每個(gè)執(zhí)行其的偏移范圍值

streaming sinks 是設(shè)計(jì)用來保證處理的冪等性

這樣,依靠可回放的數(shù)據(jù)源(streaming source)和處理冪等(streaming sinks),結(jié)構(gòu)流來做到任何故障下的端到端的有且僅有一次保證

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

其中,spark是SparkSession,lines是DataFrame,DataFrame就是Dataset[Row]。

DataSet

看看Dataset的觸發(fā)因子的代碼實(shí)現(xiàn),比如foreach操作:

def foreach(f: T => Unit): Unit = withNewRDDExecutionId {

    rdd.foreach(f)

  }



 private def withNewRDDExecutionId[U](body: => U): U = {

    SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {

      rddQueryExecution.executedPlan.foreach { plan =>

        plan.resetMetrics()

      }

      body

    }

  }

接著看:

 def withNewExecutionId[T](

      sparkSession: SparkSession,

      queryExecution: QueryExecution,

      name: Option[String] = None)(body: => T): T = {

    val sc = sparkSession.sparkContext

    val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)

    val executionId = SQLExecution.nextExecutionId

    sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

    executionIdToQueryExecution.put(executionId, queryExecution)

    try {     

      withSQLConfPropagated(sparkSession) {       

        try {         

          body

        } catch {         

        } finally {         

        }

      }

    } finally {

      executionIdToQueryExecution.remove(executionId)

      sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)

    }

  }

執(zhí)行的真正代碼就是 queryExecution: QueryExecution。 

@transient private lazy val rddQueryExecution: QueryExecution = {

    val deserialized = CatalystSerde.deserialize[T](logicalPlan)

    sparkSession.sessionState.executePlan(deserialized)

  }

看到了看到了,是sessionState.executePlan執(zhí)行l(wèi)ogicalPlan而得到了QueryExecution

這里的sessionState.executePlan其實(shí)就是創(chuàng)建了一個(gè)QueryExecution對(duì)象。然后執(zhí)行QueryExecution的executedPlan方法得到SparkPlan這個(gè)物理計(jì)劃。怎么生成的呢?

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {

    SparkSession.setActiveSession(sparkSession)   

    planner.plan(ReturnAnswer(optimizedPlan.clone())).next()

  }

通過planner.plan方法生成。

planner是SparkPlanner。在BaseSessionStateBuilder類中定義。

protected def planner: SparkPlanner = {

    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {

      override def extraPlanningStrategies: Seq[Strategy] =

        super.extraPlanningStrategies ++ customPlanningStrategies

    }

  }

SparkPlanner類

SparkPlanner對(duì)LogicalPlan執(zhí)行各種策略,返回對(duì)應(yīng)的SparkPlan。比如對(duì)于流應(yīng)用來說,有這樣的策略:DataSourceV2Strategy。

典型的幾個(gè)邏輯計(jì)劃到物理計(jì)劃的映射關(guān)系如下:

StreamingDataSourceV2Relation-》ContinuousScanExec

StreamingDataSourceV2Relation-》MicroBatchScanExec

前一種對(duì)應(yīng)與Offset沒有endOffset的情況,后一種對(duì)應(yīng)于有endOffset的情況。前一種是沒有結(jié)束的連續(xù)流,后一種是有區(qū)間的微批處理流。

前一種的時(shí)延可以達(dá)到1ms,后一種的時(shí)延只能達(dá)到100ms。

【代碼】:

case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined =>

      val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]

      val scanExec = MicroBatchScanExec(

        r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)

      val withProjection = if (scanExec.supportsColumnar) {

        scanExec

      } else {

        // Add a Project here to make sure we produce unsafe rows.

        ProjectExec(r.output, scanExec)

      }

      withProjection :: Nil

    case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>

      val continuousStream = r.stream.asInstanceOf[ContinuousStream]

      val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)

      val withProjection = if (scanExec.supportsColumnar) {

        scanExec

      } else {

        // Add a Project here to make sure we produce unsafe rows.

        ProjectExec(r.output, scanExec)

      }

      withProjection :: Nil

感謝各位的閱讀!關(guān)于“Spark結(jié)構(gòu)化流處理機(jī)制之容錯(cuò)機(jī)制的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!


名稱欄目:Spark結(jié)構(gòu)化流處理機(jī)制之容錯(cuò)機(jī)制的示例分析
轉(zhuǎn)載來于:http://weahome.cn/article/poighd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部