真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Scala筆記整理(九):Actor和AKKA

[TOC]

金寨網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián)公司,金寨網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為金寨上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)營銷網(wǎng)站建設(shè)要多少錢,請找那個(gè)售后服務(wù)好的金寨做網(wǎng)站的公司定做!


概述

? Scala的Actor有點(diǎn)類似于Java中的多線程編程。但是不同的是,Scala的Actor提供的模型與多線程有所不同。Scala的Actor盡可能地避免鎖和共享狀態(tài),從而避免多線程并發(fā)時(shí)出現(xiàn)資源爭用的情況,進(jìn)而提升多線程編程的性能。

Spark中使用的分布式多線程框架,是Akka,是Scala的一種多線程的類庫。Akka也實(shí)現(xiàn)了類似Scala Actor的模型,其核心概念同樣也是Actor。Scala Actor模型已經(jīng)在2.1.0的時(shí)候還在用,但是在2.1.1的時(shí)候已經(jīng)被遺棄了,Spark開始轉(zhuǎn)換用AKKA來替代Scala Actor,但是Scala Actor概念和原理都還是相同的。所以學(xué)習(xí)Scala Actor對我們學(xué)習(xí)AKKA,Spark還是有所幫助的

之所以學(xué)習(xí)Scala Actor,AKKA是因?yàn)樵趯W(xué)習(xí)Spark源碼的時(shí),我們能看懂Spark的源碼,因?yàn)樵诘讓酉鬟f機(jī)制上大量使用AKKA的傳送機(jī)制。

scala actor

在使用前,需要先引入maven依賴:



    org.scala-lang
    scala-actors
    2.10.5

actor單向通信

測試代碼如下:

package cn.xpleaf.bigdata.p5.myactor
import scala.actors.Actor
/**
  * 學(xué)習(xí)scala actor的基本操作
  * 和java中的Runnable Thread幾乎一致
  *
  * 第一步:編寫一個(gè)類,擴(kuò)展特質(zhì)trait Actor(scala 的actor)
  * 第二步:復(fù)寫其中的act方法
  * 第三步:創(chuàng)建該actor的對象,調(diào)用該對象的start()方法,啟動(dòng)該線程
  * 第四步:通過scala的操作符"!",發(fā)送消息
  * 第五步:結(jié)束的話,調(diào)用close即可
  *
  * 模擬單向打招呼
  */
object ActorOps {
    def main(args: Array[String]): Unit = {
        val mFActor = new MyFirstActor()
        mFActor.start()
        // 發(fā)送消息
        mFActor ! "小美,睡了嗎?"
        mFActor ! "我去洗澡了~"
        mFActor ! "呵呵"
    }
}

class MyFirstActor extends Actor {
    override def act(): Unit = {
        while(true) {
            receive {
                case str: String => println(str)
            }
        }
    }
}

輸出結(jié)果如下:

小美,睡了嗎?
我去洗澡了~
呵呵

使用樣例類(case class)進(jìn)行actor消息傳遞

測試代碼如下:

package cn.xpleaf.bigdata.p5.myactor

import scala.actors.Actor

/**
  *
  */
object GreetingActor {
    def main(args: Array[String]): Unit = {
        val ga = new GreetingActor
        ga.start()

        ga ! Greeting("小美")
        ga ! WorkContent("裝系統(tǒng)")
    }
}

case class Greeting(name:String)
case class WorkContent(content:String)

class GreetingActor extends Actor {
    override def act(): Unit = {
        while(true) {
            receive {
                case Greeting(name) => println(s"Hello, $name")
                case WorkContent(content) => println(s"Let's talk about sth. with $content")
            }
        }
    }
}

輸出結(jié)果如下:

Hello, 小美
Let's talk about sth. with 裝系統(tǒng)

actor相互通信

測試代碼如下:

package cn.xpleaf.bigdata.p5.myactor

import scala.actors.Actor

/**
  * actor之線程間,互相通信
  *
  * studentActor
  *     向老師問了一個(gè)問題
  *
  * teacherActor
  *     向?qū)W生做回應(yīng)
  *
  * 通信的協(xié)議:
  * 請求,使用Request(內(nèi)容)來表示
  * 響應(yīng),使用Response(內(nèi)容)來表示
  */
