12 數(shù)據(jù)格式
10年積累的成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站制作后付款的網(wǎng)站建設(shè)流程,更有沾化免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
[[u'3', u'5'], [u'4', u'6'], [u'4', u'5'], [u'4', u'2']] 拆分或截取的原始數(shù)據(jù), 可以通過 map 中的 x[0], x[1] 來獲取對(duì)應(yīng)列的數(shù)據(jù)
可以通過 map 來轉(zhuǎn)換為key-value 數(shù)據(jù)格式 例如: df3 = df2.map(lambda x: (x[0], x[1]))
key-value 數(shù)據(jù)格式
[(u'3', u'5'), (u'4', u'6'), (u'4', u'5'), (u'4', u'2')] 中每一個(gè)() 表示一組數(shù)據(jù), 第一個(gè)表示key 第二個(gè)表示value
3)PipelinedRDD 類型表示 key-value形式數(shù)據(jù)
13 RDD類型轉(zhuǎn)換
userRdd = sc.textFile("D:\data\people.json")
userRdd = userRdd.map(lambda x: x.split(" "))
userRows = userRdd.map(lambda p:
Row(
userName = p[0],
userAge = int(p[1]),
userAdd = p[2],
userSalary = int(p[3])
)
)
print(userRows.take(4))
結(jié)果: [Row(userAdd='shanghai', userAge=20, userName='zhangsan', userSalary=13), Row(userAdd='beijin', userAge=30, userName='lisi', userSalary=15)]
2) 創(chuàng)建 DataFrame
userDF = sqlContext.createDataFrame(userRows)
if name== 'main':
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
sc = spark.sparkContext
rd = sc.textFile("D:\data\people.txt")
rd2 = rd.map(lambda x:x.split(","))
people = rd2.map(lambda p: Row(name=p[0], age=int(p[1])))
peopleDF = spark.createDataFrame(people)
peopleDF.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name,age FROM people where name='Andy'")
teenagers.show(5)
teenNames = teenagers.rdd.map(lambda p: 100 + p.age).collect()
for name in teenNames:
print(name)
15 dateFrame,sql,json使用詳細(xì)示例
#
#
#
#
"""
A simple example demonstrating basic Spark SQL features.
Run with:
./bin/spark-submit examples/src/main/python/sql/basic.py
"""
from futureimport print_function
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
def basic_df_example(spark):
# spark is an existing SparkSession
df = spark.read.json("/data/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# $example off:create_df$
# $example on:untyped_ops$
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+
# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
# $example off:untyped_ops$
# $example on:run_sql$
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# $example off:run_sql$
# $example on:global_temp_view$
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# $example off:global_temp_view$
def schema_inference_example(spark):
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
# $example off:schema_inferring$
def programmatic_schema_example(spark):
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# $example off:programmatic_schema$
if name== "main":
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# $example off:init_session$
basic_df_example(spark)
# schema_inference_example(spark)
# programmatic_schema_example(spark)
spark.stop()