真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網站制作重慶分公司

Spark通訊錄相似度計算怎么實現(xiàn)

本篇內容介紹了“Spark通訊錄相似度計算怎么實現(xiàn)”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

成都創(chuàng)新互聯(lián)公司堅持“要么做到,要么別承諾”的工作理念,服務領域包括:成都網站制作、成都網站設計、外貿營銷網站建設、企業(yè)官網、英文網站、手機端網站、網站推廣等服務,滿足客戶于互聯(lián)網時代的魯甸網站設計、移動媒體設計的需求,幫助企業(yè)找到有效的互聯(lián)網解決方案。努力成為您成熟可靠的網絡建設合作伙伴!

需求:

Hive表中存有UserPhone跟LinkPhone 兩個字段。 通過SparkSQL計算出UserPhone之間通訊錄相似度>=80%的記錄數據。

相似度 = A跟B的交集/A的通訊錄大小。

pom文件

注意依賴之間的適配性,選擇合適的版本。同時一般可能會吧Hive中conf/hive-site.xml配置文件拷貝一份到 IDEA目錄

Spark通訊錄相似度計算怎么實現(xiàn)



    4.0.0

    com.sowhat.demo
    PhoneBookSimilaryCal
    1.0-SNAPSHOT





















    
        2.11.8
        2.11.8
        2.2.0
        2.7.2
        1.0
    

    
        
            org.scala-lang
            scala-library
            ${scala.version}
            
        

        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
            
        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
            
        

    

    
        PhoneBookSimilaryCal
        

            
                net.alchim31.maven
                scala-maven-plugin
                
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            


            
                org.apache.maven.plugins
                maven-assembly-plugin
                
                    
                        
                            com.sowhat.PhoneBookSimilaryCal
                        
                    
                    
                        jar-with-dependencies
                    
                

                
                    
                        make-assembly
                        
                        package
                        
                        
                            single
                            
                        
                    
                

            
        
    


spark代碼:

package com.sowhat

/**
  * @author sowhat
  * @create 2020-07-02 16:30
  */

import java.security.MessageDigest
import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}


object PhoneBookSimilaryCal {
  def MD5(input: String): String = {
    var md5: MessageDigest = null
    try {
      md5 = MessageDigest.getInstance("MD5")
    } catch {
      case e: Exception => {
        e.printStackTrace
        println(e.getMessage)
      }
    }
    val byteArray: Array[Byte] = input.getBytes
    val md5Bytes: Array[Byte] = md5.digest(byteArray)
    var hexValue: String = ""
    for (i <- 0 to md5Bytes.length - 1) {
      val str: Int = (md5Bytes(i).toInt) & 0xff
      if (str < 16) {
        hexValue = hexValue + "0"
      }
      hexValue = hexValue + Integer.toHexString(str)
    }
    return hexValue.toString
  }

  def Yesterday = {
    val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val cal: Calendar = Calendar.getInstance()
    cal.add(Calendar.DATE, -1)
    dateFormat.format(cal.getTime)
  }

  def OneYearBefore = {
    val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal: Calendar = Calendar.getInstance()
    cal.add(Calendar.YEAR, -1)
    dateFormat.format(cal.getTime())
  }

  def SixMonthBefore = {
    val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal: Calendar = Calendar.getInstance()
    cal.add(Calendar.MONTH, -6)
    dateFormat.format(cal.getTime)
  }

  def ThreeMonthBefore = {
    val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal: Calendar = Calendar.getInstance()
    cal.add(Calendar.MONTH, -3)
    dateFormat.format(cal.getTime)

  }

  def OneMonthBefore = {
    val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var cal: Calendar = Calendar.getInstance()
    cal.add(Calendar.MONTH, -1)
    dateFormat.format(cal.getTime)
  }

