[TOC]
博愛(ài)網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)公司自2013年起到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。
測(cè)試代碼如下:
package cn.xpleaf.bigdata.spark.scala.sql.p1
import java.util.Properties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
/**
* SparkSQL關(guān)于加載數(shù)據(jù)和數(shù)據(jù)落地的各種實(shí)戰(zhàn)操作
*/
object _03SparkSQLLoadAndSaveOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// readOps(sqlContext)
writeOps(sqlContext)
sc.stop()
}
/**
* 在write結(jié)果到目錄中的時(shí)候需要留意相關(guān)異常
* org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists
* 如果還想使用該目錄的話,就需要設(shè)置具體的保存模式SaveMode
* ErrorIfExist
* 默認(rèn)的,目錄存在,拋異常
* Append
* 追加
* Ingore
* 忽略,相當(dāng)于不執(zhí)行
* Overwrite
* 覆蓋
*/
def writeOps(sqlContext:SQLContext): Unit = {
val df = sqlContext.read.json("D:/data/spark/sql/people.json")
df.registerTempTable("people")
val retDF = sqlContext.sql("select * from people where age > 20")
// retDF.show()
// 將結(jié)果落地
//retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json")
// 落地到數(shù)據(jù)庫(kù)
val url = "jdbc:MySQL://localhost:3306/test"
val table = "people1" // 會(huì)重新創(chuàng)建一張新表
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "root")
retDF.coalesce(1).write.jdbc(url, table, properties)
}
/*
// sparkSQL讀數(shù)據(jù)
// java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file
sparkSQL使用read.load加載的默認(rèn)文件格式為parquet(parquet.apache.org)
加載其它文件格式怎么辦?
需要指定加載文件的格式.format("json")
*/
def readOps(sqlContext:SQLContext): Unit = {
// val df = sqlContext.read.load("D:/data/spark/sql/users.parquet")
// val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json")
// val df = sqlContext.read.json("D:/data/spark/sql/people.json")
val url = "jdbc:mysql://localhost:3306/test"
val table = "people"
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "root")
val df = sqlContext.read.jdbc(url, table, properties)
df.show()
}
}
當(dāng)執(zhí)行讀操作時(shí),輸出結(jié)果如下:
+---+----+---+------+
| id|name|age|height|
+---+----+---+------+
| 1| 小甜甜| 18| 168.0|
| 2| 小丹丹| 19| 167.0|
| 3| 大神| 25| 181.0|
| 4| 團(tuán)長(zhǎng)| 38| 158.0|
| 5| 記者| 22| 169.0|
+---+----+---+------+
當(dāng)執(zhí)行寫操作時(shí):
1.如果保存到j(luò)son文件
注意有各種寫模式,另外其保存的是一個(gè)目錄,與HDFS兼容的目錄格式
2.如果保存到j(luò)dbc
則會(huì)在數(shù)據(jù)庫(kù)中創(chuàng)建一個(gè)DataFrame所包含列的表,注意該表不能存在
需要先啟動(dòng)Hive,然后再進(jìn)行下面的操作。
測(cè)試代碼如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOps
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* 通過(guò)創(chuàng)建HiveContext來(lái)操作Hive中表的數(shù)據(jù)
* 數(shù)據(jù)源:
* teacher_info.txt
* name(String) height(double)
* zhangsan,175
* lisi,180
* wangwu,175
* zhaoliu,195
* zhouqi,165
* weiba,185
*
* create table teacher_info(
* name string,
* height double
* ) row format delimited
* fields terminated by ',';
*
* teacher_basic.txt
* name(String) age(int) married(boolean) children(int)
* zhangsan,23,false,0
* lisi,24,false,0
* wangwu,25,false,0
* zhaoliu,26,true,1
* zhouqi,27,true,2
* weiba,28,true,3
*
* create table teacher_basic(
* name string,
* age int,
* married boolean,
* children int
* ) row format delimited
* fields terminated by ',';
* *
* 需求:
*1.通過(guò)sparkSQL在hive中創(chuàng)建對(duì)應(yīng)表,將數(shù)據(jù)加載到對(duì)應(yīng)表
*2.執(zhí)行sparkSQL作業(yè),計(jì)算teacher_info和teacher_basic的關(guān)聯(lián)信息,將結(jié)果存放在一張表teacher中
*
* 在集群中執(zhí)行hive操作的時(shí)候,需要以下配置:
* 1、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
2、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
*/
object _01HiveContextOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
// .setMaster("local[2]")
.setAppName(_01SparkSQLOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
//創(chuàng)建teacher_info表
hiveContext.sql("CREATE TABLE teacher_info(" +
"name string, " +
"height double) " +
"ROW FORMAT DELIMITED " +
"FIELDS TERMINATED BY ','")
hiveContext.sql("CREATE TABLE teacher_basic(" +
"name string, " +
"age int, " +
" married boolean, " +
"children int) " +
"ROW FORMAT DELIMITED " +
"FIELDS TERMINATED BY ','")
// 向表中加載數(shù)據(jù)
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info")
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic")
//第二步操作 計(jì)算兩張表的關(guān)聯(lián)數(shù)據(jù)
val joinDF = hiveContext.sql("SELECT " +
"b.name, " +
"b.age, " +
"if(b.married, '已婚', '未婚') as married, " +
"b.children, " +
"i.height " +
"FROM teacher_info i " +
"INNER JOIN teacher_basic b ON i.name = b.name")
joinDF.collect().foreach(println)
joinDF.write.saveAsTable("teacher")
sc.stop()
}
}
打包后上傳到集群環(huán)境中,然后針對(duì)Spark做如下配置:
在集群中執(zhí)行hive操作的時(shí)候,需要以下配置:
1、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
2、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
使用的spark提交作業(yè)的腳本如下:
[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop
/home/uplooking/app/spark/bin/spark-submit \
--class $2 \
--master spark://uplooking02:7077 \
--executor-memory 1G \
--num-executors 1 \
$1 \
執(zhí)行如下命令:
./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps
可以在作業(yè)執(zhí)行的輸出結(jié)果有看到我們期望的輸出,也可以直接在Hive中操作來(lái)進(jìn)行驗(yàn)證:
hive> show tables;
OK
hpeople
people
t1
teacher
teacher_basic
teacher_info
Time taken: 0.03 seconds, Fetched: 6 row(s)
hive> select * from teacher;
OK
zhangsan 23 未婚 0 175.0
lisi 24 未婚 0 180.0
wangwu 25 未婚 0 175.0
zhaoliu 26 已婚 1 195.0
zhouqi 27 已婚 2 165.0
weiba 28 已婚 3 185.0
Time taken: 0.369 seconds, Fetched: 6 row(s)
需要確保ElasticSearch環(huán)境已經(jīng)搭建好。
測(cè)試代碼如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._
/**
* Spark和ES的集成操作
* 引入Spark和es的maven依賴
* elasticsearch-hadoop
* 2.3.0
* 將account.json加載到es的索引庫(kù)spark/account
* 可以參考官方文檔:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html
*/
object _02SparkElasticSearchOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_02SparkElasticSearchOps.getClass().getSimpleName)
.setMaster("local[2]")
/**
* Spark和es的集成配置
*/
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "uplooking01")
conf.set("es.port", "9200")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// write2ES(sqlContext)
readFromES(sc)
sc.stop()
}
/**
* 從es中讀數(shù)據(jù)
* (使用sparkContext進(jìn)行操作)
*/
def readFromES(sc:SparkContext): Unit = {
val resources = "spark/account" // 索引庫(kù)/類型
val jsonRDD = sc.esJsonRDD(resources)
jsonRDD.foreach(println)
}
/**
* 向es中寫入數(shù)據(jù)
* (使用sqlContext進(jìn)行操作)
*/
def write2ES(sqlContext:SQLContext): Unit = {
val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json")
val resources = "spark/account" // 索引庫(kù)/類型
jsonDF.saveToEs(resources)
}
}
使用Spark SQL中的內(nèi)置函數(shù)對(duì)數(shù)據(jù)進(jìn)行分析,Spark SQL API不同的是,DataFrame中的內(nèi)置函數(shù)操作的結(jié)果是返回一個(gè)Column對(duì)象,而DataFrame天生就是"A distributed collection of data organized into named columns.",這就為數(shù)據(jù)的復(fù)雜分析建立了堅(jiān)實(shí)的基礎(chǔ)并提供了極大的方便性,例如說(shuō),我們?cè)诓僮鱀ataFrame的方法中可以隨時(shí)調(diào)用內(nèi)置函數(shù)進(jìn)行業(yè)務(wù)需要的處理,這之于我們構(gòu)建附件的業(yè)務(wù)邏輯而言是可以極大的減少不必須的時(shí)間消耗(基于上就是實(shí)際模型的映射),讓我們聚焦在數(shù)據(jù)分析上,這對(duì)于提高工程師的生產(chǎn)力而言是非常有價(jià)值的Spark 1.5.x開始提供了大量的內(nèi)置函數(shù),還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan
總體上而言內(nèi)置函數(shù)包含了五大基本類型:
1、聚合函數(shù),例如countDistinct、sumDistinct等;
2、集合函數(shù),例如sort_array、explode等
3、日期、時(shí)間函數(shù),例如hour、quarter、next_day
4、數(shù)學(xué)函數(shù),例如asin、atan、sqrt、tan、round等;
5、開窗函數(shù),例如rowNumber等
6、字符串函數(shù),concat、format_number、rexexp_extract
7、其它函數(shù),isNaN、sha、randn、callUDF
以下為Hive中的知識(shí)內(nèi)容,但是顯然Spark SQL也有同樣的概念
UDF
用戶自定義函數(shù):User Definded Function
一路輸入,一路輸出
a--->A
strlen("adbad")=5
UDAF
用戶自定義聚合函數(shù):User Definded Aggregation Function
多路輸入,一路輸出
sum(a, b, c, d)---->匯總的結(jié)果
表函數(shù)
UDTF:用戶自定義表函數(shù):User Definded Table Function
多路輸入,多路輸出
"hello you"
"hello me" ---->轉(zhuǎn)換操作,----->split("")---->Array[]
["hello, "you"]--->
"hello"
"you"
---->行列轉(zhuǎn)換
一個(gè)基本的案例如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
/**
* SparkSQL 內(nèi)置函數(shù)操作
*/
object _03SparkSQLFunctionOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName)
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val pdf = sqlContext.read.json("D:/data/spark/sql/people.json")
pdf.show()
pdf.registerTempTable("people")
// 統(tǒng)計(jì)人數(shù)
sqlContext.sql("select count(1) from people").show()
// 統(tǒng)計(jì)最小年齡
sqlContext.sql("select age, " +
"max(age) as max_age, " +
"min(age) as min_age, " +
"avg(age) as avg_age, " +
"count(age) as count " +
"from people group by age order by age desc").show()
sc.stop()
}
}
輸出結(jié)果如下:
+---+------+-------+
|age|height| name|
+---+------+-------+
| 10| 168.8|Michael|
| 30| 168.8| Andy|
| 19| 169.8| Justin|
| 32| 188.8| Jack|
| 10| 158.8| John|
| 19| 179.8| Domu|
| 13| 179.8| 袁帥|
| 30| 175.8| 殷杰|
| 19| 179.9| 孫瑞|
+---+------+-------+
18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1
+---+
|_c0|
+---+
| 9|
+---+
18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1
+---+-------+-------+-------+-----+
|age|max_age|min_age|avg_age|count|
+---+-------+-------+-------+-----+
| 32| 32| 32| 32.0| 1|
| 30| 30| 30| 30.0| 2|
| 19| 19| 19| 19.0| 3|
| 13| 13| 13| 13.0| 1|
| 10| 10| 10| 10.0| 2|
+---+-------+-------+-------+-----+
1、Spark 1.5.x版本以后,在Spark SQL和DataFrame中引入了開窗函數(shù),比如最經(jīng)典的就是我們的row_number(),可以讓我們實(shí)現(xiàn)分組取topn的邏輯。
2、做一個(gè)案例進(jìn)行topn的取值(利用Spark的開窗函數(shù)),不知道同學(xué)們是否還有印象,我們之前在最早的時(shí)候,做過(guò)topn的計(jì)算,當(dāng)時(shí)是非常麻煩的。但是現(xiàn)在用了Spark SQL之后,非常方便。
測(cè)試代碼如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* SparkSQL 內(nèi)置函數(shù)操作
*/
object _04SparkSQLFunctionOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName)
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
/**
* hive中的用戶自定義函數(shù)UDF操作(即在SparkSQL中類比hive來(lái)進(jìn)行操作,因?yàn)閔ive和SparkSQL都是交互式計(jì)算)
* 1.創(chuàng)建一個(gè)普通的函數(shù)
* 2.注冊(cè)(在SqlContext中注冊(cè))
* 3.直接使用即可
*
* 案例:創(chuàng)建一個(gè)獲取字符串長(zhǎng)度的udf
*/
// 1.創(chuàng)建一個(gè)普通的函數(shù)
def strLen(str:String):Int = str.length
// 2.注冊(cè)(在SqlContext中注冊(cè))
sqlContext.udf.register[Int, String]("myStrLen", strLen)
val list = List("Hello you", "Hello he", "Hello me")
// 將RDD轉(zhuǎn)換為DataFrame
val rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => {
Row(word)
})
val scheme = StructType(List(
StructField("word", DataTypes.StringType, false)
))
val df = sqlContext.createDataFrame(rowRDD, scheme)
df.registerTempTable("test")
// 3.直接使用即可
sqlContext.sql("select word, myStrLen(word) from test").show()
sc.stop()
}
}
輸出結(jié)果如下:
+-----+---+
| word|_c1|
+-----+---+
|Hello| 5|
| you| 3|
|Hello| 5|
| he| 2|
|Hello| 5|
| me| 2|
+-----+---+
測(cè)試代碼如下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
/**
* 這兩部分都比較重要:
* 1.使用SparkSQL完成單詞統(tǒng)計(jì)操作
* 2.開窗函數(shù)使用
*/
object _05SparkSQLFunctionOps2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName)
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val list = List("Hello you", "Hello he", "Hello me")
// 將RDD轉(zhuǎn)換為DataFrame
val rowRDD = sqlContext.sparkContext.parallelize(list).map(line => {
Row(line)
})
val scheme = StructType(List(
StructField("line", DataTypes.StringType, false)
))
val df = sqlContext.createDataFrame(rowRDD, scheme)
df.registerTempTable("test")
df.show()
// 執(zhí)行wordcount
val sql = "select t.word, count(1) as count " +
"from " +
"(select " +
"explode(split(line, ' ')) as word " +
"from test) as t " +
"group by t.word order by count desc"
sqlContext.sql(sql).show()
sc.stop()
}
}
輸出結(jié)果如下:
+---------+
| line|
+---------+
|Hello you|
| Hello he|
| Hello me|
+---------+
+-----+-----+
| word|count|
+-----+-----+
|Hello| 3|
| me| 1|
| he| 1|
| you| 1|
+-----+-----+