真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

sparkRDD算子的創(chuàng)建和使用-創(chuàng)新互聯(lián)

spark是大數(shù)據(jù)領(lǐng)域近幾年比較火的編程開發(fā)語言。有眾多的好處,比如速度快,基于內(nèi)存式計算框架。

創(chuàng)新互聯(lián)建站服務(wù)項目包括樂至網(wǎng)站建設(shè)、樂至網(wǎng)站制作、樂至網(wǎng)頁制作以及樂至網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,樂至網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到樂至省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

不多說直接講 spark的RDD 算子的使用。

如果有spark環(huán)境搭建等問題,請自行查找資料。本文不做講述。

spark rdd的創(chuàng)建有兩種方式:

1>從集合創(chuàng)建。也就是從父rdd繼承過來

2>從外部創(chuàng)建。

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import com.google.common.base.Optional;

import scala.Tuple2;

public class Demo01 {

	public static void main(String[] args) {
		
		SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local");
		JavaSparkContext jsc = new JavaSparkContext(conf);
		
		//map(jsc);
		//filter(jsc);
	    // flatMap(jsc);
		//groupByKey(jsc);
		//reduceByKey(jsc);
		//sortByKey(jsc);
		//join(jsc);
		leftOutJoin(jsc);
		jsc.stop();
	}

	//每一條元素 都乘以2,并且打印
	private static void map(JavaSparkContext jsc) {
		
		//數(shù)據(jù)源
		List lst = Arrays.asList(1,2,3,4,5,6,7,8);
		
		JavaRDD numRDD = jsc.parallelize(lst);
		
		JavaRDD resultRDD = numRDD.map(new Function() {
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer num) throws Exception {
				
				return num * 2;
			}
		});
		