  private val logger: Logger = LoggerFactory.getLogger(PhoneBookSimilaryCal.getClass)

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "yjy_research") // sparkSQL用到Hadoop的東西,所以權限用戶要注意哦
    val spark: SparkSession = SparkSession.builder().appName("phoneBookSimilaryCal")
      .config("spark.sql.shuffle.partitions", "1000")
      .config("spark.default.parallelism", "3000")
      .config("spark.driver.maxResultSize", "40g")
      //.conf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.shuffle.io.maxRetries", "20")
      .config("spark.shuffle.io.retryWait", "10s")
      .config("spark.storage.memoryFraction", "0.5")
      .config("spark.shuffle.memoryFraction", "0.5")
      .config("executor-cores", "5")
      .config("spark.executor.instances", "10")
      .config("spark.executor.cores.config", "3000")
      .config("spark.executor.instances", "20")
      .config("spark.executor.memory", "40g")
      .config("spark.driver.memory", "40g")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .enableHiveSupport().getOrCreate() // 開啟Hive table

    spark.sql("use dm_kg")
    val sqlText: String = "select user_phone,phone from user_phone_with_phone_message where user_phone not in( '59400a197e9bf5fbb2fbee0456b66cd6','f7e82e195810a01688db2eeecb8e56c9') and  etl_date>'" + SixMonthBefore + "'"
    println(sqlText)
    val df: DataFrame = spark.sql(sqlText)
    val rdd: RDD[Row] = df.rdd


    def getUserPhoneAndPhone(iter: Iterator[Row]) = {
      var res: List[(String, String)] = List[(String, String)]()
      while (iter.hasNext) {
        val row: Row = iter.next()
        res = res.::(row.getString(0), row.getString(1))
      }
      res.iterator
    }

    val userPhone_Phone: RDD[(String, String)] = rdd.mapPartitions(getUserPhoneAndPhone)
    userPhone_Phone.persist(StorageLevel.MEMORY_AND_DISK_SER)

    val userPhone_num: RDD[(String, Long)] = userPhone_Phone.map(x => (x._1, 1L)).reduceByKey(_ + _, 3000)

    def dealUserPhoneNum(iter: Iterator[(String, Long)]) = {
      var res: List[(String, String)] = List[(String, String)]()
      while (iter.hasNext) {
        val row: (String, Long) = iter.next()
        res.::=(row._1, row._1.concat("_").concat(row._2.toString))
      }
      res.iterator
    }

    val userPhone_userPhoneNum: RDD[(String, String)] = userPhone_num.mapPartitions(dealUserPhoneNum)
    val userPhone_Phone_userPhoneNum: RDD[(String, (String, String))] = userPhone_Phone.join(userPhone_userPhoneNum, 3000)
    val userPhone_Phone_userPhoneNum_filter: RDD[(String, (String, String))] = userPhone_Phone_userPhoneNum.filter(x => x._2._2.split("_")(1).toLong != 1)

    def getSecondTuple(iter: Iterator[(String, (String, String))]) = {
      var res = List[(String, String)]()
      while (iter.hasNext) {
        val tuple: (String, (String, String)) = iter.next()
        res.::=(tuple._2)
      }
      res.iterator
    }

    val phone_userPhoneNum: RDD[(String, String)] = userPhone_Phone_userPhoneNum_filter.mapPartitions(getSecondTuple)
    val phone_userPhoneListWithSize: RDD[(String, (List[String], Int))] = phone_userPhoneNum.combineByKey(
      (x: String) => (List(x), 1),
      (old: (List[String], Int), x: String) => (x :: old._1, old._2 + 1),
      (par1: (List[String], Int), par2: (List[String], Int)) => (par1._1 ::: par2._1, par1._2 + par2._2)
    ) // 結果  (聯(lián)系電話,(對應用戶電話List,List大小))
    val userPhoneList: RDD[List[String]] = phone_userPhoneListWithSize.filter(x => (x._2._2 < 1500 && x._2._2 > 1)).map(_._2._1)
    // 通訊錄大小 (1,1500) 篩查出來
    val userPhone_userPhone: RDD[List[String]] = userPhoneList.flatMap(_.sorted.combinations(2))
    // https://blog.csdn.net/aomao4913/article/details/101274895
    val userPhone_userPhone_Num: RDD[((String, String), Int)] = userPhone_userPhone.map(x => ((x(0), x(1)), 1)).reduceByKey(_ + _, 3000)
    // 獲得 (UserPhone1,UserPhone2),LinkNum

    def dealData(iter: Iterator[((String, String), Int)]) = {
      var res = List[(String, String, Int)]()
      while (iter.hasNext) {
        val row: ((String, String), Int) = iter.next()
        val line = row._1.toString.split(",") // (userPhone_num,userPhone_num)
        res.::=(line(0).replace("(", ""), line(1).replace(")", ""), row._2)
      }
      res.iterator
    }

    val userPhone_num_with_userPhone_num_with_commonNum: RDD[(String, String, Int)] = userPhone_userPhone_Num.mapPartitions(dealData)

    def FirstToSecond(iter: Iterator[(String, String, Int)]) = {
      var res = List[(String, String, Long, Int)]()
      while (iter.hasNext) {
        val cur: (String, String, Int) = iter.next
        val itemList1: Array[String] = cur._1.toString.split("_")
        val itemList2: Array[String] = cur._2.toString.split("_")
        res.::=(itemList1(0), itemList2(0), itemList1(1).toLong, cur._3)
      }
      res.iterator
    } // userPhone1,userPhone2,userPhone1BookNum,CommonNum

    def SecondToFirst(iter: Iterator[(String, String, Int)]) = {
      var res = List[(String, String, Long, Int)]()
      while (iter.hasNext) {
        val cur: (String, String, Int) = iter.next
        val itemList1: Array[String] = cur._1.toString.split("_")
        val itemList2: Array[String] = cur._2.toString.split("_")
        res.::=(itemList2(0), itemList1(0), itemList2(1).toLong, cur._3)
      }
      res.iterator
    } // userPhone2,userPhone1,userPhone2BookNum,CommonNum
    val userPhone1_userPhone2_userPhone1BookNum_CommonNum_1: RDD[(String, String, Long, Int)] = userPhone_num_with_userPhone_num_with_commonNum.mapPartitions(FirstToSecond).filter(_._3 > 1)
    val userPhone2_userPhone1_userPhone2BookNum_CommonNum_2: RDD[(String, String, Long, Int)] = userPhone_num_with_userPhone_num_with_commonNum.mapPartitions(SecondToFirst).filter(_._3 > 1)
    val userPhone1_userPhone2_userPhone1BookNum_CommonNum: RDD[(String, String, Long, Int)] = userPhone2_userPhone1_userPhone2BookNum_CommonNum_2.union(userPhone1_userPhone2_userPhone1BookNum_CommonNum_1)

    def finalDeal(iter: Iterator[(String, String, Long, Int)]) = {
      var res = List[(String, Long, String, String, Long, String)]()
      while (iter.hasNext) {
        val cur: (String, String, Long, Int) = iter.next()
        res.::=(cur._1.toString, cur._4 * 100 / cur._3, cur._2.toString, "Similar_phoneBook", cur._3, Yesterday)
      }
      res.iterator
    } // user_phone1,percent,user_phone2,label,userPhone1BookNum,CalDate
    val userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate: RDD[(String, Long, String, String, Long, String)] = userPhone1_userPhone2_userPhone1BookNum_CommonNum.mapPartitions(finalDeal)
    val userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter: RDD[(String, Long, String, String, Long, String)] = userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate.filter(_._2 >= 80)
    import spark.implicits._
    val finalResult: DataFrame = userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter.toDF()
    printf("·:" + userPhone1_percent_userPhone2_Label_UserPhone1BookNum_CalDate_Filter.collect().length)
    spark.sql("drop table if exists sowhat_similar_phonebook_result")
    spark.sql("CREATE TABLE IF NOT EXISTS sowhat_similar_phonebook_result" +
      "(startId string comment '起始節(jié)點ID'," +
      "similar_percent string comment '相似度'," +
      "endId string comment '終止節(jié)點ID'," +
      "type string  comment '邊的類型'," +
      "telbook_num long comment '通訊錄個數'," +
      "etl_date Date  comment 'etl日期') " +
      "row format delimited fields terminated by ',' ")
    logger.info("created table similar_phonebook_result")

    finalResult.createOrReplaceTempView("resultMessage")
    spark.sql("insert into sowhat_similar_phonebook_result select * from resultMessage")
    spark.sql("select count(1) from  sowhat_similar_phonebook_result").show()
    spark.stop()

  }
}

spark集群啟動腳本命令: 

time sshpass -p passpwrd ssh user@ip " nohup  
spark-submit --name "sowhatJob" --master yarn --deploy-mode client \
--conf spark.cleaner.periodicGC.interval=120 --conf spark.executor.memory=20g \
--conf spark.num.executors=20 --conf spark.driver.memory=20g --conf spark.sql.shuffle.partitions=1500 \
--conf spark.network.timeout=100000000 --queue root.kg \ (Hadoop集群中YARN隊列)
--class com.sowhat.PhoneBookSimilaryCal PhoneBookSimilaryCal1.jar  "

“Spark通訊錄相似度計算怎么實現(xiàn)”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網站,小編將為大家輸出更多高質量的實用文章!


網頁標題:Spark通訊錄相似度計算怎么實現(xiàn)
網站地址:http://weahome.cn/article/gjsdho.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部