真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

基于redis分布式鎖實現(xiàn)“秒殺”

最近在項目中遇到了類似“秒殺”的業(yè)務場景,在本篇博客中,我將用一個非常簡單的demo,闡述實現(xiàn)所謂“秒殺”的基本思路。

溆浦網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)建站!從網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、APP開發(fā)、響應式網(wǎng)站開發(fā)等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)建站公司2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設就選創(chuàng)新互聯(lián)建站。

業(yè)務場景
所謂秒殺,從業(yè)務角度看,是短時間內(nèi)多個用戶“爭搶”資源,這里的資源在大部分秒殺場景里是商品;將業(yè)務抽象,技術(shù)角度看,秒殺就是多個線程對資源進行操作,所以實現(xiàn)秒殺,就必須控制線程對資源的爭搶,既要保證高效并發(fā),也要保證操作的正確。

一些可能的實現(xiàn)
剛才提到過,實現(xiàn)秒殺的關(guān)鍵點是控制線程對資源的爭搶,根據(jù)基本的線程知識,可以不加思索的想到下面的一些方法:
1、秒殺在技術(shù)層面的抽象應該就是一個方法,在這個方法里可能的操作是將商品庫存-1,將商品加入用戶的購物車等等,在不考慮緩存的情況下應該是要操作數(shù)據(jù)庫的。那么最簡單直接的實現(xiàn)就是在這個方法上加上synchronized關(guān)鍵字,通俗的講就是鎖住整個方法;
2、鎖住整個方法這個策略簡單方便,但是似乎有點粗暴??梢陨晕?yōu)化一下,只鎖住秒殺的代碼塊,比如寫數(shù)據(jù)庫的部分;
3、既然有并發(fā)問題,那我就讓他“不并發(fā)”,將所有的線程用一個隊列管理起來,使之變成串行操作,自然不會有并發(fā)問題。

上面所述的方法都是有效的,但是都不好。為什么?第一和第二種方法本質(zhì)上是“加鎖”,但是鎖粒度依然比較高。什么意思?試想一下,如果兩個線程同時執(zhí)行秒殺方法,這兩個線程操作的是不同的商品,從業(yè)務上講應該是可以同時進行的,但是如果采用第一二種方法,這兩個線程也會去爭搶同一個鎖,這其實是不必要的。第三種方法也沒有解決上面說的問題。

那么如何將鎖控制在更細的粒度上呢?可以考慮為每個商品設置一個互斥鎖,以和商品ID相關(guān)的字符串為唯一標識,這樣就可以做到只有爭搶同一件商品的線程互斥,不會導致所有的線程互斥。分布式鎖恰好可以幫助我們解決這個問題。

何為分布式鎖
分布式鎖是控制分布式系統(tǒng)之間同步訪問共享資源的一種方式。在分布式系統(tǒng)中,常常需要協(xié)調(diào)他們的動作。如果不同的系統(tǒng)或是同一個系統(tǒng)的不同主機之間共享了一個或一組資源,那么訪問這些資源的時候,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下,便需要使用到分布式鎖。

我們來假設一個最簡單的秒殺場景:數(shù)據(jù)庫里有一張表,column分別是商品ID,和商品ID對應的庫存量,秒殺成功就將此商品庫存量-1?,F(xiàn)在假設有1000個線程來秒殺兩件商品,500個線程秒殺第一個商品,500個線程秒殺第二個商品。我們來根據(jù)這個簡單的業(yè)務場景來解釋一下分布式鎖。
通常具有秒殺場景的業(yè)務系統(tǒng)都比較復雜,承載的業(yè)務量非常巨大,并發(fā)量也很高。這樣的系統(tǒng)往往采用分布式的架構(gòu)來均衡負載。那么這1000個并發(fā)就會是從不同的地方過來,商品庫存就是共享的資源,也是這1000個并發(fā)爭搶的資源,這個時候我們需要將并發(fā)互斥管理起來。這就是分布式鎖的應用。
而key-value存儲系統(tǒng),如redis,因為其一些特性,是實現(xiàn)分布式鎖的重要工具。

