真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

響應(yīng)式編程簡介之如何掌握Reactor

這篇文章主要介紹“響應(yīng)式編程簡介之如何掌握Reactor”,在日常操作中,相信很多人在響應(yīng)式編程簡介之如何掌握Reactor問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”響應(yīng)式編程簡介之如何掌握Reactor”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

成都創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站制作、成都網(wǎng)站設(shè)計、新平網(wǎng)絡(luò)推廣、微信小程序開發(fā)、新平網(wǎng)絡(luò)營銷、新平企業(yè)策劃、新平品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)為所有大學生創(chuàng)業(yè)者提供新平建站搭建服務(wù),24小時服務(wù)熱線:13518219792,官方網(wǎng)址:www.cdcxhl.com

Reactor簡介

Reactor是基于JVM的非阻塞API,他直接跟JDK8中的API相結(jié)合,比如:CompletableFuture,Stream和Duration等。

它提供了兩個非常有用的異步序列API:Flux和Mono,并且實現(xiàn)了Reactive Streams的標準。

并且還可以和reactor-netty相結(jié)合,作為一些異步框架的底層服務(wù),比如我們非常熟悉的Spring MVC 5中引入的WebFlux。

我們知道WebFlux的底層使用的是reactor-netty,而reactor-netty又引用了Reactor。所以,如果你在POM中引入了webFlux依賴:


    org.springframework.boot
    spring-boot-starter-webflux

那么項目將會自動引入Reactor。

如果你用的不是Spring webflux,沒關(guān)系,你可以直接添加下面的依賴來使用Reactor:


    io.projectreactor
    reactor-core

reactive programming的發(fā)展史

最最開始的時候微軟為.NET平臺創(chuàng)建了Reactive Extensions (Rx) library。接著RxJava實現(xiàn)了JVM平臺的Reactive。

然后Reactive Streams標準出現(xiàn)了,它定義了Java平臺必須滿足的的一些規(guī)范。并且已經(jīng)集成到JDK9中的java.util.concurrent類中。

在Flow中定義了實現(xiàn)Reactive Streams的四個非常重要的組件,分別是Publisher,Subscriber,Subscription和Processor。

Iterable-Iterator 和Publisher-Subscriber的區(qū)別

一般來說reactive在面向?qū)ο蟮木幊陶Z言中是以觀察者模式的擴展來使用的。

我們來具體看一下這個觀察者模式的實現(xiàn),以Publisher和Subscriber為例:

   public static interface Publisher {
        public void subscribe(Subscriber subscriber);
    }
    public static interface Subscriber {

        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();
    }

上面定義了兩個接口,Publisher和Subscriber,Publisher的作用就是subscribe到subscriber。

而subscriber定義了4個on方法,用來觸發(fā)特定的事件。

那么Publisher中的subscribe是怎么觸發(fā)Subscriber的onSubscribe事件呢?

很簡單,我們看一個具體的實現(xiàn):

    public void subscribe(Flow.Subscriber subscriber) {
        Subscription sub;
        if (throwable != null) {
            assert iterable == null : "non-null iterable: " + iterable;
            sub = new Subscription(subscriber, null, throwable);
        } else {
            assert throwable == null : "non-null exception: " + throwable;
            sub = new Subscription(subscriber, iterable.iterator(), null);
        }
        subscriber.onSubscribe(sub);

        if (throwable != null) {
            sub.pullScheduler.runOrSchedule();
        }
    }

上面的例子是PullPublisher的subscribe實現(xiàn)。我們可以看到,在這個subscribe中觸發(fā)了subscriber.onSubscribe方法。而這就是觀察者模式的秘密。

或者說,當Publisher調(diào)用subscribe的時候,是主動push subscriber的onSubscribe方法。

熟悉Iterable-Iterator模式的朋友應(yīng)該都知道,Iterator模式,其實是一個主動的pull模式,因為需要不斷的去調(diào)用next()方法。所以它的控制權(quán)是在調(diào)用方。

為什么要使用異步reactive

在現(xiàn)代應(yīng)用程序中,隨著用戶量的增多,程序員需要考慮怎么才能提升系統(tǒng)的處理能力。

傳統(tǒng)的block IO的方式,因為需要占用大量的資源,所以是不適合這樣的場景的。我們需要的是NO-block IO。

JDK中提供了兩種異步編程的模型:

第一種是Callbacks,異步方法可以通過傳入一個Callback參數(shù)的形式來在Callback中執(zhí)行異步任務(wù)。比較典型的像是java Swing中的EventListener。

第二中就是使用Future了。我們使用Callable來提交一個任務(wù),然后通過Future來拿到它的運行結(jié)果。

這兩種異步編程會有什么問題呢?

callback的問題就在于回調(diào)地獄。熟悉JS的朋友應(yīng)該很理解這個回調(diào)地獄的概念。

簡單點講,回調(diào)地獄就是在callback中又使用了callback,從而造成了這種callback的層級調(diào)用關(guān)系。

而Future主要是對一個異步執(zhí)行的結(jié)果進行獲取,它的 get()實際上是一個block操作。并且不支持異常處理,也不支持延遲計算。

