真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

SparkSQL筆記整理(三):加載保存功能與SparkSQL函數(shù)

[TOC]

博愛(ài)網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)公司自2013年起到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司


加載保存功能

數(shù)據(jù)加載(json文件、jdbc)與保存(json、jdbc)

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p1

import java.util.Properties

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}

/**
  * SparkSQL關(guān)于加載數(shù)據(jù)和數(shù)據(jù)落地的各種實(shí)戰(zhàn)操作
  */
object _03SparkSQLLoadAndSaveOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)

        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

//        readOps(sqlContext)
        writeOps(sqlContext)
        sc.stop()
    }

    /**
      * 在write結(jié)果到目錄中的時(shí)候需要留意相關(guān)異常
      *     org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists
      * 如果還想使用該目錄的話,就需要設(shè)置具體的保存模式SaveMode
      * ErrorIfExist
      *     默認(rèn)的,目錄存在,拋異常
      * Append
      *     追加
      * Ingore
      *     忽略,相當(dāng)于不執(zhí)行
      * Overwrite
      *     覆蓋
      */
    def writeOps(sqlContext:SQLContext): Unit = {
        val df = sqlContext.read.json("D:/data/spark/sql/people.json")
        df.registerTempTable("people")
        val retDF = sqlContext.sql("select * from people where age > 20")
//        retDF.show()
        // 將結(jié)果落地
        //retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json")
        // 落地到數(shù)據(jù)庫(kù)
        val url = "jdbc:MySQL://localhost:3306/test"
        val table = "people1"   // 會(huì)重新創(chuàng)建一張新表
        val properties = new Properties()
        properties.put("user", "root")
        properties.put("password", "root")
        retDF.coalesce(1).write.jdbc(url, table, properties)
    }

    /*
        // sparkSQL讀數(shù)據(jù)
        // java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file
        sparkSQL使用read.load加載的默認(rèn)文件格式為parquet(parquet.apache.org)
        加載其它文件格式怎么辦?
            需要指定加載文件的格式.format("json")
     */
    def readOps(sqlContext:SQLContext): Unit = {
        //        val df = sqlContext.read.load("D:/data/spark/sql/users.parquet")
        //        val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json")
        //        val df = sqlContext.read.json("D:/data/spark/sql/people.json")
        val url = "jdbc:mysql://localhost:3306/test"
        val table = "people"
        val properties = new Properties()
        properties.put("user", "root")
        properties.put("password", "root")
        val df = sqlContext.read.jdbc(url, table, properties)

        df.show()
    }
}

當(dāng)執(zhí)行讀操作時(shí),輸出結(jié)果如下:

+---+----+---+------+
| id|name|age|height|
+---+----+---+------+
|  1| 小甜甜| 18| 168.0|
|  2| 小丹丹| 19| 167.0|
|  3|  大神| 25| 181.0|
|  4|  團(tuán)長(zhǎng)| 38| 158.0|
|  5|  記者| 22| 169.0|
+---+----+---+------+

當(dāng)執(zhí)行寫操作時(shí):

1.如果保存到j(luò)son文件
注意有各種寫模式,另外其保存的是一個(gè)目錄,與HDFS兼容的目錄格式

2.如果保存到j(luò)dbc
則會(huì)在數(shù)據(jù)庫(kù)中創(chuàng)建一個(gè)DataFrame所包含列的表,注意該表不能存在

Spark SQL和Hive的集成

需要先啟動(dòng)Hive,然后再進(jìn)行下面的操作。

代碼編寫

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOps
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

/**
  * 通過(guò)創(chuàng)建HiveContext來(lái)操作Hive中表的數(shù)據(jù)
  * 數(shù)據(jù)源:
  * teacher_info.txt
  *     name(String)    height(double)
  *     zhangsan,175
  *     lisi,180
  *     wangwu,175
  *     zhaoliu,195
  *     zhouqi,165
  *     weiba,185
  *
  *     create table teacher_info(
  *     name string,
  *     height double
  *     ) row format delimited
  *     fields terminated by ',';
  *
  * teacher_basic.txt
  *     name(String)    age(int)    married(boolean)    children(int)
  *     zhangsan,23,false,0
  *     lisi,24,false,0
  *     wangwu,25,false,0
  *     zhaoliu,26,true,1
  *     zhouqi,27,true,2
  *     weiba,28,true,3
  *
  *     create table teacher_basic(
  *     name string,
  *     age int,
  *     married boolean,
  *     children int
  *     ) row format delimited
  *     fields terminated by ',';
  * *
  * 需求:
  *1.通過(guò)sparkSQL在hive中創(chuàng)建對(duì)應(yīng)表,將數(shù)據(jù)加載到對(duì)應(yīng)表
  *2.執(zhí)行sparkSQL作業(yè),計(jì)算teacher_info和teacher_basic的關(guān)聯(lián)信息,將結(jié)果存放在一張表teacher中
  * 
  * 在集群中執(zhí)行hive操作的時(shí)候,需要以下配置:
  *     1、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
        2、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄
         export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
  */
