真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

sparlsql有哪些

這篇文章給大家分享的是有關(guān)sparl sql有哪些的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

為永豐等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及永豐網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站設(shè)計、成都網(wǎng)站制作、永豐網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!

1、讀取json格式的文件創(chuàng)建DataFrame

java (spark1.6)

public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("javaSpark01");
        SparkContext sc = new SparkContext(conf);

        SQLContext sqlContext = new SQLContext(sc);
//        Dataset df = sqlContext.read().format("json").load("G:/idea/scala/spark02/json");
        Dataset df2 = sqlContext.read().json("G:/idea/scala/spark02/json");
        df2.show();

        //樹形的形式顯示schema信息
        df2.printSchema();

        //注冊臨時表 將DataFrame注冊成臨時的一張表,這張表臨時注冊到內(nèi)存中,是邏輯上的表,不會霧化到磁盤
        df2.registerTempTable("baidukt_table");
        Dataset sql = sqlContext.sql("select * from baidukt_table");
        sql.show();
        Dataset sql1 = sqlContext.sql("select age,count(1) from baidukt_table group by age");
        sql1.show();
 }

scala(spark 1.6)

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("Spark08 1.6")
     val sc = new SparkContext(conf)
    val sqlContext: SQLContext = new SQLContext(sc)

    val df = sqlContext.read.format("json").load("G:/idea/scala/spark02/json")
//    val df1 = sqlContext.read.json("G:/idea/scala/spark02/json")
    //顯示前50行數(shù)據(jù)
    df.show(50)
    //樹形的形式顯示schema信息
    df.printSchema()
    //注冊臨時表
    df.registerTempTable("baidukt_com_table")
    val result = sqlContext.sql("select age,count(1) from baidukt_com_table group by age")
    result.show()
    val result1 = sqlContext.sql("select * from baidukt_com_table")
    result1.show()
    sc.stop()
  }

java (spark 2.0++)

public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark 2.0 ++");
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
        Dataset df = spark.read().json("G:/idea/scala/spark02/json");
//        Dataset df1 = spark.read().format("json").load("G:/idea/scala/spark02/json");
        df.show();
        df.printSchema();

        df.createOrReplaceGlobalTempView("baidu_com_spark2");
        Dataset resut = spark.sql("select * from baidu_com_spark2");
        resut.show();
        spark.stop();
    }

scala(spark 2.0++)

  def main(args: Array[String]): Unit = {
    //用戶的當(dāng)前工作目錄
//    val  location = System.setProperties("user.dir","spark_2.0"
    val conf = new SparkConf().setAppName("Spark08 2.0++").setMaster("local[3]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //數(shù)據(jù)導(dǎo)入方式
    val df: DataFrame = spark.read.json("G:/idea/scala/spark02/json")
//    val df1: DataFrame = spark.read.format("json").load("G:/idea/scala/spark02/json")
    //查看表
    df.show()
    //查看表
    df.printSchema()

    //直接使用spark SQL進行查詢
    //先注冊為臨時表
    //createOrReplaceTempView:創(chuàng)建臨時視圖,此視圖的生命周期與用于創(chuàng)建此數(shù)據(jù)集的[SparkSession]相關(guān)聯(lián)。
    //createGlobalTempView:創(chuàng)建全局臨時視圖,此時圖的生命周期與Spark Application綁定。
    df.createOrReplaceTempView("people")
    val result: DataFrame = spark.sql("select * from people")
    result.show()
    spark.stop()
  }

2、通過json格式的RDD創(chuàng)建DataFrame

java(spark 1.6)

public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("jsonRDD");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        JavaRDD data = sc.parallelize(Arrays.asList(
                "{\"name\":\"zhangsan\",\"score\":\"100\"}",
                "{\"name\":\"lisi\",\"score\":\"200\"}",
                "{\"name\":\"wangwu\",\"score\":\"300\"}"
        ));
        Dataset df = sqlContext.read().json(data);
        df.show();
        df.printSchema();

        df.createOrReplaceTempView("baidu_com_spark2");
        Dataset resut = sqlContext.sql("select * from baidu_com_spark2");
        resut.show();
        sc.stop();
    }

