真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新互聯(lián)網(wǎng)站建設(shè)提供從項(xiàng)目策劃、軟件開(kāi)發(fā),軟件安全維護(hù)、網(wǎng)站優(yōu)化(SEO)、網(wǎng)站分析、效果評(píng)估等整套的建站服務(wù),主營(yíng)業(yè)務(wù)為成都做網(wǎng)站、網(wǎng)站制作,成都App定制開(kāi)發(fā)以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。創(chuàng)新互聯(lián)深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!

1、簡(jiǎn)介

從Spark2.0以上的版本開(kāi)始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext

來(lái)實(shí)現(xiàn)對(duì)數(shù)據(jù)的加載、轉(zhuǎn)換、處理等工作,并且實(shí)現(xiàn)了SQLcontext和HiveContext的所有功能。

我們?cè)谛掳姹局胁⒉恍枰澳敲捶爆嵉膭?chuàng)建很多對(duì)象,只需要?jiǎng)?chuàng)建一個(gè)SparkSession對(duì)象即可。

SparkSession支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame,并支持把DataFrame轉(zhuǎn)換成SQLContext自身中的表。

然后使用SQL語(yǔ)句來(lái)操作數(shù)據(jù),也提供了HiveQL以及其他依賴于Hive的功能支持。

創(chuàng)建SparkSession

SparkSession 是 Spark SQL 的入口。

使用 Dataset 或者 Datafram 編寫 Spark SQL 應(yīng)用的時(shí)候,第一個(gè)要?jiǎng)?chuàng)建的對(duì)象就是 SparkSession。

Builder 是 SparkSession 的構(gòu)造器。 通過(guò) Builder, 可以添加各種配置。

Builder 的方法如下:

MethodDescription
getOrCreate獲取或者新建一個(gè) sparkSession
enableHiveSupport增加支持 hive Support
appName設(shè)置 application 的名字
config設(shè)置各種配置

2、sparkSQL基本使用方法

使用的spark版本2.4.3

spark 1.x中的SQLContext在新版本中已經(jīng)被廢棄,改為SparkSession.builder

spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解

可以寫成

val conf = new SparkConf().setAppName("helloworld").setMaster("local[*]")
val spark1=SparkSession.builder().config(conf).getOrCreate()

或(sparksession構(gòu)造器私有化在builder中)

val spark = SparkSession.builder
      .appName("my spark application")
      .master("local[2]")
      .getOrCreate()

例:

import org.apache.spark.sql.SparkSession
object HelloWorld {
  def main(args: Array[String]): Unit = {
   /* val conf = new SparkConf().setAppName("helloworld").setMaster("local[*]")
    val spark1=SparkSession.builder().config(conf).getOrCreate()*/
    val spark = SparkSession.builder
      .appName("my spark application")
      .master("local[2]")
      .getOrCreate()
    //讀取數(shù)據(jù)
    val df = spark.read.json("/usr/local/opt/spark-2.4.3/examples/src/main/resources/people.json")
    //展示所有數(shù)據(jù)
    df.show()
  //DSL
    df.select("name").show()
    //SQL
    df.createTempView("people")
    spark.sql("select * from people where age=30").show()
    //關(guān)閉
    spark.close()
  }
}

輸出結(jié)果 1:

 //展示所有數(shù)據(jù)
    df.show()

spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解

輸出結(jié)果 2:

//DSL
    df.select("name").show()

spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解

輸出結(jié)果 3:

//SQL
    df.createTempView("people")
    spark.sql("select * from people where age=30").show()

spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解

3、通過(guò)udf自定義用戶函數(shù)addName (實(shí)現(xiàn)將字段x前拼接上name:x)

scala> spark.read.json("./examples/src/main/resources/people.json")
res32: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res32.createOrReplaceTempView("people")
scala> spark.sql("select * from people")
res38: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
scala> spark.udf.register("addName",(x:String)=> "name:"+x)
res40: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(,StringType,Some(List(StringType)))
scala> spark.sql("select addName(name) as name from people").show
+------------+
|        name|
+------------+
|name:Michael|
|   name:Andy|
| name:Justin|
+------------+

4、通過(guò)udaf自定義用戶函數(shù)

package com.ny.service
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
class CustomerAvg extends UserDefinedAggregateFunction {
  //輸入的類型
  override def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil)
  //緩存數(shù)據(jù)的類型
  override def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  //返回值類型
  override def dataType: DataType = LongType
  //冪等性
  override def deterministic: Boolean = true
  //初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
//更新 分區(qū)內(nèi)操作
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getLong(0) +input.getLong(0)
    buffer(1)=buffer.getLong(1)+1L
  }
//合并 分區(qū)與分區(qū)之間操作
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
    buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
  }
  //最終執(zhí)行的方法
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0)/buffer.getLong(1)
  }
}
object CustomerAvg{
  def main(args: Array[String]): Unit = {
     val spark= SparkSession.builder()
       .appName("MyAvg")
       .master("local[2]")
       .getOrCreate()
    spark.udf.register("MyAvg",new CustomerAvg)
//讀數(shù)據(jù)
    val frame = spark.read.json("/usr/local/opt/spark-2.4.3/examples/src/main/resources/peopleCP.json")
   frame.createTempView("peopleCP")
    spark.sql("select MyAvg(age) avg_age from peopleCP").show()
    spark.stop()
  }
}
nancylulululu:resources nancy$ vi peopleCP.json 
{"name":"Michael","age":11}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

返回結(jié)果

spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解

上述就是小編為大家分享的spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


網(wǎng)頁(yè)標(biāo)題:spark2.4.3中sparkSQL用戶自定義函數(shù)該怎么理解
網(wǎng)站路徑:http://weahome.cn/article/gepcho.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部