本篇文章給大家分享的是有關(guān)如何用Guava Retrying,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、重慶小程序開發(fā)公司、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了新平免費(fèi)建站歡迎大家使用!
重試的使用場(chǎng)景 在很多業(yè)務(wù)場(chǎng)景中,為了排除系統(tǒng)中的各種不穩(wěn)定因素,以及邏輯上的錯(cuò)誤,并最大概率保證獲得預(yù)期的結(jié)果,重試機(jī)制都是必不可少的。 尤其是調(diào)用遠(yuǎn)程服務(wù),在高并發(fā)場(chǎng)景下,很可能因?yàn)?a title="服務(wù)器" target="_blank" >服務(wù)器響應(yīng)延遲或者網(wǎng)絡(luò)原因,造成我們得不到想要的結(jié)果,或者根本得不到響應(yīng)。這個(gè)時(shí)候,一個(gè)優(yōu)雅的重試調(diào)用機(jī)制,可以讓我們更大概率保證得到預(yù)期的響應(yīng)。 sequenceDiagram Client->>Server:{"msg":"hello,server"} Note right of Server: busying ...... Client->>Server:{"msg":"hello,server"}
Server-->>Client:{"Exception":"500"} Note right of Server: busying ...... loop ServerBlock Server -->> Server: Too busy to deal with so many requests... end Client->>Server:{"msg":"hello,server"} activate Server Server-->>Client:{"msg":"hello,client"} deactivate Server
通常情況下,我們會(huì)通過定時(shí)任務(wù)進(jìn)行重試。例如某次操作失敗,則記錄下來,當(dāng)定時(shí)任務(wù)再次啟動(dòng),則將數(shù)據(jù)放到定時(shí)任務(wù)的方法中,重新跑一遍。最終直至得到想要的結(jié)果為止。 無論是基于定時(shí)任務(wù)的重試機(jī)制,還是我們自己寫的簡(jiǎn)單的重試器,缺點(diǎn)都是重試的機(jī)制太單一,而且實(shí)現(xiàn)起來不優(yōu)雅。 如何優(yōu)雅地設(shè)計(jì)重試實(shí)現(xiàn) 一個(gè)完備的重試實(shí)現(xiàn),要很好地解決如下問題:
什么條件下重試 什么條件下停止 如何停止重試 停止重試等待多久 如何等待 請(qǐng)求時(shí)間限制 如何結(jié)束 如何監(jiān)聽整個(gè)重試過程
并且,為了更好地封裝性,重試的實(shí)現(xiàn)一般分為兩步:
使用工廠模式構(gòu)造重試器 執(zhí)行重試方法并得到結(jié)果
一個(gè)完整的重試流程可以簡(jiǎn)單示意為: graph LR A((Start)) -->|build| B(Retryer) B --> C{need call?} C -->|continue| D[call] D --> Z[call count++] Z --> C C -->|finished| E[result] E --> F((success)) E --> G((failed ))
guava-retrying基礎(chǔ)用法 guava-retrying是基于谷歌的核心類庫(kù)guava的重試機(jī)制實(shí)現(xiàn),可以說是一個(gè)重試?yán)鳌?下面就快速看一下它的用法。 1.Maven配置
需要注意的是,此版本依賴的是27.0.1版本的guava。如果你項(xiàng)目中的guava低幾個(gè)版本沒問題,但是低太多就不兼容了。這個(gè)時(shí)候你需要升級(jí)你項(xiàng)目的guava版本,或者直接去掉你自己的guava依賴,使用guava-retrying傳遞過來的guava依賴。 2.實(shí)現(xiàn)Callable Callable
Callable的call方法中是你自己實(shí)際的業(yè)務(wù)調(diào)用。
通過RetryerBuilder構(gòu)造Retryer
Retryer
使用重試器執(zhí)行你的業(yè)務(wù)
retryer.call(callable);
下面是完整的參考實(shí)現(xiàn)。 public Boolean test() throws Exception { //定義重試機(jī)制 Retryer
//等待策略:每次請(qǐng)求間隔1s .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) //停止策略 : 嘗試請(qǐng)求6次 .withStopStrategy(StopStrategies.stopAfterAttempt(6)) //時(shí)間限制 : 某次請(qǐng)求不得超過2s , 類似: TimeLimiter timeLimiter = new SimpleTimeLimiter(); .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS)) .build(); //定義請(qǐng)求實(shí)現(xiàn) Callablecallable = new Callable () { int times = 1; @Override public Boolean call() throws Exception { log.info("call times={}", times); times++; if (times == 2) { throw new NullPointerException(); } else if (times == 3) { throw new Exception(); } else if (times == 4) { throw new RuntimeException(); } else if (times == 5) { return false; } else { return true; } } }; //利用重試器調(diào)用請(qǐng)求
return retryer.call(callable); }
guava-retrying實(shí)現(xiàn)原理 guava-retrying的核心是Attempt類、Retryer類以及一些Strategy(策略)相關(guān)的類。
Attempt
Attempt既是一次重試請(qǐng)求(call),也是請(qǐng)求的結(jié)果,并記錄了當(dāng)前請(qǐng)求的次數(shù)、是否包含異常和請(qǐng)求的返回值。 /**
An attempt of a call, which resulted either in a result returned by the call,
or in a Throwable thrown by the call.
@param
@author JB */ public interface Attempt
Retryer
Retryer通過RetryerBuilder這個(gè)工廠類進(jìn)行構(gòu)造。RetryerBuilder負(fù)責(zé)將定義的重試策略賦值到Retryer對(duì)象中。 在Retryer執(zhí)行call方法的時(shí)候,會(huì)將這些重試策略一一使用。 下面就看一下Retryer的call方法的具體實(shí)現(xiàn)。 /** * Executes the given callable. If the rejection predicate * accepts the attempt, the stop strategy is used to decide if a new attempt * must be made. Then the wait strategy is used to decide how much time to sleep * and a new attempt is made. * * @param callable the callable task to be executed * @return the computed result of the given callable * @throws ExecutionException if the given callable throws an exception, and the * rejection predicate considers the attempt as successful. The original exception * is wrapped into an ExecutionException. * @throws RetryException if all the attempts failed before the stop strategy decided * to abort, or the thread was interrupted. Note that if the thread is interrupted, * this exception is thrown and the thread's interrupt status is set. */ public V call(Callable
//說明:遍歷自定義的監(jiān)聽器 for (RetryListener listener : listeners) { listener.onRetry(attempt); } //說明:判斷是否滿足重試條件,來決定是否繼續(xù)等待并進(jìn)行重試 if (!rejectionPredicate.apply(attempt)) { return attempt.get(); } //說明:此時(shí)滿足停止策略,因?yàn)檫€沒有得到想要的結(jié)果,因此拋出異常 if (stopStrategy.shouldStop(attempt)) { throw new RetryException(attemptNumber, attempt); } else { //說明:執(zhí)行默認(rèn)的停止策略——線程休眠 long sleepTime = waitStrategy.computeSleepTime(attempt); try { //說明:也可以執(zhí)行定義的停止策略 blockStrategy.block(sleepTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RetryException(attemptNumber, attempt); } } }
}
Retryer執(zhí)行過程如下。 graph TB
sq[Retryer] --> ci((call)) subgraph Retrying rb>RetryerBuilder]-- build retryer
with strategies --> ro di{Retryer:
using callable whith
strategies execute call...} -.-> ro(
.retryIf...
.withWaitStrategy
.withStopStrategy
.withAttemptTimeLimiter
.withBlockStrategy
.withRetryListene) di==>ro2(Attempt: get the result) end classDef green fill:#9f6,stroke:#333,stroke-width:2px; classDef orange fill:#f96,stroke:#333,stroke-width:4px; class sq,e green class di orange
guava-retrying高級(jí)用法 基于guava-retrying的實(shí)現(xiàn)原理,我們可以根據(jù)實(shí)際業(yè)務(wù)來確定自己的重試策略。 下面以數(shù)據(jù)同步這種常規(guī)系統(tǒng)業(yè)務(wù)為例,自定義重試策略。 如下實(shí)現(xiàn)基于Spring Boot 2.1.2.RELEASE版本。 并使用Lombok簡(jiǎn)化Bean。
業(yè)務(wù)描述 當(dāng)商品創(chuàng)建以后,需要另外設(shè)置商品的價(jià)格。由于兩個(gè)操作是有兩個(gè)人進(jìn)行的,因此會(huì)出現(xiàn)如下問題,即商品沒有創(chuàng)建,但是價(jià)格數(shù)據(jù)卻已經(jīng)建好了。遇到這種情況,價(jià)格數(shù)據(jù)需要等待商品正常創(chuàng)建以后,繼續(xù)完成同步。 我們通過一個(gè)http請(qǐng)求進(jìn)行商品的創(chuàng)建,同時(shí)通過一個(gè)定時(shí)器來修改商品的價(jià)格。 當(dāng)商品不存在,或者商品的數(shù)量小于1的時(shí)候,商品的價(jià)格不能設(shè)置。需要等商品成功創(chuàng)建且數(shù)量大于0的時(shí)候,才能將商品的價(jià)格設(shè)置成功。 實(shí)現(xiàn)過程
自定義重試阻塞策略
默認(rèn)的阻塞策略是線程休眠,這里使用自旋鎖實(shí)現(xiàn),不阻塞線程。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;
import com.github.rholder.retry.BlockStrategy; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j;
import java.time.Duration; import java.time.LocalDateTime;
/**
自旋鎖的實(shí)現(xiàn), 不響應(yīng)線程中斷 */ @Slf4j @NoArgsConstructor public class SpinBlockStrategy implements BlockStrategy {
@Override public void block(long sleepTime) throws InterruptedException {
LocalDateTime startTime = LocalDateTime.now(); long start = System.currentTimeMillis(); long end = start; log.info("[SpinBlockStrategy]...begin wait."); while (end - start <= sleepTime) { end = System.currentTimeMillis(); } //使用Java8新增的Duration計(jì)算時(shí)間間隔 Duration duration = Duration.between(startTime, LocalDateTime.now()); log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());
} }
自定義重試監(jiān)聽器
RetryListener可以監(jiān)控多次重試過程,并可以使用attempt做一些額外的事情。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;
import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryListener; import lombok.extern.slf4j.Slf4j;
@Slf4j public class RetryLogListener implements RetryListener {
@Override publicvoid onRetry(Attempt attempt) { // 第幾次重試,(注意:第一次重試其實(shí)是第一次調(diào)用) log.info("retry time : [{}]", attempt.getAttemptNumber()); // 距離第一次重試的延遲 log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt()); // 重試結(jié)果: 是異常終止, 還是正常返回 log.info("hasException={}", attempt.hasException()); log.info("hasResult={}", attempt.hasResult()); // 是什么原因?qū)е庐惓? if (attempt.hasException()) { log.info("causeBy={}" , attempt.getExceptionCause().toString()); } else { // 正常返回時(shí)的結(jié)果 log.info("result={}" , attempt.getResult()); } log.info("log listen over."); }
}
自定義Exception
有些異常需要重試,有些不需要。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;
/**
當(dāng)拋出這個(gè)異常的時(shí)候,表示需要重試 */ public class NeedRetryException extends Exception {
public NeedRetryException(String message) { super("NeedRetryException can retry."+message); }
}
實(shí)現(xiàn)具體重試業(yè)務(wù)與Callable接口
使用call方法調(diào)用自己的業(yè)務(wù)。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;
import lombok.AllArgsConstructor; import lombok.Data;
import java.math.BigDecimal;
/**
商品model */ @Data @AllArgsConstructor public class Product {
private Long id;
private String name;
private Integer count;
private BigDecimal price;
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product; import org.springframework.stereotype.Repository;
import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong;
/**
商品DAO */ @Repository public class ProductRepository {
private static ConcurrentHashMap
private static AtomicLong ids=new AtomicLong(0);
public List
public Product findById(Long id){ return products.get(id); }
public Product updatePrice(Long id, BigDecimal price){ Product p=products.get(id); if (null==p){ return p; } p.setPrice(price); return p; }
public Product addProduct(Product product){ Long id=ids.addAndGet(1); product.setId(id); products.put(id,product); return product; }
}
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import lombok.extern.slf4j.Slf4j; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.math.BigDecimal; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable;
/**
業(yè)務(wù)方法實(shí)現(xiàn) */ @Component @Slf4j public class ProductInformationHander implements Callable
@Autowired private ProductRepository pRepo;
private static Map
static { prices.put(1L, new BigDecimal(100)); prices.put(2L, new BigDecimal(200)); prices.put(3L, new BigDecimal(300)); prices.put(4L, new BigDecimal(400)); prices.put(8L, new BigDecimal(800)); prices.put(9L, new BigDecimal(900)); }
@Override public Boolean call() throws Exception {
log.info("sync price begin,prices size={}", prices.size()); for (Long id : prices.keySet()) { Product product = pRepo.findById(id); if (null == product) { throw new NeedRetryException("can not find product by id=" + id); } if (null == product.getCount() || product.getCount() < 1) { throw new NeedRetryException("product count is less than 1, id=" + id); } Product updatedP = pRepo.updatePrice(id, prices.get(id)); if (null == updatedP) { return false; } prices.remove(id); } log.info("sync price over,prices size={}", prices.size()); return true;
}
}
構(gòu)造重試器Retryer
將上面的實(shí)現(xiàn)作為參數(shù),構(gòu)造Retryer。 package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import com.github.rholder.retry.*; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy; import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
構(gòu)造重試器 */ @Component public class ProductRetryerBuilder {
public Retryer build() { //定義重試機(jī)制 Retryer
//retryIf 重試條件 //.retryIfException() //.retryIfRuntimeException() //.retryIfExceptionOfType(Exception.class) //.retryIfException(Predicates.equalTo(new Exception())) //.retryIfResult(Predicates.equalTo(false)) .retryIfExceptionOfType(NeedRetryException.class) //等待策略:每次請(qǐng)求間隔1s .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) //停止策略 : 嘗試請(qǐng)求3次 .withStopStrategy(StopStrategies.stopAfterAttempt(3)) //時(shí)間限制 : 某次請(qǐng)求不得超過2s , 類似: TimeLimiter timeLimiter = new SimpleTimeLimiter(); .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS)) //默認(rèn)的阻塞策略:線程睡眠 //.withBlockStrategy(BlockStrategies.threadSleepStrategy()) //自定義阻塞策略:自旋鎖 .withBlockStrategy(new SpinBlockStrategy()) //自定義重試監(jiān)聽器 .withRetryListener(new RetryLogListener()) .build(); return retryer;
} }
與定時(shí)任務(wù)結(jié)合執(zhí)行Retryer
定時(shí)任務(wù)只需要跑一次,但是實(shí)際上實(shí)現(xiàn)了所有的重試策略。這樣大大簡(jiǎn)化了定時(shí)器的設(shè)計(jì)。 首先使用@EnableScheduling聲明項(xiàng)目支持定時(shí)器注解。 @SpringBootApplication @EnableScheduling public class DemoRetryerApplication { public static void main(String[] args) { SpringApplication.run(DemoRetryerApplication.class, args); } }
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;
import com.github.rholder.retry.Retryer; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander; import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
/**
商品信息定時(shí)器 */ @Component public class ProductScheduledTasks {
@Autowired private ProductRetryerBuilder builder;
@Autowired private ProductInformationHander hander;
/**
同步商品價(jià)格定時(shí)任務(wù)
@Scheduled(fixedDelay = 30000) :上一次執(zhí)行完畢時(shí)間點(diǎn)之后30秒再執(zhí)行 _/ @Scheduled(fixedDelay = 30_1000) public void syncPrice() throws Exception{ Retryer retryer=builder.build(); retryer.call(hander); }
}
執(zhí)行結(jié)果:由于并沒有商品,因此重試以后,拋出異常。 2019-二月-28 14:37:52.667 INFO [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over. 2019-二月-28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task. com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts. at com.github.rholder.retry.Retryer.call(Retryer.java:174)
你也可以增加一些商品數(shù)據(jù),看一下重試成功的效果。 完整示例代碼在這里。 使用中遇到的問題 Guava版本沖突 由于項(xiàng)目中依賴的guava版本過低,啟動(dòng)項(xiàng)目時(shí)出現(xiàn)了如下異常。 java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService; at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41) at com.bzn.curator.ZkOperator.getZkClient(ZkOperator.java:207) at com.bzn.curator.ZkOperator.checkExists(ZkOperator.java:346) at com.bzn.curator.watcher.AbstractWatcher.initListen(AbstractWatcher.java:87) at com.bzn.web.listener.NebulaSystemInitListener.initZkWatcher(NebulaSystemInitListener.java:84) at com.bzn.web.listener.NebulaSystemInitListener.contextInitialized(NebulaSystemInitListener.java:33) at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4939) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5434) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
因此,要排除項(xiàng)目中低版本的guava依賴。
同時(shí),由于Guava在新版本中移除了sameThreadExecutor方法,但目前項(xiàng)目中的ZK需要此方法,因此需要手動(dòng)設(shè)置合適的guava版本。 果然,在19.0版本中MoreExecutors的此方法依然存在,只是標(biāo)注為過期了。 @Deprecated @GwtIncompatible("TODO") public static ListeningExecutorService sameThreadExecutor() { return new DirectExecutorService(); }
聲明依賴的guava版本改為19.0即可。
動(dòng)態(tài)調(diào)節(jié)重試策略 在實(shí)際使用過程中,有時(shí)經(jīng)常需要調(diào)整重試的次數(shù)、等待的時(shí)間等重試策略,因此,將重試策略的配置參數(shù)化保存,可以動(dòng)態(tài)調(diào)節(jié)。 例如在秒殺、雙十一購(gòu)物節(jié)等時(shí)期增加等待的時(shí)間與重試次數(shù),以保證錯(cuò)峰請(qǐng)求。在平時(shí),可以適當(dāng)減少等待時(shí)間和重試次數(shù)。 對(duì)于系統(tǒng)關(guān)鍵性業(yè)務(wù),如果多次重試步成功,可以通過RetryListener進(jìn)行監(jiān)控與報(bào)警。 關(guān)于 『動(dòng)態(tài)調(diào)節(jié)重試策略 』下面提供一個(gè)參考實(shí)現(xiàn)。 import com.github.rholder.retry.Attempt; import com.github.rholder.retry.WaitStrategy;
/**
自定義等待策略:根據(jù)重試次數(shù)動(dòng)態(tài)調(diào)節(jié)等待時(shí)間,第一次請(qǐng)求間隔1s,第二次間隔10s,第三次及以后都是20s。
在創(chuàng)建Retryer的時(shí)候通過withWaitStrategy將該等待策略生效即可。
RetryerBuilder.
.withWaitStrategy(new AlipayWaitStrategy())
類似的效果也可以通過自定義 BlockStrategy 來實(shí)現(xiàn),你可以寫一下試試。
*/ public class AlipayWaitStrategy implements WaitStrategy {
@Override public long computeSleepTime(Attempt failedAttempt) { long number = failedAttempt.getAttemptNumber(); if (number==1){ return 1*1000; } if (number==2){ return 10*1000; } return 20*1000; }
}
以上就是如何用Guava Retrying,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。