scala(spark 1.6)

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("spark10")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val data: RDD[String] = sc.parallelize(Array(
      "{\"name\":\"zhangsan\",\"age\":18}",
      "{\"name\":\"lisi\",\"age\":19}",
      "{\"name\":\"wangwu\",\"age\":20}"
    ))
    val df = sqlContext.read.json(data)
    df.show()
    df.printSchema()
    df.createOrReplaceTempView("baidukt_com_spark1.6")
    val result = sqlContext.sql("select * from baidukt_com_spark1.6")
    result.show()
    result.printSchema()
    sc.stop()
  }

3、非json格式的RDD創(chuàng)建DataFrame

3.1 通過反射的方式將非json格式的RDD轉(zhuǎn)換成DataFrame(不推薦,所以不復(fù)制代碼過來了)

3.2、態(tài)創(chuàng)建Schema將非json格式的RDD轉(zhuǎn)換成DataFrame

4、讀取parquet文件創(chuàng)建DataFrame(多次io 不推薦)

5、讀取JDBC中的數(shù)據(jù)創(chuàng)建DataFrame(MySQL為例)

java(spark 1.6)

   public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("mysql");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        /**
         * 第一種方式讀取MySql數(shù)據(jù)庫表,加載為DataFrame
         */
        Map options = new HashMap();
        options.put("url", "jdbc:mysql://localhost:3306/spark");//連接地址和數(shù)據(jù)庫名稱
        options.put("driver", "com.mysql.jdbc.Driver");//驅(qū)動
        options.put("user", "root");//用戶名
        options.put("password", "admin");//密碼
        options.put("dbtable", "person");//表
        Dataset person = sqlContext.read().format("jdbc").options(options).load();
        person.show();
        //注冊臨時表
        person.registerTempTable("person");
        /**
         * 第二種方式讀取MySql數(shù)據(jù)表加載為DataFrame
         */
        DataFrameReader reader = sqlContext.read().format("jdbc");
        reader.option("url", "jdbc:mysql://localhost:3306/spark");
        reader.option("driver", "com.mysql.jdbc.Driver");
        reader.option("user", "root");
        reader.option("password", "admin");
        reader.option("dbtable", "score");
        Dataset score = reader.load();
        score.show();
        score.registerTempTable("score");

        Dataset result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");
        result.show();
        /**
         * 將DataFrame結(jié)果保存到Mysql中
         */
        Properties properties = new Properties();
        properties.setProperty("user", "root");
        properties.setProperty("password", "admin");
        result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties);
        sc.stop();
    }

scala (spark 1.6)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("mysql")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    /**
      * 第一種方式讀取Mysql數(shù)據(jù)庫表創(chuàng)建DF
      */
    val options = new mutable.HashMap[String,String]();
    options.put("url", "jdbc:mysql://localhost:3306/spark")
    options.put("driver","com.mysql.jdbc.Driver")
    options.put("user","root")
    options.put("password", "admin")
    options.put("dbtable","person")
    val person = sqlContext.read.format("jdbc").options(options).load()
    person.show()
    person.registerTempTable("person")
    /**
      * 第二種方式讀取Mysql數(shù)據(jù)庫表創(chuàng)建DF
      */
    val reader = sqlContext.read.format("jdbc")
    reader.option("url", "jdbc:mysql://localhost:3306/spark")
    reader.option("driver","com.mysql.jdbc.Driver")
    reader.option("user","root")
    reader.option("password","admin")
    reader.option("dbtable", "score")
    val score = reader.load()
    score.show()
    score.registerTempTable("score")
    val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")
    result.show()
    /**
      * 將數(shù)據(jù)寫入到Mysql表中
      */
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "admin")
    result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)
    sc.stop()
  }

