真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

WebFlux定點推送以及全推送靈活websocket運用是什么

本篇文章為大家展示了WebFlux定點推送以及全推送靈活websocket運用是什么,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

成都創(chuàng)新互聯(lián)是專業(yè)的廣宗網(wǎng)站建設公司,廣宗接單;提供網(wǎng)站制作、網(wǎng)站設計,網(wǎng)頁設計,網(wǎng)站設計,建網(wǎng)站,PHP網(wǎng)站建設等專業(yè)做網(wǎng)站服務;采用PHP框架,可快速的進行廣宗網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!

前言

        WebFlux 本身提供了對 WebSocket 協(xié)議的支持,處理 WebSocket 請求需要對應的 handler 實現(xiàn) WebSocketHandler 接口,每一個 WebSocket 都有一個關聯(lián)的 WebSocketSession,包含了建立請求時的握手信息 HandshakeInfo,以及其它相關的信息。可以通過 session 的 receive() 方法來接收客戶端的數(shù)據(jù),通過 session 的 send() 方法向客戶端發(fā)送數(shù)據(jù)。

示例

下面是一個簡單的 WebSocketHandler 示例:

@Component
public class EchoHandler implements WebSocketHandler {
    public Mono handle(WebSocketSession session) {
        return session.send(
                session.receive().map(
                        msg -> session.textMessage("ECHO -> " + msg.getPayloadAsText())));
    }
}

        有了 handler 之后,還需要讓 WebFlux 知道哪些請求需要交給這個 handler 進行處理,因此要創(chuàng)建相應的 HandlerMapping。

        在處理 HTTP 請求時,我們經(jīng)常使用 WebFlux 中最簡單的 handler 定義方式,即通過注解 @RequestMapping 將某個方法定義為處理特定路徑請求的 handler。 但是這個注解是用于處理 HTTP 請求的,對于 WebSocket 請求而言,收到請求后還需要協(xié)議升級的過程,之后才是 handler 的執(zhí)行,所以我們不能直接通過該注解定義請求映射,不過可以使用 SimpleUrlHandlerMapping 來添加映射。

@Configuration
public class WebSocketConfiguration {
    @Bean
    public HandlerMapping webSocketMapping(EchoHandler echoHandler) {
        final Map map = new HashMap<>(1);
        map.put("/echo", echoHandler);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

這樣就能夠將發(fā)往 /echo 的 WebSocket 請求交給 EchoHandler 處理。

我們還要為 WebSocket 類型的 handler 創(chuàng)建對應的 WebSocketHandlerAdapter,以便讓 DispatcherHandler 能夠調(diào)用我們的 WebSocketHandler。

完成這三個步驟后,當一個 WebSocket 請求到達 WebFlux 時,首先由 DispatcherHandler 進行處理,它會根據(jù)已有的 HandlerMapping 找到這個 WebSocket 請求對應的 handler,接著發(fā)現(xiàn)該 handler 實現(xiàn)了 WebSocketHandler 接口,于是會通過 WebSocketHandlerAdapter 來完成該 handler 的調(diào)用。

疑惑

        從上面的例子不難看出,沒接收一個請求后,就得在里面里面返回消息,后面就不能再給他發(fā)消息了。其次是我每次新添加或者刪除一個消息的處理類Handler,就得每次去修改配置文件中的SimpleUrlHandlerMapping的UrlMap的內(nèi)容,感覺不是很友好。于是針對這2點進行修改和調(diào)整如下:

 1. 用自定義注解注冊 Handler

我們能否像注冊 HTTP 請求的 Handler 那樣,也通過類似 RequestMapping 的注解來注冊 Handler 呢?

雖然官方?jīng)]有相關實現(xiàn),但我們可以自己實現(xiàn)一個類似的注解,不妨叫作 WebSocketMapping

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface WebSocketMapping {
    String value() default "";
}

@Retention(RetentionPolicy.RUNTIME) 表明該注解工作在運行期間,@Target(ElementType.TYPE) 表明該注解作用在類上。