object _01HiveContextOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
//            .setMaster("local[2]")
            .setAppName(_01SparkSQLOps.getClass.getSimpleName)

        val sc = new SparkContext(conf)
        val hiveContext = new HiveContext(sc)

        //創(chuàng)建teacher_info表
        hiveContext.sql("CREATE TABLE teacher_info(" +
            "name string, " +
            "height double) " +
            "ROW FORMAT DELIMITED " +
            "FIELDS TERMINATED BY ','")

        hiveContext.sql("CREATE TABLE teacher_basic(" +
            "name string, " +
            "age int, " +
            " married boolean, " +
            "children int) " +
            "ROW FORMAT DELIMITED " +
            "FIELDS TERMINATED BY ','")

        // 向表中加載數(shù)據(jù)
        hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info")
        hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic")

        //第二步操作 計(jì)算兩張表的關(guān)聯(lián)數(shù)據(jù)
        val joinDF = hiveContext.sql("SELECT " +
            "b.name, " +
            "b.age, " +
            "if(b.married, '已婚', '未婚') as married, " +
            "b.children, " +
            "i.height " +
            "FROM teacher_info i " +
            "INNER JOIN teacher_basic b ON i.name = b.name")

        joinDF.collect().foreach(println)

        joinDF.write.saveAsTable("teacher")

        sc.stop()
    }
}
打包、上傳與配置

打包后上傳到集群環(huán)境中,然后針對(duì)Spark做如下配置:

在集群中執(zhí)行hive操作的時(shí)候,需要以下配置:
    1、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
    2、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄
       export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
提交spark作業(yè)

使用的spark提交作業(yè)的腳本如下:

[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh 
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop

/home/uplooking/app/spark/bin/spark-submit \
--class $2 \
--master spark://uplooking02:7077 \
--executor-memory 1G \
--num-executors 1 \
$1 \

執(zhí)行如下命令:

./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps
驗(yàn)證

可以在作業(yè)執(zhí)行的輸出結(jié)果有看到我們期望的輸出,也可以直接在Hive中操作來(lái)進(jìn)行驗(yàn)證:

hive> show tables;
OK
hpeople
people
t1
teacher
teacher_basic
teacher_info
Time taken: 0.03 seconds, Fetched: 6 row(s)
hive> select * from teacher;
OK
zhangsan        23      未婚    0       175.0
lisi    24      未婚    0       180.0
wangwu  25      未婚    0       175.0
zhaoliu 26      已婚    1       195.0
zhouqi  27      已婚    2       165.0
weiba   28      已婚    3       185.0
Time taken: 0.369 seconds, Fetched: 6 row(s)

Spark和ES的集成

需要確保ElasticSearch環(huán)境已經(jīng)搭建好。

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._

/**
  * Spark和ES的集成操作
  *     引入Spark和es的maven依賴
  *     elasticsearch-hadoop
  *     2.3.0
  *     將account.json加載到es的索引庫(kù)spark/account
  *     可以參考官方文檔:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html
  */
object _02SparkElasticSearchOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
                        .setAppName(_02SparkElasticSearchOps.getClass().getSimpleName)
                        .setMaster("local[2]")
        /**
          * Spark和es的集成配置
          */
        conf.set("es.index.auto.create", "true")
        conf.set("es.nodes", "uplooking01")
        conf.set("es.port", "9200")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

//        write2ES(sqlContext)

        readFromES(sc)

        sc.stop()
    }

    /**
      * 從es中讀數(shù)據(jù)
      * (使用sparkContext進(jìn)行操作)
      */
    def readFromES(sc:SparkContext): Unit = {
        val resources = "spark/account"  // 索引庫(kù)/類型
        val jsonRDD = sc.esJsonRDD(resources)
        jsonRDD.foreach(println)
    }

    /**
      * 向es中寫入數(shù)據(jù)
      * (使用sqlContext進(jìn)行操作)
      */
    def write2ES(sqlContext:SQLContext): Unit = {
        val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json")
        val resources = "spark/account"  // 索引庫(kù)/類型
        jsonDF.saveToEs(resources)
    }
}

