這篇文章主要講解了“Spark ALS實現(xiàn)的步驟是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Spark ALS實現(xiàn)的步驟是什么”吧!
公司主營業(yè)務(wù):網(wǎng)站設(shè)計、做網(wǎng)站、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚、勤奮敬業(yè)、活力青春激揚、勤奮敬業(yè)、活力澎湃、和諧高效的團隊。公司秉承以“開放、自由、嚴謹、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出吳川免費做網(wǎng)站回饋大家。
spark ALS算法是做個性推薦用的,它所需要的數(shù)據(jù)集是類似用戶對商品的打分表之類的數(shù)據(jù)集。實現(xiàn)步驟主要以下幾步:
1、定義輸入數(shù)據(jù)
2、輸入數(shù)據(jù)轉(zhuǎn)換成評分數(shù)據(jù)格式,如case class Rating(user: Int, movie: Int, rating: Float)
3、設(shè)計ALS模型訓(xùn)練數(shù)據(jù)
4、計算推薦數(shù)據(jù),存儲起來供業(yè)務(wù)系統(tǒng)直接使用。
下面看看具體的代碼:
package recommend import org.apache.spark.sql.SparkSession import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.ml.feature.IndexToString import scala.collection.mutable.ArrayBuffer import org.apache.spark.TaskContext import org.apache.spark.ml.Pipeline import org.apache.spark.sql.SaveMode /** * 個性化推薦ALS算法 * 用戶對資源的點擊率作為評分 * */ object Recommend { case class Rating(user: Int, movie: Int, rating: Float) def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Java Spark MySQL Recommend") .master("local") .config("es.nodes", "127.0.0.1") .config("es.port", "9200") .config("es.mapping.date.rich", "false") //不解析日期類型 .getOrCreate() trainModel(spark) spark.close() } def trainModel(spark: SparkSession): Unit = { import spark.implicits._ val MAX = 3 // 最大推薦數(shù)目 val rank = 10 // 向量大小,默認10 val iterations = 10 // 迭代次數(shù),默認10 val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8" val table = "clicks" val user = "root" val pass = "123456" val props = new Properties() props.setProperty("user", user) // 設(shè)置用戶名 props.setProperty("password", pass) // 設(shè)置密碼 val clicks = spark.read.jdbc(url, table, props).repartition(4) clicks.createOrReplaceGlobalTempView("clicks") val agg = spark.sql("SELECT userId ,resId ,COUNT(id) AS clicks FROM global_temp.clicks GROUP BY userId,resId") val userIndexer = new StringIndexer() .setInputCol("userId") .setOutputCol("userIndex") val resIndexer = new StringIndexer() .setInputCol("resId") .setOutputCol("resIndex") val indexed1 = userIndexer.fit(agg).transform(agg) val indexed2 = resIndexer.fit(indexed1).transform(indexed1) indexed2.show() val ratings = indexed2.map(x => Rating(x.getDouble(3).toInt, x.getDouble(4).toInt, x.getLong(2).toFloat)) ratings.show() val Array(training, test) = ratings.randomSplit(Array(0.9, 0.1)) println("training:") training.show() println("test:") test.show() //隱性反饋和顯示反饋 val als = new ALS() .setMaxIter(iterations) .setRegParam(0.01) .setImplicitPrefs(false) .setUserCol("user") .setItemCol("movie") .setRatingCol("rating") val model = als.fit(ratings) // Evaluate the model by computing the RMSE on the test data // Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics model.setColdStartStrategy("drop") val predictions = model.transform(test) val r2 = model.recommendForAllUsers(MAX) println(r2.schema) val result = r2.rdd.flatMap(row => { val userId = row.getInt(0) val arrayPredict: Seq[Row] = row.getSeq(1) var result = ArrayBuffer[Rating]() arrayPredict.foreach(rowPredict => { val p = rowPredict(0).asInstanceOf[Int] val score = rowPredict(1).asInstanceOf[Float] val sql = "insert into recommends(userId,resId,score) values (" + userId + "," + rowPredict(0) + "," + rowPredict(1) + ")" println("sql:" + sql) result.append(Rating(userId, p, score)) }) for (i <- result) yield { i } }) println("推薦結(jié)果RDD已展開") result.toDF().show() //資源id隱射 val resInt2Index = new IndexToString() .setInputCol("movie") .setOutputCol("resId") .setLabels(resIndexer.fit(indexed1).labels) //userId映射 val userInt2Index = new IndexToString() .setInputCol("user") .setOutputCol("userId") .setLabels(userIndexer.fit(agg).labels) val rc = userInt2Index.transform(resInt2Index.transform(result.toDF())) rc.show() rc.withColumnRenamed("rating","score").select("userId", "resId","score").write.mode(SaveMode.Overwrite) .format("jdbc") .option("url", url) .option("dbtable", "recommends") .option("user", user) .option("password", pass) .option("batchsize", "5000") .option("truncate", "true") .save println("finished!!!") } }
DataFrame寫入mysql還有另一種寫法,就是原生寫入:
//分區(qū)寫推薦結(jié)果到mysql r2.foreachPartition(p => { @transient val conn = ConnectionPool.getConnection p.foreach(row => { val userId = row.getInt(0) val arrayPredict: Seq[Row] = row.getSeq(1) arrayPredict.foreach(rowPredict => { println(rowPredict(0) + "@" + rowPredict(1)) val sql = "insert into recommends(userId,resId,score) values (" + userId+"," + rowPredict(0)+","+ rowPredict(1) + ")" println("sql:"+sql) val stmt = conn.createStatement stmt.executeUpdate(sql) }) }) ConnectionPool.returnConnection(conn) })
感謝各位的閱讀,以上就是“Spark ALS實現(xiàn)的步驟是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Spark ALS實現(xiàn)的步驟是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!