這篇文章主要介紹了springboot websocket集群連接時(shí)候傳遞參數(shù)的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
創(chuàng)新互聯(lián)專(zhuān)注于開(kāi)封網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供開(kāi)封營(yíng)銷(xiāo)型網(wǎng)站建設(shè),開(kāi)封網(wǎng)站制作、開(kāi)封網(wǎng)頁(yè)設(shè)計(jì)、開(kāi)封網(wǎng)站官網(wǎng)定制、微信小程序開(kāi)發(fā)服務(wù),打造開(kāi)封網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供開(kāi)封網(wǎng)站排名全網(wǎng)營(yíng)銷(xiāo)落地服務(wù)。
最近在公司項(xiàng)目中接到個(gè)需求。就是后臺(tái)跟前端瀏覽器要保持長(zhǎng)連接,后臺(tái)主動(dòng)往前臺(tái)推數(shù)據(jù)。
網(wǎng)上查了下,websocket stomp協(xié)議處理這個(gè)很簡(jiǎn)單。尤其是跟springboot 集成。
但是由于開(kāi)始是單機(jī)玩的,很順利。
但是后面部署到生產(chǎn)搞集群的話,就會(huì)出問(wèn)題了。
假如集群兩個(gè)節(jié)點(diǎn),瀏覽器A與節(jié)點(diǎn)A建立連接,A節(jié)點(diǎn)發(fā)的消息瀏覽器A節(jié)點(diǎn)肯定能收到。但是B節(jié)點(diǎn)由于沒(méi)有跟瀏覽器A建立連接。B節(jié)點(diǎn)發(fā)的消息瀏覽器就收不到了。
網(wǎng)上也查了好多,但是沒(méi)有一個(gè)說(shuō)的很清楚的,也很多都是理論層面的。
還有很多思路都是通過(guò)session獲取信息的。但是這都不是我需要的。我需要的是從前臺(tái)傳遞參數(shù),連接的時(shí)候每個(gè)節(jié)點(diǎn)保存下。然后通過(guò)SimpleUserRegistry.getUser獲取。
代碼:
var WEB_SOCKET = { topic : "", url : "", stompClient : null, connect : function(url, topic, callback,userid) { this.url = url; this.topic = topic; var socket = new SockJS(url); //連接SockJS的endpoint名稱(chēng)為"endpointOyzc" WEB_SOCKET.stompClient = Stomp.over(socket);//使用STMOP子協(xié)議的WebSocket客戶端 WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//連接WebSocket服務(wù)端 // console.log('Connected:' + frame); //通過(guò)stompClient.subscribe訂閱/topic/getResponse 目標(biāo)(destination)發(fā)送的消息 WEB_SOCKET.stompClient.subscribe(topic, callback); }); } };
這是響應(yīng)的前端代碼。只需要引入兩個(gè)js。調(diào)用new SockJS(url) 就代表跟服務(wù)器建立連接了。
@Configuration //注解開(kāi)啟使用STOMP協(xié)議來(lái)傳輸基于代理(message broker)的消息,這時(shí)控制器支持使用@MessageMapping,就像使用@RequestMapping一樣 @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Autowired private GetHeaderParamInterceptor getHeaderParamInterceptor; @Override //注冊(cè)STOMP協(xié)議的節(jié)點(diǎn)(endpoint),并映射指定的url public void registerStompEndpoints(StompEndpointRegistry registry) { //注冊(cè)一個(gè)STOMP的endpoint,并指定使用SockJS協(xié)議 registry.addEndpoint("/endpointOyzc") .setAllowedOrigins("*") .withSockJS(); /* registry.addEndpoint("/endpointOyzc") .setAllowedOrigins("*") .setHandshakeHandler(xlHandshakeHandler) .withSockJS();*/ } @Override //配置消息代理(Message Broker) public void configureMessageBroker(MessageBrokerRegistry registry) { //點(diǎn)對(duì)點(diǎn)應(yīng)配置一個(gè)/user消息代理,廣播式應(yīng)配置一個(gè)/topic消息代理 registry.enableSimpleBroker("/topic", "/user"); // 全局使用的消息前綴(客戶端訂閱路徑上會(huì)體現(xiàn)出來(lái)) //registry.setApplicationDestinationPrefixes("/app"); //點(diǎn)對(duì)點(diǎn)使用的訂閱前綴(客戶端訂閱路徑上會(huì)體現(xiàn)出來(lái)),不設(shè)置的話,默認(rèn)也是/user/ registry.setUserDestinationPrefix("/user"); } /** * 采用自定義攔截器,獲取connect時(shí)候傳遞的參數(shù) * * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(getHeaderParamInterceptor); } }
注:上面的endpointOyzc就是前端的url。后面注冊(cè)端點(diǎn),前臺(tái)鏈接。
然后注意下configureClientInboundChannel這個(gè)方法,這個(gè)方法里面注入攔截器就是為了鏈接時(shí)候接收參數(shù)的。
/** * @author : hao * @description : websocket建立鏈接的時(shí)候獲取headeri里認(rèn)證的參數(shù)攔截器。 * @time : 2019/7/3 20:42 */ @Component public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter { @Override public Message> preSend(Message> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); if (raw instanceof Map) { Object name = ((Map) raw).get("userid"); if (name instanceof LinkedList) { // 設(shè)置當(dāng)前訪問(wèn)的認(rèn)證用戶 accessor.setUser(new JqxxPrincipal(((LinkedList) name).get(0).toString())); } } } return message; } } /** * @author : hao * @description : 自定義的java.security.Principal * @time : 2019/7/3 20:42 */ public class JqxxPrincipal implements Principal { private String loginName; public JqxxPrincipal(String loginName) { this.loginName = loginName; } @Override public String getName() { return loginName; } }
這樣就存入的前臺(tái)傳的參數(shù)。
后臺(tái)發(fā)消息的時(shí)候怎么發(fā)呢?
/** * @author : hao * @description : websocket發(fā)送代理,負(fù)責(zé)發(fā)送消息 * @time : 2019/7/4 11:01 */ @Component @Slf4j public class WebsocketSendProxy{ @Autowired private SimpMessagingTemplate template; @Autowired private SimpUserRegistry userRegistry; @Resource(name = "redisServiceImpl") private RedisService redisService; @Value("spring.redis.message.topic-name") private String topicName; public void sendMsg(RedisWebsocketMsg redisWebsocketMsg) { SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver()); log.info("發(fā)送消息前獲取接收方為{},根據(jù)Registry獲取本節(jié)點(diǎn)上這個(gè)用戶{}", redisWebsocketMsg.getReceiver(), simpUser); if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) { //2. 獲取WebSocket客戶端的訂閱地址 WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if (channelEnum != null) { //3. 給WebSocket客戶端發(fā)送消息 template.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); } } else { //給其他訂閱了主題的節(jié)點(diǎn)發(fā)消息,因?yàn)楸竟?jié)點(diǎn)沒(méi)有 redisService.convertAndSend(topicName, redisWebsocketMsg); } } }
可以發(fā)現(xiàn)上面代碼利用了redis監(jiān)聽(tīng)模型,也就是redis模型的消息隊(duì)列
/** * @author : hao * @description : redis消息監(jiān)聽(tīng)實(shí)現(xiàn)類(lèi),接收處理類(lèi) * @time : 2019/7/3 14:00 */ @Component @Slf4j public class MessageReceiver { @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; /** * 處理WebSocket消息 */ public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) { log.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg)); //1. 取出用戶名并判斷是否連接到當(dāng)前應(yīng)用節(jié)點(diǎn)的WebSocket SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver()); if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) { //2. 獲取WebSocket客戶端的訂閱地址 WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if (channelEnum != null) { //3. 給WebSocket客戶端發(fā)送消息 messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); } } } }
redis消息模型只貼部分代碼就好了
/** * 消息監(jiān)聽(tīng)器 */ @Bean MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer
上面的思路大體如下:客戶端簡(jiǎn)歷鏈接時(shí)候,傳過(guò)來(lái)userid保存起來(lái)。發(fā)消息的時(shí)候 通過(guò)userRegistry獲取,能獲取到就證明是跟本節(jié)點(diǎn)建立的鏈接,直接用本節(jié)點(diǎn)發(fā)消息就好了。
如果不是就利用redis消息隊(duì)列,把消息推出去。每個(gè)節(jié)點(diǎn)去判斷獲取看下是不是本節(jié)點(diǎn)的userid。這樣就實(shí)現(xiàn)了集群的部署。
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“springboot websocket集群連接時(shí)候傳遞參數(shù)的示例分析”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!