Spark 計算框架為了能夠進行高并發(fā)和高吞吐的數(shù)據(jù)處理,封裝了三大數(shù)據(jù)結構,用于處理不同的應用場景。三大數(shù)據(jù)結構分別是:
1)RDD:彈性分布式數(shù)據(jù)集
2)累加器:分布式共享只寫變量
3)廣播變量:分布式共享只讀變量
接下來讓我們看看這三大數(shù)據(jù)結構是如何數(shù)據(jù)處理中使用的
RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是 Spark 中最基本的數(shù)據(jù)處理模型
。代碼中是一個抽象類,它代表一個彈性的,不可變,可分區(qū),里面的元素可并行計算的集合。
彈性:
存儲的彈性:內存與磁盤的自動切換
容錯的彈性:數(shù)據(jù)丟失可以自動恢復
計算的彈性:計算出錯重試機制
分片的機制:可根據(jù)需要重新分片
分布式:數(shù)據(jù)存儲在大數(shù)據(jù)集群不同的節(jié)點上
數(shù)據(jù)集:RDD 封裝了計算邏輯,并不保存數(shù)據(jù)
數(shù)據(jù)抽象:RDD 是一個抽象類,需要子類具體實現(xiàn)
不可變:RDD 封裝了計算邏輯,是不可以改變的,想要改變,只能產(chǎn)生新的RDD,在新的RDD里面封裝邏輯計算
可分區(qū),并行計算
首先分為兩部分,我們把Excuter當成服務器,把Driver當成客戶端。然后用客戶端去連接服務器,然后客戶端發(fā)送數(shù)據(jù)給服務器。
Excuter (服務器):
第一步設置服務器的端口號,ServerScket(9998)
方法,里面的參數(shù)是端口號,這可以隨便寫。然后第二步等待客戶端發(fā)送數(shù)據(jù)過來accept()
方法。然后第三步使用getInputStream
輸入流接收客戶端發(fā)送過來的數(shù)據(jù),使用輸入流的read()
方法,這個就是從客戶端拿到的數(shù)據(jù),然后把這個數(shù)據(jù)給輸出。最后把輸出流,數(shù)據(jù)等待,還有服務器依次都給關閉。
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.InputStream
import java.net.{ServerSocket, Socket}
//這個是做計算準備的,主要是邏輯代碼部分
//這個相當于是服務器,然后Driver相當于是客戶端,客戶端連接服務器就可以直接使用了
class Excuter {}
object Excuter{def main(args: Array[String]): Unit = {//啟動服務器,接收數(shù)據(jù) 這個端口號是隨便寫的
val server = new ServerSocket(9998) //這個是網(wǎng)絡編程的
println("服務器啟動,等待接收數(shù)據(jù)")
//等待客戶端的鏈接
val client: Socket = server.accept() //等待客戶端發(fā)送過來的數(shù)據(jù),accept()方法
val in: InputStream = client.getInputStream //輸入流接收數(shù)據(jù)
val i = in.read() //這個就是拿到的值
println("接收到客戶端發(fā)送的數(shù)據(jù):" + i) //把客戶端拿到的數(shù)據(jù)給輸出
in.close() //把輸入流給關閉掉
client.close()
server.close() //把服務器給關閉掉
}
}
Driver (客戶端):
首先客戶端連接服務器的端口號Socket("localhost",9998)
方法,第一個參數(shù)是連接方式,這里是本地連接,第二個參數(shù)是服務器的端口號。然后第二步就向服務器發(fā)送數(shù)據(jù),getOutputStream
方法輸出流,然后使用輸出流的write()
方法寫出數(shù)據(jù)。然后使用輸出流的flush()
方法,flush方法的作用是,刷新此輸出流并強制寫出所有緩沖的輸出字節(jié)。然后用完之后就把輸出流和客戶端給關閉了。
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.OutputStream
import java.net.Socket
//這個是用來執(zhí)行程序的
class Driver {}
object Driver{def main(args: Array[String]): Unit = {//連接服務器 本地連接,然后第二個參數(shù)是服務器定義的端口號
val client = new Socket("localhost",9998) //這個相當于是是客戶端,連接服務器
val out: OutputStream = client.getOutputStream //向服務器發(fā)東西,用getOutputStream()
out.write(2)
out.flush()
out.close() //用完了吧這個輸出流給關掉
client.close() //然后把這個客戶端也關掉
}
}
(2) 客戶端向服務器發(fā)送計算任務Excuter 類里面是服務器,Driver是客戶端,Task 里面是準備數(shù)據(jù)和邏輯操作的,那個Driver 里面創(chuàng)建一個Task 對象然后把Task 用ObjectOutputstream
輸出流把對象給輸出到Excuter接收,接收也是使用ObjectIntputstream
對象輸入流進行接收,因為輸出的是一個操作邏輯,用字節(jié)流接收肯定不對,所有要用對象。然后Excuter 拿到Task之后,就可以直接使用里面的函數(shù)了。Task里面要混入Serializable
特質,因為在網(wǎng)絡中肯定是無法直接傳送一個對象過去的,所以要進行序列化。
7
Excuter 代碼:
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
//這個是做計算準備的,主要是邏輯代碼部分
//這個相當于是服務器,然后Driver相當于是客戶端,客戶端連接服務器就可以直接使用了
class Excuter {}
object Excuter{//要混入序列化的特征,不然不能那個傳一個對象過去
def main(args: Array[String]): Unit = {//啟動服務器,接收數(shù)據(jù) 這個端口號是隨便寫的
val server = new ServerSocket(9998) //這個是網(wǎng)絡編程的
println("服務器啟動,等待接收數(shù)據(jù)")
//等待客戶端的鏈接
val client: Socket = server.accept() //等待客戶端發(fā)送過來的數(shù)據(jù),accept()方法
val in: InputStream = client.getInputStream //輸入流接收數(shù)據(jù)
val objin: ObjectInputStream = new ObjectInputStream(in) //輸出流失obj那么接收也應該是obj
val task: Task = objin.readObject().asInstanceOf[Task] //這個就是拿到的值 ,但是這里不應該是AnyRef,所以要進行轉換
val ints = task.compute() //上面已經(jīng)拿到了傳過來的操作了,所以可以直接使用里面定義的函數(shù)了
println("計算節(jié)點的計算結果為:" + ints) //把客戶端拿到的數(shù)據(jù)給輸出
objin.close() //把輸入流給關閉掉
client.close()
server.close() //把服務器給關閉掉
}
}
Driver 代碼:
package com.atguigu.bigdata.spark.core.wc.test2
import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket
//這個是用來執(zhí)行程序的
class Driver {}
object Driver {def main(args: Array[String]): Unit = {//連接服務器 本地連接,然后第二個參數(shù)是服務器定義的端口號
val client = new Socket("localhost",9998) //這個相當于是是客戶端,連接服務器
val out: OutputStream = client.getOutputStream //向服務器發(fā)東西,用getOutputStream()
val objout = new ObjectOutputStream(out) //定義這個Object的輸出,因為上面那個是輸出字節(jié)的不能傳輸對象
val task:Task = new Task() //然后創(chuàng)建一個task
objout.writeObject(task) //把task 傳入給objout 對象輸出流
objout.flush()
objout.close() //用完了吧這個輸出流給關掉
client.close() //然后把這個客戶端也關掉
println("客戶端發(fā)送數(shù)據(jù)完畢")
}
}
Task 代碼:
package com.atguigu.bigdata.spark.core.wc.test2
class Task extends Serializable {//要混入序列化的特征,不然不能那個傳一個對象過去
val datas = List(1,2,3,4) //這個是數(shù)據(jù)
val logic = (num:Int) =>{num * 2} //匿名函數(shù) 這個是邏輯
//計算
def compute() = {datas.map(logic) //莫logic 上面定義的邏輯操作傳入進去
}
}
3、RDD 創(chuàng)建在 Spark 中創(chuàng)建 RDD 的創(chuàng)建方式可以分為四種: 一般就是用前兩種就行了,一般前兩種用的比較多。
(1) 從集合(內存)中創(chuàng)建從集合中創(chuàng)建RDD,Spark主要提供了兩個方法:parallelize
和makeRDD
parallelize 是并行的意思,makeRDD 的底層則完全就是調用了parallelize方法,因為這個單詞字面意思不大好理解,所以都用makeRDD就行了。
注意:local[*]
里面加上*
的意思是可以模擬多核多線程,要是不加的話那么就是模擬單線程,從內存中創(chuàng)建makeRDD()
方法要傳一個集合進去
package com.atguigu.bigdata.spark.core.wc.create_RDD
import org.apache.spark.api.java.JavaSparkContext.fromSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//在內存(集合)中創(chuàng)建RDD
class Spark01_RDD_Memory {}
object Spark01_RDD_Memory{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
//這個 local[*] 里面加上*的意思是,可以模擬多核多線程,不加的話就是模擬的單線程
val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD")
val context = new SparkContext(conf)
//TODO 創(chuàng)建RDD
//從內存中創(chuàng)建RDD,將內存中集合的數(shù)據(jù)作為處理的數(shù)據(jù)
val seq: Seq[Int] = Seq(1, 2, 3, 4)
//parallelize 并行
//val sc: RDD[Int] = context.parallelize(seq) //這里面?zhèn)魅氲膮?shù)是一個集合,當做數(shù)據(jù)源,
val sc: RDD[Int] = context.makeRDD(seq) //makeRDD方法和parallelize方法是一樣的
sc.collect().foreach(println) //只有觸發(fā)collect方法,才會執(zhí)行我們的應用程序
//TODO 關閉環(huán)境
context.stop()
}
}
(2) 從外部存儲(文件)創(chuàng)建RDD由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建RDD 包括:本地的文件系統(tǒng),所有Hadoop支持的數(shù)據(jù)集,比如HDFS,HBase 等。
注意:這個文件的路徑,可以是項目目錄下,可以洗本地環(huán)境目錄下,或者說hdfs 的路徑下都是可以的。在文件中創(chuàng)建RDD,就要用textFile()
方法將文件的路徑給導入進去?;蛘咦x取數(shù)據(jù)的時候用wholeTextFiles()
方法可以看到里面的數(shù)據(jù)來源,具體是來自于哪一份文件。textFile
:以行為單位來讀取數(shù)據(jù),讀取的數(shù)據(jù)都是字符串wholeTextFIles
:以文件為單位讀取數(shù)據(jù),讀取的結果表示為元組,第一個元素表示文件路徑,第二個元素表示文件內容
package com.atguigu.bigdata.spark.core.wc.create_RDD
import org.apache.spark.{SparkConf, SparkContext}
//從文件中創(chuàng)建RDD
class Spark02_RDD_File {}
object Spark02_RDD_File{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD_File")
val context = new SparkContext(conf)
//TODO 創(chuàng)建RDD
//從文件中創(chuàng)建RDD,將文件中的數(shù)據(jù)作為處理的數(shù)據(jù)源
//path路徑默認以當前環(huán)境的根路徑為基準,可以寫絕對路徑,也可以寫相對路徑,
//還可以hdfs路徑也是可以的,例如:hdfs://master:9080/test.txt
val file = context.textFile("datas/*")
file.collect().foreach(println)
//TODO 關閉環(huán)境
context.stop()
}
}
(3) 從其他RDD創(chuàng)建主要是通過一個RDD運算完后,再產(chǎn)生新的RDD。
(4) 直接創(chuàng)建 RDD (new)使用new的方式直接構造 RDD,一般由 Spark 框架自身使用。
4、RDD 并行度與分區(qū)默認情況下,Spark 可以將一個作業(yè)切分多個任務后,發(fā)送給Executor 節(jié)點并行計算,而能夠并行計算的任務數(shù)量我們稱之為并行度。這個數(shù)量可以在構建RDD時指定。記住,這里的并行執(zhí)行的任務數(shù)量,并不是指的切分任務的數(shù)量,不要混淆了。
(1) makeRDD() 基于內存創(chuàng)建的RDD的分區(qū)注意:makeRDD()
方法,第二個參數(shù)是個隱式參數(shù),是分區(qū)的數(shù)量,如果不傳的話那么默認分區(qū)跟本地環(huán)境的核有關。比如我的電腦是4核,那么分區(qū)就是分為四個,并行計算。saveAsTextFile()
方法 將處理的數(shù)據(jù)保存成分區(qū)文件,里面的參數(shù)是要創(chuàng)建的文件名。然后輸出之后會自動生成一個這個名字的目錄,下面的文件是分區(qū)文件。
package com.atguigu.bigdata.spark.core.wc.create_RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//RDD 并行度
class Spark01_RDD_Memory_Par {}
object Spark01_RDD_Memory_Par{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
//這個 local[*] 里面加上*的意思是,可以模擬多核多線程,不加的話就是模擬的單線程
val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD")
val context = new SparkContext(conf)
//TODO 創(chuàng)建RDD
//RDD的并行度 & 分區(qū)
//makeRDD 方法可以傳入第二個參數(shù),第二個參數(shù)是分區(qū)的數(shù)量
//第二個參數(shù)是可以不傳的,因為是隱式參數(shù),如果不傳默認分區(qū)就是按照內核數(shù)量決定的,我的內核是4個,所以分區(qū)是4
val rdd:RDD[Int] = context.makeRDD(List(1, 2, 3,4,5),3) //里面的第一個參數(shù)是一個集合,第二個參數(shù)是分區(qū)的數(shù)量,分為幾個區(qū)
//saveAsTextFile方法 將處理的數(shù)據(jù)保存成分區(qū)文件
rdd.saveAsTextFile("output")//saveAsTextFile方法
//TODO 關閉環(huán)境
context.stop()
}
}
(2) 基于文件創(chuàng)建的RDD 的分區(qū)它分區(qū)分配數(shù)據(jù)的方式和Hadoop的分區(qū)的方式是一樣的。和上面的基于內存的分配數(shù)據(jù)的方式不一樣。
package com.atguigu.bigdata.spark.core.wc.create_RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
class Spark02_RDD_File_Par {}
object Spark02_RDD_File_Par{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD_File2")
val context = new SparkContext(conf)
//TODO 創(chuàng)建RDD
//textFile 可以將文件作為數(shù)據(jù)處理的數(shù)據(jù)源,默認也可以設定分區(qū)
// minPartitions:最小分區(qū)數(shù)量
//默認分區(qū)是兩個,如果不想使用默認的分區(qū)數(shù)量那么,可以通過第二個參數(shù)指定分區(qū)數(shù)
val rdd: RDD[String] = context.textFile("datas/one.txt",3)
rdd.saveAsTextFile("output")
//TODO 關閉環(huán)境
context.stop()
}
}
(3) 數(shù)據(jù)分區(qū)的規(guī)則首先看字節(jié),可以看到這個文件一共是14個字節(jié),加上回車符
然后我們分兩個區(qū),14 / 2 = 7,一個區(qū)是7個字節(jié),再用 14 / 7 = 2 可以看到剛好是2沒有余數(shù),所以沒有問題剛剛好。首先是要計算行偏移量,計算出第一行的行偏移量是多少,計算出第二行是多少,然后計算行偏移量的范圍就可以算出每個分區(qū)得到的數(shù)據(jù)是什么了。
查看結果
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