object _03CommunicationActorOps {
    def main(args: Array[String]): Unit = {
        val teacherActor = new TeacherActor()
        teacherActor.start()
        val studentActor = new StudentActor(teacherActor)
        studentActor.start()

        studentActor ! Request("老李啊,scala學(xué)習(xí)為什么這么難啊")
    }
}

case class Request(req:String)
case class Response(resp:String)

class StudentActor(teacherActor: TeacherActor) extends Actor {
    override def act(): Unit = {
        while(true) {
            receive {
                case Request(req) => {
                    // 向老師請求相關(guān)的問題
                    println("學(xué)生向老師說:" + req)
                    teacherActor ! Request(req)
                }
                case Response(resp) => {
                    println(resp)
                    println("高!")
                }
            }
        }
    }
}

class TeacherActor() extends Actor {
    override def act(): Unit = {
        while (true) {
            receive {
                case Request(req) => {  // 接收到學(xué)生的請求
                    sender ! Response("這個(gè)問題,需要如此搞定~")
                }
            }
        }
    }
}

輸出結(jié)果如下:

學(xué)生向老師說:老李啊,scala學(xué)習(xí)為什么這么難啊
這個(gè)問題,需要如此搞定~
高!

消息的同步和Future

1、Scala在默認(rèn)情況下,消息都是以異步進(jìn)行發(fā)送的;但是如果發(fā)送的消息是同步的,即對方接受后,一定要給自己返回結(jié)果,那么可以使用!?的方式發(fā)送消息。即:

val response= activeActor !? activeMessage

2、如果要異步發(fā)送一個(gè)消息,但是在后續(xù)要獲得消息的返回值,那么可以使用Future。即!!語法,如下:

val futureResponse = activeActor !! activeMessage
val activeReply = future()

AKKA actor

首先需要添加akka的maven依賴:



    com.typesafe.akka
    akka-actor_2.10
    2.3.16


    com.typesafe.akka
    akka-remote_2.10
    2.3.16


    com.typesafe.akka
    akka-slf4j_2.10
    2.3.16

AKKA消息傳遞——本地

原理如下:

Scala筆記整理(九):Actor和AKKA

_01StudentActorOps
package cn.xpleaf.bigdata.p5.myakka.p1

import akka.actor.{Actor, ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}

import scala.util.Random

/**
  * 基于AKKA Actor的單向通信案例
  * 學(xué)生向老師發(fā)送請求
  */
object _01StudentActorOps {
    def main(args: Array[String]): Unit = {
        // 第一步:構(gòu)建Actor操作系統(tǒng)
        val actorSystem = ActorSystem("StudentActorSystem")
        // 第二步:actorSystem創(chuàng)建TeacherActor的代理對象ActorRef
        val teacherActorRef = actorSystem.actorOf(Props[TeacherActor])
        // 第三步:發(fā)送消息
        teacherActorRef ! QuoteRequest()

        Thread.sleep(2000)
        // 第四步:關(guān)閉
        actorSystem.shutdown()
    }
}

class TeacherActor extends Actor {
    val quotes = List(
        "Moderation is for cowards",
        "Anything worth doing is worth overdoing",
        "The trouble is you think you have time",
        "You never gonna know if you never even try")

    override def receive = {
        case QuoteRequest() => {
            val random = new Random()

            val randomIndex = random.nextInt(quotes.size)
            val randomQuote = quotes(randomIndex)

            val response = QuoteResponse(randomQuote)
            println(response)
        }
    }
}
MessageProtocol

后面akka通信的幾個(gè)測試程序都會(huì)使用到這個(gè)object,只在這里給出,后面不再給出。

package cn.xpleaf.bigdata.p5.myakka

/**
  * akka actor通信協(xié)議
  */
object MessageProtocol {

    case class QuoteRequest()

    case class QuoteResponse(resp: String)

    case class InitSign()

}

object Start extends Serializable

object Stop extends Serializable

trait Message {
    val id: String
}

case class Shutdown(waitSecs: Int) extends Serializable

case class Heartbeat(id: String, magic: Int) extends Message with Serializable

