這樣進行Spark的解析,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
成都創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于做網(wǎng)站、成都網(wǎng)站設(shè)計、華池網(wǎng)絡(luò)推廣、成都小程序開發(fā)、華池網(wǎng)絡(luò)營銷、華池企業(yè)策劃、華池品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供華池建站搭建服務(wù),24小時服務(wù)熱線:18982081108,官方網(wǎng)址:www.cdcxhl.com
Spark場景
Spark是基于內(nèi)存的迭代計算框架,適用于需要多次操作特定數(shù)據(jù)集的應(yīng)用場合。需要反復(fù)操作的次數(shù)越多, 所需讀取的數(shù)據(jù)量越大,受益越大,數(shù)據(jù)量小但是計算密集度較大的場合,受益就相對較小 由于RDD的特性,Spark不適用那種異步細粒度更新狀態(tài)的應(yīng)用,例如web服務(wù)的存儲或者是增量的web 爬蟲和索引。就是對于那種增量修改的應(yīng)用模型不適合 數(shù)據(jù)量不是特別大,但是要求實時統(tǒng)計分析需求
Spark Master模式(Url)
1、local:這種方式是在本地啟動一個線程來運行作業(yè); 2、local[N]:也是本地模式,但是啟動了N個線程; 3、local[*]:還是本地模式,但是用了系統(tǒng)中所有的核; 4、local[N,M]:這里有兩個參數(shù),第一個代表的是用到的核個數(shù);第二個參數(shù)代表的是容許該作業(yè)失敗M次; 5、local-cluster[N, cores, memory] :本地偽集群模式; 6、spark:// :這是用到了 Spark 的Standalone模; 7、(mesos|zk):// :這是Mesos模式; 8、yarn\yarn-cluster\yarn-client :這是YARN模式。前面兩種代表的是集群模式;后面代表的是客戶端模式; 9、simr:// :simr其實是Spark In MapReduce的縮寫
Spark deploy模式
1、Local模式 local模式出了偽集群模式(local-cluster),所有的local都是用到了LocalBackend和TaskSchedulerImpl類。LocalBackend接收來自TaskSchedulerImpl的receiveOffers()調(diào)用,并根據(jù)運行Application傳進來的CPU核生成WorkerOffer,并調(diào)用scheduler.resourceOffers(offers)生成Task,最后通過 executor.launchTask來執(zhí)行這些Task。 2、Standalone Standalone模式使用SparkDeploySchedulerBackend和TaskSchedulerImpl,SparkDeploySchedulerBackend是繼承自CoarseGrainedSchedulerBackend類,并重寫了其中的一些方法。 CoarseGrainedSchedulerBackend是一個粗粒度的資源調(diào)度類,在Spark job運行的整個期間,它會保存所有的Executor,在task運行完的時候,并不釋放該Executor,也不向Scheduler申請一個新的Executor。Executor的啟動方式有很多中,需要根據(jù)Application提交的Master URL進行判斷。在CoarseGrainedSchedulerBackend中封裝了一個DriverActor類,它接受Executor注冊(RegisterExecutor)、狀態(tài)更新(StatusUpdate)、響應(yīng)Scheduler的ReviveOffers請求、殺死Task等等。 在本模式中將會啟動一個或者多個CoarseGrainedExecutorBackend。具體是通過AppClient類向Master請求注冊Application。當(dāng)注冊成功之后,Master會向Client進行反饋,并調(diào)用schedule啟動Driver和CoarseGrainedExecutorBackend,啟動的Executor會向DriverActor進行注冊。然后CoarseGrainedExecutorBackend通過aunchTask方法啟動已經(jīng)提交的Task。 3、yarn-cluster yarn-cluster集群模式涉及到的類有YarnClusterScheduler和YarnClusterSchedulerBackend。YarnClusterSchedulerBackend同樣是繼承自CoarseGrainedSchedulerBackend。而YarnClusterScheduler繼承自TaskSchedulerImpl,它只是簡單地對TaskSchedulerImpl進行封裝,并重寫了getRackForHost和postStartHook方法。 Client類通過YarnClient在Hadoop集群上啟動一個Container,并在其中運行ApplicationMaster,并通過Yarn提供的接口在集群中啟動多個Container用于運行CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。 4、yarn-client yarn-cluster集群模式涉及到的類有YarnClientClusterScheduler和YarnClientSchedulerBackend。YarnClientClusterScheduler繼承自TaskSchedulerImpl,并對其中的getRackForHost方法進行了重寫。Yarn-client模式下,會在集群外面啟動一個ExecutorLauncher來作為driver,并想集群申請Container,來啟動CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor進行注冊。 5、Mesos Mesos模式調(diào)度方式有兩種:粗粒度和細粒度。粗粒度涉及到的類有CoarseMesosSchedulerBackend和TaskSchedulerImpl類;而細粒度涉及到的類有MesosSchedulerBackend和TaskSchedulerImpl類。CoarseMesosSchedulerBackend和 MesosSchedulerBackend都繼承了MScheduler(其實是Mesos的Scheduler),便于注冊到Mesos資源調(diào)度的框架中。選擇哪種模式可以通過spark.mesos.coarse參數(shù)配置。默認(rèn)的是MesosSchedulerBackend。
上面涉及到Spark的許多部署模式,究竟哪種模式好這個很難說,需要根據(jù)需求,如果只是測試Spark Application,可以選擇local模式。而如果數(shù)據(jù)量不是很多,Standalone 是個不錯的選擇。當(dāng)你需要統(tǒng)一管理集群資源(Hadoop、Spark等)那么可以選擇Yarn,但是這樣維護成本就會變高。yarn-cluster和yarn-client模式內(nèi)部實現(xiàn)還是有很大的區(qū)別。如果需要用于生產(chǎn)環(huán)境,那么請選擇yarn-cluster;而如果僅僅是Debug程序,可以選擇yarn-client。
Spark Jar/File Url格式
file:/ 文件絕對路徑,并且file:/URI是通過驅(qū)動器的HTTP文件服務(wù)器來下載的,每個執(zhí)行器都從驅(qū)動器的HTTP server拉取這些文件。 hdfs:/http:/https:/ftp: Spark將會從指定的URI位置下載所需的文件和jar包。 local:/ 指定在每個工作節(jié)點上都能訪問到的本地或共享文件。這意味著,不會占用網(wǎng)絡(luò)IO,特別是對一些大文件或jar包,最好使用這種方式,當(dāng)需要把文件推送到每個工作節(jié)點上可以通過NFS和GlusterFS共享文件。
Spark執(zhí)行模型
Dependency Dependency代表了RDD之間的依賴關(guān)系,即血緣 NarrowDependency代表窄依賴,即父RDD的分區(qū),最多被子RDD的一個分區(qū)使用。所以支持并行計算。 OneToOneDependency表示父RDD和子RDD的分區(qū)依賴是一對一的 RangeDependency表示在一個range范圍內(nèi),依賴關(guān)系是一對一的,所以初始化的時候會有一個范圍,范圍外的partitionId,傳進去之后返回的是Nil Shuffle代表寬依賴針對的RDD是KV形式的,需要一個partitioner指定分區(qū)方式,需要一個序列化工具類 Partition Partition具體表示RDD每個數(shù)據(jù)分區(qū)。 Partitioner Partitioner決定KV形式的RDD如何根據(jù)key進行partition 默認(rèn)Partitioner Partitioner的伴生對象提供defaultPartitioner方法,邏輯為: 傳入的RDD(至少兩個)中,遍歷(順序是partition數(shù)目從大到?。㏑DD,如果已經(jīng)有Partitioner了,就使用。如果RDD們都沒有Partitioner,則使用默認(rèn)的HashPartitioner。而HashPartitioner的初始化partition數(shù)目,取決于是否設(shè)置了Spark.default.parallelism,如果沒有的話就取RDD中partition數(shù)目最大的值 HashPartitioner基于Java的Object.hashCode。會有個問題是Java的Array有自己的hashCode,不基于Array里的內(nèi)容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner會有問題。 RangePartitioner處理的KV RDD要求Key是可排序的,即滿足Scala的Ordered[K]類型 Persist/Unpersist 默認(rèn)cache()過程是將RDD persist在內(nèi)存里,persist()操作可以為RDD重新指定StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication), persist并不是action,并不會觸發(fā)任何計算 Checkpoint RDD Actions api里提供了checkpoint()方法,會把本RDD save到SparkContext CheckpointDir 目錄下。建議該RDD已經(jīng)persist在內(nèi)存中,否則需要recomputation。 Transformations RDD transformation 見下 Actions RDD action 見下 Job->Stage->Task->Transformations/Action 一個Spark的Job分為多個stage,最后一個stage會包括一個或多個ResultTask,前面的stages會包括一個或多個ShuffleMapTasks。 ResultTask執(zhí)行并將結(jié)果返回給driver application。 ShuffleMapTask將task的output根據(jù)task的partition分離到多個buckets里。一個ShuffleMapTask對應(yīng)一個ShuffleDependency的partition,而總partition數(shù)同并行度、reduce數(shù)目是一致的 DAGScheduler 面向stage的調(diào)度層,為job生成以stage組成的DAG,以stage為單位,提交TaskSet給TaskScheduler執(zhí)行。 每一個Stage內(nèi),都是獨立的tasks,他們共同執(zhí)行同一個compute function,享有相同的shuffledependencies。DAG在切分stage的時候是依照出現(xiàn)shuffle為界限的。 DAGSchedulerEvent TaskScheduler TaskScheduler接收task、接收分到的資源和executor、維護信息、與backend打交道、分配任務(wù) SchedulableBuilder FIFO和Fair兩種實現(xiàn), addTaskSetManager會把TaskSetManager加到pool里。FIFO的話只有一個pool。Fair有多個pool,Pool也分FIFO和Fair兩種模式 TaskSet,即Stage 封裝一個stage的所有的tasks, 以提交給TaskScheduler ResultTask 對應(yīng)于Result Stage直接產(chǎn)生結(jié)果 ShuffleMapTask 對應(yīng)于ShuffleMap Stage, 產(chǎn)生的結(jié)果作為其他stage的輸入 TaskSetManager 負(fù)責(zé)這批Tasks的啟動,失敗重試,感知本地化等事情。每次reourseOffer方法會尋找合適(符合條件execId, host, locality)的Task并啟動它 TaskResultGetter 維護一個線程池,用來反序列化和從遠端獲取task結(jié)果 BlockManagerMaster/BlockManagerWorker TaskResult里包含BolckId, BlockManagerMaster通過這個blockId的獲取bolck的locations,BlockManagerWorker通過這些locations來獲得(反序列化)block的數(shù)據(jù)
Spark RDD
RDD是Spark中的抽象數(shù)據(jù)結(jié)構(gòu)類型,任何數(shù)據(jù)在Spark中都被表示為RDD。從編程的角度來看,RDD可以簡單看成是一個數(shù)組。 和普通數(shù)組的區(qū)別是,RDD中的數(shù)據(jù)是分區(qū)存儲的,這樣不同分區(qū)的數(shù)據(jù)就可以分布在不同的機器上,同時可以被并行處理。 Spark應(yīng)用程序所做把需要處理的數(shù)據(jù)轉(zhuǎn)換為RDD,然后對RDD進行一系列的變換和操作從而得到結(jié)果 一個RDD對象,包含如下5個核心屬性。 一個分區(qū)列表,每個分區(qū)里是RDD的部分?jǐn)?shù)據(jù)(或稱數(shù)據(jù)塊)。 一個依賴列表,存儲依賴的其他RDD。 一個名為compute的計算函數(shù)(由子類實現(xiàn)),用于計算RDD各分區(qū)的值。 一個分區(qū)器(可選),用于鍵/值類型的RDD,比如某個RDD是按散列來分區(qū)。 一個計算各分區(qū)時優(yōu)先的位置列表(可選),比如從HDFS上的文件生成RDD時,RDD分區(qū)的位置優(yōu)先選擇數(shù)據(jù)所在的節(jié)點,這樣可以避免數(shù)據(jù)移動帶來的開銷。
Work with RDD
object array -> object list -> object rdd object array -> object list -> Row list -> Row rdd + StructType schema -> object df object arrays -> object lists -> object rdds -> object rdd queue -> object dstream
RDD Transformer
map(func):對調(diào)用map的RDD數(shù)據(jù)集中的每個element都使用func,然后返回一個新的RDD,這個返回的數(shù)據(jù)集是分布式的數(shù)據(jù)集 keyBy(f: T => K) filter(func): 對調(diào)用filter的RDD數(shù)據(jù)集中的每個元素都使用func,然后返回一個包含使func為true的元素構(gòu)成的RDD flatMap(func):和map差不多,但是flatMap生成的是多個結(jié)果 mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應(yīng)該有index sample(withReplacement,faction,seed):抽樣 union(otherDataset):并集, 返回一個新的dataset,包含源dataset和給定dataset的元素的集合 intersection(otherDataset):交集 subtract(otherDataset):差集 distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函數(shù)接受的key-valuelist reduceByKey(func,[numTasks]):就是用一個給定的reducefunc再作用在groupByKey產(chǎn)生的(K,Seq[V]),比如求和,求平均數(shù) sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean類型 join(otherDataset,[numTasks]):當(dāng)有兩個KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks為并發(fā)的任務(wù)數(shù) cogroup(otherDataset,[numTasks]):當(dāng)有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks為并發(fā)的任務(wù)數(shù) cartesian(otherDataset):笛卡爾積就是m*n pipe(command: String) 把RDD數(shù)據(jù)通過ProcessBuilder創(chuàng)建額外的進程輸出走 zip(RDD[U]): RDD[(T, U)] 兩個RDD分區(qū)數(shù)目一致,且每個分區(qū)數(shù)據(jù)條數(shù)一致
RDD Action
reduce(func):說白了就是聚集,但是傳入的函數(shù)是兩個參數(shù)輸入返回一個值,這個函數(shù)必須是滿足交換律和結(jié)合律的 fold(zeroValue: T)(op: (T, T) => T) 特殊的reduce,帶初始值,函數(shù)式語義的fold aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U):帶初始值、reduce聚合、merge聚合三個完整條件的聚合方法。rdd的做法是把函數(shù)傳入分區(qū)里去做計算,最后匯總各分區(qū)的結(jié)果再一次combOp計算 subtract(RDD[T]):rdd實現(xiàn)為map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys 與求交類似 collect():一般在filter或者足夠小的結(jié)果的時候,再用collect封裝返回一個數(shù)組 count():返回的是dataset中的element的個數(shù) first():返回的是dataset中的第一個元素 top(n)(ordering):每個分區(qū)內(nèi)傳入top的處理函數(shù),得到分區(qū)的堆,使用rdd.reduce(),把每個分區(qū)的堆合起來,排序,取前n個 take(n):返回前n個elements,這個士driverprogram返回的 takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed saveAsTextFile(path):把dataset寫到一個textfile中,或者hdfs,或者hdfs支持的文件系統(tǒng)中,spark把每條記錄都轉(zhuǎn)換為一行記錄,然后寫到file中 saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop文件系統(tǒng) countByKey():返回的是key對應(yīng)的個數(shù)的一個map,作用于一個RDD countByValue(): Map[T, Long] rdd實現(xiàn)為map(value => (value, null)).countByKey():本質(zhì)上是一次簡單的combineByKey,返回Map,會全load進driver的內(nèi)存里,需要數(shù)據(jù)集規(guī)模較小 foreach(func):對dataset中的每個元素都使用func max()/min() 特殊的reduce,傳入max/min比較函數(shù) PairRDDFunctions ..... DoubleRDDFunctions sum() rdd實現(xiàn)是reduce(_ + _) stats() rdd實現(xiàn)是mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b)) StatCounter在一次遍歷里統(tǒng)計出中位數(shù)、方差、count三個值,merge()是他內(nèi)部的方法 mean() rdd實現(xiàn)是stats().mean variance()/sampleVariance() rdd實現(xiàn)是stats().variance stdev()/sampleStdev() rdd實現(xiàn)是stats().stdev 求標(biāo)準(zhǔn)差 meanApprox()/sumApprox() 調(diào)用runApproximateJob histogram() 比較復(fù)雜的計算,rdd實現(xiàn)是先mapPartitions再reduce,包含幾次遞歸
Spark Core
提供了有向無環(huán)圖(DAG)的分布式并行計算框架,并提供Cache機制來支持多次迭代計算或者數(shù)據(jù)共享,大大減少 迭代計算之間讀取數(shù)據(jù)局的開銷,這對于需要進行多次迭代的數(shù)據(jù)挖掘和分析性能有很大提升 在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節(jié)點中的只讀對象集合, 這些集合是彈性的,如果數(shù)據(jù)集一部分丟失,則可以根據(jù)“血統(tǒng)”對它們進行重建,保證了數(shù)據(jù)的高容錯性; 移動計算而非移動數(shù)據(jù),RDD Partition可以就近讀取分布式文件系統(tǒng)中的數(shù)據(jù)塊到各個節(jié)點內(nèi)存中進行計算 使用多線程池模型來減少task啟動開稍 采用容錯的、高可伸縮性的akka作為通訊框架
Spark SQL
引入了新的RDD類型SchemaRDD,可以象傳統(tǒng)數(shù)據(jù)庫定義表一樣來定義SchemaRDD,SchemaRDD由定義了列數(shù)據(jù)類型的行對象構(gòu)成。SchemaRDD可以從RDD轉(zhuǎn)換過來,也可以從Parquet文件讀入,也可以使用HiveQL從Hive中獲取。 內(nèi)嵌了Catalyst查詢優(yōu)化框架,在把SQL解析成邏輯執(zhí)行計劃之后,利用Catalyst包里的一些類和接口,執(zhí)行了一些簡單的執(zhí)行計劃優(yōu)化,最后變成RDD的計算 在應(yīng)用程序中可以混合使用不同來源的數(shù)據(jù),如可以將來自HiveQL的數(shù)據(jù)和來自SQL的數(shù)據(jù)進行Join操作。 內(nèi)存列存儲(In-Memory Columnar Storage),sparkSQL的表數(shù)據(jù)在內(nèi)存中存儲不是采用原生態(tài)的JVM對象存儲方式,而是采用內(nèi)存列存儲; 字節(jié)碼生成技術(shù)(Bytecode Generation),Spark1.1.0在Catalyst模塊的expressions增加了codegen模塊,使用動態(tài)字節(jié)碼生成技術(shù),對匹配的表達式采用特定的代碼動態(tài)編譯。另外對SQL表達式都作了CG優(yōu)化, CG優(yōu)化的實現(xiàn)主要還是依靠Scala2.10的運行時放射機制(runtime reflection); Scala代碼優(yōu)化 SparkSQL在使用Scala編寫代碼的時候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對于用戶來說接口統(tǒng)一。
Spark MLlib
MLBase 是Spark生態(tài)圈的一部分專注于機器學(xué)習(xí),讓機器學(xué)習(xí)的門檻更低,讓一些可能并不了解機器學(xué)習(xí)的用戶也能方便地使用MLbase。MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。 ML Optimizer 會選擇它認(rèn)為最適合的已經(jīng)在內(nèi)部實現(xiàn)好了的機器學(xué)習(xí)算法和相關(guān)參數(shù),來處理用戶輸入的數(shù)據(jù),并返回模型或別的幫助分析的結(jié)果; MLI 是一個進行特征抽取和高級ML編程抽象的算法實現(xiàn)的API或平臺; MLlib 是Spark實現(xiàn)一些常見的機器學(xué)習(xí)算法和實用程序,包括分類、回歸、聚類、協(xié)同過濾、降維以及底層優(yōu)化,該算法可以進行可擴充; MLRuntime 基于Spark計算框架,將Spark的分布式計算應(yīng)用到機器學(xué)習(xí)領(lǐng)域。
Spark GraphX
GraphX是Spark中用于圖(e.g., Web-Graphs and Social Networks)和圖并行計算(e.g., PageRank and Collaborative Filtering)的API,可以認(rèn)為是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重寫及優(yōu)化,跟其他分布式圖計算框架相比,GraphX最大的貢獻是,在Spark之上提供一棧式數(shù)據(jù)解決方案,可以方便且高效地完成圖計算的一整套流水作業(yè)。GraphX最先是伯克利AMPLAB的一個分布式圖計算框架項目,后來整合到Spark中成為一個核心組件。 GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴展了Spark RDD的抽象,有Table和Graph兩種視圖,而只需要一份物理存儲。兩種視圖都有自己獨有的操作符,從而獲得了靈活操作和執(zhí)行效率。如同Spark,GraphX的代碼非常簡潔。GraphX的核心代碼只有3千多行,而在此之上實現(xiàn)的Pregel模型,只要短短的20多行。GraphX的代碼結(jié)構(gòu)整體下圖所示,其中大部分的實現(xiàn),都是圍繞Partition的優(yōu)化進行的。這在某種程度上說明了點分割的存儲和相應(yīng)的計算優(yōu)化的確是圖計算框架的重點和難點。 GraphX的底層設(shè)計有以下幾個關(guān)鍵點。 1.對Graph視圖的所有操作,最終都會轉(zhuǎn)換成其關(guān)聯(lián)的Table視圖的RDD操作來完成。這樣對一個圖的計算,最終在邏輯上,等價于一系列RDD的轉(zhuǎn)換過程。因此,Graph最終具備了RDD的3個關(guān)鍵特性:Immutable、Distributed和Fault-Tolerant。其中最關(guān)鍵的是Immutable(不變性)。邏輯上,所有圖的轉(zhuǎn)換和操作都產(chǎn)生了一個新圖;物理上,GraphX會有一定程度的不變頂點和邊的復(fù)用優(yōu)化,對用戶透明。 2.兩種視圖底層共用的物理數(shù)據(jù),由RDD[VertexPartition]和RDD[EdgePartition]這兩個RDD組成。點和邊實際都不是以表Collection[tuple]的形式存儲的,而是由VertexPartition/EdgePartition在內(nèi)部存儲一個帶索引結(jié)構(gòu)的分片數(shù)據(jù)塊,以加速不同視圖下的遍歷速度。不變的索引結(jié)構(gòu)在RDD轉(zhuǎn)換過程中是共用的,降低了計算和存儲開銷。 3.圖的分布式存儲采用點分割模式,而且使用partitionBy方法,由用戶指定不同的劃分策略(PartitionStrategy)。劃分策略會將邊分配到各個EdgePartition,頂點Master分配到各個VertexPartition,EdgePartition也會緩存本地邊關(guān)聯(lián)點的Ghost副本。劃分策略的不同會影響到所需要緩存的Ghost副本數(shù)量,以及每個EdgePartition分配的邊的均衡程度,需要根據(jù)圖的結(jié)構(gòu)特征選取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut這四種策略。在淘寶大部分場景下,EdgePartition2d效果最好。
Spark Streaming
SparkStreaming是一個對實時數(shù)據(jù)流進行高通量、容錯處理的流式處理系統(tǒng),可以對多種數(shù)據(jù)源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等復(fù)雜操作,并將結(jié)果保存到外部文件系統(tǒng)、數(shù)據(jù)庫或應(yīng)用到實時儀表盤。 計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業(yè)。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數(shù)據(jù)按照batch size(如1秒)分成一段一段的數(shù)據(jù)(Discretized Stream),每一段數(shù)據(jù)都轉(zhuǎn)換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變?yōu)獒槍park中對RDD的Transformation操作,將RDD經(jīng)過操作變成中間結(jié)果保存在內(nèi)存中。整個流式計算根據(jù)業(yè)務(wù)的需求可以對中間的結(jié)果進行疊加或者存儲到外部設(shè)備。下圖顯示了Spark Streaming的整個流程。 容錯性:對于流式計算來說,容錯性至關(guān)重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分布式可重算的數(shù)據(jù)集,其記錄著確定性的操作繼承關(guān)系(lineage),所以只要輸入數(shù)據(jù)是可容錯的,那么任意一個RDD的分區(qū)(Partition)出錯或不可用,都是可以利用原始輸入數(shù)據(jù)通過轉(zhuǎn)換操作而重新算出的。 實時性:對于實時性的討論,會牽涉到流式處理框架的應(yīng)用場景。Spark Streaming將流式計算分解成多個Spark Job,對于每一段數(shù)據(jù)的處理都會經(jīng)過Spark DAG圖分解以及Spark的任務(wù)集的調(diào)度過程。對于目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準(zhǔn)實時計算場景。 擴展性與吞吐量:Spark目前在EC2上已能夠線性擴展到100個節(jié)點(每個節(jié)點4Core),可以以數(shù)秒的延遲處理6GB/s的數(shù)據(jù)量(60M records/s),其吞吐量也比流行的Storm高2~5倍,Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節(jié)點的吞吐量是670k records/s,而Storm是115k records/s。
SparkR
SparkR是AMPLab發(fā)布的一個R開發(fā)包,使得R擺脫單機運行的命運,可以作為Spark的job運行在集群上,極大得擴展了R的數(shù)據(jù)處理能力。 SparkR的幾個特性: 提供了Spark中彈性分布式數(shù)據(jù)集(RDD)的API,用戶可以在集群上通過R shell交互性的運行Spark job。 支持序化閉包功能,可以將用戶定義函數(shù)中所引用到的變量自動序化發(fā)送到集群中其他的機器上。 SparkR還可以很容易地調(diào)用R開發(fā)包,只需要在集群上執(zhí)行操作前用includePackage讀取R開發(fā)包就可以了,當(dāng)然集群上要安裝R開發(fā)包。
SparkPython
Spark Python
pom.xml
org.scala-lang scala-library 2.11.8 org.apache.spark spark-mllib_2.11 2.0.0 com.typesafe.play play-json_2.11 2.3.9 net.alchim31.maven scala-maven-plugin 3.1.3 scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile maven_central http://central.maven.org/maven2/ sonatype-nexus-snapshots https://oss.sonatype.org/content/repositories/snapshots typesafe http://repo.typesafe.com/typesafe/releases/ maven_central http://central.maven.org/maven2/ sonatype-nexus-snapshots https://oss.sonatype.org/content/repositories/releases/
Tests.scala
def listRdd(){ var sc = new SparkContext("local[1]", "spdb") var sqlContext = new SQLContext(sc) var listStr1 = """zm,zn,zq""" var list = listStr1.split(",").toList var rdd = sc.parallelize(list, 2) var max = rdd.max() println(max) }
看完上述內(nèi)容,你們掌握這樣進行Spark的解析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!