我們先看下該注解最終的使用方式。下面是一個 TimeHandler 的示例,它會每秒鐘會向客戶端發(fā)送一次時間。我們通過注解 @WebSocketMapping("/time") 完成了 TimeHandler 的注冊,告訴 WebFlux 當有 WebSocket 請求發(fā)往 /echo 路徑時,就交給 EchoHandler 處理:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {
    @Override
    public Mono handle(final WebSocketSession session) {
        return session.send(
                session.receive()
                        .map(msg -> session.textMessage(
                                "服務端返回:小明, -> " + msg.getPayloadAsText())));
    }
}

是不是和 RequestMapping 一樣方便?

到目前為止,這個注解還沒有實際的功能,還不能自動注冊 handler?;仡櫸覀兩厦孀月酚傻姆绞?,我們創(chuàng)建了一個 SimpleUrlHandlerMapping,并手動添加了 EchoHandler 的映射規(guī)則,然后將其作為 HandlerMapping 的 Bean 返回。

現(xiàn)在我們要創(chuàng)建一個專門的 HandlerMapping 類來處理 WebSocketMapping 注解,自動完成 handler 的注冊:

public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping{
	
	private Map handlerMap = new LinkedHashMap<>();
	/**
     * Register WebSocket handlers annotated by @WebSocketMapping
     * @throws BeansException
     */
    @Override
    public void initApplicationContext() throws BeansException {
        Map beanMap = obtainApplicationContext()
                .getBeansWithAnnotation(WebSocketMapping.class);
        beanMap.values().forEach(bean -> {
            if (!(bean instanceof WebSocketHandler)) {
                throw new RuntimeException(
                        String.format("Controller [%s] doesn't implement WebSocketHandler interface.",
                                bean.getClass().getName()));
            }
            WebSocketMapping annotation = AnnotationUtils.getAnnotation(
                    bean.getClass(), WebSocketMapping.class);
            //webSocketMapping 映射到管理中
            handlerMap.put(Objects.requireNonNull(annotation).value(),(WebSocketHandler) bean);
        });
        super.setOrder(Ordered.HIGHEST_PRECEDENCE);
        super.setUrlMap(handlerMap);
        super.initApplicationContext();
    }
}

我們的 WebSocketMappingHandlerMapping 類,實際上就是 SimpleUrlHandlerMapping,只不過增加了一些初始化的操作。

initApplicationContext() 方法是 Spring 中 ApplicationObjectSupport 類的方法,用于自定義類的初始化行為,在我們的 WebSocketMappingHandlerMapping 中,初始化工作主要是收集使用了 @WebSocketMapping 注解并且實現(xiàn)來 WebSocketHandler 接口的 Component,然后將它們注冊到內(nèi)部的 SimpleUrlHandlerMapping 中。之后的路由工作都是由父類 SimpleUrlHandlerMapping 已實現(xiàn)的功能來完成。

現(xiàn)在,我們只需要返回 WebSocketMappingHandlerMapping 的 Bean,就能自動處理 @WebSocketMapping 注解了:

@Configuration
public class WebSocketConfiguration {

	@Bean
	public HandlerMapping webSocketMapping() {
		return new WebSocketMappingHandlerMapping();
	}

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}

2. WebSocket 請求處理過程剖析

我們來看下基于 Reactor Netty 的 WebFlux 具體是如何處理 WebSocket 請求的。

前面說過,WebSocket 請求進入 WebFlux 后,首先會從 HandlerMapping 中找到對應的 WebSocketHandler,再由 WebSocketHandlerAdapter 進行實際的調(diào)用。這就不再多做闡述,有興趣的朋友可以去看看WebSocketHandler,WebSocketHandlerAdapter。

3. 分離數(shù)據(jù)的接收與發(fā)送操作

我們知道 HTTP 協(xié)議是半雙工通信,雖然客戶端和服務器都能給對方發(fā)數(shù)據(jù),但是同一時間內(nèi)只會由一方向另一方發(fā)送數(shù)據(jù),并且在順序上是客戶端先發(fā)送請求,然后才由服務器返回響應數(shù)據(jù)。所以服務器處理 HTTP 的邏輯很簡單,就是每接收到一個客戶端請求,就返回一個響應。

