真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark2.x由淺入深深到底系列六之RDDjavaapi詳解三

學(xué)習任何spark知識點之前請先正確理解spark,可以參考:正確理解spark

創(chuàng)新互聯(lián)建站是一家專注于成都做網(wǎng)站、網(wǎng)站建設(shè)與策劃設(shè)計,開福網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:開福等地區(qū)。開福做網(wǎng)站價格咨詢:18982081108

本文詳細介紹了spark key-value類型的rdd java api

一、key-value類型的RDD的創(chuàng)建方式

1、sparkContext.parallelizePairs

JavaPairRDD javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
//結(jié)果:[(test,3), (kkk,3)]
System.out.println("javaPairRDD = " + javaPairRDD.collect());

2、keyBy的方式

public class User implements Serializable {
    private String userId;

    private Integer amount;

    public User(String userId, Integer amount) {
        this.userId = userId;
        this.amount = amount;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId='" + userId + '\'' +
                ", amount=" + amount +
                '}';
    }
}

JavaRDD userJavaRDD = sc.parallelize(Arrays.asList(new User("u1", 20)));
JavaPairRDD userJavaPairRDD = userJavaRDD.keyBy(new Function() {
    @Override
    public String call(User user) throws Exception {
        return user.getUserId();
    }
});
//結(jié)果:[(u1,User{userId='u1', amount=20})]
System.out.println("userJavaPairRDD = " + userJavaPairRDD.collect());

3、zip的方式

JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
//兩個rdd zip也是創(chuàng)建key-value類型RDD的一種方式
JavaPairRDD zipPairRDD = rdd.zip(rdd);
//結(jié)果:[(1,1), (1,1), (2,2), (3,3), (5,5), (8,8), (13,13)]
System.out.println("zipPairRDD = " + zipPairRDD.collect());

4、groupBy的方式

JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function isEven = new Function() {
    @Override
    public Boolean call(Integer x) throws Exception {
        return x % 2 == 0;
    }
};
//將偶數(shù)和奇數(shù)分組,生成key-value類型的RDD
JavaPairRDD> oddsAndEvens = rdd.groupBy(isEven);
//結(jié)果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]
System.out.println("oddsAndEvens = " + oddsAndEvens.collect());
//結(jié)果:1
System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());

oddsAndEvens = rdd.groupBy(isEven, 2);
//結(jié)果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]
System.out.println("oddsAndEvens = " + oddsAndEvens.collect());
//結(jié)果:2
System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());

二、combineByKey

JavaPairRDD javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),
                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);

//當在一個分區(qū)中遇到新的key的時候,對這個key對應(yīng)的value應(yīng)用這個函數(shù)
Function> createCombiner = new Function>() {
    @Override
    public Tuple2 call(Integer value) throws Exception {
        return new Tuple2<>(value, 1);
    }
};
//當在一個分區(qū)中遇到已經(jīng)應(yīng)用過上面createCombiner函數(shù)的key的時候,對這個key對應(yīng)的value應(yīng)用這個函數(shù)
Function2, Integer, Tuple2> mergeValue =
        new Function2, Integer, Tuple2>() {
            @Override
            public Tuple2 call(Tuple2 acc, Integer value) throws Exception {
                return new Tuple2<>(acc._1() + value, acc._2() + 1);
            }
        };
//當需要對不同分區(qū)的數(shù)據(jù)進行聚合的時候應(yīng)用這個函數(shù)
Function2, Tuple2, Tuple2> mergeCombiners =
        new Function2, Tuple2, Tuple2>() {
            @Override
            public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception {
                return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
            }
        };

JavaPairRDD> combineByKeyRDD =
        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
//結(jié)果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());

combineByKey的數(shù)據(jù)流如下:

spark2.x由淺入深深到底系列六之RDD java api詳解三

對于combineByKey的原理講解詳細見: spark core RDD api原理詳解

三、aggregateByKey

JavaPairRDD> aggregateByKeyRDD =
        javaPairRDD.aggregateByKey(new Tuple2<>(0, 0), mergeValue, mergeCombiners);
//結(jié)果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("aggregateByKeyRDD = " + aggregateByKeyRDD.collect());
//aggregateByKey是由combineByKey實現(xiàn)的,上面的aggregateByKey就是等于下面的combineByKeyRDD
Function> createCombinerAggregateByKey =
        new Function>() {
            @Override
            public Tuple2 call(Integer value) throws Exception {
                return mergeValue.call(new Tuple2<>(0, 0), value);
            }
        };
//結(jié)果是: [(coffee,(12,3)), (panda,(3,1))]
System.out.println(javaPairRDD.combineByKey(createCombinerAggregateByKey, mergeValue, mergeCombiners).collect());

四、reduceByKey

JavaPairRDD reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2() {
    @Override
    public Integer call(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
});
//結(jié)果:[(coffee,12), (panda,3)]
System.out.println("reduceByKeyRDD = " + reduceByKeyRDD.collect());
//reduceByKey底層也是combineByKey實現(xiàn)的,上面的reduceByKey等于下面的combineByKey
Function createCombinerReduce = new Function() {
    @Override
    public Integer call(Integer integer) throws Exception {
        return integer;
    }
};
Function2 mergeValueReduce =
        new Function2() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        };
//結(jié)果:[(coffee,12), (panda,3)]
System.out.println(javaPairRDD.combineByKey(createCombinerReduce, mergeValueReduce, mergeValueReduce).collect());

五、foldByKey

JavaPairRDD foldByKeyRDD = javaPairRDD.foldByKey(0, new Function2() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
});
//結(jié)果:[(coffee,12), (panda,3)]
System.out.println("foldByKeyRDD = " + foldByKeyRDD.collect());
//foldByKey底層也是combineByKey實現(xiàn)的,上面的foldByKey等于下面的combineByKey
Function2 mergeValueFold =
        new Function2() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        };

Function createCombinerFold = new Function() {
    @Override
    public Integer call(Integer integer) throws Exception {
        return mergeValueFold.call(0, integer);
    }
};
//結(jié)果:[(coffee,12), (panda,3)]
System.out.println(javaPairRDD.combineByKey(createCombinerFold, mergeValueFold, mergeValueFold).collect());

六、groupByKey

JavaPairRDD> groupByKeyRDD = javaPairRDD.groupByKey();
//結(jié)果:[(coffee,[1, 2, 9]), (panda,[3])]
System.out.println("groupByKeyRDD = " + groupByKeyRDD.collect());
//groupByKey底層也是combineByKey實現(xiàn)的,上面的groupByKey等于下面的combineByKey
Function> createCombinerGroup = new Function>() {
    @Override
    public List call(Integer integer) throws Exception {
        List list = new ArrayList<>();
        list.add(integer);
        return list;
    }
};
Function2, Integer, List> mergeValueGroup = new Function2, Integer, List>() {
    @Override
    public List call(List integers, Integer integer) throws Exception {
        integers.add(integer);
        return integers;
    }
};
Function2, List, List> mergeCombinersGroup =
        new Function2, List, List>() {
            @Override
            public List call(List integers, List integers2) throws Exception {
                integers.addAll(integers2);
                return integers;
            }
        };
//結(jié)果:[(coffee,[1, 2, 9]), (panda,[3])]
System.out.println(javaPairRDD.combineByKey(createCombinerGroup, mergeValueGroup, mergeCombinersGroup).collect());

對于api原理性的東西很難用文檔說明清楚,如果想更深入,更透徹的理解api的原理,可以參考: spark core RDD api原理詳解


文章題目:spark2.x由淺入深深到底系列六之RDDjavaapi詳解三
地址分享:http://weahome.cn/article/jsegjo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部