真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spark之master與worker通信模型講解

通信模型架構(gòu)圖

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名注冊(cè)、網(wǎng)站空間、營銷軟件、網(wǎng)站建設(shè)、和順網(wǎng)站維護(hù)、網(wǎng)站推廣。

spark之master與worker通信模型講解

master 端代碼
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
// 需要導(dǎo)入這2個(gè)包 封裝一些屬性。
class MasterActor extends Actor {

  //在開始之前調(diào)用一次
  override def preStart(): Unit = {

  }

  //用于接收消息
  override def receive: Receive = {
    case "started" => {
      println("Master has been started!")
      //進(jìn)入這個(gè)分支,說明這個(gè)Master線程已經(jīng)啟動(dòng)完成
    }
    case "connecting" => {
      println("Master has been get connect from Worker!")
      println("a Worker Node has been register!")
      //返回消息給Worker
      sender() ! "connected"
      Thread.sleep(1000)
    }

    case "stoped" => {

    }
  }

}

object Demo01MasterActor {

  def main(args: Array[String]) {

    //設(shè)置MasterIP和端口
    val masterHost = "localhost"
    val masterPort = "1234"

    //端口和IP封裝到akka架構(gòu),獲取一個(gè)屬性配置文件
    val conStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$masterHost"
         |akka.remote.netty.tcp.port = "$masterPort"
  """.stripMargin

    val config = ConfigFactory.parseString(conStr)
    val masterActorSystem = ActorSystem("MasterActorSystem", config)
    val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor")
    masterActor ! "started"
    masterActorSystem.awaitTermination();

  }

}

worker端代碼
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class WorkerActor extends Actor {
  var masterURL: ActorSelection = null

  //啟動(dòng)Actor之前執(zhí)行,做初始化工作
  override def preStart(): Unit = {
    //配置訪問Master的URL
    //MasterIP:localhost
    //MasterPort:8888(根據(jù)Master配置)
    //Master的 ActorSystem對(duì)象:MasterActorSystem、MasterActor
    masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor")
  }

  override def receive: Receive = {
    case "started" => {
      println("Worker has been started!")
      //進(jìn)入這個(gè)分支,說明這個(gè)Worker線程已經(jīng)啟動(dòng)完成
      //可以去向Master注冊(cè)

      //請(qǐng)求和Master建立連接
      masterURL ! "connecting"
    }
    case "connected" => {
      println("Worker 收到來自Master確認(rèn)信息!")
    }
    case "stoped" => {

    }
  }


}

object Demo01WorkerActor {

  def main(args: Array[String]) {
    //初始化MastereIP和端口、WorkerIP和端口

    //    val masterHost = args(0)
    //    val masterPort = args(1)
    //    val workerHost = args(2)
    //    val workePort = args(3)

    val masterHost = "localhost"
    val masterPort = "8888"

    val workerHost = "localhost"
    val workePort = "8889"


    //端口和IP封裝到akka架構(gòu),獲取一個(gè)屬性配置文件
    val conStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$workerHost"
         |akka.remote.netty.tcp.port = "$workePort"
  """.stripMargin

    val config = ConfigFactory.parseString(conStr)
    val workerActorSystem = ActorSystem("WorkerActorSystem", config)
    val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor")
    workerActor ! "started"
    workerActorSystem.awaitTermination();


  }

}

新聞標(biāo)題:spark之master與worker通信模型講解
鏈接URL:http://weahome.cn/article/jpgeed.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部