而 WebSocket 是全雙工通信,客戶端和服務器可以隨時向另一方發(fā)送數(shù)據(jù),所以不再是"發(fā)送請求、返回響應"的通信方式了。我們上面的 EchoHandler 示例用的仍舊是這一方式,即收到數(shù)據(jù)后再針對性地返回一條數(shù)據(jù),我們下面就來看看如何充分利用 WebSocket 的雙向通信。

WebSocket 的處理,主要是通過 session 完成對兩個數(shù)據(jù)流的操作,一個是客戶端發(fā)給服務器的數(shù)據(jù)流,一個是服務器發(fā)給客戶端的數(shù)據(jù)流:

WebSocketSession 方法描述
Flux receive()接收來自客戶端的數(shù)據(jù)流,當連接關閉時數(shù)據(jù)流結束。
Mono send(Publisher)向客戶端發(fā)送數(shù)據(jù)流,當數(shù)據(jù)流結束時,往客戶端的寫操作也會隨之結束,此時返回的 Mono 會發(fā)出一個完成信號。

在 WebSocketHandler 中,最后應該將兩個數(shù)據(jù)流的處理結果整合成一個信號流,并返回一個 Mono 用于表明處理是否結束。

我們分別為兩個流定義處理的邏輯:

  • 對于輸出流:服務器每秒向客戶端發(fā)送一個數(shù)字;

  • 對于輸入流:每當收到客戶端消息時,就打印到標準輸出

Mono input = session.receive()
                   .map(WebSocketMessage::getPayloadAsText)
                   .map(msg -> id + ": " + msg)
				   .doOnNext(System.out::println).then();

Mono output = session.send(Flux.create(sink -> 
                    senderMap.put(id, new WebSocketSender(session, sink))));

 這兩個處理邏輯互相獨立,它們之間沒有先后關系,操作執(zhí)行完之后都是返回一個 Mono,但是如何將這兩個操作的結果整合成一個信號流返回給 WebFlux 呢?我們可以使用 WebFlux 中的 Mono.zip() 方法:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {

	@Autowired
	private ConcurrentHashMap senderMap;

	@Override
	public Mono handle(WebSocketSession session) {

		Mono input = session.receive()
                .map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg)
				.doOnNext(System.out::println).then();

		Mono output = session.send(Flux.create(sink -> 
                senderMap.put(id, new WebSocketSender(session, sink))));
		/**
		 * Mono.zip() 會將多個 Mono 合并為一個新的 Mono,
         * 任何一個 Mono 產(chǎn)生 error 或 complete 都會導致合并后的 Mono
		 * 也隨之產(chǎn)生 error 或 complete,此時其它的 Mono 則會被執(zhí)行取消操作。
		 */
		return Mono.zip(input, output).then();
	}
}

4. 從 Handler 外部發(fā)送數(shù)據(jù)

這里所說的從外部發(fā)送數(shù)據(jù),指的是需要在 WebSocketHandler 的代碼范圍之外,在其它地方通過代碼調(diào)用的方式向 WebSocket 連接發(fā)送數(shù)據(jù)。

思路:在定義 session 的 send() 操作時,通過編程的方式創(chuàng)建 Flux,即使用 Flux.create() 方法創(chuàng)建,將發(fā)布 Flux 數(shù)據(jù)的 FluxSink 暴露出來,并進行保存,然后在需要發(fā)送數(shù)據(jù)的地方,調(diào)用 FluxSink 的 next(T data) 方法,向 Flux 的訂閱者發(fā)布數(shù)據(jù)。

create 方法是以編程方式創(chuàng)建 Flux 的高級形式,它允許每次產(chǎn)生多個數(shù)據(jù),并且可以由多個線程產(chǎn)生。

create 方法將內(nèi)部的 FluxSink 暴露出來,F(xiàn)luxSink 提供了 next、error、complete 方法。通過 create 方法,可以將響應式堆棧中的 API 與其它 API 進行連接。

考慮這么一個場景:服務器與客戶端 A 建立 WebSocket 連接后,允許客戶端 B 通過 HTTP 向客戶端 A 發(fā)送數(shù)據(jù)。

