真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Structured中怎么利用Streaming實(shí)現(xiàn)超低延遲

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)Structured中怎么利用Streaming實(shí)現(xiàn)超低延遲,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

站在用戶的角度思考問題,與客戶深入溝通,找到大荔網(wǎng)站設(shè)計(jì)與大荔網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:成都做網(wǎng)站、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、申請域名虛擬主機(jī)、企業(yè)郵箱。業(yè)務(wù)覆蓋大荔地區(qū)。

要在連續(xù)處理模式下運(yùn)行支持的查詢,您只需指定一個(gè)連續(xù)觸發(fā)器,并將所需的checkpoint間隔作為參數(shù)。 例如浪尖的demo如下:

object ContinuousProcessing {
 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
     .set("yarn.resourcemanager.hostname", "mt-mdh.local")
     .set("spark.executor.instances","2")
     .set("spark.default.parallelism","4")
     .set("spark.sql.shuffle.partitions","4")
     .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
       ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
       ,"/opt/jars/kafka-clients-0.10.2.2.jar"
       ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
       ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))


   val spark = SparkSession
     .builder
     .appName("StructuredKafkaWordCount")
     .config(sparkConf)
     .getOrCreate()

   spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
     .option("subscribe", "StructuredSource")
     .load()
     .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
     .writeStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
     .option("topic", "StructuredSink")
     .option("checkpointLocation","/sql/checkpoint")
     .trigger(Trigger.Continuous("1 second"))  // only change in query
     .start()
     .awaitTermination()
 }

}

checkpoint 間隔為1秒意味著連續(xù)處理引擎將每秒記錄查詢的進(jìn)度。 生成的checkpoint采用與微批處理引擎兼容的格式,因此可以使用任何觸發(fā)器重新啟動(dòng)任何查詢。 例如,假如查詢支持微批處理和連續(xù)處理,那么實(shí)際上也可以用連續(xù)處理觸發(fā)器去啟動(dòng)微批處理觸發(fā)器,反之亦然。 

請注意,無論何時(shí)切換到連續(xù)模式,都將獲得至少一次的容錯(cuò)保證。

支持的查詢

從Spark 2.3開始,連續(xù)處理模式僅支持以下類型的查詢。

  • Operations:在連續(xù)模式下僅支持dataset/dataframe的類似于map的操作,即支持projection(select,map,flatMap,mapPartitions等)和selection(where,filter等)。

  • 除了聚合函數(shù)(因?yàn)樯胁恢С志酆希?,current_timestamp()和current_date()(使用時(shí)間的確定性計(jì)算具有挑戰(zhàn)性)之外,支持所有SQL函數(shù)。

Sources 

  • Kafka Source:支持所有操作。

  • Rate source:適合測試。只有連續(xù)模式支持的選項(xiàng)是numPartitions和rowsPerSecond。

Sinks

  • Kafka sink:支持所有選項(xiàng)。

  • Memory sink:適合調(diào)試。

  • Console sink:適合調(diào)試。支持所有操作。請注意,控制臺(tái)將打印你在連續(xù)觸發(fā)器中指定的每個(gè)checkpoint間隔。

更詳細(xì)的關(guān)于sink和source信息,請參閱輸入源和輸出接收器部分的官網(wǎng)。雖然控制臺(tái)接收器非常適合測試,但是使用Kafka作為源和接收器可以最好地觀察到端到端的低延遲處理。

注意事項(xiàng)

  • 連續(xù)處理引擎啟動(dòng)多個(gè)長時(shí)間運(yùn)行的任務(wù),這些任務(wù)不斷從源中讀取數(shù)據(jù),處理數(shù)據(jù)并連續(xù)寫入接收器。 查詢所需的任務(wù)數(shù)取決于查詢可以并行從源讀取的分區(qū)數(shù)。 因此,在開始連續(xù)處理查詢之前,必須確保群集中有足夠的核心并行執(zhí)行所有任務(wù)。 例如,如果您正在讀取具有10個(gè)分區(qū)的Kafka主題,則群集必須至少具有10個(gè)核心才能使查詢正常執(zhí)行。

  • 停止連續(xù)處理流可能會(huì)產(chǎn)生虛假的任務(wù)終止警告。 這些可以安全地忽略。

  • 目前沒有自動(dòng)重試失敗的任務(wù)。 任何失敗都將導(dǎo)致查詢停止,并且需要從檢查點(diǎn)手動(dòng)重新啟動(dòng)。

上述就是小編為大家分享的Structured中怎么利用Streaming實(shí)現(xiàn)超低延遲了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


本文題目:Structured中怎么利用Streaming實(shí)現(xiàn)超低延遲
鏈接URL:http://weahome.cn/article/gscded.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部