我們都知道,RocketMQ在代碼級(jí)別對(duì)連接服務(wù)器進(jìn)行了限制,基本上可以理解為一個(gè)JVM進(jìn)程中只能連接一個(gè)NameServer,但實(shí)際應(yīng)用場(chǎng)景中,我們可能會(huì)在架構(gòu)設(shè)計(jì)層面上對(duì)RocketMQ進(jìn)行了職能上的劃分,規(guī)定了A服務(wù)處理A類消息,而B(niǎo)服務(wù)處理B類消息,這時(shí)我們應(yīng)該如何解決這個(gè)問(wèn)題呢?
十年的吐魯番網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。全網(wǎng)整合營(yíng)銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整吐魯番建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)公司從事“吐魯番網(wǎng)站設(shè)計(jì)”,“吐魯番網(wǎng)站推廣”以來(lái),每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
我們從代碼層級(jí)來(lái)分析到底為什么會(huì)產(chǎn)生“一個(gè)JVM實(shí)例只能連接一個(gè)NameServer”。
RocketMQ Client有一個(gè)核心類MQClientManager
,在我們需要使用MQ Client實(shí)例的時(shí)候,實(shí)際上都是通過(guò)它的getAndCreateMQClientInstance
方法進(jìn)行創(chuàng)建的;名稱比較拗口,同時(shí)是Get和Create,這不太符合我們所說(shuō)的設(shè)計(jì)單一性原則,但這不是我們討論的重點(diǎn),我們看一看這個(gè)方法的實(shí)現(xiàn)
public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
代碼不復(fù)雜,我們可以看到它利用客戶的配置信息生成一個(gè)固定的clientId,以此去緩存factoryTable中查找,不存在才會(huì)創(chuàng)建全新一個(gè)實(shí)例。
那么,可以理解一個(gè)clientID僅能存在一個(gè)連接實(shí)例了,可這個(gè)clientId是怎么產(chǎn)生的呢?繼續(xù)跟蹤看看這段代碼
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
代碼層面上對(duì)clientId進(jìn)行了約定,格式為“ClientIp@InstanceName”格式,當(dāng)unitName不為空的時(shí)候還會(huì)在后面加上“@unitName”。
從代碼分析上我們可以知道,為了創(chuàng)建多實(shí)例,我們可以
instanceName從哪來(lái)的?
instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
從系統(tǒng)屬性中讀取出來(lái)的,也就是一般在JVM啟動(dòng)時(shí)設(shè)定的。。。
可以變嗎?當(dāng)然,你可以通過(guò)代碼去做到,但這么做的話,你會(huì)失去讓人理解你代碼的能力的,哈哈
這就是為什么多少RocketMQ Client都只能連接一個(gè)服務(wù)器的原因了,它根本不考慮服務(wù)器是誰(shuí),僅關(guān)心自己,自私的家伙!
除此之外還有其它解決方案嗎?我仔細(xì)從網(wǎng)絡(luò)上翻了一輪,沒(méi)看到什么好方法,是大家都沒(méi)這個(gè)場(chǎng)景還是有其它好辦法解決了呢?歡迎大家討論~
在上一篇博文來(lái)自平行世界的救贖里面,我做了個(gè)工具sandbox,我提供的方法3就是依托于這個(gè)工具。
sandbox通過(guò)代碼隔離的方式,將另一份類定義放入沙箱中運(yùn)行,從而實(shí)現(xiàn)多個(gè)實(shí)例完全隔離的效果。MQClientManager
通過(guò)緩存方式,以clientId作為key值存儲(chǔ)到自身實(shí)例當(dāng)中,為了實(shí)現(xiàn)多個(gè)Client,那么前兩種方法的邏輯是修改clientId實(shí)現(xiàn)多個(gè)實(shí)例,而方法3的邏輯則是“既然你的緩存已經(jīng)有這個(gè)key,我就換個(gè)緩存”,本質(zhì)就是“你這個(gè)鍋不裝我,我就換個(gè)鍋”。
這里我使用一個(gè)springboot項(xiàng)目作為演示案例。
通過(guò)springboot的Configuration
將多個(gè)RocketMQ Client進(jìn)行注冊(cè),再定義一個(gè)Controller接收不同請(qǐng)求去發(fā)送MQ消息,最后加上啟動(dòng)類。
我們先從pom文件中引入包(我沒(méi)有推上maven倉(cāng)庫(kù),各位可以從github/gitee上下載),代碼如下
4.0.0
me.van
rocket-mq-multi-client-test
1.0-SNAPSHOT
jar
測(cè)試多個(gè)rocketmq client共存
org.springframework.boot
spring-boot-starter-parent
2.1.6.RELEASE
UTF-8
me.van.App
1.8
1.14.8
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
${lombok.version}
provided
org.apache.rocketmq
rocketmq-client
4.4.0
me.van
sandbox
1.0.0
org.springframework.boot
spring-boot-maven-plugin
此處引入了apache的rocketmq-client組件作為mq客戶端,也就是存在前面所說(shuō)的問(wèn)題的組件。
package me.van;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
非常的簡(jiǎn)單,沒(méi)什么好介紹的。
package me.van;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AppConfig {
@Bean(autowire = Autowire.BY_NAME, value = "producer")
MQProducer producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer();
initProducer(producer, "a.io:9876;b.io:9876");
return producer;
}
@Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox1")
MQProducer producerSandbox1() throws MQClientException, SandboxCannotCreateObjectException {
DefaultMQProducer producer = createProducerInSandbox();
initProducer(producer, "x.io:9876;y.io:9876");
return producer;
}
@Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox2")
MQProducer producerSandbox2() throws MQClientException, SandboxCannotCreateObjectException {
DefaultMQProducer producer = createProducerInSandbox();
initProducer(producer, "1.io:9876;2.io:9876");
return producer;
}
private DefaultMQProducer createProducerInSandbox() throws SandboxCannotCreateObjectException {
Sandbox sandbox = new Sandbox("org.apache.rocketmq.client");
return sandbox.createObject(DefaultMQProducer.class);
}
private void initProducer(DefaultMQProducer producer, String namesrvAddr) throws MQClientException {
producer.setNamesrvAddr(namesrvAddr);
producer.setProducerGroup("test-group");
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.start();
}
}
這里可以看到,producer
對(duì)象是直接new 出來(lái)的DefaultMQProducer
,而producer_sandbox1
和producer_sandbox2
是通過(guò)不同的沙箱創(chuàng)建出來(lái)的;三個(gè)client分別連接到不同的NameServer中,同時(shí)其它屬性保持一致。
package me.van;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
MQProducer producer;
@Autowired
MQProducer producer_sandbox1;
@Autowired
MQProducer producer_sandbox2;
@GetMapping("/")
public String hello(){
return "hello world";
}
@GetMapping("/send")
public String send(String msg){
if(null == msg) return "msg is null";
String returnMsg = "";
Message message = new Message("topic-test-multi-mq-client", msg.getBytes());
try {
producer.send(message);
returnMsg += "原生producer發(fā)送完成
";
producer_sandbox1.send(message);
returnMsg += "第一個(gè)沙箱內(nèi)producer發(fā)送完成
";
producer_sandbox2.send(message);
returnMsg += "第二個(gè)沙箱內(nèi)producer發(fā)送完成
";
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
returnMsg += "發(fā)送過(guò)程出現(xiàn)異常:" + e.getMessage();
}
return returnMsg;
}
}
通過(guò)send
方法同時(shí)向三個(gè)producer
發(fā)送消息。
運(yùn)行App
,等幾秒鐘啟動(dòng)完畢,訪問(wèn)http://localhost:8080/send,返回
msg is null
訪問(wèn),http://localhost:8080/send?msg=test
github: https://github.com/vancoo/multi-mq-demo
gitee: https://gitee.com/vancoo/multi-mq-demo
參考文檔: 來(lái)自平行世界的救贖