當有多個Future的組合應(yīng)該怎么處理呢?JDK8 實際上引入了一個CompletableFuture類,這個類是Future也是一個CompletionStage,CompletableFuture支持then的級聯(lián)操作。不過CompletableFuture提供的方法不是那么的豐富,可能滿足不了我的需求。

于是我們的Reactor來了。

Flux

Reactor提供了兩個非常有用的操作,他們是 Flux 和 Mono。 其中Flux 代表的是 0 to N 個響應(yīng)式序列,而Mono代表的是0或者1個響應(yīng)式序列。

我們看一個Flux是怎么transfer items的:

響應(yīng)式編程簡介之如何掌握Reactor

先看下Flux的定義:

public abstract class Flux implements Publisher

可以看到Flux其實就是一個Publisher,用來產(chǎn)生異步序列。

Flux提供了非常多的有用的方法,來處理這些序列,并且提供了completion和error的信號通知。

相應(yīng)的會去調(diào)用Subscriber的onNext, onComplete, 和 onError 方法。

Mono

我們看下Mono是怎么transfer items的:

響應(yīng)式編程簡介之如何掌握Reactor

看下Mono的定義:

public abstract class Mono implements Publisher

Mono和Flux一樣,也是一個Publisher,用來產(chǎn)生異步序列。

Mono因為只有0或者1個序列,所以只會觸發(fā)Subscriber的onComplete和onError方法,沒有onNext。

另一方面,Mono其實可以看做Flux的子集,只包含F(xiàn)lux的部分功能。

Mono和Flux是可以互相轉(zhuǎn)換的,比如Mono#concatWith(Publisher)返回一個Flux,而 Mono#then(Mono)返回一個Mono.

Flux和Mono的基本操作

我們看下Flux創(chuàng)建的例子:

Flux seq1 = Flux.just("foo", "bar", "foobar");
List iterable = Arrays.asList("foo", "bar", "foobar");
Flux seq2 = Flux.fromIterable(iterable);
Flux numbersFromFiveToSeven = Flux.range(5, 3);

可以看到Flux提供了很多種創(chuàng)建的方式,我們可以自由選擇。

再看看Flux的subscribe方法:

Disposable subscribe(); 

Disposable subscribe(Consumer consumer); 

Disposable subscribe(Consumer consumer,
          Consumer errorConsumer); 

Disposable subscribe(Consumer consumer,
          Consumer errorConsumer,
          Runnable completeConsumer); 

Disposable subscribe(Consumer consumer,
          Consumer errorConsumer,
          Runnable completeConsumer,
          Consumer subscriptionConsumer);

subscribe可以一個參數(shù)都沒有,也可以多達4個參數(shù)。

看下沒有參數(shù)的情況:

Flux numbersFromFiveToSeven = Flux.range(5, 3);

numbersFromFiveToSeven.subscribe();

注意,沒有參數(shù)并不表示Flux的對象不被消費,只是不可見而已。

看下帶參數(shù)的情況:consumer用來處理on each事件,errorConsumer用來處理on error事件,completeConsumer用來處理on complete事件,subscriptionConsumer用來處理on subscribe事件。

前面的3個參數(shù)很好理解,我們來舉個例子:

Flux ints3 = Flux.range(1, 4);
        ints3.subscribe(System.out::println,
                error -> System.err.println("Error " + error),
                () -> System.out.println("Done"),
                sub -> sub.request(2));

我們構(gòu)建了從1到4的四個整數(shù)的Flux,on each就是打印出來,如果中間有錯誤的話,就輸出Error,全部完成就輸出Done。

那么最后一個subscriptionConsumer是做什么用的呢?

subscriptionConsumer accept的是一個Subscription對象,我們看下Subscription的定義:

public interface Subscription {

    public void request(long n);
    public void cancel();
}

Subscription 定義了兩個方法,用來做初始化用的,我們可以調(diào)用request(n)來決定這次subscribe獲取元素的最大數(shù)目。

比如上面我們的例子中,雖然構(gòu)建了4個整數(shù),但是最終輸出的只有2個。

上面所有的subscribe方法,都會返回一個Disposable對象,我們可以通過Disposable對象的dispose()方法,來取消這個subscribe。

Disposable只定義了兩個方法:

public interface Disposable {

	void dispose();

	default boolean isDisposed() {
		return false;
	}

dispose的原理是向Flux 或者 Mono發(fā)出一個停止產(chǎn)生新對象的信號,但是并不能保證對象產(chǎn)生馬上停止。

有了Disposable,當然要介紹它的工具類Disposables。

Disposables.swap() 可以創(chuàng)建一個Disposable,用來替換或者取消一個現(xiàn)有的Disposable。

Disposables.composite(…)可以將多個Disposable合并起來,在后面統(tǒng)一做處理。

到此,關(guān)于“響應(yīng)式編程簡介之如何掌握Reactor”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
名稱欄目:響應(yīng)式編程簡介之如何掌握Reactor
當前URL:http://weahome.cn/article/ghgjdd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部