		resultRDD.foreach(new VoidFunction() {
			
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Integer num) throws Exception {		
				System.out.println(num);
			}
		});
		 
	}
	
	// 把集合中的偶數(shù)過濾出來
	private static void filter(JavaSparkContext jsc) {
	
		//數(shù)據(jù)源
		List lst = Arrays.asList(1,2,3,4,5,6,7,8);
		
		JavaRDD numRDD = jsc.parallelize(lst);
		
		System.out.println(numRDD.filter(new Function() {
			private static final long serialVersionUID = 1L;

			@Override
			public Boolean call(Integer num) throws Exception {
				
				return num % 2 ==0;
			}
		}).collect());
	}

	//將一行行數(shù)據(jù)的單詞拆分為一個個單詞
	private static void flatMap(JavaSparkContext jsc) {
		
		List lst = Arrays.asList("hi tim ","hello girl","hello spark");
		
		JavaRDD lines = jsc.parallelize(lst);
		
		JavaRDD resultRDD = lines.flatMap(new FlatMapFunction() {

			private static final long serialVersionUID = 1L;

			@Override
			public Iterable call(String line) throws Exception {
			
				return Arrays.asList(line.split(" "));
			}
		});
		
		System.out.println(resultRDD.collect());
	}

	// 根據(jù)班級進行分組
	private static void groupByKey(JavaSparkContext jsc) {
		// int ,Integer 
		// scala 里面的類型,沒有像Java這樣分為基本類型和包裝類,因為scala是一種更加強的面向?qū)ο笳Z言,
		//一切皆對象,里面的類型,也有對應(yīng)的方法可以調(diào)用,隱式轉(zhuǎn)換
		// 模擬數(shù)據(jù)
		@SuppressWarnings("unchecked")
		List> lst = Arrays.asList(
				new Tuple2("class01", 100),
				new Tuple2("class02",101),
				new Tuple2("class01",199),
				new Tuple2("class02",121),
				new Tuple2("class02",120));
		
		JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
		JavaPairRDD> groupedRDD = cla***DD.groupByKey();
		
		groupedRDD.foreach(new VoidFunction>>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2> tuple)
					throws Exception {
				
				String classKey = tuple._1;
				Iterator values = tuple._2.iterator();
				while (values.hasNext()) {
					
					Integer value = values.next();
					
					System.out.println("key:" + classKey + "\t" + "value:" + value);
				}
			}
		});
	}
	
	
	private static void reduceByKey(JavaSparkContext jsc) {
		
		@SuppressWarnings("unchecked")
		List> lst = Arrays.asList(
				new Tuple2("class01", 100),
				new Tuple2("class02",101),
				new Tuple2("class01",199),
				new Tuple2("class02",121),
				new Tuple2("class02",120));
		
		JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
		
		JavaPairRDD resultRDD = cla***DD.reduceByKey(new Function2() {
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				
				return v1 + v2;
			}
		});
		
		resultRDD.foreach(new VoidFunction>() {
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Tuple2 tuple) throws Exception {
				System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2);
				
			}
		});
	}
	// 把學(xué)生的成績前3名取出來,并打印
	// 1.先排序sortByKey,然后take(3),再foreach
	private static void sortByKey(JavaSparkContext jsc) {
		
		@SuppressWarnings("unchecked")
		List> lst = Arrays.asList(
				new Tuple2("tom", 60),
				new Tuple2("kate",80),
				new Tuple2("kobe",100),
				new Tuple2("馬蓉",4),
				new Tuple2("宋哲",2),
				new Tuple2("白百合",3),
				new Tuple2("隔壁老王",1));
		
		JavaPairRDD cla***DD = jsc.parallelizePairs(lst);
		
		JavaPairRDD pairRDD = cla***DD.mapToPair(new PairFunction,Integer , String>() {
			
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2 call(Tuple2 tuple)
					throws Exception {
				
				return new Tuple2(tuple._2, tuple._1);
			}
		});
		//do no 
		JavaPairRDD sortedRDD = pairRDD.sortByKey();
		JavaPairRDD sortedRDD01 = sortedRDD.mapToPair(new PairFunction, String, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2 call(Tuple2 tuple)
					throws Exception {
				
				return new Tuple2(tuple._2, tuple._1);
			}
		} );
		// take 也是一個action操作
		List> result = sortedRDD01.take(3);
		System.out.println(result);
	}
	
	
	private static void join(JavaSparkContext jsc) {
		
		// 模擬數(shù)據(jù)
		@SuppressWarnings("unchecked")
		List> names =Arrays.asList(
				new Tuple2(1,"jack"),
				new Tuple2(2,"rose"),
				new Tuple2(3,"tom"),
				new Tuple2(4,"趙麗穎"));
		
		JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);
	
		List> scores = Arrays.asList(
				new Tuple2(1,60),
				new Tuple2(4,100),
				new Tuple2(2,30));	
		
		JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);
		
		JavaPairRDD> joinedRDD = num2scoresRDD.join(num2NamesRDD);
		
		//姓名成績排序,取前2名
		JavaPairRDD score2NameRDD = joinedRDD.mapToPair(new PairFunction>,Integer, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2 call(
					Tuple2> tuple)
					throws Exception {
				Integer score = tuple._2._1;
				String name = tuple._2._2;
				return new Tuple2(score,name);
			}
		});
		// sortByKey之后,你可以執(zhí)行一個maptoPair的操作,轉(zhuǎn)換為
		System.out.println(score2NameRDD.sortByKey(false).take(2));
	}
	
	// 學(xué)生成績改良版
	private static void leftOutJoin(JavaSparkContext jsc) {
		// 模擬數(shù)據(jù)
				@SuppressWarnings("unchecked")
				List> names =Arrays.asList(
						new Tuple2(1,"jack"),
						new Tuple2(2,"rose"),
						new Tuple2(3,"tom"),
						new Tuple2(4,"趙麗穎"));
				
				JavaPairRDD num2NamesRDD = jsc.parallelizePairs(names);
			
				List> scores = Arrays.asList(
						new Tuple2(1,60),
						new Tuple2(4,100),
						new Tuple2(2,30));	
				
				JavaPairRDD num2scoresRDD = jsc.parallelizePairs(scores);
		
				// num2scoresRDD num2NamesRDD
				//JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
				// 注意join,誰join誰,沒區(qū)別,但是leftoutjoin 是有順序的
				JavaPairRDD>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
				
				JavaPairRDD pairRDD = joinedRDD.mapToPair(new PairFunction>>, Integer, String>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2 call(
							Tuple2>> tuple)
							throws Exception {
						
						String name = tuple._2._1;
						Optional scoreOptional = tuple._2._2;
						Integer score = null;
				         if(scoreOptional.isPresent()){
				        	score= scoreOptional.get();	 
				         }else {
				        	 score = 0;
				         }
						
						return new Tuple2(score, name);
					}
				});
				
				JavaPairRDD sortedRDD = pairRDD.sortByKey(false);
				
				sortedRDD.foreach(new VoidFunction>() {
					private static final long serialVersionUID = 1L;

					@Override
					public void call(Tuple2 tuple)
							throws Exception {
					
						if(tuple._1 == 0){
							System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成績0分" );
						}else{
							System.out.println("姓名:" + tuple._2 + "\t" + "分?jǐn)?shù):" + tuple._1);
						}
					}
				});
				
	}
}

如有疑問可跟帖討論。歡迎拍磚

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


網(wǎng)站名稱:sparkRDD算子的創(chuàng)建和使用-創(chuàng)新互聯(lián)
網(wǎng)址分享:http://weahome.cn/article/dcsjcj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部