真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark2.x由淺入深深到底系列六之RDDjavaapi詳解二

在學(xué)習(xí)Spark前,建議先正確理解spark,可以參考:正確理解spark

創(chuàng)新互聯(lián)建站專(zhuān)注于企業(yè)營(yíng)銷(xiāo)型網(wǎng)站、網(wǎng)站重做改版、容城網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5高端網(wǎng)站建設(shè)、商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為容城等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。

本篇對(duì)JavaRDD基本的action api進(jìn)行了詳細(xì)的描述

先定義兩個(gè)Comparator實(shí)現(xiàn),一個(gè)是實(shí)現(xiàn)升序,一個(gè)是實(shí)現(xiàn)降序

//升序排序比較器
private static class AscComparator implements Comparator, Serializable {

    @Override
    public int compare(java.lang.Integer o1, java.lang.Integer o2) {
        return o1 - o2;
    }

}
//降序排序比較器
private static class DescComparator implements Comparator, Serializable {

    @Override
    public int compare(java.lang.Integer o1, java.lang.Integer o2) {
        return o2 - o1;
    }
}

再定義一個(gè)RDD:

JavaRDD listRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 3, 6), 2);

一、collect、take、top、first

//結(jié)果: [1, 2, 4, 3, 3, 6] 將RDD的所有數(shù)據(jù)收集到driver端來(lái),用于小數(shù)據(jù)或者實(shí)驗(yàn),
// 對(duì)大數(shù)據(jù)量的RDD進(jìn)行collect會(huì)出現(xiàn)driver端內(nèi)存溢出
System.out.println("collect = " + listRDD.collect());
//結(jié)果:[1, 2]  將RDD前面兩個(gè)元素收集到j(luò)ava端
//take的原理大致為:先看看RDD第一個(gè)分區(qū)的元素夠不夠我們想take的數(shù)量
//不夠的話再根據(jù)剩余的需要take的數(shù)據(jù)量來(lái)估算需要掃描多少個(gè)分區(qū)的數(shù)據(jù),直到take到了我們想要的數(shù)據(jù)個(gè)數(shù)為止
System.out.println("take(2) = " + listRDD.take(2));
//結(jié)果:[6, 4]  取RDD升序的最大的兩個(gè)元素
System.out.println("top(2) = " + listRDD.top(2));
//結(jié)果:[1, 2] 取RDD降序的最大的兩個(gè)元素(即取RDD最小的兩個(gè)元素)
System.out.println("DescComparator top(2) = " + listRDD.top(2, new DescComparator()));
//結(jié)果:1  其底層實(shí)現(xiàn)就是take(1)
System.out.println("first = " + listRDD.first());

二、min、max

//結(jié)果:1。 按照升序取最小值,就是RDD的最小值
System.out.println("min = " + listRDD.min(new AscComparator()));
//結(jié)果:6   按照降序取最小值,就是RDD的最大值
System.out.println("min = " + listRDD.min(new DescComparator()));
//結(jié)果:6   按照升序取最大值,就是RDD的最大值
System.out.println("max = " + listRDD.max(new AscComparator()));
//結(jié)果:1   按照降序取最大值,就是RDD的最小值
System.out.println("max = " + listRDD.max(new DescComparator()));

min和max的底層是用reduce api來(lái)實(shí)現(xiàn)的,下面是偽代碼

min()  == reduce((x, y) => if (x <= y) x else y)
max()  == redcue((x, y) => if (x >= y) x else y)

對(duì)于reduce api我們見(jiàn)下面的講解

三、takeOrdered

//結(jié)果:[1, 2] 返回該RDD最小的兩個(gè)元素
System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2));
//結(jié)果:[1, 2] 返回RDD按照升序的前面兩個(gè)元素,即返回該RDD最小的兩個(gè)元素
System.out.println("takeOrdered(2)  = " + listRDD.takeOrdered(2, new AscComparator()));
//結(jié)果:[6, 4] 返回RDD按照降序的前面兩個(gè)元素,即返回該RDD最大的兩個(gè)元素
System.out.println("takeOrdered(2)  = " + listRDD.takeOrdered(2, new DescComparator()));

四、foreach和foreachPartition

foreach是對(duì)RDD每一個(gè)元素應(yīng)用自定義的函數(shù),而foreachPartition是對(duì)RDD的每一個(gè)partition應(yīng)用自定義的函數(shù),使用時(shí)需要注意下面的建議

先定義一個(gè)比較耗時(shí)的操作:

public static Integer getInitNumber(String source) {
    System.out.println("get init number from " + source + ", may be take much time........");
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
}
listRDD.foreach(new VoidFunction() {
    @Override
    public void call(Integer element) throws Exception {
        //這個(gè)性能太差,遍歷每一個(gè)元素的時(shí)候都需要調(diào)用比較耗時(shí)的getInitNumber
        //建議采用foreachPartition來(lái)代替foreach操作
        Integer initNumber = getInitNumber("foreach");
        System.out.println((element + initNumber) + "=========");
    }
});

