本篇內(nèi)容主要講解“如何實(shí)現(xiàn)基于Jedis+ZK的分布式序列號(hào)生成器”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“如何實(shí)現(xiàn)基于Jedis+ZK的分布式序列號(hào)生成器”吧!
創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶(hù)于互聯(lián)網(wǎng)時(shí)代的大竹網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
部分源碼參考Jedis實(shí)現(xiàn)分布式鎖博客:
package com.xxx.arch.seq.utlis; import com.xxx.arch.seq.client.redis.RedisSEQ; import lombok.extern.slf4j.Slf4j; /** * arch-seq 唯一code 獲取客戶(hù)端 * * @author jdkleo */ @Slf4j public class SEQUtil { /** * 生成默認(rèn)KEY的UUID規(guī)則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位 * * @param * @return */ public static long getSEQ() { return RedisSEQ.getSEQ(); } /** * 生成默認(rèn)KEY連續(xù)的UUID,共total個(gè) * * @param total - 連續(xù)多少個(gè) * @return */ public static long[] getSEQ(long total) { long value = RedisSEQ.getSEQ(total); return getValueArray(value, (int) total); } /** * 生成指定KEY的UUID規(guī)則: 日期yyMMdd 6位 + 分布式seqID 10位,總共6 + 10 = 16位 * * @param seqName * @return */ public static long getSEQ(String seqName) { return RedisSEQ.getSEQ(seqName, 1); } /** * 生成指定KEY連續(xù)的UUID,共total個(gè) * * @param seqName * @param total * @return */ public static long[] getSEQ(String seqName, long total) { long value = RedisSEQ.getSEQ(seqName, total); return getValueArray(value, (int) total); } private static long[] getValueArray(long value, int total) { int n = total; long[] ret = new long[n]; do { ret[n - 1] = value--; } while (--n > 0); return ret; } }
package com.xxx.arch.seq.client.redis; import com.xxx.arch.seq.client.tool.StreamCloseAble; import lombok.extern.slf4j.Slf4j; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; /** * Redis版本SEQ(有序SEQ) * * @author zhangyang * @createDate 2019-01-22 * @since 2.x */ @Slf4j public class RedisSEQ extends StreamCloseAble { //默認(rèn)的REDIS SEQ初始化狀態(tài)器KEY private static final String _DEFAULT_SEQ_INIT_KEY = "ARCH_SEQ_REDIS_SEQ_INIT"; //默認(rèn)的REDIS SEQ初始化狀態(tài)器VAL private static final String _DEFAULT_SEQ_INIT_PENDING = "pending"; private static final String _DEFAULT_SEQ_INIT_READY = "ready"; //SEQ初始化容器狀態(tài) private static volatile boolean _DEFAULT_SEQ_INIT_STATUS; //默認(rèn)REDIS SEQ序列號(hào)的名稱(chēng) private static final String _DEFAULT_SEQ_NAME = "ARCH_SEQ_REDIS_SEQ"; //本地模式自增ID槽 private final static AtomicInteger _LOCAL_INCR = new AtomicInteger(0); static { JedisConfig.JedisConn jedisConn = null; try { jedisConn = JedisConfig.getInstance().getConn(); //if REDIS宕機(jī)或第一次:創(chuàng)建初始化狀態(tài)成功后,初始化redis keys(該方法可以恢復(fù)上次redis宕機(jī)數(shù)據(jù)) if (jedisConn.setnx(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_PENDING) == 1) {//搶到REDIS初始化鎖,并將其標(biāo)記為pending狀態(tài) try { RedisSEQTimer.getInstance().removeNotUsedKeys(); RedisSEQTimer.getInstance().initRedisKeys();//初始化REDIS,從ZK上讀取初始數(shù)據(jù) jedisConn.set(_DEFAULT_SEQ_INIT_KEY, _DEFAULT_SEQ_INIT_READY);//初始化完成,標(biāo)記為ready狀態(tài) } catch (Exception e) { log.error(e.getMessage(), e); //初始化arch.seq REDIS數(shù)據(jù)異常,有可能是ZK相關(guān)問(wèn)題,也有可能是REDIS問(wèn)題,請(qǐng)排查 log.error("Initialization of arch.seq REDIS data exceptions, may be ZK-related problems, may also be REDIS problems, please check redis key:{}", _DEFAULT_SEQ_INIT_KEY); jedisConn.del(_DEFAULT_SEQ_INIT_KEY); } } //else{...} 沒(méi)搶到REDIS初始化鎖的話:不作任何處理 } catch (Exception e) { log.error(e.getMessage(), e); log.error("Initialization of arch.seq REDIS data exceptions, may be arch.seq's configuration is not ready"); } finally { close(jedisConn); } } public static Long getSEQ() { return getSEQ(_DEFAULT_SEQ_NAME, 1); } public static Long getSEQ(long total) { return getSEQ(_DEFAULT_SEQ_NAME, total); } public static Long getSEQ(String seqName, long total) { Long result = null; JedisConfig.JedisConn jedisConn = null; try { //獲取redis連接 jedisConn = JedisConfig.getInstance().getConn(); //獲得REDIS初始化狀態(tài)不成功 if (!tryInitReady(jedisConn)) { //arch.seq By REDIS版本不能正常初始化,請(qǐng)檢查REDIS服務(wù)。 throw new RuntimeException("arch.seq By REDIS version cannot be initialized properly. Please check the REDIS service."); } //開(kāi)啟分布式鎖 //if (jedisConn.tryLock(seqName, 1000, 2000)) { try { String day = RedisSEQTimer.getInstance().getDayFormat(); String incrVal = String.format("%010d", getIncrVal(jedisConn, day, seqName, total)); result = Long.parseLong(day + incrVal); } catch (Exception e) { e.printStackTrace(); log.warn("try lock failed,the arch.seq tool will be retry after sleep some times."); Thread.sleep(randTime()); result = getSEQ(seqName, total); } } catch (Throwable e) { log.error(e.getMessage(), e); //redis生成失敗,返回本地ID:15位納秒+1位自然數(shù)輪詢(xún) //在獲取【自增序列號(hào):{},序列號(hào)分布式鎖:{}】時(shí)發(fā)生了異常,系統(tǒng)返回了本地生成的自增序列號(hào),不影響系統(tǒng)使用,但請(qǐng)管理員盡快協(xié)查! log.error("An exception occurred while acquiring self-incremental sequence number '{}', " + "sequence number distributed lock '{}',The system returns the locally generated self-incremental " + "sequence number, which does not affect the use of the system, but the administrator should check " + "it as soon as possible.", seqName, seqName + "_LOCK"); result = xUUID(); } finally { //切記,一定要釋放分布式鎖(注:釋放鎖的同時(shí)jedisConn會(huì)自動(dòng)釋放connection,無(wú)需再次CLOSE) if (jedisConn != null) { //jedisConn.unLock(seqName); jedisConn.close(); } if (log.isDebugEnabled()) { log.debug(seqName + ":" + result + ", trace:\n" + getStackTrace()); } } return result; //arch.seq發(fā)生了不可預(yù)測(cè)的異常,請(qǐng)聯(lián)系架構(gòu)部處理! //throw new RuntimeException("arch.seq發(fā)生了不可預(yù)測(cè)的異常,請(qǐng)聯(lián)系架構(gòu)部處理!"); } private static String getStackTrace() { StringBuilder result = new StringBuilder(); StackTraceElement[] element = Thread.currentThread().getStackTrace(); for (int i = 0; i < element.length; i++) { result.append("\t").append(element[i]).append("\n"); } return result.toString(); } private static long randTime() { return new Random().nextInt(50) + 50; } private static boolean tryInitReady(JedisConfig.JedisConn jedisConn) throws InterruptedException { int times = 0; for (; times < 3; times++) { if (getSEQInitReady(jedisConn)) { break; } Thread.sleep(100); } return times < 3; } /** * 獲得SEQ初始化狀態(tài) * * @param jedisConn * @return */ private static boolean getSEQInitReady(JedisConfig.JedisConn jedisConn) { if (!_DEFAULT_SEQ_INIT_STATUS) { synchronized (RedisSEQ.class) { if (!_DEFAULT_SEQ_INIT_STATUS) { _DEFAULT_SEQ_INIT_STATUS = _DEFAULT_SEQ_INIT_READY.equals(jedisConn.get(_DEFAULT_SEQ_INIT_KEY)); } } } return _DEFAULT_SEQ_INIT_STATUS; } /** * 獲得REDIS自增序列號(hào)最新值,并同步更新到ZK備份數(shù)據(jù)節(jié)點(diǎn)守護(hù)線程中 * * @param jedisConn * @param day * @param seqName * @param total * @return */ private static Long getIncrVal(JedisConfig.JedisConn jedisConn, String day, String seqName, long total) { String key = seqName + "_" + day; Long incrVal = total > 1 ? jedisConn.incr(key, total) : jedisConn.incr(key); if (incrVal > 9999999999L) { throw new RuntimeException("Exceed the maximum value,sequence:" + incrVal); } //塞到要更新的ZK隊(duì)列中 RedisSEQTimer.getInstance().push(key, incrVal); return incrVal; } /** * 單機(jī)模式生成UUID * * @return */ private static Long xUUID() { int rand = _LOCAL_INCR.incrementAndGet() % 10; String result = System.nanoTime() + "" + rand; return Long.parseLong(result); } }
package com.xxx.arch.seq.client.redis; import com.xxx.arch.seq.client.tool.StreamCloseAble; import com.xxx.arch.seq.client.tool.ZkClient; import com.xxx.arch.seq.client.zk.ZkClientUtil; import org.apache.commons.lang3.time.DateUtils; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class RedisSEQTimer extends StreamCloseAble { public static final String DAY_FORMAT_PATTERN = "yyMMdd"; public static volatile RedisSEQTimer redisSEQTimer; private final ConcurrentHashMapREDIS_INCR_MAP = new ConcurrentHashMap<>(); private final ZkClient _ZK_CLIENT = ZkClientUtil.getZkClient(); private final String _DEFAULT_ZK_NAMESPACE = "/ARCH_SEQ_REDIS"; //zk節(jié)點(diǎn)最大值每次遞增數(shù) private long _REDIS_MAXVALUE_INIT = 10_000L; private Timer _TIMER = new Timer(true); //是否處于清理狀態(tài) private volatile boolean _CLEAN_STATUS; //清理key private static final String _REMOVE_KEY = "ARCH_SEQ_REMOVE_KEY"; private RedisSEQTimer() { super(); //啟動(dòng)zk巡查服務(wù) _TIMER.schedule(new TimerTask() { @Override public void run() { checkAndConfigure(); } }, new Date(), 1 * 60 * 1000); //每天定時(shí)清理垃圾數(shù)據(jù) _TIMER.schedule(new TimerTask() { @Override public void run() { removeNotUsedKeys(); } }, getFirstTime(), 24 * 60 * 60 * 1000); } public static RedisSEQTimer getInstance() { if (redisSEQTimer == null) { synchronized (RedisSEQTimer.class) { if (redisSEQTimer == null) { redisSEQTimer = new RedisSEQTimer(); } } } return redisSEQTimer; } /** * 定期更新ZK節(jié)點(diǎn) */ private synchronized void checkAndConfigure() { if (_CLEAN_STATUS) { return; } if (REDIS_INCR_MAP.isEmpty()) { return; } String endDay = "_" + getDayFormat(); List notTodayKeys = new ArrayList<>(); Set > entrySet = REDIS_INCR_MAP.entrySet(); for (Map.Entry entry : entrySet) { //不是今天的key不作處理 if (!entry.getKey().endsWith(endDay)) { notTodayKeys.add(entry.getKey()); return; } //將最新的值寫(xiě)到zk節(jié)點(diǎn)上 節(jié)點(diǎn)格式: /KEY_yyMMdd String zkNode = _DEFAULT_ZK_NAMESPACE + "/" + entry.getKey(); if (_ZK_CLIENT.exists(zkNode)) { _ZK_CLIENT.writeData(zkNode, entry.getValue()); } else { try { _ZK_CLIENT.createPersistent(zkNode, entry.getValue()); } catch (RuntimeException e) { //not to write log ,it's will be retry in next time. } } } ; if (!notTodayKeys.isEmpty()) { for (String key : notTodayKeys) { REDIS_INCR_MAP.remove(key); } } } /** * 刪除不再使用的KEY(包含redis和zk節(jié)點(diǎn)) */ public synchronized void removeNotUsedKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } _CLEAN_STATUS = true; JedisConfig.JedisConn jedisConn = null; String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); //保留兩天??紤]到多個(gè)機(jī)器的時(shí)間可能不一致,如果在剛過(guò)零點(diǎn)刪除了昨天的sequence,另一臺(tái)機(jī)器可能還需要使用它,則會(huì)出現(xiàn)id重復(fù) Date now = new Date(); Date yesterday = DateUtils.addDays(now, -1); List keepDays = Arrays.asList(getDayFormat(now), getDayFormat(yesterday)); if (list != null && !list.isEmpty()) { jedisConn = JedisConfig.getInstance().getConn(); if (tryLock = jedisConn.tryLock(_REMOVE_KEY, requestId, 2000)) { JedisConfig.JedisConn finalJedisConn = jedisConn; for (String node : list) { String dayPart = node.substring(node.length() - DAY_FORMAT_PATTERN.length()); if (!keepDays.contains(dayPart)) { REDIS_INCR_MAP.remove(node); finalJedisConn.del(node); removeZkNode(node); } } } } } finally { _CLEAN_STATUS = false; if (jedisConn != null) { if (tryLock) { jedisConn.unLock(_REMOVE_KEY, requestId); } jedisConn.close(); } } } /** * 移除ZK節(jié)點(diǎn) * * @param node */ private void removeZkNode(String node) { String path = _DEFAULT_ZK_NAMESPACE + "/" + node; if (_ZK_CLIENT.exists(path)) { try { _ZK_CLIENT.delete(path); } catch (Exception e) { } } } /** * 獲得每天定時(shí)任務(wù)的執(zhí)行時(shí)間 * * @return */ private Date getFirstTime() { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.HOUR_OF_DAY, 24); // 24點(diǎn) 可以更改時(shí)間 calendar.set(Calendar.MINUTE, getRandNum(6, 0)); // 0-5分鐘 隨機(jī) calendar.set(Calendar.SECOND, getRandNum(60, 0));// 0-59秒 隨機(jī) return calendar.getTime(); } /** * 獲得區(qū)間隨機(jī)整數(shù) * * @param exclude - 最大數(shù),exclude * @param from - 最小數(shù),include * @return */ private int getRandNum(int exclude, int from) { return new Random().nextInt(exclude) + from; } /** * 將某天的KEY塞到相應(yīng)隊(duì)列 * * @param key - 業(yè)務(wù)KEY key_yyMMdd * @param val - 值 * @return 是否成功 */ public synchronized void push(String key, Long val) { REDIS_INCR_MAP.put(key, val); } public String getDayFormat() { return getDayFormat(new Date()); } public String getDayFormat(Date date) { return new SimpleDateFormat(DAY_FORMAT_PATTERN).format(date); } /** * 初始化redis keys */ public void initRedisKeys() { if (!_ZK_CLIENT.exists(_DEFAULT_ZK_NAMESPACE)) { return; } List list = _ZK_CLIENT.getChildren(_DEFAULT_ZK_NAMESPACE); if (list != null && !list.isEmpty()) { Long zkVal; JedisConfig.JedisConn jedisConn = null; for (int i = 0; i < list.size(); i++) { zkVal = _ZK_CLIENT.readData(_DEFAULT_ZK_NAMESPACE + "/" + list.get(i)); if (zkVal != null) { String requestId = UUID.randomUUID().toString(); boolean tryLock = false; try { jedisConn = JedisConfig.getInstance().getConn(); //獲得鎖才更新,沒(méi)獲得鎖就放棄更新 if (tryLock = jedisConn.tryLock(list.get(i), requestId, 2000)) { jedisConn.set(list.get(i), String.valueOf(zkVal + _REDIS_MAXVALUE_INIT)); } } finally { if (jedisConn != null) { if (tryLock) { jedisConn.unLock(list.get(i), requestId); } jedisConn.close(); } } } } } } }
package com.xxx.arch.seq.client.tool; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.Collections; import java.util.List; @Slf4j public class ZkClient { private CuratorFramework client; public ZkClient(String serverList, int connectionTimeoutMs, int sessionTimeout) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(serverList) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeout) .retryPolicy(retryPolicy) .build(); client.start(); } public boolean exists(String path) { try { return client.checkExists().forPath(path) != null; } catch (Exception e) { return false; } } public void writeData(String path, Long value) { try { client.setData().forPath(path, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public void createPersistent(String zkNode, Long value) { try { client.create().forPath(zkNode, value.toString().getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); } } public ListgetChildren(String path) { try { return client.getChildren().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } return Collections.emptyList(); } public Long readData(String path) { try { byte[] data = client.getData().forPath(path); return Long.parseLong(new String(data)); } catch (Exception e) { log.error(e.getMessage(), e); } return null; } public void delete(String path) { try { client.delete().forPath(path); } catch (Exception e) { log.error(e.getMessage(), e); } } }
package com.xxx.arch.seq.client.zk; import com.xxx.arch.seq.client.tool.ZkClient; import com.xxx.arch.seq.constant.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZkClientUtil { private static final Logger logger = LoggerFactory.getLogger(ZkClientUtil.class); private static volatile ZkClient zkClient = null; public static ZkClient getZkClient() { if (zkClient == null) { synchronized (ZkClientUtil.class) { if (zkClient == null) { initZkClient(); } } } return zkClient; } private static void initZkClient() { try { String serverList = Constants.ARCH_SEQ_ZOOKEEPER_CONNECT_STRING; if (logger.isInfoEnabled()) { logger.info("zk cluster[" + serverList + "]"); } if (serverList == null || serverList.trim().isEmpty()) { throw new RuntimeException("no \"arch.seq.zk-cluster.serverList\" config.used"); } else { zkClient = new ZkClient(serverList, 15000, 60000); } } catch (Exception e) { logger.error(e.getMessage(), e); } } }
package com.xxx.arch.seq.client.tool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; /** * Created by zhangyang on 2016/5/31. */ public class StreamCloseAble { private static Logger logger = LoggerFactory.getLogger(StreamCloseAble.class); /** * 關(guān)閉輸入輸出流 * * @param closeAbles */ public static void close(Closeable... closeAbles) { if (closeAbles == null || closeAbles.length <= 0) { return; } for (Closeable closeAble : closeAbles) { if (closeAble != null) { try { closeAble.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } } }
到此,相信大家對(duì)“如何實(shí)現(xiàn)基于Jedis+ZK的分布式序列號(hào)生成器”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!