具體的實現(xiàn)
先來看看一些redis的基本命令:
SETNX key value
如果key不存在,就設置key對應字符串value。在這種情況下,該命令和SET一樣。當key已經(jīng)存在時,就不做任何操作。SETNX是”SET if Not eXists”。
expire KEY seconds
設置key的過期時間。如果key已過期,將會被自動刪除。
del KEY
刪除key
由于筆者的實現(xiàn)只用到這三個命令,就只介紹這三個命令,更多的命令以及redis的特性和使用,可以參考redis官網(wǎng)。

需要考慮的問題
1、用什么操作redis?幸虧redis已經(jīng)提供了jedis客戶端用于java應用程序,直接調(diào)用jedis API即可。
2、怎么實現(xiàn)加鎖?“鎖”其實是一個抽象的概念,將這個抽象概念變?yōu)榫唧w的東西,就是一個存儲在redis里的key-value對,key是于商品ID相關(guān)的字符串來唯一標識,value其實并不重要,因為只要這個唯一的key-value存在,就表示這個商品已經(jīng)上鎖。
3、如何釋放鎖?既然key-value對存在就表示上鎖,那么釋放鎖就自然是在redis里刪除key-value對。
4、阻塞還是非阻塞?筆者采用了阻塞式的實現(xiàn),若線程發(fā)現(xiàn)已經(jīng)上鎖,會在特定時間內(nèi)輪詢鎖。
5、如何處理異常情況?比如一個線程把一個商品上了鎖,但是由于各種原因,沒有完成操作(在上面的業(yè)務場景里就是沒有將庫存-1寫入數(shù)據(jù)庫),自然沒有釋放鎖,這個情況筆者加入了鎖超時機制,利用redis的expire命令為key設置超時時長,過了超時時間redis就會將這個key自動刪除,即強制釋放鎖(可以認為超時釋放鎖是一個異步操作,由redis完成,應用程序只需要根據(jù)系統(tǒng)特點設置超時時間即可)。

talk is cheap,show me the code
在代碼實現(xiàn)層面,注解有并發(fā)的方法和參數(shù),通過動態(tài)代理獲取注解的方法和參數(shù),在代理中加鎖,執(zhí)行完被代理的方法后釋放鎖。

幾個注解定義:
cachelock是方法級的注解,用于注解會產(chǎn)生并發(fā)問題的方法:

@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)
br/>@Retention(RetentionPolicy.RUNTIME)
public @interface CacheLock {
String lockedPrefix() default "";//redis 鎖key的前綴
long timeOut() default 2000;//輪詢鎖的時間
int expireTime() default 1000;//key在redis里存在的時間,1000S
}
lockedObject是參數(shù)級的注解,用于注解商品ID等基本類型的參數(shù):

@Target(ElementType.PARAMETER)@Retention(RetentionPolicy.RUNTIME)
br/>@Retention(RetentionPolicy.RUNTIME)
public @interface LockedObject {
//不需要值
}
LockedComplexObject也是參數(shù)級的注解,用于注解自定義類型的參數(shù):

@Target(ElementType.PARAMETER)@Retention(RetentionPolicy.RUNTIME)
br/>@Retention(RetentionPolicy.RUNTIME)
public @interface LockedComplexObject {
String field() default "";//含有成員變量的復雜對象中需要加鎖的成員變量,如一個商品對象的商品ID

}
CacheLockInterceptor實現(xiàn)InvocationHandler接口,在invoke方法中獲取注解的方法和參數(shù),在執(zhí)行注解的方法前加鎖,執(zhí)行被注解的方法后釋放鎖:

public class CacheLockInterceptor implements InvocationHandler{
public static int ERROR_COUNT = 0;
private Object proxied;

public CacheLockInterceptor(Object proxied) {
    this.proxied = proxied;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

    CacheLock cacheLock = method.getAnnotation(CacheLock.class);
    //沒有cacheLock注解,pass
    if(null == cacheLock){
        System.out.println("no cacheLock annotation");          
        return method.invoke(proxied, args);
    }
    //獲得方法中參數(shù)的注解
    Annotation[][] annotations = method.getParameterAnnotations();
    //根據(jù)獲取到的參數(shù)注解和參數(shù)列表獲得加鎖的參數(shù)
    Object lockedObject = getLockedObject(annotations,args);
    String objectValue = lockedObject.toString();
    //新建一個鎖
    RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
    //加鎖
    boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
    if(!result){//取鎖失敗
        ERROR_COUNT += 1;
        throw new CacheLockException("get lock fail");

    }
    try{
        //加鎖成功,執(zhí)行方法
        return method.invoke(proxied, args);
    }finally{
        lock.unlock();//釋放鎖
    }

}
/**
 * 
 * @param annotations
 * @param args
 * @return
 * @throws CacheLockException
 */
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
    if(null == args || args.length == 0){
        throw new CacheLockException("方法參數(shù)為空,沒有被鎖定的對象");
    }

    if(null == annotations || annotations.length == 0){
        throw new CacheLockException("沒有被注解的參數(shù)");
    }
    //不支持多個參數(shù)加鎖,只支持第一個注解為lockedObject或者lockedComplexObject的參數(shù)
    int index = -1;//標記參數(shù)的位置指針
    for(int i = 0;i < annotations.length;i++){
        for(int j = 0;j < annotations[i].length;j++){
            if(annotations[i][j] instanceof LockedComplexObject){//注解為LockedComplexObject
                index = i;
                try {
                    return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
                } catch (NoSuchFieldException | SecurityException e) {
                    throw new CacheLockException("注解對象中沒有該屬性" + ((LockedComplexObject)annotations[i][j]).field());
                }
            }

            if(annotations[i][j] instanceof LockedObject){
                index = i;
                break;
            }
        }
        //找到第一個后直接break,不支持多參數(shù)加鎖
        if(index != -1){
            break;
        }
    }

    if(index == -1){
        throw new CacheLockException("請指定被鎖定參數(shù)");
    }

    return args[index];
}

}
最關(guān)鍵的RedisLock類中的lock方法和unlock方法:

/**

  • 加鎖
  • 使用方式為:
  • lock();
  • try{
  • executeMethod();
  • }finally{
  • unlock();
  • }
  • @param timeout timeout的時間范圍內(nèi)輪詢鎖
  • @param expire 設置鎖超時時間
  • @return 成功 or 失敗
    /
    public boolean lock(long timeout,int expire){
    long nanoTime = System.nanoTime();
    timeout
    = MILLI_NANO_TIME;
    try {
    //在timeout的時間范圍內(nèi)不斷輪詢鎖
    while (System.nanoTime() - nanoTime < timeout) {
    //鎖不存在的話,設置鎖并設置鎖過期時間,即加鎖
    if (this.redisClient.setnx(this.key, LOCKED) == 1) {
    this.redisClient.expire(key, expire);//設置鎖過期時間是為了在沒有釋放
    //鎖的情況下鎖過期后消失,不會造成永久阻塞
    this.lock = true;
    return this.lock;
    }
    System.out.println("出現(xiàn)鎖等待");
    //短暫休眠,避免可能的活鎖
    Thread.sleep(3, RANDOM.nextInt(30));
    }
    } catch (Exception e) {
    throw new RuntimeException("locking error",e);
    }
    return false;
    }

    public void unlock() {
    try {
    if(this.lock){
    redisClient.delKey(key);//直接刪除
    }
    } catch (Throwable e) {

    }

    }
    上述的代碼是框架性的代碼,現(xiàn)在來講解如何使用上面的簡單框架來寫一個秒殺函數(shù)。
    先定義一個接口,接口里定義了一個秒殺方法:

public interface SeckillInterface {/**
*現(xiàn)在暫時只支持在接口方法上注解
*/
//cacheLock注解可能產(chǎn)生并發(fā)的方法
@CacheLock(lockedPrefix="TEST_PREFIX")
br/>/**
*現(xiàn)在暫時只支持在接口方法上注解
*/
//cacheLock注解可能產(chǎn)生并發(fā)的方法
@CacheLock(lockedPrefix="TEST_PREFIX")
}
上述SeckillInterface接口的實現(xiàn)類,即秒殺的具體實現(xiàn):

