真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

java實(shí)現(xiàn)memcache服務(wù)器的示例代碼

什么是Memcache?

創(chuàng)新互聯(lián)是一家專業(yè)提供朝陽(yáng)企業(yè)網(wǎng)站建設(shè),專注與做網(wǎng)站、網(wǎng)站設(shè)計(jì)、H5開發(fā)、小程序制作等業(yè)務(wù)。10年已為朝陽(yáng)眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站制作公司優(yōu)惠進(jìn)行中。

Memcache集群環(huán)境下緩存解決方案

Memcache是一個(gè)高性能的分布式的內(nèi)存對(duì)象緩存系統(tǒng),通過在內(nèi)存里維護(hù)一個(gè)統(tǒng)一的巨大的hash表,它能夠用來存儲(chǔ)各種格式的數(shù)據(jù),包括圖像、視頻、文件以及數(shù)據(jù)庫(kù)檢索的結(jié)果等。簡(jiǎn)單的說就是將數(shù)據(jù)調(diào)用到內(nèi)存中,然后從內(nèi)存中讀取,從而大大提高讀取速度。

 Memcache是danga的一個(gè)項(xiàng)目,最早是LiveJournal 服務(wù)的,最初為了加速 LiveJournal 訪問速度而開發(fā)的,后來被很多大型的網(wǎng)站采用。

Memcached是以守護(hù)程序方式運(yùn)行于一個(gè)或多個(gè)服務(wù)器中,隨時(shí)會(huì)接收客戶端的連接和操作

為什么會(huì)有Memcache和memcached兩種名稱?

其實(shí)Memcache是這個(gè)項(xiàng)目的名稱,而memcached是它服務(wù)器端的主程序文件名,知道我的意思了吧。一個(gè)是項(xiàng)目名稱,一個(gè)是主程序文件名,在網(wǎng)上看到了很多人不明白,于是混用了。 

Memcached是高性能的,分布式的內(nèi)存對(duì)象緩存系統(tǒng),用于在動(dòng)態(tài)應(yīng)用中減少數(shù)據(jù)庫(kù)負(fù)載,提升訪問速度。Memcached由Danga Interactive開發(fā),用于提升LiveJournal.com訪問速度的。LJ每秒動(dòng)態(tài)頁(yè)面訪問量幾千次,用戶700萬。Memcached將數(shù)據(jù)庫(kù)負(fù)載大幅度降低,更好的分配資源,更快速訪問。

這篇文章將會(huì)涉及以下內(nèi)容:

  1. Java Socket多線程服務(wù)器
  2. Java IO
  3. Concurrency
  4. Memcache特性和協(xié)議

Memcache

Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.

即內(nèi)存緩存數(shù)據(jù)庫(kù),是一個(gè)鍵值對(duì)數(shù)據(jù)庫(kù)。該數(shù)據(jù)庫(kù)的存在是為了將從其他服務(wù)中獲取的數(shù)據(jù)暫存在內(nèi)存中,在重復(fù)訪問時(shí)可以直接從命中的緩存中返回。既加快了訪問速率,也減少了其他服務(wù)的負(fù)載。這里將實(shí)現(xiàn)一個(gè)單服務(wù)器版本的Memcache,并且支持多個(gè)客戶端的同時(shí)連接。

客戶端將與服務(wù)器建立telnet連接,然后按照Memcache協(xié)議與服務(wù)器緩存進(jìn)行交互。這里實(shí)現(xiàn)的指令為get,set和del。先來看一下各個(gè)指令的格式

set

set屬于存儲(chǔ)指令,存儲(chǔ)指令的特點(diǎn)時(shí),第一行輸入基本信息,第二行輸入其對(duì)應(yīng)的value值。

set [noreply]\r\n
\r\n

如果存儲(chǔ)成功,將會(huì)返回STORED,如果指令中包含noreply屬性,則服務(wù)器將不會(huì)返回信息。

該指令中每個(gè)域的內(nèi)容如下:

  1. key: 鍵
  2. flags: 16位無符號(hào)整數(shù),會(huì)在get時(shí)隨鍵值對(duì)返回
  3. exptime: 過期時(shí)間,以秒為單位
  4. bytes:即將發(fā)送的value的長(zhǎng)度
  5. noreply:是否需要服務(wù)器響應(yīng),為可選屬性

如果指令不符合標(biāo)準(zhǔn),服務(wù)器將會(huì)返回ERROR。

get

get屬于獲取指令,該指令特點(diǎn)如下:

get *\r\n

它支持傳入多個(gè)key的值,如果緩存命中了一個(gè)或者多個(gè)key,則會(huì)返回相應(yīng)的數(shù)據(jù),并以END作為結(jié)尾。如果沒有命中,則返回的消息中不包含該key對(duì)應(yīng)的值。格式如下:

VALUE   \r\n
\r\n
VALUE   \r\n
\r\n
END
del

刪除指令,該指令格式如下:

del  [noreply]\r\n