listRDD.foreachPartition(new VoidFunction>() {
    @Override
    public void call(Iterator integerIterator) throws Exception {
        //和foreach api的功能是一樣,只不過(guò)一個(gè)是將函數(shù)應(yīng)用到每一條記錄,這個(gè)是將函數(shù)應(yīng)用到每一個(gè)partition
        //如果有一個(gè)比較耗時(shí)的操作,只需要每一分區(qū)執(zhí)行一次這個(gè)操作就行,則用這個(gè)函數(shù)
        //這個(gè)耗時(shí)的操作可以是連接數(shù)據(jù)庫(kù)等操作,不需要計(jì)算每一條時(shí)候去連接數(shù)據(jù)庫(kù),一個(gè)分區(qū)只需連接一次就行
        Integer initNumber = getInitNumber("foreach");
        while (integerIterator.hasNext()) {
            System.out.println((integerIterator.next() + initNumber) + "=========");
        }
    }
});

五、reduce 和 treeReduce

Integer reduceResult = listRDD.reduce(new Function2() {
    @Override
    public Integer call(Integer ele1, Integer ele2) throws Exception {
        return ele1 + ele2;
    }
});
//結(jié)果:19
System.out.println("reduceResult = " + reduceResult);

Integer treeReduceResult = listRDD.treeReduce(new Function2() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
}, 3); //這個(gè)3表示做3次聚合才計(jì)算出結(jié)果
//結(jié)果:19
System.out.println("treeReduceResult = " + treeReduceResult);

它們倆的結(jié)果是一樣的,但是執(zhí)行流程不一樣,如下流程:

spark2.x由淺入深深到底系列六之RDD java api詳解二

如果分區(qū)數(shù)太多的話,使用treeReduce做多次聚合,可以提高性能,如下:

spark2.x由淺入深深到底系列六之RDD java api詳解二

六、fold

fold其實(shí)和reduce的功能類(lèi)似,只不過(guò)fold多了一個(gè)初始值而已

//和reduce的功能類(lèi)似,只不過(guò)是在計(jì)算每一個(gè)分區(qū)的時(shí)候需要加上初始值0,最后再將每一個(gè)分區(qū)計(jì)算出來(lái)的值相加再加上這個(gè)初始值
Integer foldResult = listRDD.fold(0, new Function2() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
});
//結(jié)果:19
System.out.println("foldResult = " + foldResult);

七、aggregate 和 treeAggregate

//先初始化一個(gè)我們想要的返回的數(shù)據(jù)類(lèi)型的初始值
//然后在每一個(gè)分區(qū)對(duì)每一個(gè)元素應(yīng)用函數(shù)一(acc, value) => (acc._1 + value, acc._2 + 1)進(jìn)行聚合
//最后將每一個(gè)分區(qū)生成的數(shù)據(jù)應(yīng)用函數(shù)(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)進(jìn)行聚合
Tuple2 aggregateResult = listRDD.aggregate(new Tuple2(0, 0),
        new Function2, Integer, Tuple2>() {
            @Override
            public Tuple2 call(Tuple2 acc, Integer integer) throws Exception {
                return new Tuple2<>(acc._1 + integer, acc._2 + 1);
            }
        }, new Function2, Tuple2, Tuple2>() {
            @Override
            public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception {
                return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
            }
        });
//結(jié)果:(19,6)
System.out.println("aggregateResult = " + aggregateResult);

Tuple2 treeAggregateResult = listRDD.treeAggregate(new Tuple2(0, 0),
        new Function2, Integer, Tuple2>() {
            @Override
            public Tuple2 call(Tuple2 acc, Integer integer) throws Exception {
                return new Tuple2<>(acc._1 + integer, acc._2 + 1);
            }
        }, new Function2, Tuple2, Tuple2>() {
            @Override
            public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception {
                return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
            }
        }, 2);
//結(jié)果:(19,6)
System.out.println("treeAggregateResult = " + treeAggregateResult);

兩者的結(jié)果是一致的,只不過(guò)執(zhí)行流程不一樣,如下是aggregate的執(zhí)行流程:

spark2.x由淺入深深到底系列六之RDD java api詳解二

如果RDD的分區(qū)數(shù)非常多的話,建議使用treeAggregate,如下是treeAggregate的執(zhí)行流程:

spark2.x由淺入深深到底系列六之RDD java api詳解二

aggregate和treeAggregate的比較:

1: aggregate在combine上的操作,時(shí)間復(fù)雜度為O(n). treeAggregate的時(shí)間復(fù)雜度為O(lgn)。

n表示分區(qū)數(shù)

2: aggregate把數(shù)據(jù)全部拿到driver端,存在內(nèi)存溢出的風(fēng)險(xiǎn)。treeAggregate則不會(huì)。

3:aggregate 比 treeAggregate在最后結(jié)果的reduce操作時(shí),多使用了一次初始值

對(duì)于以上api的原理層面的講解,可以參考spark core RDD api原理詳解,因?yàn)橛梦淖种v清楚原理性的東西是一件比較困難的事情,看了后記得也不深入


新聞名稱:spark2.x由淺入深深到底系列六之RDDjavaapi詳解二
轉(zhuǎn)載來(lái)于:http://weahome.cn/article/pgjhee.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部