真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

第19課:Spark高級排序徹底解密

本節(jié)課內(nèi)容:

創(chuàng)新互聯(lián)建站是一家集網(wǎng)站建設(shè),青岡企業(yè)網(wǎng)站建設(shè),青岡品牌網(wǎng)站建設(shè),網(wǎng)站定制,青岡網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,青岡網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

    1、基礎(chǔ)排序算法實(shí)戰(zhàn)

    2、二次排序算法實(shí)戰(zhàn)

    3、更高級別排序算法

    4、排序算法內(nèi)幕解密


排序在Spark運(yùn)用程序中使用的比較多,且維度也不一樣,如二次排序,三次排序等,在機(jī)器學(xué)習(xí)算法中經(jīng)常碰到,所以非常重要,必須掌握!

所謂二次排序,就是根據(jù)兩列值進(jìn)行排序,如下測試數(shù)據(jù):

2 3

4 1

3 2

4 3

8 7

2 1

經(jīng)過二次排序后的結(jié)果(升序):

2 1

2 3

3 2

4 1

4 3

8 7

在編寫二次排序代碼前,先簡單的寫下單個key排序的代碼:

val conf = new SparkConf().setAppName("SortByKey").setMaster("local")

val sc = new SparkContext(conf)

val lines = sc.textFile("C:\\User\\Test.txt")

valwords = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

val wordcount = words.map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1))

wordcount.collect().foreach(println)

以上就是簡單的wordcount程序,程序中使用了sortByKey排序

下面我們通過代碼實(shí)現(xiàn)二次排序算法

首先我們先通過Java代碼實(shí)現(xiàn)上面測試數(shù)據(jù)進(jìn)行二次排序

排序最主要的就是Key的準(zhǔn)備,我們先用Java編寫二次排序的key,參考代碼如下:

import java.io.Serializable;

import scala.math.Ordered;

public class SecondarySortKey implements Ordered, Serializable {

    private int first;

    private int second;

    @Override

    public int hashCode() {

        final int prime = 31;

        int result = 1;

        result = prime * result + first;

        result = prime * result + second;

        return result;

    }

    @Override

    public boolean equals(Object obj) {

        if (this == obj)

            return true;

        if (obj == null)

            return false;

        if (getClass() != obj.getClass())

            return false;

        SecondarySortKey other = (SecondarySortKey) obj;

        if (first != other.first)

            return false;

        if (second != other.second)

            return false;

        return true;

    }

    public int getFirst() {

        return first;

    }

    public void setFirst(int first) {

        this.first = first;

    }

    public int getSecond() {

        return second;

    }

    public void setSecond(int second) {

        this.second = second;

    }

    public SecondarySortKey(int first, int second) {

        this.first = first;

        this.second = second;

    }

    public boolean $greater(SecondarySortKey other) {

        if (this.first > other.getFirst()) {

            return true;

        } else if (this.first == other.getFirst() && this.second > other.getSecond()) {

            return true;

        }

        return false;

    }

    public boolean $greater$eq(SecondarySortKey other) {

        if (this.$greater(other)) {

            return true;

        } else if (this.first == other.getFirst() && this.second == other.getSecond()) {

            return true;

        }

        return false;

    }

    public boolean $less(SecondarySortKey other) {

        if (this.first < other.getFirst()) {

            return true;

        } else if (this.first == other.getFirst() && this.second < other.getSecond()) {

            return true;

        }

        return false;

    }

    public boolean $less$eq(SecondarySortKey other) {

        if (this.$less(other)) {

            return true;

        } else if (this.first == other.getFirst() && this.second < other.getSecond()) {

            return true;

        }

        return false;

    }

    public int compare(SecondarySortKey other) {

        if (this.first - other.getFirst() != 0) {

            return this.first - other.getFirst();

        } else {

            return this.second - other.getSecond();

        }

    }

    public int compareTo(SecondarySortKey other) {

        if (this.first - other.getFirst() != 0) {

            return this.first - other.getFirst();

        } else {

            return this.second - other.getSecond();

        }

    }

根據(jù)上面生成的排序key編寫對測試數(shù)據(jù)的二次排序

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**

