好程序員大數(shù)據(jù)學(xué)習(xí)路線分享Actor學(xué)習(xí)筆記,在scala中她能實現(xiàn)很強(qiáng)大的功能,他是基于并發(fā)機(jī)制的一個事件模型
創(chuàng)新互聯(lián)建站是一家集網(wǎng)站建設(shè),佳木斯企業(yè)網(wǎng)站建設(shè),佳木斯品牌網(wǎng)站建設(shè),網(wǎng)站定制,佳木斯網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,佳木斯網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。
我們現(xiàn)在學(xué)的scala2.10.x版本就是之前的Actor
?
同步:在主程序上排隊執(zhí)行的任務(wù),只有前一個任務(wù)執(zhí)行完畢后,才能執(zhí)行下一個任務(wù)
異步:指不進(jìn)入主程序,而進(jìn)入"任務(wù)對列"的任務(wù),只有等主程序任務(wù)執(zhí)行完畢,"任務(wù)對列"開始請求主程序,請求任務(wù)執(zhí)行,該任務(wù)會進(jìn)入主程序
?
java
共享變量 -- 加鎖
會出現(xiàn)鎖死問題
scala
Actor不共享數(shù)據(jù)
沒有鎖的概念
Actor通信之間需要message(通信)
Aactor執(zhí)行順序
1.首先調(diào)用start()方法啟動Actor
2.調(diào)用start()方法后act()方法會被執(zhí)行
3.Actor之間進(jìn)行發(fā)送消息
Actor發(fā)送消息的三種方式
! -> 發(fā)送異步消息,沒有返回值
!? -> 發(fā)送同步消息,有返回值,會有線程等待
!! -> 發(fā)送異步消息,有返回值,返回值類型Future[Any](用來獲取異步操作結(jié)果)
?
Actor并行執(zhí)行
//注意,這兩個actor會并行執(zhí)行,當(dāng)其中一個for循環(huán)結(jié)束后,actor結(jié)束 object ActorDemo01 { ??def main(args: Array[String]): Unit = { ????MyActor1.start() ????MyActor2.start() ??} } ? object MyActor1 extends Actor{ ??override def act(): Unit = { ????for (i <- 1 to 10){ ??????println(s"actor => $i") ??????Thread.sleep(2000) ????} ??} ? ??object MyActor2 extends Actor{ ????override def act(): Unit = { ??????for (i <- 1 to 5){ ????????println(s"actor2 => $i") ????????Thread.sleep(2000) ??????} ????} ??} } |
用Actor不斷接受消息
執(zhí)行第一種方式,異步
object ActorDemo02 { ??def main(args: Array[String]): Unit = { ????val actor: MyActor = new MyActor ????actor.start() ? ????//并行執(zhí)行 ????actor ! "start" ?// !->異步 ????actor ! "stop" ????println("發(fā)送完成") ? ??} } ? class MyActor extends Actor{ ??override def act(): Unit = { ????while (true){ ??//死循環(huán) ??????receive { ??//接收 ????????case "start" => { ??????????println("starting") ??????????Thread.sleep(1000) ??????????println("started") ????????} ????????case "stop" => { ??????????println("stopping") ??????????Thread.sleep(1000) ??????????println("stopped") ????????} ??????} ????} ??} } |
第二種方式:利用react來代替receive,也就是說react線程可復(fù)用,比receive更高效
object ActorDemo03 { ??def main(args: Array[String]): Unit = { ????val actor: MyActor3 = new MyActor3 ????actor.start() ????actor ! "start" ????actor ! "stop" ????println("成功了") ??} } ? class MyActor3 extends Actor{ ??override def act(): Unit = { ????loop { ??????react{ ????????case "start" =>{ ??????????println("starting") ??????????Thread.sleep(1000) ??????????println("sarted") ????????} ????????case "stop" =>{ ??????????println("stoppting") ??????????Thread.sleep(1000) ??????????println("stopped") ????????} ??????} ????} ??} } |
結(jié)合樣例類練習(xí)Actor發(fā)送消息
//創(chuàng)建樣例類 case class AsyncMsg(id: Int, msg: String) case class SyncMsg(id: Int, msg: String) case class ReplyMsg(id: Int, msg: String) ? object ActorDemo01 extends Actor { ??override def act(): Unit = { ????while (true) { ??????receive { ????????case "start" => println("starting...") ????????case AsyncMsg(id, msg) => ????????{ ??????????println(s"id:$id,msg:$msg") ??????????sender ! ReplyMsg(1,"sucess") ?//接收到消息后返回響應(yīng)消息 ????????} ????????case SyncMsg(id,msg) => { ??????????println(s"id:$id,msg:$msg") ??????????sender ! ReplyMsg(2,"sucess") ????????} ??????} ????} ??} } ? object ActorTest{ ??def main(args: Array[String]): Unit = { ? ????val actor: Actor = ActorDemo01.start() ? // ???//異步發(fā)送消息,沒有返回值 // ???actor ! AsyncMsg(3,"heihei") // ???println("異步消息發(fā)送完成,沒有返回值") ? // ???//同步發(fā)送消息,有返回值 // ???val text: Any = actor !? SyncMsg(4,"OK") // ???println(text) // ???println("同步消息發(fā)送成功") ? ????//異步發(fā)送消息,有返回值,返回類型為Future[Any] ????val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的") ????Thread.sleep(2000) ????if (reply.isSet){ ??????val applyMsg: Any = reply.apply() ??????println(applyMsg) ????}else{ ??????println("Nothing") ????} ??} } |
Actor并行化的wordcount
class Task extends Actor { ? ??override def act(): Unit = { ????loop { ??????react { ????????case SubmitTask(fileName) => { ??????????val contents = Source.fromFile(new File(fileName)).mkString ??????????val arr = contents.split("\r\n") ??????????val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length) ??????????//val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) ??????????sender ! ResultTask(result) ????????} ????????case StopTask => { ??????????exit() ????????} ??????} ????} ??} } ? object WorkCount { ??def main(args: Array[String]) { ????val files = Array("c://words.txt", "c://words.log") ? ????val replaySet = new mutable.HashSet[Future[Any]] ????val resultList = new mutable.ListBuffer[ResultTask] ? ????for(f <- files) { ??????val t = new Task ??????val replay = t.start() !! SubmitTask(f) ??????replaySet += replay ????} ? ????while(replaySet.size > 0){ ??????val toCumpute = replaySet.filter(_.isSet) ??????for(r <- toCumpute){ ????????val result = r.apply() ????????resultList += result.asInstanceOf[ResultTask] ????????replaySet.remove(r) ??????} ??????Thread.sleep(100) ????} ????val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2)) ????println(finalResult) ??} } ? case class SubmitTask(fileName: String) case object StopTask case class ResultTask(result: Map[String, Int]) |
?