10余年建站經(jīng)驗, 成都網(wǎng)站設(shè)計、網(wǎng)站建設(shè)客戶的見證與正確選擇。成都創(chuàng)新互聯(lián)提供完善的營銷型網(wǎng)頁建站明細報價表。后期開發(fā)更加便捷高效,我們致力于追求更美、更快、更規(guī)范。
??Shark是伯克利實驗室Spark生態(tài)的組件之一,它修改了Hive Driver的內(nèi)存管理、物理計劃、執(zhí)行三個模塊,使之能運行在Spark引擎上,從而使得SQL查詢的速度得到10-100倍的提升。
??Shark對于Hive的太多依賴(如采用Hive的語法解析器、查詢優(yōu)化器等等),制約了Spark的One Stack Rule Them All的既定方針,制約了Spark各個組件的相互集成,所以提出了SparkSQL項目。
??SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優(yōu)點,如內(nèi)存列存儲(In-Memory Columnar Storage)、Hive兼容性等,重新開發(fā)了SparkSQL代碼。由于擺脫了對Hive的依賴性,SparkSQL無論在數(shù)據(jù)兼容、性能優(yōu)化、組件擴展方面都得到了極大地提升。
??除了采取In-Memory Columnar Storage、byte-code generation等優(yōu)化技術(shù)外,引進Cost Model對查詢進行動態(tài)評估、獲取最佳物理計劃等。
??2014年Shark停止開發(fā),團隊將所有資源放SparkSQL項目上,至此,Shark的發(fā)展畫上了句號,但也因此發(fā)展出兩條線:SparkSQL和Hive on Spark。
??其中SparkSQL作為Spark生態(tài)的一員繼續(xù)發(fā)展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一個Hive的發(fā)展計劃,該計劃將Spark作為Hive的底層引擎之一,也就是說,Hive將不再受限于一個引擎,可以采用Map-Reduce、Tez、Spark等引擎。
??Spark SQL是一個用于結(jié)構(gòu)化數(shù)據(jù)處理的模塊。Spark SQL賦予待處理數(shù)據(jù)一些結(jié)構(gòu)化信息,可以使用SQL語句或DataSet API接口與Spark SQL進行交互。
??Spark SQL可以使用sql讀寫Hive中的數(shù)據(jù);也可以在編程語言中使用sql,返回Dataset/DataFrame結(jié)果集。
??Dataset是一個分布式數(shù)據(jù)集,它結(jié)合了RDD與SparkSQL執(zhí)行引擎的優(yōu)點。Dataset可以通過JVM對象構(gòu)造,然后使用算子操作進行處理。Java和Scala都有Dataset API;Python和R本身支持Dataset特性。
??DataFrame是一個二維結(jié)構(gòu)的DataSet,相當于RDBMS中的表。DataFrame可以有多種方式構(gòu)造,比如結(jié)構(gòu)化數(shù)據(jù)文件、hive表、外部數(shù)據(jù)庫、RDD等。在Scala、Java、Python及R中都有DataFrame API。
import org.apache.spark.sql.SparkSession
// 構(gòu)造SparkSession
val spark = SparkSession
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
// 創(chuàng)建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// DataFrame操作
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;
// Print the schema in a tree format
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
from pyspark.sql import SparkSession
# 構(gòu)造SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
# 創(chuàng)建DataFrame
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# DataFrame操作
# spark, df are from the previous example
# Print the schema in a tree format
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Select only the "name" column
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+
# Count people by age
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
public void setName(String name) {
this.name = name;
public int getAge() {
return age;
public void setAge(int age) {
this.age = age;
// Create an instance of a Bean class
Person person = new Person();
// Encoders are created for Java beans
Encoder personEncoder = Encoders.bean(Person.class);
Dataset javaBeanDS = spark.createDataset(
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder integerEncoder = Encoders.INT();
Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset transformedDS = primitiveDS.map(
(MapFunction) value -> value + 1,
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset peopleDS = spark.read().json(path).as(personEncoder);
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Register the DataFrame as a SQL temporary view
val sqlDF = spark.sql("SELECT * FROM people")
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Register the DataFrame as a SQL temporary view
Dataset sqlDF = spark.sql("SELECT * FROM people");
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
# Register the DataFrame as a SQL temporary view
# df.createGlobalTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+