郵箱Mailbox
成都創(chuàng)新互聯(lián)公司服務(wù)項目包括臨沭網(wǎng)站建設(shè)、臨沭網(wǎng)站制作、臨沭網(wǎng)頁制作以及臨沭網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,臨沭網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到臨沭省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!Akka Mailbox持有發(fā)給actor的消息。通常,每一個Actor都與自己的郵箱,但是當(dāng)使用BalancingPool時,所有的路由都共享一個郵箱實例。
郵箱選擇
Actor需要的消息隊列類型
特定類型的actor可以用特定類型的消息隊列,只要這個actor實現(xiàn)了參數(shù)化的接口RequiresMessageQueue。這里是一個例子:
import akka.dispatch.BoundedMessageQueueSemantics;import akka.dispatch.RequiresMessageQueue;public class MyBoundedUntypedActor extends MyUntypedActorimplements RequiresMessageQueueRequiresMessageQueue接口的類型參數(shù)需要在配置中映射到一個郵箱,就像這樣:
bounded-mailbox {mailbox-type = "akka.dispatch.BoundedMailbox"mailbox-capacity = 1000mailbox-push-timeout-time = 10s}akka.actor.mailbox.requirements {"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox}現(xiàn)在每次你創(chuàng)建一個類型為MyBoundedUntypedActor的actor,它都將會嘗試獲取一個有界郵箱。如果actor在部署時配置了不同的郵箱,可能是直接配置的,也可能是通過帶有特定郵箱類型的分發(fā)器,那么就會覆寫這個映射。
注意
為Actor創(chuàng)建的郵箱中的隊列類型用接口中要求的類型進行檢查,如果隊列沒有實現(xiàn)要求的類型,那么actor創(chuàng)建就會失敗。
Dispatcher需要的消息隊列類型
分發(fā)器也需要一個郵箱類型,用于運行中的actor。一個例子就是BalancingDispatcher,它需要一個并發(fā)的、線程安全的消息隊列。這樣的需求可以在分發(fā)器配置中進行規(guī)劃,就像這樣:
my-dispatcher {mailbox-requirement = org.example.MyInterface}給定的需求命名了一個類或者接口,必須保證這個類或者接口是消息隊列實現(xiàn)的超類型。萬一沖突了,例如如果actor需要一個郵箱類型,但是它不滿足這個需求,那么actor創(chuàng)建就會失敗。
如何選擇郵箱類型
當(dāng)創(chuàng)建actor時,ActorRefProvider首先確定分發(fā)器,分發(fā)起會執(zhí)行actor。然后按照如下順序確定郵箱類型:
如果actor的部署配置部分包含一個mailbox關(guān)鍵字,那么這個mailbox關(guān)鍵字就指定了要使用的郵箱類型;如果actor的Props包含mailbox選擇—即調(diào)用了withMailbox方法—那么這個方法指定要使用的郵箱類型;如果分發(fā)器的配置部分包含一個mailbox-type關(guān)鍵字,那么這部分也將被用于配置郵箱類型;如果actor需要上面描述的郵箱類型,那么這個需求的映射將被用于確定郵箱類型;如果失敗了,那么分發(fā)器的需求-如果存在-將被會嘗試;如果分發(fā)器需要后面描述的郵箱類型,那么這個需求的映射將被用于確定郵箱類型;將使用默認的郵箱akka.actor.default-mailbox。默認郵箱
當(dāng)按照上述描述的依然沒有指定郵箱。那么就會使用默認的郵箱。默認郵箱his一個無界郵箱,是由java.util.concurrent.ConcurrentLinkedQueue支持的。
SingleConsumerOnlyUnboundedMailbox是更高效的郵箱,它可被用于默認郵箱,但是它不能被用于BalancingDispatcher。
將SingleConsumerOnlyUnboundedMailbox配置為默認郵箱:
akka.actor.default-mailbox {mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"}那些配置會傳給Mailbox類型
每一個郵箱類型都繼承自MailboxType,它的構(gòu)造函數(shù)有兩個參數(shù):ActorSystem.Settings對象和Config對象。后面這個是通過actor系統(tǒng)的配置獲取,用郵箱類型的配置路徑覆蓋它的id關(guān)鍵字,并添加一個默認郵箱配置的回調(diào)。
內(nèi)置的Mailbox實現(xiàn)
Akka自帶了很多郵箱實現(xiàn):
UnboundedMailbox(默認) 默認郵箱由java.util.concurrent.ConcurrentLinkedQueue支持阻塞: No有界: No配置名: "unbounded"或"akka.dispatch.UnboundedMailbox" SingleConsumerOnlyUnboundedMailbox這個隊列可能會或可能不會比默認郵箱更快,取決于你的使用場景—請確保進行過適當(dāng)?shù)幕鶞蕼y試!
由多生產(chǎn)者-單消費者隊列支持,不能用于BalancingDispatcher阻塞: No有界: No配置名:"akka.dispatch.SingleConsumerOnlyUnboundedMailbox" NonBlockingBoundedMailbox由非常高效的多生產(chǎn)者-單消費者隊列支持阻塞: No (將溢出消息丟棄到死信)有界: Yes配置名:"akka.dispatch.NonBlockingBoundedMailbox" UnboundedControlAwareMailbox優(yōu)先派送akka.dispatch.ControlMessage消息由兩個java.util.concurrent.ConcurrentLinkedQueue支持阻塞: No有界: No配置名:"akka.dispatch.UnboundedControlAwareMailbox" UnboundedPriorityMailbox由java.util.concurrent.PriorityBlockingQueue支持相同優(yōu)先級的消息的派送順序未定義-與UnboundedStablePriorityMailbox相反阻塞: No有界: No配置名:"akka.dispatch.UnboundedPriorityMailbox" UnboundedStablePriorityMailbox由包裝到akka.util.PriorityQueueStabilizer的java.util.concurrent.PriorityBlockingQueue支持相同優(yōu)先級的消息保證按照FIFO順序派送- contrast with the UnboundedPriorityMailbox阻塞: No有界: No配置名:"akka.dispatch.UnboundedStablePriorityMailbox"其它的有界郵箱實現(xiàn)如果達到容量,并且配置了non-zero mailbox-push-timeout-time,會阻塞發(fā)送者。
注意
下面的郵箱只應(yīng)該用于mailbox-push-timeout-time為0的情況。
BoundedMailbox由java.util.concurrent.LinkedBlockingQueue支持阻塞:如果使用non-zero mailbox-push-timeout-time為Yes,否則為No有界: Yes配置名:"bounded"或"akka.dispatch.BoundedMailbox" BoundedPriorityMailbox由包裝到akka.util.BoundedBlockingQueue中java.util.PriorityQueue支持相同優(yōu)先級的消息的派送順序未定義-與BoundedStablePriorityMailbox相反阻塞:如果使用了non-zero mailbox-push-timeout-time則為Yes,否則為No有界: Yes配置名:"akka.dispatch.BoundedPriorityMailbox" BoundedStablePriorityMailbox由包裝在akka.util.PriorityQueueStabilizer和akka.util.BoundedBlockingQueue中的java.util.PriorityQueue支持相同優(yōu)先級的消息的派送順序為FIFO-與BoundedPriorityMailbox相反阻塞: Yes如果使用了non-zero mailbox-push-timeout-time則為Yes,否則為No有界: Yes配置名:"akka.dispatch.BoundedStablePriorityMailbox" BoundedControlAwareMailbox優(yōu)先派送akka.dispatch.ControlMessage消息由兩個java.util.concurrent.ConcurrentLinkedQueue支持,如果達到容量,則阻塞排隊阻塞: Yes如果使用了non-zero mailbox-push-timeout-time則為Yes,否則為No有界: Yes配置名:"akka.dispatch.BoundedControlAwareMailbox"如果創(chuàng)建PriorityMailbox:
importcom.typesafe.config.Config;
importakka.actor.ActorSystem;
importakka.actor.PoisonPill;
importakka.dispatch.PriorityGenerator;
importakka.dispatch.UnboundedPriorityMailbox;
publicclassMyPrioMailboxextendsUnboundedPriorityMailbox {
// 用于反射實例化
publicMyPrioMailbox(ActorSystem.Settingssettings, Configconfig) {
//創(chuàng)建一個新的PriorityGenerator,低優(yōu)先級意味著更重要
super(newPriorityGenerator() {
@Override
publicintgen(Objectmessage) {
if(message.equals("highpriority"))
return0;// 如果可能的話,高優(yōu)先級的消息應(yīng)該被優(yōu)先處理
elseif(message.equals("lowpriority"))
return2;// 如果低優(yōu)先級的消息應(yīng)該被最后處理
elseif(message.equals(PoisonPill.getInstance()))
return3;// 當(dāng)沒有剩余時,則為處理PoisonPill
else
return1;// 默認位于高優(yōu)先級和低優(yōu)先級
}
});
}
}
然后把它添加到配置中:
prio-dispatcher {
mailbox-type ="docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other dispatcher configuration goes here
}
下面是使用這個郵箱的例子:
importcom.typesafe.config.Config;
importcom.typesafe.config.ConfigFactory;
importakka.actor.ActorRef;
importakka.actor.ActorSystem;
importakka.actor.PoisonPill;
importakka.actor.Props;
importakka.actor.UntypedActor;
importakka.event.Logging;
importakka.event.LoggingAdapter;
publicclassDemoextendsUntypedActor {
LoggingAdapterlog= Logging.getLogger(getContext().system(),this);
{
for(Objectmsg:newObject[] {"lowpriority","lowpriority","highpriority","pigdog","pigdog2","pigdog3",
"highpriority", PoisonPill.getInstance() }) {
getSelf().tell(msg, getSelf());
}
}
publicvoidonReceive(Objectmessage) {
log.info(message.toString());
}
publicstaticvoidmain(String[]args) {
Configconfig= ConfigFactory.parseString("akka.loglevel = DEBUG n"+"akka.actor.debug.lifecycle = on");
// We create a new Actor that just prints out what it processes
ActorSystemsystem= ActorSystem.create("mailbox");
ActorRefmyActor=system.actorOf(Props.create(Demo.class).withDispatcher("prio-dispatcher"),"demo");
system.terminate();
}
}
運行輸出:
[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] highpriority
[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] highpriority
[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog
[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog2
[INFO] [12/24/2016 23:38:22.364] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] pigdog3
[INFO] [12/24/2016 23:38:22.365] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] lowpriority
[INFO] [12/24/2016 23:38:22.365] [mailbox-prio-dispatcher-5] [akka://mailbox/user/demo] lowpriority
也可以直接配置郵箱類型,就像這樣:
prio-mailbox {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other mailbox configuration goes here
}
akka.actor.deployment {
/priomailboxactor {
mailbox = prio-mailbox
}
}
然后就可以使用來自部署的郵箱類型:
ActorRef myActor =
system.actorOf(Props.create(MyUntypedActor.class),
"priomailboxactor");
或這樣:
ActorRef myActor =
system.actorOf(Props.create(MyUntypedActor.class)
.withMailbox("prio-mailbox"));
創(chuàng)建自己的郵箱類型一個值得上千次吹噓的例子:
importakka.actor.ActorRef;
importakka.actor.ActorSystem;
importakka.dispatch.Envelope;
importakka.dispatch.MailboxType;
importakka.dispatch.MessageQueue;
importakka.dispatch.ProducesMessageQueue;
importcom.typesafe.config.Config;
importjava.util.concurrent.ConcurrentLinkedQueue;
importjava.util.Queue;
importscala.Option;
publicclassMyUnboundedJMailboximplementsMailboxType, ProducesMessageQueue
// This is the MessageQueue implementation
publicstaticclassMyMessageQueueimplementsMessageQueue, MyUnboundedJMessageQueueSemantics {
privatefinalQueue
// these must be implemented; queue used as example
publicvoidenqueue(ActorRefreceiver, Envelopehandle) {
queue.offer(handle);
}
publicEnvelope dequeue() {
returnqueue.poll();
}
publicintnumberOfMessages() {
returnqueue.size();
}
publicbooleanhasMessages() {
return!queue.isEmpty();
}
publicvoidcleanUp(ActorRefowner, MessageQueuedeadLetters) {
for(Envelopehandle:queue) {
deadLetters.enqueue(owner,handle);
}
}
}
// This constructor signature must exist, it will be called by Akka
publicMyUnboundedJMailbox(ActorSystem.Settingssettings, Configconfig) {
// put your initialization code here
}
// The create method is called to create the MessageQueue
publicMessageQueue create(Option
returnnewMyMessageQueue();
}
}
//Marker interface used for mailbox requirements mapping
publicinterfaceMyUnboundedJMessageQueueSemantics {
}
然后只需要將分發(fā)器配置或mailbox配置的"mailbox-type"的值指定為你的MailboxType的全限定名。
注意
確保包含一個參數(shù)為akka.actor.ActorSystem.Settings和com.typesafe.config.ConfigMake的構(gòu)造函數(shù),因為這個構(gòu)造函數(shù)會被反射調(diào)用,以便構(gòu)建你的郵箱類型。傳入的第二參數(shù)config是來自于配置中描述使用了這個郵箱類型的分發(fā)器和郵箱設(shè)置的那部分。對于每一個分發(fā)器和郵箱,郵箱類型只實例化一次。
你也可以使用郵箱類型作為分發(fā)器的必要條件,就像這樣:
custom-dispatcher {
mailbox-requirement =
"docs.dispatcher.MyUnboundedJMessageQueueSemantics"
}
akka.actor.mailbox.requirements {
"docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
}
或者在你的actor上定義必備條件,就像這樣:
importakka.actor.UntypedActor;
importakka.dispatch.RequiresMessageQueue;
publicclassMySpecialActorextendsUntypedActorimplementsRequiresMessageQueue
@Override
publicvoidonReceive(Objectarg0)throwsException {
//TODOAuto-generated method stub
}
}
system.actorOf的特殊語義為了讓system.actorOf既同步又是非阻塞的,同時保持返回類型ActorRef(和返回的引用功能齊全的語義),這種情況下需要特殊處理。在這些場景背后,構(gòu)造了虛擬的actor引用,這些引用被發(fā)送到系統(tǒng)的守護actor,守護actor實際創(chuàng)建actor和它的上下文,并將它們放入引用內(nèi)部。Until that has happened,發(fā)送到ActorRef的消息將被本地排隊,一旦交換實際的填寫,它們將被轉(zhuǎn)移到真正的郵箱。因此,
finalProps props = ...
//這個actor使用MyCustomMailbox,假設(shè)它是單例
system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);
assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));
可能會失?。荒銓⒉坏貌辉试S一些時間來匆忙地傳入檢查并重試。TestKit.awaitCond。