 * DT_Spark大數(shù)據(jù)夢工廠

 * 二次排序,具體的實(shí)現(xiàn)步驟: 

 * 第一步:按照Ordered和Serializable接口實(shí)現(xiàn)自定義排序的key 

 * 第二步:將要進(jìn)行二次排序的文件加載進(jìn)來生成類型的RDD 

 * 第三步:使用sortByKey基于自定義的Key進(jìn)行二次排序 

 * 第四步:去除掉排序的Key,只保留排序的結(jié)果

 */

public class SecondarySortKeyApp {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD lines = sc.textFile("C:\\Users\\Test.txt");

        

        //將自定義的key添加進(jìn)來

        JavaPairRDD pairs = lines

                .mapToPair(new PairFunction() {

                    private static final long serialVersionUID = 1L;

                    public Tuple2 call(String line) throws Exception {

                        String[] splited = line.split(" ");

                        SecondarySortKey key = new SecondarySortKey(Integer.valueOf(splited[0]),

                                Integer.valueOf(splited[1]));

                        return new Tuple2(key, line);

                    }

                });

        

        //根據(jù)我們自定義的key進(jìn)行升序排序

        JavaPairRDD sorted = pairs.sortByKey(); //sortByKey(false) 降序

        // 過濾掉排序后的自定義的Key,保留排序的結(jié)果

        JavaRDD secondarySort = sorted.map(new Function, String>() {

            public String call(Tuple2 sortedContent) throws Exception {

                return sortedContent._2;

            }

        });

        secondarySort.foreach(new VoidFunction() {

            public void call(String sorted) throws Exception {

                System.out.println(sorted);

            }

        });

    }

運(yùn)行結(jié)果:

2 1

2 3

3 2

4 1

4 3

8 7

下面我通過Scala方式實(shí)現(xiàn)上述二次排序,scala代碼非常簡潔

先創(chuàng)建我們自定義排序key

/**

*DT_Spark大數(shù)據(jù)夢工廠
* 自定義二次排序的key
*/
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(other: SecondarySortKey): Int = {
if (this.first - other.first != 0) {
  this.first - other.first
}
else {
  this.second - other.second
}
}

根據(jù)自定義排序Key實(shí)現(xiàn)二次排序

import org.apache.spark.{SparkConf, SparkContext}

/**
* 二次排序,具體的實(shí)現(xiàn)步驟:
* 第一步:按照Ordered和Serializable接口實(shí)現(xiàn)自定義排序的key
* 第二步:將要進(jìn)行的二次排序的文件加載進(jìn)來生成類型的RDD
* 第三步:使用sortByKey基于自定義的Key進(jìn)行二次排序 第
* 四步:去除掉排序的Key,只保留排序的結(jié)果
*/

object

SecondarySortKeyApp {

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");
val sc = new SparkContext(conf)
val lines = sc.textFile("C:\\Users\\Test.txt")//添加key,組合成(key,value)格式
val pairWithSortKey = lines.map(line => (new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line))
//格式:Tuple2(key,value)
val sorted = pairWithSortKey.sortByKey()

//過濾掉key,只保留value
val sortedResult = sorted.map(sort => sort._2)

//顯示結(jié)果
sortedResult.collect().foreach(println)

}

}

運(yùn)行結(jié)果:

2 1

2 3

3 2

4 1

4 3

8 7

從上面的代碼可以看出,通過scala代碼實(shí)現(xiàn)二次排序確實(shí)非常簡潔,這也是scala的強(qiáng)大之處所在。

更高級別排序算法和內(nèi)幕解密在后續(xù)課程中在分享。

備注:

資料來源于:DT_大數(shù)據(jù)夢工廠

更多私密內(nèi)容,請關(guān)注微信公眾號:DT_Spark

如果您對大數(shù)據(jù)Spark感興趣,可以免費(fèi)聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費(fèi)公開課,地址YY房間號:68917580


網(wǎng)頁名稱:第19課:Spark高級排序徹底解密
本文地址:http://weahome.cn/article/iehhjh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部