不考慮安全性、魯棒性等問題,我們給出一個簡單的示例。

首先是 WebSocketHandler 的實現(xiàn),客戶端發(fā)送 WebSocket 建立請求時,需要在 query 參數(shù)中為當前連接指定一個 id,服務器會以該 id 為鍵,以對應的 WebSocketSender 為值存放到 senderMap 中:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {

	@Autowired
	private ConcurrentHashMap senderMap;

	@Override
	public Mono handle(WebSocketSession session) {
		// TODO Auto-generated method stub
		HandshakeInfo handshakeInfo = session.getHandshakeInfo();
		Map queryMap = getQueryMap(handshakeInfo.getUri().getQuery());
		String id = queryMap.getOrDefault("id", "defaultId");
		Mono input = session.receive().map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg)
				.doOnNext(System.out::println).then();

		Mono output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink))));
		/**
		 * Mono.zip() 會將多個 Mono 合并為一個新的 Mono,任何一個 Mono 產(chǎn)生 error 或 complete 都會導致合并后的 Mono
		 * 也隨之產(chǎn)生 error 或 complete,此時其它的 Mono 則會被執(zhí)行取消操作。
		 */
		return Mono.zip(input, output).then();
	}

	//用于獲取url參數(shù)
	 private Map getQueryMap(String queryStr) {
        Map queryMap = new HashMap<>();
        if (!StringUtils.isEmpty(queryStr)) {
            String[] queryParam = queryStr.split("&");
            Arrays.stream(queryParam).forEach(s -> {
                String[] kv = s.split("=", 2);
                String value = kv.length == 2 ? kv[1] : "";
                queryMap.put(kv[0], value);
            });
        }
        return queryMap;
    }
}

其中,senderMap 是我們自己定義的 Bean,在配置文件中定義:

@Configuration
public class WebSocketConfiguration {

	@Bean
	public HandlerMapping webSocketMapping() {
		return new WebSocketMappingHandlerMapping();
	}

	@Bean
	public ConcurrentHashMap senderMap() {
		return new ConcurrentHashMap();
	}

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}

WebSocketSender 是我們自己創(chuàng)建的類,目的是保存 WebSocket 連接的 session 以及對應的 FluxSink,以便在 WebSocketHandler 代碼范圍外發(fā)送數(shù)據(jù):

public class WebSocketSender {
	private WebSocketSession session;
    private FluxSink sink;

    public WebSocketSender(WebSocketSession session, FluxSink sink) {
        this.session = session;
        this.sink = sink;
    }

    public void sendData(String data) {
        sink.next(session.textMessage(data));
    }
}

接著我們來實現(xiàn) HTTP Controller,用戶在發(fā)起 HTTP 請求時,通過 query 參數(shù)指定要通信的 WebSocket 連接 id,以及要發(fā)送的數(shù)據(jù),然后從 senderMap 中取出對應的 WebSocketSender,調(diào)用其 send() 方法向客戶端發(fā)送數(shù)據(jù):

@RestController
@RequestMapping("/msg")
public class MsgController {

	@Autowired
	private ConcurrentHashMap senderMap;

	@RequestMapping("/send")
	public String sendMessage(@RequestParam String id, @RequestParam String data) {
		WebSocketSender sender = senderMap.get(id);
		if (sender != null) {
			sender.sendData(data);
			return String.format("Message '%s' sent to connection: %s.", data, id);
		} else {
			return String.format("Connection of id '%s' doesn't exist", id);
		}
	}
}

5. 測試

我這就不再寫頁面了,直接就用https://www.websocket.org/echo.html進行測試了,結果如下:

WebFlux定點推送以及全推送靈活websocket運用是什么

這樣就算完成了定點推送了,全推送,和部分推送就不再寫了,只要從ConcurrentHashMap中取出來去發(fā)送就是了。

上述內(nèi)容就是WebFlux定點推送以及全推送靈活websocket運用是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


當前標題:WebFlux定點推送以及全推送靈活websocket運用是什么
網(wǎng)站URL:http://weahome.cn/article/pcejdg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部