學(xué)習任何spark知識點之前請先正確理解spark,可以參考:正確理解spark
創(chuàng)新互聯(lián)建站是一家專注于成都做網(wǎng)站、網(wǎng)站建設(shè)與策劃設(shè)計,開福網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:開福等地區(qū)。開福做網(wǎng)站價格咨詢:18982081108
本文詳細介紹了spark key-value類型的rdd java api
一、key-value類型的RDD的創(chuàng)建方式
1、sparkContext.parallelizePairs
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3))); //結(jié)果:[(test,3), (kkk,3)] System.out.println("javaPairRDD = " + javaPairRDD.collect());
2、keyBy的方式
public class User implements Serializable { private String userId; private Integer amount; public User(String userId, Integer amount) { this.userId = userId; this.amount = amount; } @Override public String toString() { return "User{" + "userId='" + userId + '\'' + ", amount=" + amount + '}'; } } JavaRDDuserJavaRDD = sc.parallelize(Arrays.asList(new User("u1", 20))); JavaPairRDD userJavaPairRDD = userJavaRDD.keyBy(new Function () { @Override public String call(User user) throws Exception { return user.getUserId(); } }); //結(jié)果:[(u1,User{userId='u1', amount=20})] System.out.println("userJavaPairRDD = " + userJavaPairRDD.collect());
3、zip的方式
JavaRDDrdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); //兩個rdd zip也是創(chuàng)建key-value類型RDD的一種方式 JavaPairRDD zipPairRDD = rdd.zip(rdd); //結(jié)果:[(1,1), (1,1), (2,2), (3,3), (5,5), (8,8), (13,13)] System.out.println("zipPairRDD = " + zipPairRDD.collect());
4、groupBy的方式
JavaRDDrdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function isEven = new Function () { @Override public Boolean call(Integer x) throws Exception { return x % 2 == 0; } }; //將偶數(shù)和奇數(shù)分組,生成key-value類型的RDD JavaPairRDD > oddsAndEvens = rdd.groupBy(isEven); //結(jié)果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])] System.out.println("oddsAndEvens = " + oddsAndEvens.collect()); //結(jié)果:1 System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size()); oddsAndEvens = rdd.groupBy(isEven, 2); //結(jié)果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])] System.out.println("oddsAndEvens = " + oddsAndEvens.collect()); //結(jié)果:2 System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());
二、combineByKey
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //當在一個分區(qū)中遇到新的key的時候,對這個key對應(yīng)的value應(yīng)用這個函數(shù) Function > createCombiner = new Function >() { @Override public Tuple2 call(Integer value) throws Exception { return new Tuple2<>(value, 1); } }; //當在一個分區(qū)中遇到已經(jīng)應(yīng)用過上面createCombiner函數(shù)的key的時候,對這個key對應(yīng)的value應(yīng)用這個函數(shù) Function2 , Integer, Tuple2 > mergeValue = new Function2 , Integer, Tuple2 >() { @Override public Tuple2 call(Tuple2 acc, Integer value) throws Exception { return new Tuple2<>(acc._1() + value, acc._2() + 1); } }; //當需要對不同分區(qū)的數(shù)據(jù)進行聚合的時候應(yīng)用這個函數(shù) Function2 , Tuple2 , Tuple2 > mergeCombiners = new Function2 , Tuple2 , Tuple2 >() { @Override public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception { return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }; JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); //結(jié)果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
combineByKey的數(shù)據(jù)流如下:
對于combineByKey的原理講解詳細見: spark core RDD api原理詳解
三、aggregateByKey
JavaPairRDD> aggregateByKeyRDD = javaPairRDD.aggregateByKey(new Tuple2<>(0, 0), mergeValue, mergeCombiners); //結(jié)果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("aggregateByKeyRDD = " + aggregateByKeyRDD.collect()); //aggregateByKey是由combineByKey實現(xiàn)的,上面的aggregateByKey就是等于下面的combineByKeyRDD Function > createCombinerAggregateByKey = new Function >() { @Override public Tuple2 call(Integer value) throws Exception { return mergeValue.call(new Tuple2<>(0, 0), value); } }; //結(jié)果是: [(coffee,(12,3)), (panda,(3,1))] System.out.println(javaPairRDD.combineByKey(createCombinerAggregateByKey, mergeValue, mergeCombiners).collect());
四、reduceByKey
JavaPairRDDreduceByKeyRDD = javaPairRDD.reduceByKey(new Function2 () { @Override public Integer call(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); //結(jié)果:[(coffee,12), (panda,3)] System.out.println("reduceByKeyRDD = " + reduceByKeyRDD.collect()); //reduceByKey底層也是combineByKey實現(xiàn)的,上面的reduceByKey等于下面的combineByKey Function createCombinerReduce = new Function () { @Override public Integer call(Integer integer) throws Exception { return integer; } }; Function2 mergeValueReduce = new Function2 () { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }; //結(jié)果:[(coffee,12), (panda,3)] System.out.println(javaPairRDD.combineByKey(createCombinerReduce, mergeValueReduce, mergeValueReduce).collect());
五、foldByKey
JavaPairRDDfoldByKeyRDD = javaPairRDD.foldByKey(0, new Function2 () { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); //結(jié)果:[(coffee,12), (panda,3)] System.out.println("foldByKeyRDD = " + foldByKeyRDD.collect()); //foldByKey底層也是combineByKey實現(xiàn)的,上面的foldByKey等于下面的combineByKey Function2 mergeValueFold = new Function2 () { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }; Function createCombinerFold = new Function () { @Override public Integer call(Integer integer) throws Exception { return mergeValueFold.call(0, integer); } }; //結(jié)果:[(coffee,12), (panda,3)] System.out.println(javaPairRDD.combineByKey(createCombinerFold, mergeValueFold, mergeValueFold).collect());
六、groupByKey
JavaPairRDD> groupByKeyRDD = javaPairRDD.groupByKey(); //結(jié)果:[(coffee,[1, 2, 9]), (panda,[3])] System.out.println("groupByKeyRDD = " + groupByKeyRDD.collect()); //groupByKey底層也是combineByKey實現(xiàn)的,上面的groupByKey等于下面的combineByKey Function > createCombinerGroup = new Function >() { @Override public List call(Integer integer) throws Exception { List list = new ArrayList<>(); list.add(integer); return list; } }; Function2 , Integer, List
> mergeValueGroup = new Function2 , Integer, List
>() { @Override public List call(List integers, Integer integer) throws Exception { integers.add(integer); return integers; } }; Function2 , List
, List > mergeCombinersGroup = new Function2 , List
, List >() { @Override public List call(List integers, List integers2) throws Exception { integers.addAll(integers2); return integers; } }; //結(jié)果:[(coffee,[1, 2, 9]), (panda,[3])] System.out.println(javaPairRDD.combineByKey(createCombinerGroup, mergeValueGroup, mergeCombinersGroup).collect());
對于api原理性的東西很難用文檔說明清楚,如果想更深入,更透徹的理解api的原理,可以參考: spark core RDD api原理詳解