==> 什么是 Spark SQL?
創(chuàng)新互聯(lián)公司專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、射洪網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5場景定制、成都商城網(wǎng)站開發(fā)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)公司、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為射洪等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
---> Spark SQL 是 Spark 用來處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊
---> 作用:提供一個(gè)編程抽象(DataFrame) 并且作為分布式 SQL 查詢引擎
---> 運(yùn)行原理:將 Spark SQL 轉(zhuǎn)化為 RDD, 然后提交到集群執(zhí)行
---> 特點(diǎn):
---- 容易整合
---- 統(tǒng)一的數(shù)據(jù)訪問方式
---- 兼容 Hive
---- 標(biāo)準(zhǔn)的數(shù)據(jù)連接
==> SparkSession
---> 特點(diǎn):(2.0引用 SparkSession)
---- 為用戶提供一個(gè)統(tǒng)一的切入點(diǎn)使用Spark 各項(xiàng)功能
---- 允許用戶通過它調(diào)用 DataFrame 和 Dataset 相關(guān) API 來編寫程序
---- 減少了用戶需要了解的一些概念,可以很容易的與 Spark 進(jìn)行交互
---- 與 Spark 交互之時(shí)不需要顯示的創(chuàng)建 SparkConf, SparkContext 以及 SQlContext,這些對象已經(jīng)封閉在 SparkSession 中
==> DataFrames 組織成命名列的數(shù)據(jù)集,等同于數(shù)據(jù)庫中的表
---> 與 RDD 相比較:
---- RDD 是分布式的 Java 對象 的集合
---- DataFrame 是分布式 Row 對象的集合
---> 創(chuàng)建 DataFrames
---- 通過 case class 創(chuàng)建 DataFrames
// 定義 case class (相當(dāng)于表的結(jié)構(gòu)) case class Emp(Empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int) // 將 HDFS 上的數(shù)據(jù)讀入 RDD, 并將 RDD 與 case class 關(guān)聯(lián) val lines = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(",")) val emp = lines.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)) ` // 將RDD 轉(zhuǎn)換成 DataFrames val empDF = emp.toDF // 通過 DataFrames 查詢數(shù)據(jù) empDF.show
---- 通過 SparkSession 創(chuàng)建 DataFrames
// 創(chuàng)建 StructType 來定義結(jié)構(gòu),注意,需要先導(dǎo)入模塊 import org.apache.spark.sql.types._ val myschema = StructType(List( StructField("empno", DataTypes.IntegerType), StructField("ename", DataTypes.StringType), StructField("job", DataTypes.StringType), StructField("mgr", DataTypes.StringType), StructField("hiredate", DataTypes.StringType), StructField("sal", DataTypes.IntegerType), StructField("comm", DataTypes.StringType), StructField("deptno", DataTypes.IntegerType) )) // 讀入數(shù)據(jù)且切分?jǐn)?shù)據(jù) val empcsvRDD = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(",")) // 將 RDD 數(shù)據(jù)映射成 Row,需要 import org.apache.spark.sql.Row import org.apache.spark.sql.Row val rowRDD = empcsvRDD.map(line=> Row(line(0).toInt, line(1), line(2), line(3),line(4), line(5).toInt, line(6), line(7).toInt) // 創(chuàng)建 DataFrames val df = spark.createDataFrame(rowRDD, myschema) // 查看表 df.show
---- 使用 Json 文件來創(chuàng)建 DataFrame
val df = spark.read.json("Json 文件") // 查看數(shù)據(jù) df.show
---> DataFrame 操作 DataFrame 操作也稱為無類型的 Dataset操作
---- 查詢所有員工姓名
df.select("ename").show
---- 查詢所有員工姓名和薪水,并給薪水加 100 元
df.select($"ename", $"sal", $"sal"+ 100).show
---- 查詢工資大于2000的員工
df.select($"sal" > 2000).show
---- 求每個(gè)部門員工數(shù)
df.groupBy($"deptno").count.show
---- 在 DataFrame 中使用 SQL 語句 注: 需要首先將 DataFrame 注冊成表(視圖)
df.createOrReplaceTempView("emp") // 執(zhí)行查詢 spark.sql("select * from emp").show
---> 臨時(shí)視圖(2種):
---- 只在當(dāng)前會(huì)話中有效 df.createOrReplaceTempView("emp1")
---- 在全局有效 df.createGlobalTempView("emp2")
==> Datasets
---> 數(shù)據(jù)的分布式集合
--->特點(diǎn):
---- Spark1.6中添加的新接口,是DataFrame之上更高一級的抽象
---- 提供了 RDD的優(yōu)點(diǎn)(強(qiáng)類型化,使用 lambda函數(shù)的能力)
---- Spark SQL 優(yōu)化后的執(zhí)行引擎
---- 可以從 JVM 對象構(gòu)造,然后使用函數(shù)轉(zhuǎn)換(map, flatMap, filter等)去操作
---- 支持 Scala 和 Java,不支持 Python
---> 創(chuàng)建 DataSet
---- 使用序列
// 定義 case class case class MyData(a:String, b:String) // 生成序列并創(chuàng)建 DataSet val ds = Seq(MyData(1, "Tom"), MyData(2, "Mary")).toDS // 查看結(jié)果 ds.show
---- 使用 Json 數(shù)據(jù)
// 定義 case class case class Person(name:String, gender:String) //通過 Json 數(shù)據(jù)生成 DataFrame val df = spark.read.json(sc.parallelize("""{"gender":"Male", "name": "Tom"}""" ::Nil)) // 將 DataFrame 轉(zhuǎn)成 DataSet df.as[Person].show df.as[Person].collect
---- 通過使用 DHFS 執(zhí)行 WordCount 程序
// 讀取 HDFS 數(shù)據(jù),并創(chuàng)建 DataSet val linesDS = spark.read.text("hdfs://bigdata0:9000/input/data.txt").as[String] // 對DataSet 進(jìn)行操作:分詞后, 查詢長度大于3 的單詞 val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3) // 查看結(jié)果 words.show words.collect // 執(zhí)行wordcount 程序 val result = linesDS.flatMap(_.split(" ").map((_.1)).groupByKey(x=> x._1).count) result.show // 排序 result.orderBy($"value").show
==> Datasets 操作
---> joinWith 和 join 的區(qū)別是連接后的新 Dataset 的 schema 會(huì)不一樣
// 使用 emp.json 生成 DataFrame val empDF = spark.read.json("/root/resources/emp.json") // 查詢工資大于 3000 的員工 empDF.where($"sal" > 3000).show // 創(chuàng)建 case class case class Emp(empno:Lone, ename:String, job:String, hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long) // 生成 DataSets,并查詢數(shù)據(jù) val empDS = empDF.as[Emp] // 查詢工資大于 3000 的員工 empDS.filter(_.sal > 3000).show // 查看 10 號(hào)部門的員工 empDS.filter(_.deptno == 10) // 多表查詢 // 1.創(chuàng)建部門表 val deptRDD = sc.textFile("/test/dept.csv").map(_.split(",")) case class Dept(deptno:Int, dname:String, loc:String) val deptDS = deptRDD.map(x=>Dept(x(0).toInt, x(1), x(2))).toDS // 2.創(chuàng)建員工表 case class Emp(empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int) val empRDD = sc.textFile("/test/emp.csv").map(_.split(",")) val empDS = empRDD.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)) // 3.執(zhí)行多表查詢: 等值鏈接 val result = deptDF.join(empDS, "deptno") // 另一種寫法: 注意有三個(gè)等號(hào) val result = deptDS.joinWith(empDS, deptDS("deptno") === empDS("deptno")) // 查看執(zhí)行計(jì)劃 result.explain