在這里,將從Actor之間發(fā)送消息的角度來(lái)介紹所有關(guān)于消息傳遞的概念。
● Ask:向Actor發(fā)送一條消息,返回一個(gè)Future。當(dāng)Actor返回響應(yīng)時(shí),會(huì)完成Future。不會(huì)向消息發(fā)送者的郵箱返回任何消息。
● Tell。向Actor發(fā)送一條消息。所有發(fā)送至sender()的響應(yīng)都會(huì)返回給發(fā)送消息的Actor?!?Forward:將接收到的消息再發(fā)送給另一個(gè)Actor。所有發(fā)送至sender()的響應(yīng)都會(huì)返回給原始消息的發(fā)送者。
● Pipe:用于將Future的結(jié)果返回給sender()或另一個(gè)Actor。如果正在使用Ask或是處理一個(gè)Future,那么使用Pipe可以正確地返回Future的結(jié)果。
可變消息定義
// Java
public class Message {public StringBuffer mutableBuffer;
public Message(StringBuffer: mutableBuffer) {this.mutableBuffer = mutableBuffer;
}
}
// scala
class Message(var mutableBuffer: StringBuffer = new StringBuffer);
Message message = new Message(new StringBuffer("original"));
message.mutableBuffer = new StringBuffer("new");
val message = new Message(new StringBuffer("original"))
message.mutableBuffer = new StringBuffer("new")
這個(gè)例子新建了一條消息,然后修改mutableBuffer引用,使之指向另一個(gè)新建的StringBuffer,消息創(chuàng)建時(shí)傳入的是StringBuffer (“original”),后來(lái)被修改成了StringBuffer(“new”)。這就是通過(guò)修改引用來(lái)修改消息的方法。
消息不可變
public class Message {// final
public final StringBuffer mutableBuffer;
Message(StringBuffer mutableBuffer) {this.mutableBuffer = mutableBuffer;
}
}
public class ImmutableMessage{public final String immutableType;
public ImmutableMessage(String immutableType) {this.immutableType = immutableType;
}
}
class ImmutableMessage(immutableType: String)
總結(jié)
理解不可變性不僅僅對(duì)于Akka消息是至關(guān)重要的,對(duì)于如何進(jìn)行安全的通用并發(fā)編程也是必不可少的,因此,無(wú)論什么時(shí)候,只要需要在線程之間共享數(shù)據(jù),就應(yīng)該首先考慮將數(shù)據(jù)定義為不可變。
1.2 ASK消息模式![epub_22651331_27.jpg](https://img-blog.csdnimg.cn/img_convert/2a7e44802270d65d74f4b6588fff8522.jpeg
要求
Ask模式要求定義一個(gè)超時(shí)參數(shù),如果對(duì)方?jīng)]有在超時(shí)參數(shù)限定的時(shí)間內(nèi)返回這個(gè)ask的響應(yīng),那么Future就會(huì)返回失敗。ask/?方法要求提供的超時(shí)參數(shù)可以是長(zhǎng)整型的毫秒數(shù),也可以是akka.util.Timeout,這種類型提供了更豐富的時(shí)間表達(dá)方式。
![epub_22651331_29.jpg](https://img-blog.csdnimg.cn/img_convert/50dcf9f3caa3978e58df33c9ff7b4690.jpeg#averageHue=#f0f0f0&clientId=uac482df2-4a9a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=278&id=ue3790d29&margin=[object Object]&name=epub_22651331_29.jpg&originHeight=347&originWidth=1198&originalType=binary&ratio=1&rotation=0&showTitle=false&size=44177&status=done&style=none&taskId=uc3ef683b-00ce-4632-86a0-8a7a71984e3&title=&width=958.4)
static import akka.pattern.Patterns.ask;
Timeout timeout = new akka.util.Timeout(
1,
java.util.concurrent.TimeUnit.SECONDS
);
Future future = ask(actor, message, timeout);
import scala.concurrent.duration._
import akka.pattern.ask
// 設(shè)置超時(shí)時(shí)間
implicit val timeout = akka.util.Timeout(1 second)
val future = actorRef ? "message"
案例:請(qǐng)參考前面的案例3
缺點(diǎn)
Ask模式看上去很簡(jiǎn)單,不過(guò)它是有隱藏的額外性能開(kāi)銷的,首先,ask會(huì)導(dǎo)致Akka在/temp路徑下新建一個(gè)臨時(shí)Actor。
這個(gè)臨時(shí)Actor會(huì)等待從接收ask消息的Actor返回的響應(yīng),其次,F(xiàn)uture也有額外的性能開(kāi)銷。
Ask會(huì)創(chuàng)建Future,由臨時(shí)Actor負(fù)責(zé)完成,這個(gè)開(kāi)銷并不大,但是如果需要非常高頻地執(zhí)行ask操作,那么還是要將這一開(kāi)銷考慮在內(nèi)的。
Ask很簡(jiǎn)單,不過(guò)考慮到性能,使用tell是更高效的解決方案。
//Java
actor.forward(result, getContext());
//Scala
actor forward message
1.4 Pipe消息模式很多情況下,需要將Actor中的某個(gè)Future返回給請(qǐng)求發(fā)送者。上文介紹過(guò)sender()是一個(gè)方法,所以要在Future的回調(diào)函數(shù)中訪問(wèn)sender(),我們必須存儲(chǔ)一個(gè)指向sender()的引用:
//Java
final ActorRef senderRef = sender();
future.map(x ->{senderRef.tell(x, ActorRef.noSender())});
//Scala
val senderRef = sender();
future.map(x =>senderRef ! x);(原文為future.map(x =>senderRef ! ActorRef.noSender),有誤。)
//Java
pipe(future, system.dispatcher()).to(sender());
//Scala
future pipeTo sender()
pipe(future) to sender()
pipe接受Future的結(jié)果作為參數(shù),然后將其傳遞給所提供的Actor引用。在上面的例子中,因?yàn)閟ender()執(zhí)行在當(dāng)前線程上,所以我們可以直接調(diào)用sender(),而不用干一些奇怪的事情(比如把sender()引用存儲(chǔ)在一個(gè)變量中),執(zhí)行結(jié)果一切正確。這就好多了!
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