case class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializable

case class Packet(id: String, seq: Long, content: String) extends Message with Serializable
測試

輸出結(jié)果如下:

QuoteResponse(Anything worth doing is worth overdoing)

AKKA請求與響應(yīng)——本地

原理如下:

Scala筆記整理(九):Actor和AKKA

TeacherActor
package cn.xpleaf.bigdata.p5.myakka.p2

import akka.actor.Actor
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}

import scala.util.Random

/**
  * Teacher Actor
  */
class TeacherActor extends Actor {
    val quotes = List(
        "Moderation is for cowards",
        "Anything worth doing is worth overdoing",
        "The trouble is you think you have time",
        "You never gonna know if you never even try")

    override def receive = {
        case QuoteRequest() => {
            val random = new Random()

            val randomIndex = random.nextInt(quotes.size)
            val randomQuote = quotes(randomIndex)

            val response = QuoteResponse(randomQuote)
//            println(response)
            sender ! response
        }
    }
}
StudentActor
package cn.xpleaf.bigdata.p5.myakka.p2

import akka.actor.{Actor, ActorLogging, ActorRef}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{InitSign, QuoteRequest, QuoteResponse}

/**
  * Student Actor
  * 當(dāng)學(xué)生接收到InitSign信號(hào)之后,便向老師發(fā)送一條Request請求的消息
  */
class StudentActor(teacherActorRef:ActorRef) extends Actor with ActorLogging {
    override def receive = {
        case InitSign => {
            teacherActorRef ! QuoteRequest()
//            println("student send request")
        }
        case QuoteResponse(resp) => {
            log.info(s"$resp")
        }
    }
}
DriverApp
package cn.xpleaf.bigdata.p5.myakka.p2

import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSign

object DriverApp {
    def main(args: Array[String]): Unit = {
        val actorSystem = ActorSystem("teacherStudentSystem")
        // 老師的代理對象
        val teacherActorRef = actorSystem.actorOf(Props[TeacherActor], "teacherActor")
        // 學(xué)生的代理對象
        val studentActorRef = actorSystem.actorOf(Props[StudentActor](new StudentActor(teacherActorRef)), "studentActor")

        studentActorRef ! InitSign

        Thread.sleep(2000)

        actorSystem.shutdown()
    }
}
測試

輸出結(jié)果如下:

[INFO] [04/24/2018 10:02:19.932] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoing

AKKA請求與響應(yīng)——遠(yuǎn)程

application.conf
MyRemoteServerSideActor {
  akka {
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
    }
  }
}
MyRemoteClientSideActor {
  akka {
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
  }
}
RemoteActor
package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Shutdown, Start, Stop}

class RemoteActor extends Actor with ActorLogging {
    def receive = {
        case Start => { // 處理Start消息
            log.info("Remote Server Start ==>RECV Start event : " + Start)
        }
        case Stop => { // 處理Stop消息
            log.info("Remote Server Stop ==>RECV Stop event: " + Stop)
        }
        case Shutdown(waitSecs) => { // 處理Shutdown消息
            log.info("Remote Server Shutdown ==>Wait to shutdown: waitSecs=" + waitSecs)
            Thread.sleep(waitSecs)
            log.info("Remote Server Shutdown ==>Shutdown this system.")
            context.system.shutdown // 停止當(dāng)前ActorSystem系統(tǒng)
        }
        case Header(id, len, encrypted) => log.info("Remote Server => RECV header: " + (id, len, encrypted)) // 處理Header消息
        case _ =>
    }
}
AkkaServerApplication
package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object AkkaServerApplication extends App {
    // 創(chuàng)建名稱為remote-system的ActorSystem:從配置文件application.conf中獲取該Actor的配置內(nèi)容
    val system = ActorSystem("remote-system",
        ConfigFactory.load().getConfig("MyRemoteServerSideActor"))

    val log = system.log
    log.info("===>Remote server actor started: " + system)
    // 創(chuàng)建一個(gè)名稱為remoteActor的Actor,返回一個(gè)ActorRef,這里我們不需要使用這個(gè)返回值
    system.actorOf(Props[RemoteActor], "remoteActor")

}
ClientActor
package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorLogging}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}

