object SparkSqlTest {
def main(args: Array[String]): Unit = {
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
/**
* 注意在spark 2.0之后:
* val sqlContext = new SQLContext(sparkContext)
* val hiveContext = new HiveContext(sparkContext)
* 主構(gòu)造器被私有化,所以這里只能使用SparkSession對象創(chuàng)建
*/
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//加載數(shù)據(jù)為DataFrame,這里加載的是json數(shù)據(jù)
//數(shù)據(jù)格式:{name:'',age:18}
val perDF: DataFrame = sqlContext.read.json("hdfs://zzy/data/person.json")
//查看二維表結(jié)構(gòu)
perDF.printSchema()
//查看數(shù)據(jù),默認(rèn)顯示20條記錄
perDF.show()
//復(fù)雜查詢
perDF.select("name").show() //指定字段進(jìn)行查詢
perDF.select(new Column("name"),new Column("age").>(18)).show() //指定查詢條件進(jìn)行查詢
perDF.select("name","age").where(new Column("age").>(18)).show() //指定查詢條件進(jìn)行查詢
perDF.select("age").groupBy("age").avg("age") //聚合操作
}
}
如果對入門案例不太了解的話,接下來分步驟的介紹:
創(chuàng)新互聯(lián)公司專注于五龍口網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供五龍口營銷型網(wǎng)站建設(shè),五龍口網(wǎng)站制作、五龍口網(wǎng)頁設(shè)計、五龍口網(wǎng)站官網(wǎng)定制、小程序設(shè)計服務(wù),打造五龍口網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供五龍口網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
通過RDD轉(zhuǎn)換為DataFrame/DataSet,有兩種方式:
- 通過反射的方式將RDD或者外部的集合轉(zhuǎn)化為dataframe/datasets
- 要通過編程動態(tài)的來將外部的集合或者RDD轉(zhuǎn)化為dataframe或者dataset
注意:如果是dataFrame對應(yīng)的是java bean ,如果是dataSet對應(yīng)的是case class
數(shù)據(jù)準(zhǔn)備:
case class Student(name:String, birthday:String, province:String)
val stuList = List(
new Student("委xx", "1998-11-11", "山西"),
new Student("吳xx", "1999-06-08", "河南"),
new Student("戚xx", "2000-03-08", "山東"),
new Student("王xx", "1997-07-09", "安徽"),
new Student("薛xx", "2002-08-09", "遼寧")
)
list --> DataFrame:
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Student]))
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
/**
* list--->DataFrame
* 將scala集合轉(zhuǎn)換為java集合
*/
val javaList: util.List[Student] = JavaConversions.seqAsJavaList(stuList)
val stuDF: DataFrame = sqlContext.createDataFrame(javaList,classOf[Student])
val count = stuDF.count()
println(count)
RDD --> DataFrame:
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Student]))
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//創(chuàng)建sparkContext
val sc: SparkContext = spark.sparkContext
/**
* RDD--->DataFrame
*/
val stuRDD: RDD[Student] = sc.makeRDD(stuList)
val stuDF: DataFrame = sqlContext.createDataFrame(stuRDD,classOf[Student])
val count = stuDF.count()
println(count)
list --> DataSet:
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Student]))
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//創(chuàng)建sparkContext
val sc: SparkContext = spark.sparkContext
/**
* list--->DataSet
*/
//如果創(chuàng)建Dataset 必須導(dǎo)入下面的隱式轉(zhuǎn)換
import spark.implicits._
val stuDF: Dataset[Student] = sqlContext.createDataset(stuList)
stuDF.createTempView("student")
//使用完整的sql語句進(jìn)行查詢,使用反射的方式,只有Dataset可以,dataFrame不行
val sql=
"""
|select * from student
""".stripMargin
spark.sql(sql).show()
RDD --> DataSet:
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Student]))
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//創(chuàng)建sparkContext
val sc: SparkContext = spark.sparkContext
/**
* RDD--->DataSet
*/
//如果創(chuàng)建Dataset 必須導(dǎo)入下面的隱式轉(zhuǎn)換
import spark.implicits._
val stuRDD: RDD[Student] = sc.makeRDD(stuList)
val stuDF: Dataset[Student] = sqlContext.createDataset(stuRDD)
stuDF.createTempView("student")
//使用完整的sql語句進(jìn)行查詢,使用反射的方式,只有Dataset可以,dataFrame不行
val sql=
"""
|select * from student
""".stripMargin
spark.sql(sql).show()
list --> DataFrame:
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Student]))
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//創(chuàng)建sparkContext
val sc: SparkContext = spark.sparkContext
//list-DataFrame
//1.將list中的元素全部轉(zhuǎn)化為Row
val RowList: List[Row] = stuList.map(item => {
Row(item.name, item.birthday, item.province)
})
//2.構(gòu)建元數(shù)據(jù)
val schema=StructType(List(
StructField("name",DataTypes.StringType),
StructField("birthday",DataTypes.StringType),
StructField("province",DataTypes.StringType)
))
//將scala的集合轉(zhuǎn)化為java集合
val javaList = JavaConversions.seqAsJavaList(RowList)
val stuDF = spark.createDataFrame(javaList,schema)
stuDF.createTempView("student")
//使用完整的sql語句進(jìn)行查詢,使用動態(tài)編程的方式,Dataset、dataFrame都可以
val sql=
"""
|select * from student
""".stripMargin
spark.sql(sql).show()
RDD--> DataFrame:
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Student]))
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//創(chuàng)建sparkContext
val sc: SparkContext = spark.sparkContext
//RDD-DataFrame
//將RDD中的元素轉(zhuǎn)換為Row
val RowRDD: RDD[Row] = sc.makeRDD(stuList).map(item => {
Row(item.name, item.birthday, item.province)
})
//2.構(gòu)建元數(shù)據(jù)
val schema=StructType(List(
StructField("name",DataTypes.StringType),
StructField("birthday",DataTypes.StringType),
StructField("province",DataTypes.StringType)
))
val stuDF = spark.createDataFrame(RowRDD,schema)
stuDF.createTempView("student")
//使用完整的sql語句進(jìn)行查詢,使用動態(tài)編程的方式,Dataset、dataFrame都可以
val sql=
"""
|select * from student
""".stripMargin
spark.sql(sql).show()
由于構(gòu)建DataFrame和構(gòu)建DataSet一模一樣,這里就不在演示
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//創(chuàng)建sparkContext
val sc: SparkContext = spark.sparkContext
//早期版本加載:parquet文件
sqlContext.load("hdfs://zzy/hello.parquet")
//加載json數(shù)據(jù)
sqlContext.read.json("hdfs://zzy/hello.json")
//加載普通文件
sqlContext.read.text("hdfs://zzy/hello.txt")
//加載csv
sqlContext.read.csv("hdfs://zy/hello.csv")
//讀取jdbc的數(shù)據(jù)
val url="jdbc:MySQL://localhost:3306/hello"
val properties=new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
val tableName="book"
sqlContext.read.jdbc(url,tableName,properties)
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//構(gòu)建編程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
.setMaster("local[2]")
val spark: SparkSession = SparkSession.builder().config(conf)
.getOrCreate()
//創(chuàng)建sqlcontext對象
val sqlContext: SQLContext = spark.sqlContext
//創(chuàng)建sparkContext
val sc: SparkContext = spark.sparkContext
val testFD: DataFrame = sqlContext.read.text("hdfs://zzy/hello.txt")
//寫入到普通文件
testFD.write.format("json") //以什么格式寫入
.mode(SaveMode.Append) //寫入方式
.save("hdfs://zzy/hello.json") //寫入的文件位置
//寫入到數(shù)據(jù)庫
val url="jdbc:mysql://localhost:3306/hello"
val table_name="book"
val prots=new Properties()
prots.put("user","root")
prots.put("password","123456")
testFD.write.mode(SaveMode.Append).jdbc(url,table_name,prots)