本篇內(nèi)容介紹了“zookeeper分布式鎖實(shí)現(xiàn)的方法是什么”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
在太湖等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作 網(wǎng)站設(shè)計(jì)制作定制網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),網(wǎng)絡(luò)營銷推廣,外貿(mào)營銷網(wǎng)站建設(shè),太湖網(wǎng)站建設(shè)費(fèi)用合理。
一。為何使用分布式鎖?
當(dāng)應(yīng)用服務(wù)器數(shù)量超過1臺,對相同數(shù)據(jù)的訪問可能造成訪問沖突(特別是寫沖突)。單純使用關(guān)系數(shù)據(jù)庫比如MySQL的應(yīng)用可以借助于事務(wù)來實(shí)現(xiàn)鎖,也可以使用版本號等實(shí)現(xiàn)樂觀鎖,最大的缺陷就是可用性降低(性能差)。對于GLEASY這種滿足大規(guī)模并發(fā)訪問請求的應(yīng)用來說,使用數(shù)據(jù)庫事務(wù)來實(shí)現(xiàn)數(shù)據(jù)庫就有些捉襟見肘了。另外對于一些不依賴數(shù)據(jù)庫的應(yīng)用,比如分布式文件系統(tǒng),為了保證同一文件在大量讀寫操作情況下的正確性,必須引入分布式鎖來約束對同一文件的并發(fā)操作。
二。對分布式鎖的要求
1.高性能(分布式鎖不能成為系統(tǒng)的性能瓶頸)
2.避免死鎖(拿到鎖的結(jié)點(diǎn)掛掉不會導(dǎo)致其它結(jié)點(diǎn)永遠(yuǎn)無法繼續(xù))
3.支持鎖重入
三。方案1,基于zookeeper的分布式鎖
/** * DistributedLockUtil.java * 分布式鎖工廠類,所有分布式請求都由該工廠類負(fù)責(zé) **/ public class DistributedLockUtil { private static Object schemeLock = new Object(); private static Object mutexLock = new Object(); private static MapmutexLockMap = new ConcurrentHashMap(); private String schema; private Map cache = new ConcurrentHashMap (); private static Map instances = new ConcurrentHashMap(); public static DistributedLockUtil getInstance(String schema){ DistributedLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ u = new DistributedLockUtil(schema); instances.put(schema, u); } } } return u; } private DistributedLockUtil(String schema){ this.schema = schema; } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private DistributedReentrantLock getLock(String key){ DistributedReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new DistributedReentrantLock(key,schema); cache.put(key, lock); } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s).unlock(); } } /** * 嘗試加鎖 * 如果當(dāng)前線程已經(jīng)擁有該鎖的話,直接返回false,表示不用再次加鎖,此時(shí)不應(yīng)該再調(diào)用unlock進(jìn)行解鎖 * * @param key * @return * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) throws InterruptedException, KeeperException{ if(getLock(key).isOwner()){ return LockStat.NONEED; } getLock(key).lock(); return LockStat.SUCCESS; } public void clearLock(String key) throws InterruptedException, KeeperException{ synchronized(getMutex(key)){ DistributedReentrantLock l = cache.get(key); l.clear(); cache.remove(key); } } public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{ unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ DistributedReentrantLock lock = getLock(key); boolean hasWaiter = lock.unlock(); if(!hasWaiter && !keepalive){ synchronized(getMutex(key)){ lock.clear(); cache.remove(key); } } } } public static enum LockStat{ NONEED, SUCCESS } }
/** *DistributedReentrantLock.java *本地線程之間鎖爭用,先使用虛擬機(jī)內(nèi)部鎖機(jī)制,減少結(jié)點(diǎn)間通信開銷 */ public class DistributedReentrantLock { private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class); private ReentrantLock reentrantLock = new ReentrantLock(); private WriteLock writeLock; private long timeout = 3*60*1000; private final Object mutex = new Object(); private String dir; private String schema; private final ExitListener exitListener = new ExitListener(){ @Override public void execute() { initWriteLock(); } }; private synchronized void initWriteLock(){ logger.debug("初始化writeLock"); writeLock = new WriteLock(dir,new LockListener(){ @Override public void lockAcquired() { synchronized(mutex){ mutex.notify(); } } @Override public void lockReleased() { } },schema); if(writeLock != null && writeLock.zk != null){ writeLock.zk.addExitListener(exitListener); } synchronized(mutex){ mutex.notify(); } } public DistributedReentrantLock(String dir,String schema) { this.dir = dir; this.schema = schema; initWriteLock(); } public void lock(long timeout) throws InterruptedException, KeeperException { reentrantLock.lock();//多線程競爭時(shí),先拿到第一層鎖 try{ boolean res = writeLock.trylock(); if(!res){ synchronized(mutex){ mutex.wait(timeout); } if(writeLock == null || !writeLock.isOwner()){ throw new InterruptedException("鎖超時(shí)"); } } }catch(InterruptedException e){ reentrantLock.unlock(); throw e; }catch(KeeperException e){ reentrantLock.unlock(); throw e; } } public void lock() throws InterruptedException, KeeperException { lock(timeout); } public void destroy() throws KeeperException { writeLock.unlock(); } public boolean unlock(){ if(!isOwner()) return false; try{ writeLock.unlock(); reentrantLock.unlock();//多線程競爭時(shí),釋放最外層鎖 }catch(RuntimeException e){ reentrantLock.unlock();//多線程競爭時(shí),釋放最外層鎖 throw e; } return reentrantLock.hasQueuedThreads(); } public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner(); } public void clear() { writeLock.clear(); } }
/** *WriteLock.java *基于zk的鎖實(shí)現(xiàn) *一個(gè)最簡單的場景如下: *1.結(jié)點(diǎn)A請求加鎖,在特定路徑下注冊自己(會話自增結(jié)點(diǎn)),得到一個(gè)ID號1 *2.結(jié)點(diǎn)B請求加鎖,在特定路徑下注冊自己(會話自增結(jié)點(diǎn)),得到一個(gè)ID號2 *3.結(jié)點(diǎn)A獲取所有結(jié)點(diǎn)ID,判斷出來自己是最小結(jié)點(diǎn)號,于是獲得鎖 *4.結(jié)點(diǎn)B獲取所有結(jié)點(diǎn)ID,判斷出來自己不是最小結(jié)點(diǎn),于是監(jiān)聽小于自己的最大結(jié)點(diǎn)(結(jié)點(diǎn)A)變更事件 *5.結(jié)點(diǎn)A拿到鎖,處理業(yè)務(wù),處理完,釋放鎖(刪除自己) *6.結(jié)點(diǎn)B收到結(jié)點(diǎn)A變更事件,判斷出來自己已經(jīng)是最小結(jié)點(diǎn)號,于是獲得鎖。 */ public class WriteLock extends ZkPrimative { private static final Logger LOG = Logger.getLogger(WriteLock.class); private final String dir; private String id; private LockNode idName; private String ownerId; private String lastChildId; private byte[] data = {0x12, 0x34}; private LockListener callback; public WriteLock(String dir,String schema) { super(schema,true); this.dir = dir; } public WriteLock(String dir,LockListener callback,String schema) { this(dir,schema); this.callback = callback; } public LockListener getLockListener() { return this.callback; } public void setLockListener(LockListener callback) { this.callback = callback; } public synchronized void unlock() throws RuntimeException { if(zk == null || zk.isClosed()){ return; } if (id != null) { try { zk.delete(id, -1); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); //set that we have been interrupted. Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } catch (KeeperException e) { LOG.warn("Caught: " + e, e); throw (RuntimeException) new RuntimeException(e.getMessage()). initCause(e); }finally { if (callback != null) { callback.lockReleased(); } id = null; } } } private class LockWatcher implements Watcher { public void process(WatchedEvent event) { LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType()); try { trylock(); } catch (Exception e) { LOG.warn("Failed to acquire lock: " + e, e); } } } private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException { Listnames = zookeeper.getChildren(dir, false); for (String name : names) { if (name.startsWith(prefix)) { id = dir + "/" + name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } if (id == null) { id = zookeeper.create(dir + "/" + prefix, data, acl, EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } } } public void clear() { if(zk == null || zk.isClosed()){ return; } try { zk.delete(dir, -1); } catch (Exception e) { LOG.error("clear error: " + e,e); } } public synchronized boolean trylock() throws KeeperException, InterruptedException { if(zk == null){ LOG.info("zk 是空"); return false; } if (zk.isClosed()) { LOG.info("zk 已經(jīng)關(guān)閉"); return false; } ensurePathExists(dir); LOG.debug("id:"+id); do { if (id == null) { long sessionId = zk.getSessionId(); String prefix = "x-" + sessionId + "-"; idName = new LockNode(id); LOG.debug("idName:"+idName); } if (id != null) { List names = zk.getChildren(dir, false); if (names.isEmpty()) { LOG.warn("No children in: " + dir + " when we've just " + "created one! Lets recreate it..."); id = null; } else { SortedSet sortedNames = new TreeSet (); for (String name : names) { sortedNames.add(new LockNode(dir + "/" + name)); } ownerId = sortedNames.first().getName(); LOG.debug("all:"+sortedNames); SortedSet lessThanMe = sortedNames.headSet(idName); LOG.debug("less than me:"+lessThanMe); if (!lessThanMe.isEmpty()) { LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); if (LOG.isDebugEnabled()) { LOG.debug("watching less than me node: " + lastChildId); } Stat stat = zk.exists(lastChildId, new LockWatcher()); if (stat != null) { return Boolean.FALSE; } else { LOG.warn("Could not find the" + " stats for less than me: " + lastChildName.getName()); } } else { if (isOwner()) { if (callback != null) { callback.lockAcquired(); } return Boolean.TRUE; } } } } } while (id == null); return Boolean.FALSE; } public String getDir() { return dir; } public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } public String getId() { return this.id; } }
使用本方案實(shí)現(xiàn)的分布式鎖,可以很好地解決鎖重入的問題,而且使用會話結(jié)點(diǎn)來避免死鎖;性能方面,根據(jù)筆者自測結(jié)果,加鎖解鎖各一次算是一個(gè)操作,本方案實(shí)現(xiàn)的分布式鎖,TPS大概為2000-3000,性能比較一般
“zookeeper分布式鎖實(shí)現(xiàn)的方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!