如果刪除成功,則返回DELETED\r\n,否則返回NOT_FOUND。如果有noreply參數(shù),則服務(wù)器不會(huì)返回響應(yīng)。

JAVA SOCKET

JAVA SOCKET需要了解的只是包括TCP協(xié)議,套接字,以及IO流。這里就不詳細(xì)贅述,可以參考我的這系列文章,也建議去閱讀JAVA Network Programming。一書。

代碼實(shí)現(xiàn)

這里貼圖功能出了點(diǎn)問題,可以去文末我的項(xiàng)目地址查看類圖。

這里采用了指令模式和工廠模式實(shí)現(xiàn)指令的創(chuàng)建和執(zhí)行的解耦。指令工廠將會(huì)接收commandLine并且返回一個(gè)Command實(shí)例。每一個(gè)Command都擁有execute方法用來執(zhí)行各自獨(dú)特的操作。這里只貼上del指令的特殊實(shí)現(xiàn)。

 /**
 * 各種指令
 * 目前支持get,set,delete
 *
 * 以及自定義的
 * error,end
 */
public interface Command {

  /**
   * 執(zhí)行指令
   * @param reader
   * @param writer
   */
  void execute(Reader reader, Writer writer);

  /**
   * 獲取指令的類型
   * @return
   */
  CommandType getType();
}
/**
 * 指令工廠 單一實(shí)例
 */
public class CommandFactory {

  private static CommandFactory commandFactory;
  private static Cache memcache;
  private CommandFactory(){}

  public static CommandFactory getInstance(Cache cache) {
    if (commandFactory == null) {
      commandFactory = new CommandFactory();
      memcache = cache;
    }
    return commandFactory;
  }

  /**
   * 根據(jù)指令的類型獲取Command
   * @param commandLine
   * @return
   */
  public Command getCommand(String commandLine){
    if (commandLine.matches("^set .*$")){
      return new SetCommand(commandLine, memcache);
    }else if (commandLine.matches("^get .*$")){
      return new GetCommand(commandLine, memcache);
    }else if (commandLine.matches("^del .*$")){
      return new DeleteCommand(commandLine, memcache);
    }else if (commandLine.matches("^end$")){
      return new EndCommand(commandLine);
    }else{
      return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR);
    }
  }
}

/**
 * 刪除緩存指令
 */
public class DeleteCommand implements Command{

  private final String command;
  private final Cache cache;

  private String key;
  private boolean noReply;
  public DeleteCommand(final String command, final Cache cache){
    this.command = command;
    this.cache = cache;
    initCommand();
  }

  private void initCommand(){
    if (this.command.contains("noreply")){
      noReply = true;
    }
    String[] info = command.split(" ");
    key = info[1];
  }

  @Override
  public void execute(Reader reader, Writer writer) {
    BufferedWriter bfw = (BufferedWriter) writer;
    Item item = cache.delete(key);
    if (!noReply){
      try {
        if (item == null){
          bfw.write("NOT_FOUND\r\n");
        }else {
          bfw.write("DELETED\r\n");
        }
        bfw.flush();
      } catch (IOException e) {
        try {
          bfw.write("ERROR\r\n");
          bfw.flush();
        } catch (IOException e1) {
          e1.printStackTrace();
        }
        e.printStackTrace();
      }
    }


  }

  @Override
  public CommandType getType() {
    return CommandType.SEARCH;
  }
}

然后是實(shí)現(xiàn)內(nèi)存服務(wù)器,為了支持先進(jìn)先出功能,這里使用了LinkedTreeMap作為底層實(shí)現(xiàn),并且重寫了removeOldest方法。同時(shí)還使用CacheManager的后臺(tái)線程及時(shí)清除過期的緩存條目。

public class Memcache implements Cache{
  private Logger logger = Logger.getLogger(Memcache.class.getName());
  //利用LinkedHashMap實(shí)現(xiàn)LRU
  private static LinkedHashMap cache;
  private final int maxSize;
  //負(fù)載因子
  private final float DEFAULT_LOAD_FACTOR = 0.75f;
  public Memcache(final int maxSize){
    this.maxSize = maxSize;
    //確保cache不會(huì)在達(dá)到maxSize之后自動(dòng)擴(kuò)容
    int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1;

    this.cache = new LinkedHashMap(capacity, DEFAULT_LOAD_FACTOR, true){
      @Override
      protected boolean removeEldestEntry(Map.Entry eldest) {
        if (size() > maxSize){
          logger.info("緩存數(shù)量已經(jīng)達(dá)到上限,會(huì)刪除最近最少使用的條目");
        }
        return size() > maxSize;
      }
    };

    //實(shí)現(xiàn)同步訪問
    Collections.synchronizedMap(cache);
  }

  public synchronized boolean isFull(){
    return cache.size() >= maxSize;
  }

  @Override
  public Item get(String key) {
    Item item = cache.get(key);

    if (item == null){
      logger.info("緩存中key:" + key + "不存在");
      return null;
    }else if(item!=null && item.isExpired()){ //如果緩存過期則刪除并返回null
      logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + "已經(jīng)失效");
      cache.remove(key);
      return null;
    }

    logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + " 剩余有效時(shí)間" + item.remainTime());
    return item;
  }

  @Override
  public void set(String key, Item value) {
    logger.info("向緩存中寫入key:" + key + " value:" + value);
    cache.put(key, value);
  }

  @Override
  public Item delete(String key) {
    logger.info("從緩存中刪除key:" + key);
    return cache.remove(key);
  }

  @Override
  public int size(){
    return cache.size();
  }

  @Override
  public int capacity() {
    return maxSize;
  }

  @Override
  public Iterator> iterator() {
    return cache.entrySet().iterator();
  }
}
/**
 * 緩存管理器
 * 后臺(tái)線程
 * 將cache中過期的緩存刪除
 */
public class CacheManager implements Runnable {

  private Logger logger = Logger.getLogger(CacheManager.class.getName());

  //緩存
  public Cache cache;

  public CacheManager(Cache cache){
    this.cache = cache;
  }


  @Override
  public void run() {
    while (true){
      Iterator> itemIterator = cache.iterator();
      while (itemIterator.hasNext()){
        Map.Entry entry = itemIterator.next();
        Item item = entry.getValue();
        if(item.isExpired()){
          logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已經(jīng)過期,從數(shù)據(jù)庫(kù)中刪除");
          itemIterator.remove();
        }
      }

      try {
        //每隔5秒鐘再運(yùn)行該后臺(tái)程序
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

    }
  }
}

最后是實(shí)現(xiàn)一個(gè)多線程的Socket服務(wù)器,這里就是將ServerSocket綁定到一個(gè)接口,并且將accept到的Socket交給額外的線程處理。

/**
 * 服務(wù)器
 */
public class IOServer implements Server {
  private boolean stop;
  //端口號(hào)
  private final int port;
  //服務(wù)器線程
  private ServerSocket serverSocket;
  private final Logger logger = Logger.getLogger(IOServer.class.getName());
  //線程池,線程容量為maxConnection
  private final ExecutorService executorService;
  private final Cache cache;
  public IOServer(int port, int maxConnection, Cache cache){
    if (maxConnection<=0) throw new IllegalArgumentException("支持的最大連接數(shù)量必須為正整數(shù)");
    this.port = port;
    executorService = Executors.newFixedThreadPool(maxConnection);
    this.cache = cache;
  }

  @Override
  public void start() {
    try {
      serverSocket = new ServerSocket(port);
      logger.info("服務(wù)器在端口"+port+"上啟動(dòng)");
      while (true){
        try {
          Socket socket = serverSocket.accept();
          logger.info("收到"+socket.getLocalAddress()+"的連接");
          executorService.submit(new SocketHandler(socket, cache));
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    } catch (IOException e) {
      logger.log(Level.WARNING, "服務(wù)器即將關(guān)閉...");
      e.printStackTrace();
    } finally {
      executorService.shutdown();
      shutDown();
    }


  }

  /**
   * 服務(wù)器是否仍在運(yùn)行
   * @return
   */
  public boolean isRunning() {
    return !serverSocket.isClosed();
  }
  /**
   * 停止服務(wù)器
   */
  public void shutDown(){
    try {
      if (serverSocket!=null){
        serverSocket.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
/**
 * 處理各個(gè)客戶端的連接
 * 在獲得end指令后關(guān)閉連接s
 */
public class SocketHandler implements Runnable{

  private static Logger logger = Logger.getLogger(SocketHandler.class.getName());

  private final Socket socket;

  private final Cache cache;

  private boolean finish;


  public SocketHandler(Socket s, Cache cache){
    this.socket = s;
    this.cache = cache;
  }

  @Override
  public void run() {
    try {
      //獲取socket輸入流
      final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
      //獲取socket輸出流
      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

      CommandFactory commandFactory = CommandFactory.getInstance(cache);

      while (!finish){
        final String commandLine = reader.readLine();
        logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine);

        if (commandLine == null || commandLine.trim().isEmpty()) {
          continue;
        }

        //使用指令工廠獲取指令實(shí)例
        final Command command = commandFactory.getCommand(commandLine);
        command.execute(reader, writer);

        if (command.getType() == CommandType.END){
          logger.info("請(qǐng)求關(guān)閉連接");
          finish = true;
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
      logger.info("關(guān)閉來自" + socket.getLocalAddress() + "的連接");
    } finally {
      try {
        if (socket != null){
          socket.close();
        }
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

項(xiàng)目地址請(qǐng)戳這里,如果覺得還不錯(cuò)的話,希望能給個(gè)星哈><

參考資料

memcached官網(wǎng)
memcache協(xié)議

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。


網(wǎng)頁(yè)題目:java實(shí)現(xiàn)memcache服務(wù)器的示例代碼
網(wǎng)站路徑:http://weahome.cn/article/pesedg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部