這篇文章將為大家詳細(xì)講解有關(guān)怎么理解spark的自定義分區(qū)和排序及spark與jdbc,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:國際域名空間、網(wǎng)絡(luò)空間、營銷軟件、網(wǎng)站建設(shè)、崇川網(wǎng)站維護、網(wǎng)站推廣。
//自定義分區(qū) import org.apache.spark.SparkConf import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.Partitioner object PrimitivePartitionTest { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setMaster("local[2]").setAppName("Partitioner") val context = new SparkContext(conf) val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2) //實例化類,并設(shè)置分區(qū)類 val partitioner = new CustomPartitioner(2) val rdd1 = rdd.partitionBy(partitioner) rdd1.saveAsTextFile("c:\\partitioner") context.stop() } } //自定義分區(qū)類繼承spark的Partitioner class CustomPartitioner(val partitions:Int ) extends Partitioner{ def numPartitions: Int= this.partitions def getPartition(key: Any): Int={ if(key.toString().length()<=2) 0 else 1 } }
//自定義排序 package hgs.spark.othertest import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.math.Ordered //自定義排序第一種實現(xiàn)方式,通過繼承ordered class Student(val name:String,var age:Int) extends Ordered[Student] with Serializable{ def compare(that: Student): Int={ return this.age-that.age } } class Boy(val name:String,var age:Int) extends Serializable{ } //第二種方式通過實現(xiàn)隱式轉(zhuǎn)換實現(xiàn) object MyPredef{ implicit def toOrderBoy = new Ordering[Boy]{ def compare(x: Boy, y: Boy): Int={ x.age - y.age } } } //引入隱式轉(zhuǎn)換 import MyPredef._ object CutstomOrder { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("CutstomOrder") val context = new SparkContext(conf) val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2) //下面的第二個參數(shù)false為降序排列 //val rdd_sorted = rdd.sortBy(f=>new Student(f._1,f._2), false, 1) val rdd_sorted = rdd.sortBy(f=>new Boy(f._1,f._2), false, 1) rdd_sorted.saveAsTextFile("d:\\ordered") context.stop() } }
//JDBC import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD import java.sql.Connection import java.sql.DriverManager import java.sql.ResultSet import scala.collection.mutable.ListBuffer object DataFromJdbcToSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("BroadCastTest") val context = new SparkContext(conf) val sql = "select name,age from test where id>=? and id <=?" var list = new ListBuffer[(String,Int)]() //第七個參數(shù)是一個自定義的函數(shù),spark會調(diào)用該函數(shù),完成自定義的邏輯,y的數(shù)據(jù)類型是ResultSet,該函數(shù)不可以想自己定義的數(shù)組添加數(shù)據(jù), //應(yīng)為應(yīng)用的函數(shù)會將結(jié)果保存在JdbcRDD中 val jdbcRDD = new JdbcRDD(context,getConnection,sql,1,8,2,y=>{ (y.getString(1),y.getInt(2)) }) println(jdbcRDD.collect().toBuffer) context.stop() } def getConnection():Connection={ Class.forName("com.MySQL.jdbc.Driver") val conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456"); conn } } //---------------------------------------------------------------------- package hgs.spark.othertest import java.sql.Connection import java.sql.DriverManager import org.apache.commons.dbutils.QueryRunner import org.apache.spark.SparkConf import org.apache.spark.SparkContext //將spark計算后的結(jié)果錄入數(shù)據(jù)庫 object DataFromSparktoJdbc { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setMaster("local[2]").setAppName("DataFromSparktoJdbc") val context = new SparkContext(conf) val addressrdd= context.textFile("d:\\words") val words = addressrdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_) //println(words.partitions.size) var p:Int =0 words.foreachPartition(iter=>{ //每個分區(qū)一個鏈接 val qr = new QueryRunner() val conn = getConnection println(conn) val sql = s"insert into words values(?,?)" //可以修改為批量插入效率更高 while(iter.hasNext){ val tpm = iter.next() val obj1 :Object = tpm._1 val obj2 :Object = new Integer(tpm._2) //obj1+conn.toString()可以看到數(shù)據(jù)庫的插入數(shù)據(jù)作用有三個不同的鏈接 qr.update(conn, sql,obj1+conn.toString(),obj2) } //println(conn) //println(p) conn.close() }) words.saveAsTextFile("d:\\wordresult") } def getConnection():Connection={ Class.forName("com.mysql.jdbc.Driver") val conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456"); conn } }
//廣播變量 package hgs.spark.othertest import org.apache.spark.SparkConf import org.apache.spark.SparkContext object BroadCastTest{ def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("BroadCastTest") val context = new SparkContext(conf) val addressrdd= context.textFile("d:\\address") val splitaddrdd = addressrdd.map(x=>{ val cs = x.split(",") (cs(0),cs(1)) }).collect().toMap //廣播變量,數(shù)據(jù)被緩存在每個節(jié)點,減少了節(jié)點之間的數(shù)據(jù)傳送,可以有效的增加效率,廣播出去的可以是任意的數(shù)據(jù)類型 val maprdd = context.broadcast(splitaddrdd) val namerdd = context.textFile("d:\\name") val result = namerdd.map(x=>{ //該出使用了廣播的出去的數(shù)組 maprdd.value.getOrElse(x, "UnKnown") }) println(result.collect().toBuffer) context.stop() } }
其他一些知識點 1.spark 廣播變量 rdd.brodcastz(rdd),廣播變量的用處是將數(shù)據(jù)匯聚傳輸?shù)礁鱾€excutor上面 ,這樣在做數(shù)據(jù)處理的時候減少了數(shù)據(jù)的傳輸 2.wordcount程序 context.textFile(args(0),1).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) wordcount程序代碼,一個wordcount會產(chǎn)生5個RDD sc.textFile() 會產(chǎn)生兩個RDD 1.HadoopRDD-> MapPartitionsRDD flatMap() 會產(chǎn)生MapPartitionsRDD map 會產(chǎn)生MapPartitionsRDD reduceByKey 產(chǎn)生ShuuledRDD saveAsTextFile 3.緩存數(shù)據(jù)到內(nèi)存 rdd.cache 清理緩存 rdd.unpersist(true),rdd.persist存儲及級別 cache方法調(diào)用的是persist方法 4.spark 遠(yuǎn)程debug,需要設(shè)置sparkcontext.setMaster("spark://xx.xx.xx.xx:7077").setJar("d:/jars/xx.jar")
關(guān)于怎么理解spark的自定義分區(qū)和排序及spark與jdbc就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。