這篇文章給大家分享的是有關(guān)spring與disruptor集成的示例分析的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
成都創(chuàng)新互聯(lián)公司專注于企業(yè)成都營銷網(wǎng)站建設、網(wǎng)站重做改版、徐聞網(wǎng)站定制設計、自適應品牌網(wǎng)站建設、HTML5建站、商城網(wǎng)站制作、集團公司官網(wǎng)建設、外貿(mào)網(wǎng)站建設、高端網(wǎng)站制作、響應式網(wǎng)頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為徐聞等各大城市提供網(wǎng)站開發(fā)制作服務。
disruptor不過多介紹了,描述下當前的業(yè)務場景,兩個應用A,B,應用 A 向應用 B 傳遞數(shù)據(jù) . 數(shù)據(jù)傳送比較快,如果用http直接push數(shù)據(jù)然后入庫,效率不高.有可能導致A應用比較大的壓力. 使用mq 太重量級,所以選擇了disruptor. 也可以使用Reactor
BaseQueueHelper.java
/** * lmax.disruptor 高效隊列處理模板. 支持初始隊列,即在init()前進行發(fā)布。 * * 調(diào)用init()時才真正啟動線程開始處理 系統(tǒng)退出自動清理資源. * * @author xielongwang * @create 2018-01-18 下午3:49 * @email xielong.wang@nvr-china.com * @description */ public abstract class BaseQueueHelper, H extends WorkHandler > { /** * 記錄所有的隊列,系統(tǒng)退出時統(tǒng)一清理資源 */ private static List queueHelperList = new ArrayList (); /** * Disruptor 對象 */ private Disruptor disruptor; /** * RingBuffer */ private RingBuffer ringBuffer; /** * initQueue */ private List initQueue = new ArrayList (); /** * 隊列大小 * * @return 隊列長度,必須是2的冪 */ protected abstract int getQueueSize(); /** * 事件工廠 * * @return EventFactory */ protected abstract EventFactory eventFactory(); /** * 事件消費者 * * @return WorkHandler[] */ protected abstract WorkHandler[] getHandler(); /** * 初始化 */ public void init() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build(); disruptor = new Disruptor (eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy()); disruptor.setDefaultExceptionHandler(new MyHandlerException()); disruptor.handleEventsWithWorkerPool(getHandler()); ringBuffer = disruptor.start(); //初始化數(shù)據(jù)發(fā)布 for (D data : initQueue) { ringBuffer.publishEvent(new EventTranslatorOneArg () { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } //加入資源清理鉤子 synchronized (queueHelperList) { if (queueHelperList.isEmpty()) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (BaseQueueHelper baseQueueHelper : queueHelperList) { baseQueueHelper.shutdown(); } } }); } queueHelperList.add(this); } } /** * 如果要改變線程執(zhí)行優(yōu)先級,override此策略. YieldingWaitStrategy會提高響應并在閑時占用70%以上CPU, * 慎用SleepingWaitStrategy會降低響應更減少CPU占用,用于日志等場景. * * @return WaitStrategy */ protected abstract WaitStrategy getStrategy(); /** * 插入隊列消息,支持在對象init前插入隊列,則在隊列建立時立即發(fā)布到隊列處理. */ public synchronized void publishEvent(D data) { if (ringBuffer == null) { initQueue.add(data); return; } ringBuffer.publishEvent(new EventTranslatorOneArg () { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } /** * 關(guān)閉隊列 */ public void shutdown() { disruptor.shutdown(); } }
EventFactory.java
/** * @author xielongwang * @create 2018-01-18 下午6:24 * @email xielong.wang@nvr-china.com * @description */ public class EventFactory implements com.lmax.disruptor.EventFactory{ @Override public SeriesDataEvent newInstance() { return new SeriesDataEvent(); } }
MyHandlerException.java
public class MyHandlerException implements ExceptionHandler { private Logger logger = LoggerFactory.getLogger(MyHandlerException.class); /* * (non-Javadoc) 運行過程中發(fā)生時的異常 * * @see * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable * , long, java.lang.Object) */ @Override public void handleEventException(Throwable ex, long sequence, Object event) { ex.printStackTrace(); logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage()); } /* * (non-Javadoc) 啟動時的異常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. * Throwable) */ @Override public void handleOnStartException(Throwable ex) { logger.error("start disruptor error ==[{}]!", ex.getMessage()); } /* * (non-Javadoc) 關(guān)閉時的異常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang * .Throwable) */ @Override public void handleOnShutdownException(Throwable ex) { logger.error("shutdown disruptor error ==[{}]!", ex.getMessage()); } }
SeriesData.java (代表應用A發(fā)送給應用B的消息)
public class SeriesData { private String deviceInfoStr; public SeriesData() { } public SeriesData(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } public String getDeviceInfoStr() { return deviceInfoStr; } public void setDeviceInfoStr(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } @Override public String toString() { return "SeriesData{" + "deviceInfoStr='" + deviceInfoStr + '\'' + '}'; } }
SeriesDataEvent.java
public class SeriesDataEvent extends ValueWrapper{ }
SeriesDataEventHandler.java
public class SeriesDataEventHandler implements WorkHandler{ private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class); @Autowired private DeviceInfoService deviceInfoService; @Override public void onEvent(SeriesDataEvent event) { if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) { logger.warn("receiver series data is empty!"); } //業(yè)務處理 deviceInfoService.processData(event.getValue().getDeviceInfoStr()); } }
SeriesDataEventQueueHelper.java
@Component public class SeriesDataEventQueueHelper extends BaseQueueHelperimplements InitializingBean { private static final int QUEUE_SIZE = 1024; @Autowired private List seriesDataEventHandler; @Override protected int getQueueSize() { return QUEUE_SIZE; } @Override protected com.lmax.disruptor.EventFactory eventFactory() { return new EventFactory(); } @Override protected WorkHandler[] getHandler() { int size = seriesDataEventHandler.size(); SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]); return paramEventHandlers; } @Override protected WaitStrategy getStrategy() { return new BlockingWaitStrategy(); //return new YieldingWaitStrategy(); } @Override public void afterPropertiesSet() throws Exception { this.init(); } }
ValueWrapper.java
public abstract class ValueWrapper{ private T value; public ValueWrapper() {} public ValueWrapper(T value) { this.value = value; } public T getValue() { return value; } public void setValue(T value) { this.value = value; } }
DisruptorConfig.java
@Configuration @ComponentScan(value = {"com.portal.disruptor"}) //多實例幾個消費者 public class DisruptorConfig { /** * smsParamEventHandler1 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler1() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler2 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler2() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler3 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler3() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler4 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler4() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler5 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler5() { return new SeriesDataEventHandler(); } }
測試
//注入SeriesDataEventQueueHelper消息生產(chǎn)者 @Autowired private SeriesDataEventQueueHelper seriesDataEventQueueHelper; @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) public DataResponseVoreceiverDeviceData(@RequestBody String deviceData) { long startTime1 = System.currentTimeMillis(); if (StringUtils.isEmpty(deviceData)) { logger.info("receiver data is empty !"); return new DataResponseVo (400, "failed"); } seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData)); long startTime2 = System.currentTimeMillis(); logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1); return new DataResponseVo (200, "success"); }
應用A通過/data 接口把數(shù)據(jù)發(fā)送到應用B ,然后通過seriesDataEventQueueHelper 把消息發(fā)給disruptor隊列,消費者去消費,整個過程對不會堵塞應用A. 可接受消息丟失, 可以通過擴展SeriesDataEventQueueHelper來達到對disruptor隊列的監(jiān)控
感謝各位的閱讀!關(guān)于“spring與disruptor集成的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!