public class SecKillImpl implements SeckillInterface{
static Map inventory ;
static{
inventory = new HashMap<>();
inventory.put(10000001L, 10000l);
inventory.put(10000002L, 10000l);
}

@Override
public void secKill(String arg1, Long arg2) {
    //最簡單的秒殺,這里僅作為demo示例
    reduceInventory(arg2);
}
//模擬秒殺操作,姑且認為一個秒殺就是將庫存減一,實際情景要復雜的多
public Long reduceInventory(Long commodityId){
    inventory.put(commodityId,inventory.get(commodityId) - 1);
    return inventory.get(commodityId);
}

}
模擬秒殺場景,1000個線程來爭搶兩個商品:

@Test
public void testSecKill(){
int threadCount = 1000;
int splitPoint = 500;
CountDownLatch endCount = new CountDownLatch(threadCount);
CountDownLatch beginCount = new CountDownLatch(1);
SecKillImpl testClass = new SecKillImpl();

    Thread[] threads = new Thread[threadCount];
    //起500個線程,秒殺第一個商品
    for(int i= 0;i < splitPoint;i++){
        threads[i] = new Thread(new  Runnable() {
            public void run() {
                try {
                    //等待在一個信號量上,掛起
                    beginCount.await();
                    //用動態(tài)代理的方式調(diào)用secKill方法
                    SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
                        new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
                    proxy.secKill("test", commidityId1);
                    endCount.countDown();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        threads[i].start();

    }
    //再起500個線程,秒殺第二件商品
    for(int i= splitPoint;i < threadCount;i++){
        threads[i] = new Thread(new  Runnable() {
            public void run() {
                try {
                    //等待在一個信號量上,掛起
                    beginCount.await();
                    //用動態(tài)代理的方式調(diào)用secKill方法
                    SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
                        new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
                    proxy.secKill("test", commidityId2);
                    //testClass.testFunc("test", 10000001L);
                    endCount.countDown();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        threads[i].start();

    }

    long startTime = System.currentTimeMillis();
    //主線程釋放開始信號量,并等待結(jié)束信號量,這樣做保證1000個線程做到完全同時執(zhí)行,保證測試的正確性
    beginCount.countDown();

    try {
        //主線程等待結(jié)束信號量
        endCount.await();
        //觀察秒殺結(jié)果是否正確
        System.out.println(SecKillImpl.inventory.get(commidityId1));
        System.out.println(SecKillImpl.inventory.get(commidityId2));
        System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
        System.out.println("total cost " + (System.currentTimeMillis() - startTime));
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

在正確的預想下,應該每個商品的庫存都減少了500,在多次試驗后,實際情況符合預想。如果不采用鎖機制,會出現(xiàn)庫存減少499,498的情況。
這里采用了動態(tài)代理的方法,利用注解和反射機制得到分布式鎖ID,進行加鎖和釋放鎖操作。當然也可以直接在方法進行這些操作,采用動態(tài)代理也是為了能夠?qū)㈡i操作代碼集中在代理中,便于維護。
通常秒殺場景發(fā)生在web項目中,可以考慮利用spring的AOP特性將鎖操作代碼置于切面中,當然AOP本質(zhì)上也是動態(tài)代理。

小結(jié)
這篇文章從業(yè)務場景出發(fā),從抽象到實現(xiàn)闡述了如何利用redis實現(xiàn)分布式鎖,完成簡單的秒殺功能,也記錄了筆者思考的過程,希望能給閱讀到本篇文章的人一些啟發(fā)。


標題名稱:基于redis分布式鎖實現(xiàn)“秒殺”
本文路徑:http://weahome.cn/article/popgpo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部