6、讀取Hive中的數(shù)據(jù)加載成DataFrame

HiveContext是SQLContext的子類,連接Hive建議使用HiveContext。

java(spark 1.6)

public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("hive");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //HiveContext是SQLContext的子類。
        HiveContext hiveContext = new HiveContext(sc);
        hiveContext.sql("USE spark");
        hiveContext.sql("DROP TABLE IF EXISTS student_infos");
        //在hive中創(chuàng)建student_infos表
        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");
        hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");

        hiveContext.sql("DROP TABLE IF EXISTS student_scores");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");
        hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores INTO TABLE student_scores");
        /**
         * 查詢表生成DataFrame
         */
        Dataset goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80");
        hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
        goodStudentsDF.registerTempTable("goodstudent");
        Dataset result = hiveContext.sql("select * from goodstudent");
        result.show();
        result.printSchema();

        /**
         * 將結(jié)果保存到hive表 good_student_infos
         */
        goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");

        Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();
        for(Row goodStudentRow : goodStudentRows) {
            System.out.println(goodStudentRow);
        }
        sc.stop();
    }

scala(spark 1.6)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("HiveSource")
    val sc = new SparkContext(conf)
    /**
      * HiveContext是SQLContext的子類。
      */
    val hiveContext = new HiveContext(sc)
    hiveContext.sql("use spark")
    hiveContext.sql("drop table if exists student_infos")
    hiveContext.sql("create table if not exists student_infos (name string,age int) row format  delimited fields terminated by '\t'")
    hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")

    hiveContext.sql("drop table if exists student_scores")
    hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
    hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")

    val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
    hiveContext.sql("drop table if exists good_student_infos")
    /**
      * 將結(jié)果寫入到hive表中
      */
    df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
    sc.stop()
  }

7、自定義udf

scala(spark 1.6)

 def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("spark13")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    //rdd轉(zhuǎn)df
    val rdd: RDD[String] = spark.sparkContext.parallelize(Array("zhangsan","wangwu","masi"))
    val rowRDD: RDD[Row] = rdd.map(RowFactory.create(_))
    val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))
    val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,schema)

    df.show(50)
    
    df.printSchema()

    df.createOrReplaceTempView("test")
    //自定義udf函數(shù) 函數(shù)名為StrLen,參數(shù)為String、Int String有問題,網(wǎng)上說需要java.lang.String類型 
//    spark.sqlContext.udf.register("StrLen",(s:String,i:Int)=>{s.length+i})
//    spark.sqlContext.udf.register("StrLen",(i:Int)=>{i})
//    spark.sql("select name ,StrLen(name,10) as length from test").show(20)
    spark.sql("select name ,StrLen(10) as length from test").show(20)
  }

java (spark 1.6)

  public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("udf");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        JavaRDD parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
        JavaRDD rowRDD = parallelize.map(new Function() {

            private static final long serialVersionUID = 1L;

            @Override
            public Row call(String s) throws Exception {
                return RowFactory.create(s);
            }
        });

        List fields = new ArrayList();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));

        StructType schema = DataTypes.createStructType(fields);
        Dataset df = sqlContext.createDataFrame(rowRDD, schema);
        df.registerTempTable("user");

        /**
         * 根據(jù)UDF函數(shù)參數(shù)的個數(shù)來決定是實現(xiàn)哪一個UDF  UDF1,UDF2。。。。UDF1xxx
         */
        sqlContext.udf().register("StrLen", new UDF1() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(String t1) throws Exception {
                return t1.length();
            }
        }, DataTypes.IntegerType);
        sqlContext.sql("select name ,StrLen(name) as length from user").show();
        sc.stop();
    }

感謝各位的閱讀!關(guān)于“sparl sql有哪些”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!


網(wǎng)頁標(biāo)題:sparlsql有哪些
轉(zhuǎn)載來源:http://weahome.cn/article/jhpjoc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部