Spark SQL函數(shù)

概述(Spark 1.5.X ~ 1.6.X的內(nèi)置函數(shù))

使用Spark SQL中的內(nèi)置函數(shù)對(duì)數(shù)據(jù)進(jìn)行分析,Spark SQL API不同的是,DataFrame中的內(nèi)置函數(shù)操作的結(jié)果是返回一個(gè)Column對(duì)象,而DataFrame天生就是"A distributed collection of data organized into named columns.",這就為數(shù)據(jù)的復(fù)雜分析建立了堅(jiān)實(shí)的基礎(chǔ)并提供了極大的方便性,例如說(shuō),我們?cè)诓僮鱀ataFrame的方法中可以隨時(shí)調(diào)用內(nèi)置函數(shù)進(jìn)行業(yè)務(wù)需要的處理,這之于我們構(gòu)建附件的業(yè)務(wù)邏輯而言是可以極大的減少不必須的時(shí)間消耗(基于上就是實(shí)際模型的映射),讓我們聚焦在數(shù)據(jù)分析上,這對(duì)于提高工程師的生產(chǎn)力而言是非常有價(jià)值的Spark 1.5.x開始提供了大量的內(nèi)置函數(shù),還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan

總體上而言內(nèi)置函數(shù)包含了五大基本類型:

1、聚合函數(shù),例如countDistinct、sumDistinct等;
2、集合函數(shù),例如sort_array、explode等
3、日期、時(shí)間函數(shù),例如hour、quarter、next_day
4、數(shù)學(xué)函數(shù),例如asin、atan、sqrt、tan、round等;
5、開窗函數(shù),例如rowNumber等
6、字符串函數(shù),concat、format_number、rexexp_extract
7、其它函數(shù),isNaN、sha、randn、callUDF

以下為Hive中的知識(shí)內(nèi)容,但是顯然Spark SQL也有同樣的概念
UDF
    用戶自定義函數(shù):User Definded Function
    一路輸入,一路輸出
    a--->A
    strlen("adbad")=5
UDAF
    用戶自定義聚合函數(shù):User Definded Aggregation Function
    多路輸入,一路輸出
    sum(a, b, c, d)---->匯總的結(jié)果
表函數(shù)
    UDTF:用戶自定義表函數(shù):User Definded Table Function
    多路輸入,多路輸出
    "hello you"
    "hello me"  ---->轉(zhuǎn)換操作,----->split("")---->Array[] 
    ["hello, "you"]--->
        "hello"
        "you"
    ---->行列轉(zhuǎn)換

一個(gè)基本的案例如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/**
  * SparkSQL 內(nèi)置函數(shù)操作
  */
object _03SparkSQLFunctionOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
            .setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName)
            .setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        val pdf = sqlContext.read.json("D:/data/spark/sql/people.json")
        pdf.show()

        pdf.registerTempTable("people")

        // 統(tǒng)計(jì)人數(shù)
        sqlContext.sql("select count(1) from people").show()
        // 統(tǒng)計(jì)最小年齡
        sqlContext.sql("select age, " +
            "max(age) as max_age, " +
            "min(age) as min_age, " +
            "avg(age) as avg_age, " +
            "count(age) as count " +
            "from people group by age order by age desc").show()

        sc.stop()
    }
}

輸出結(jié)果如下:

+---+------+-------+
|age|height|   name|
+---+------+-------+
| 10| 168.8|Michael|
| 30| 168.8|   Andy|
| 19| 169.8| Justin|
| 32| 188.8|   Jack|
| 10| 158.8|   John|
| 19| 179.8|   Domu|
| 13| 179.8|     袁帥|
| 30| 175.8|     殷杰|
| 19| 179.9|     孫瑞|
+---+------+-------+

