本篇文章給大家分享的是有關(guān)spark sql如何進(jìn)行讀寫數(shù)據(jù),小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說(shuō),跟著小編一起來(lái)看看吧。
我們注重客戶提出的每個(gè)要求,我們充分考慮每一個(gè)細(xì)節(jié),我們積極的做好成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作服務(wù),我們努力開拓更好的視野,通過(guò)不懈的努力,創(chuàng)新互聯(lián)建站贏得了業(yè)內(nèi)的良好聲譽(yù),這一切,也不斷的激勵(lì)著我們更好的服務(wù)客戶。 主要業(yè)務(wù):網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)站設(shè)計(jì),微信小程序開發(fā),網(wǎng)站開發(fā),技術(shù)開發(fā)實(shí)力,DIV+CSS,PHP及ASP,ASP.Net,SQL數(shù)據(jù)庫(kù)的技術(shù)開發(fā)工程師。
??Spark SQL支持多種結(jié)構(gòu)化數(shù)據(jù)源,輕松從各種數(shù)據(jù)源中讀取Row對(duì)象。這些數(shù)據(jù)源包括Parquet、JSON、Hive表及關(guān)系型數(shù)據(jù)庫(kù)等。
??當(dāng)只使用一部分字段時(shí),Spark SQL可以智能地只掃描這些字段,而不會(huì)像hadoopFile方法一樣簡(jiǎn)單粗暴地掃描全部數(shù)據(jù)。
??Parquet是一種流行的列式存儲(chǔ)格式,可以高效地存儲(chǔ)具有嵌套字段的記錄。Parquet自動(dòng)保存原始數(shù)據(jù)的類型,當(dāng)寫入Parquet文件時(shí),所有的列會(huì)自動(dòng)轉(zhuǎn)為可空約束。
scala
// Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
java
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; DatasetpeopleDF = spark.read().json("examples/src/main/resources/people.json"); // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write().parquet("people.parquet"); // Read in the Parquet file created above. // Parquet files are self-describing so the schema is preserved // The result of loading a parquet file is also a DataFrame Dataset
parquetFileDF = spark.read().parquet("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset
namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); Dataset
namesDS = namesDF.map( (MapFunction ) row -> "Name: " + row.getString(0), Encoders.STRING()); namesDS.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
python
peopleDF = spark.read.json("examples/src/main/resources/people.json") # DataFrames can be saved as Parquet files, maintaining the schema information. peopleDF.write.parquet("people.parquet") # Read in the Parquet file created above. # Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.show() # +------+ # | name| # +------+ # |Justin| # +------+
sql
CREATE TEMPORARY VIEW parquetTable USING org.apache.spark.sql.parquet OPTIONS ( path "examples/src/main/resources/people.parquet" ) SELECT * FROM parquetTable
??Spark SQL可以自動(dòng)推斷JSON數(shù)據(jù)集的結(jié)構(gòu),并加載為以Row為集合項(xiàng)的Dataset。
??默認(rèn)Spark SQL讀取的json文件不是常規(guī)的json文件,每一行必須包含一個(gè)獨(dú)立的、自包含的有效JSOn對(duì)象。對(duì)于常規(guī)的多行JSON文件,設(shè)置multiLine選項(xiàng)為true即可。
scala
// Primitive types (Int, String, etc) and Product types (case classes) encoders are // supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+
java
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files Datasetpeople = spark.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method people.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset
namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); namesDF.show(); // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset
storing one JSON object per string. List jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); Dataset anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING()); Dataset anotherPeople = spark.read().json(anotherPeopleDataset); anotherPeople.show(); // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+
python
# spark is from the previous example. sc = spark.sparkContext # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files path = "examples/src/main/resources/people.json" peopleDF = spark.read.json(path) # The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") # SQL statements can be run by using the sql methods provided by spark teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() # +------+ # | name| # +------+ # |Justin| # +------+ # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] otherPeopleRDD = sc.parallelize(jsonStrings) otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() # +---------------+----+ # | address|name| # +---------------+----+ # |[Columbus,Ohio]| Yin| # +---------------+----+
sql
CREATE TEMPORARY VIEW jsonTable USING org.apache.spark.sql.json OPTIONS ( path "examples/src/main/resources/people.json" ) SELECT * FROM jsonTable
??Spark SQL支持任何Hive支持的存儲(chǔ)格式(SerDe),包括文本文件、RCFiles、ORC、Parquet、Avro及Protocol Buffer等。
??如果已配置好hive環(huán)境,將hive-site.xml,core-site.xml(用于安全配置),hdfs-site.xml(HDFS配置)放到conf目錄下;如果沒(méi)有hive環(huán)境,Spark SQL會(huì)自動(dòng)在spark-warehouse(spark.sql.warehouse.dir配置項(xiàng))目錄下創(chuàng)建metastore_db。另外,需要賦予執(zhí)行spark應(yīng)用的用戶寫權(quán)限。
scala
import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ...
java
import java.io.File; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; } public void setKey(int key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } } // warehouseLocation points to the default location for managed databases and tables String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. DatasetsqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DataFrames are of type Row, which lets you to access each column by ordinal. Dataset
stringsDS = sqlDF.map( (MapFunction ) row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. List
records = new ArrayList<>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // Queries can then join DataFrames data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ...
python
from os.path import expanduser, join, abspath from pyspark.sql import SparkSession from pyspark.sql import Row # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() # spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show() # +---+-------+ # |key| value| # +---+-------+ # |238|val_238| # | 86| val_86| # |311|val_311| # ... # Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show() # +--------+ # |count(1)| # +--------+ # | 500 | # +--------+ # The results of SQL queries are themselves DataFrames and support all normal functions. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") # The items in DataFrames are of type Row, which allows you to access each column by ordinal. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) for record in stringsDS.collect(): print(record) # Key: 0, Value: val_0 # Key: 0, Value: val_0 # Key: 0, Value: val_0 # ... # You can also use DataFrames to create temporary views within a SparkSession. Record = Row("key", "value") recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) recordsDF.createOrReplaceTempView("records") # Queries can then join DataFrame data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() # +---+------+---+------+ # |key| value|key| value| # +---+------+---+------+ # | 2| val_2| 2| val_2| # | 4| val_4| 4| val_4| # | 5| val_5| 5| val_5| # ...
??Spark SQL可以使用JDBC連接讀寫關(guān)系型數(shù)據(jù)庫(kù)中的數(shù)據(jù)。這種方式比使用spark core中的JdbcRDD要好,因?yàn)樯傻腄ataFrame可以很容易被處理。
scala
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
java
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source DatasetjdbcDF = spark.read() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load(); Properties connectionProperties = new Properties(); connectionProperties.put("user", "username"); connectionProperties.put("password", "password"); Dataset
jdbcDF2 = spark.read() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Saving data to a JDBC source jdbcDF.write() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save(); jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Specifying create table column data types on write jdbcDF.write() .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
python
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load() jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save() jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"})
sql
CREATE TEMPORARY VIEW jdbcTable USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", dbtable "schema.tablename", user 'username', password 'password' ) INSERT INTO TABLE jdbcTable SELECT * FROM resultTable
以上就是spark sql如何進(jìn)行讀寫數(shù)據(jù),小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。