class ClientActor extends Actor with ActorLogging {

    // akka.://@:/
    val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 遠(yuǎn)程Actor的路徑,通過該路徑能夠獲取到遠(yuǎn)程Actor的一個(gè)引用
    val remoteServerRef = context.actorSelection(path) // 獲取到遠(yuǎn)程Actor的一個(gè)引用,通過該引用可以向遠(yuǎn)程Actor發(fā)送消息
    @volatile var connected = false
    @volatile var stop = false

    def receive = {
        case Start => { // 發(fā)送Start消息表示要與遠(yuǎn)程Actor進(jìn)行后續(xù)業(yè)務(wù)邏輯處理的通信,可以指示遠(yuǎn)程Actor初始化一些滿足業(yè)務(wù)處理的操作或數(shù)據(jù)
            send(Start)
            if (!connected) {
                connected = true
                log.info("ClientActor==> Actor connected: " + this)
            }
        }
        case Stop => {
            send(Stop)
            stop = true
            connected = false
            log.info("ClientActor=> Stopped")
        }
        case header: Header => {
            log.info("ClientActor=> Header")
            send(header)
        }
        case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收遠(yuǎn)程Actor處理一個(gè)Packet消息的結(jié)果
        case m => log.info("Unknown message: " + m)
    }

    private def send(cmd: Serializable): Unit = {
        log.info("Send command to server: " + cmd)
        try {
            remoteServerRef ! cmd // 發(fā)送一個(gè)消息到遠(yuǎn)程Actor,消息必須是可序列化的,因?yàn)橄ο笠?jīng)過網(wǎng)絡(luò)傳輸
        } catch {
            case e: Exception => {
                connected = false
                log.info("Try to connect by sending Start command...")
                send(Start)
            }
        }
    }

}
AkkaClientApplication
package cn.xpleaf.bigdata.p5.myakka.p3

import akka.actor.{ActorSystem, Props}
import cn.xpleaf.bigdata.p5.myakka.{Header, Start}
import com.typesafe.config.ConfigFactory

object AkkaClientApplication extends App {
    // 通過配置文件application.conf配置創(chuàng)建ActorSystem系統(tǒng)
    val system = ActorSystem("client-system",
        ConfigFactory.load().getConfig("MyRemoteClientSideActor"))
    val log = system.log
    val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 獲取到ClientActor的一個(gè)引用
    clientActor ! Start // 發(fā)送一個(gè)Start消息,第一次與遠(yuǎn)程Actor握手(通過本地ClientActor進(jìn)行轉(zhuǎn)發(fā))
    Thread.sleep(2000)
    clientActor ! Header("What's your name: Can you tell me ", 20, encrypted = false) // 發(fā)送一個(gè)Header消息到遠(yuǎn)程Actor(通過本地ClientActor進(jìn)行轉(zhuǎn)發(fā))
    Thread.sleep(2000)

}
測試

服務(wù)端輸出結(jié)果如下:

[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [04/24/2018 09:39:49.509] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [04/24/2018 09:39:49.517] [main] [ActorSystem(remote-system)] ===>Remote server actor started: akka://remote-system
[INFO] [04/24/2018 09:46:01.872] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server Start ==>RECV Start event : cn.xpleaf.bigdata.p5.myakka.Start$@325737b3
[INFO] [04/24/2018 09:46:03.501] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server => RECV header: (What's your name: Can you tell me ,20,false)

客戶端輸出結(jié)果如下:

[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting
[INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@192.168.43.132:2552]
[INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@192.168.43.132:2552]
[INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: cn.xpleaf.bigdata.p5.myakka.Start$@4f00805d
[INFO] [04/24/2018 09:46:01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor==> Actor connected: cn.xpleaf.bigdata.p5.myakka.p3.ClientActor@5a85b576
[INFO] [04/24/2018 09:46:03.490] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor=> Header
[INFO] [04/24/2018 09:46:03.491] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: Header(What's your name: Can you tell me ,20,false)

當(dāng)前標(biāo)題:Scala筆記整理(九):Actor和AKKA
文章URL:http://weahome.cn/article/gssgsh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部