18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1
+---+
|_c0|
+---+
|  9|
+---+

18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1
+---+-------+-------+-------+-----+
|age|max_age|min_age|avg_age|count|
+---+-------+-------+-------+-----+
| 32|     32|     32|   32.0|    1|
| 30|     30|     30|   30.0|    2|
| 19|     19|     19|   19.0|    3|
| 13|     13|     13|   13.0|    1|
| 10|     10|     10|   10.0|    2|
+---+-------+-------+-------+-----+

Spark SQL開窗函數(shù)

1、Spark 1.5.x版本以后,在Spark SQL和DataFrame中引入了開窗函數(shù),比如最經(jīng)典的就是我們的row_number(),可以讓我們實(shí)現(xiàn)分組取topn的邏輯。

2、做一個(gè)案例進(jìn)行topn的取值(利用Spark的開窗函數(shù)),不知道同學(xué)們是否還有印象,我們之前在最早的時(shí)候,做過(guò)topn的計(jì)算,當(dāng)時(shí)是非常麻煩的。但是現(xiàn)在用了Spark SQL之后,非常方便。

Spark SQL之UDF操作

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * SparkSQL 內(nèi)置函數(shù)操作
  */
object _04SparkSQLFunctionOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
            .setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName)
            .setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        /**
          * hive中的用戶自定義函數(shù)UDF操作(即在SparkSQL中類比hive來(lái)進(jìn)行操作,因?yàn)閔ive和SparkSQL都是交互式計(jì)算)
          * 1.創(chuàng)建一個(gè)普通的函數(shù)
          * 2.注冊(cè)(在SqlContext中注冊(cè))
          * 3.直接使用即可
          *
          * 案例:創(chuàng)建一個(gè)獲取字符串長(zhǎng)度的udf
          */

        // 1.創(chuàng)建一個(gè)普通的函數(shù)
        def strLen(str:String):Int = str.length

        // 2.注冊(cè)(在SqlContext中注冊(cè))
        sqlContext.udf.register[Int, String]("myStrLen", strLen)

        val list = List("Hello you", "Hello he", "Hello me")

        // 將RDD轉(zhuǎn)換為DataFrame
        val rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => {
            Row(word)
        })
        val scheme = StructType(List(
            StructField("word", DataTypes.StringType, false)
        ))
        val df = sqlContext.createDataFrame(rowRDD, scheme)

        df.registerTempTable("test")

        // 3.直接使用即可
        sqlContext.sql("select word, myStrLen(word) from test").show()

        sc.stop()
    }

}

輸出結(jié)果如下:

+-----+---+
| word|_c1|
+-----+---+
|Hello|  5|
|  you|  3|
|Hello|  5|
|   he|  2|
|Hello|  5|
|   me|  2|
+-----+---+

Spark SQL之wordcount操作

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}

/**
  * 這兩部分都比較重要:
  * 1.使用SparkSQL完成單詞統(tǒng)計(jì)操作
  * 2.開窗函數(shù)使用
  */
object _05SparkSQLFunctionOps2 {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
            .setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName)
            .setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        val list = List("Hello you", "Hello he", "Hello me")

        // 將RDD轉(zhuǎn)換為DataFrame
        val rowRDD = sqlContext.sparkContext.parallelize(list).map(line => {
            Row(line)
        })
        val scheme = StructType(List(
            StructField("line", DataTypes.StringType, false)
        ))

        val df = sqlContext.createDataFrame(rowRDD, scheme)

        df.registerTempTable("test")

        df.show()

        // 執(zhí)行wordcount
        val sql =  "select t.word, count(1) as count " +
                    "from " +
                        "(select " +
                            "explode(split(line, ' ')) as word " +
                        "from test) as t " +
                    "group by t.word order by count desc"
        sqlContext.sql(sql).show()

        sc.stop()
    }
}

輸出結(jié)果如下:

+---------+
|     line|
+---------+
|Hello you|
| Hello he|
| Hello me|
+---------+

+-----+-----+
| word|count|
+-----+-----+
|Hello|    3|
|   me|    1|
|   he|    1|
|  you|    1|
+-----+-----+

本文題目:SparkSQL筆記整理(三):加載保存功能與SparkSQL函數(shù)
轉(zhuǎn)載源于:http://weahome.cn/article/jheojj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部