什么是Memcache?
創(chuàng)新互聯(lián)是一家專業(yè)提供朝陽(yáng)企業(yè)網(wǎng)站建設(shè),專注與做網(wǎng)站、網(wǎng)站設(shè)計(jì)、H5開發(fā)、小程序制作等業(yè)務(wù)。10年已為朝陽(yáng)眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站制作公司優(yōu)惠進(jìn)行中。
Memcache集群環(huán)境下緩存解決方案
Memcache是一個(gè)高性能的分布式的內(nèi)存對(duì)象緩存系統(tǒng),通過在內(nèi)存里維護(hù)一個(gè)統(tǒng)一的巨大的hash表,它能夠用來存儲(chǔ)各種格式的數(shù)據(jù),包括圖像、視頻、文件以及數(shù)據(jù)庫(kù)檢索的結(jié)果等。簡(jiǎn)單的說就是將數(shù)據(jù)調(diào)用到內(nèi)存中,然后從內(nèi)存中讀取,從而大大提高讀取速度。
Memcache是danga的一個(gè)項(xiàng)目,最早是LiveJournal 服務(wù)的,最初為了加速 LiveJournal 訪問速度而開發(fā)的,后來被很多大型的網(wǎng)站采用。
Memcached是以守護(hù)程序方式運(yùn)行于一個(gè)或多個(gè)服務(wù)器中,隨時(shí)會(huì)接收客戶端的連接和操作
為什么會(huì)有Memcache和memcached兩種名稱?
其實(shí)Memcache是這個(gè)項(xiàng)目的名稱,而memcached是它服務(wù)器端的主程序文件名,知道我的意思了吧。一個(gè)是項(xiàng)目名稱,一個(gè)是主程序文件名,在網(wǎng)上看到了很多人不明白,于是混用了。
Memcached是高性能的,分布式的內(nèi)存對(duì)象緩存系統(tǒng),用于在動(dòng)態(tài)應(yīng)用中減少數(shù)據(jù)庫(kù)負(fù)載,提升訪問速度。Memcached由Danga Interactive開發(fā),用于提升LiveJournal.com訪問速度的。LJ每秒動(dòng)態(tài)頁(yè)面訪問量幾千次,用戶700萬。Memcached將數(shù)據(jù)庫(kù)負(fù)載大幅度降低,更好的分配資源,更快速訪問。
這篇文章將會(huì)涉及以下內(nèi)容:
Memcache
Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.
即內(nèi)存緩存數(shù)據(jù)庫(kù),是一個(gè)鍵值對(duì)數(shù)據(jù)庫(kù)。該數(shù)據(jù)庫(kù)的存在是為了將從其他服務(wù)中獲取的數(shù)據(jù)暫存在內(nèi)存中,在重復(fù)訪問時(shí)可以直接從命中的緩存中返回。既加快了訪問速率,也減少了其他服務(wù)的負(fù)載。這里將實(shí)現(xiàn)一個(gè)單服務(wù)器版本的Memcache,并且支持多個(gè)客戶端的同時(shí)連接。
客戶端將與服務(wù)器建立telnet連接,然后按照Memcache協(xié)議與服務(wù)器緩存進(jìn)行交互。這里實(shí)現(xiàn)的指令為get,set和del。先來看一下各個(gè)指令的格式
set
set屬于存儲(chǔ)指令,存儲(chǔ)指令的特點(diǎn)時(shí),第一行輸入基本信息,第二行輸入其對(duì)應(yīng)的value值。
set
[noreply]\r\n \r\n
如果存儲(chǔ)成功,將會(huì)返回STORED,如果指令中包含noreply屬性,則服務(wù)器將不會(huì)返回信息。
該指令中每個(gè)域的內(nèi)容如下:
如果指令不符合標(biāo)準(zhǔn),服務(wù)器將會(huì)返回ERROR。
get
get屬于獲取指令,該指令特點(diǎn)如下:
get
*\r\n
它支持傳入多個(gè)key的值,如果緩存命中了一個(gè)或者多個(gè)key,則會(huì)返回相應(yīng)的數(shù)據(jù),并以END作為結(jié)尾。如果沒有命中,則返回的消息中不包含該key對(duì)應(yīng)的值。格式如下:
VALUE\r\n \r\n VALUE \r\n \r\n END del
刪除指令,該指令格式如下:
del[noreply]\r\n
如果刪除成功,則返回DELETED\r\n,否則返回NOT_FOUND。如果有noreply參數(shù),則服務(wù)器不會(huì)返回響應(yīng)。
JAVA SOCKET
JAVA SOCKET需要了解的只是包括TCP協(xié)議,套接字,以及IO流。這里就不詳細(xì)贅述,可以參考我的這系列文章,也建議去閱讀JAVA Network Programming。一書。
代碼實(shí)現(xiàn)
這里貼圖功能出了點(diǎn)問題,可以去文末我的項(xiàng)目地址查看類圖。
這里采用了指令模式和工廠模式實(shí)現(xiàn)指令的創(chuàng)建和執(zhí)行的解耦。指令工廠將會(huì)接收commandLine并且返回一個(gè)Command實(shí)例。每一個(gè)Command都擁有execute方法用來執(zhí)行各自獨(dú)特的操作。這里只貼上del指令的特殊實(shí)現(xiàn)。
/** * 各種指令 * 目前支持get,set,delete * * 以及自定義的 * error,end */ public interface Command { /** * 執(zhí)行指令 * @param reader * @param writer */ void execute(Reader reader, Writer writer); /** * 獲取指令的類型 * @return */ CommandType getType(); }
/** * 指令工廠 單一實(shí)例 */ public class CommandFactory { private static CommandFactory commandFactory; private static Cache- memcache; private CommandFactory(){} public static CommandFactory getInstance(Cache
- cache) { if (commandFactory == null) { commandFactory = new CommandFactory(); memcache = cache; } return commandFactory; } /** * 根據(jù)指令的類型獲取Command * @param commandLine * @return */ public Command getCommand(String commandLine){ if (commandLine.matches("^set .*$")){ return new SetCommand(commandLine, memcache); }else if (commandLine.matches("^get .*$")){ return new GetCommand(commandLine, memcache); }else if (commandLine.matches("^del .*$")){ return new DeleteCommand(commandLine, memcache); }else if (commandLine.matches("^end$")){ return new EndCommand(commandLine); }else{ return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR); } } }
/** * 刪除緩存指令 */ public class DeleteCommand implements Command{ private final String command; private final Cache- cache; private String key; private boolean noReply; public DeleteCommand(final String command, final Cache
- cache){ this.command = command; this.cache = cache; initCommand(); } private void initCommand(){ if (this.command.contains("noreply")){ noReply = true; } String[] info = command.split(" "); key = info[1]; } @Override public void execute(Reader reader, Writer writer) { BufferedWriter bfw = (BufferedWriter) writer; Item item = cache.delete(key); if (!noReply){ try { if (item == null){ bfw.write("NOT_FOUND\r\n"); }else { bfw.write("DELETED\r\n"); } bfw.flush(); } catch (IOException e) { try { bfw.write("ERROR\r\n"); bfw.flush(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } } } @Override public CommandType getType() { return CommandType.SEARCH; } }
然后是實(shí)現(xiàn)內(nèi)存服務(wù)器,為了支持先進(jìn)先出功能,這里使用了LinkedTreeMap作為底層實(shí)現(xiàn),并且重寫了removeOldest方法。同時(shí)還使用CacheManager的后臺(tái)線程及時(shí)清除過期的緩存條目。
public class Memcache implements Cache- { private Logger logger = Logger.getLogger(Memcache.class.getName()); //利用LinkedHashMap實(shí)現(xiàn)LRU private static LinkedHashMap
cache; private final int maxSize; //負(fù)載因子 private final float DEFAULT_LOAD_FACTOR = 0.75f; public Memcache(final int maxSize){ this.maxSize = maxSize; //確保cache不會(huì)在達(dá)到maxSize之后自動(dòng)擴(kuò)容 int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1; this.cache = new LinkedHashMap (capacity, DEFAULT_LOAD_FACTOR, true){ @Override protected boolean removeEldestEntry(Map.Entry eldest) { if (size() > maxSize){ logger.info("緩存數(shù)量已經(jīng)達(dá)到上限,會(huì)刪除最近最少使用的條目"); } return size() > maxSize; } }; //實(shí)現(xiàn)同步訪問 Collections.synchronizedMap(cache); } public synchronized boolean isFull(){ return cache.size() >= maxSize; } @Override public Item get(String key) { Item item = cache.get(key); if (item == null){ logger.info("緩存中key:" + key + "不存在"); return null; }else if(item!=null && item.isExpired()){ //如果緩存過期則刪除并返回null logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + "已經(jīng)失效"); cache.remove(key); return null; } logger.info("從緩存中讀取key:" + key + " value:" + item.getValue() + " 剩余有效時(shí)間" + item.remainTime()); return item; } @Override public void set(String key, Item value) { logger.info("向緩存中寫入key:" + key + " value:" + value); cache.put(key, value); } @Override public Item delete(String key) { logger.info("從緩存中刪除key:" + key); return cache.remove(key); } @Override public int size(){ return cache.size(); } @Override public int capacity() { return maxSize; } @Override public Iterator > iterator() { return cache.entrySet().iterator(); } }
/** * 緩存管理器 * 后臺(tái)線程 * 將cache中過期的緩存刪除 */ public class CacheManager implements Runnable { private Logger logger = Logger.getLogger(CacheManager.class.getName()); //緩存 public Cache- cache; public CacheManager(Cache
- cache){ this.cache = cache; } @Override public void run() { while (true){ Iterator
> itemIterator = cache.iterator(); while (itemIterator.hasNext()){ Map.Entry entry = itemIterator.next(); Item item = entry.getValue(); if(item.isExpired()){ logger.info("key:" + entry.getKey() + " value" + item.getValue() + " 已經(jīng)過期,從數(shù)據(jù)庫(kù)中刪除"); itemIterator.remove(); } } try { //每隔5秒鐘再運(yùn)行該后臺(tái)程序 TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } }
最后是實(shí)現(xiàn)一個(gè)多線程的Socket服務(wù)器,這里就是將ServerSocket綁定到一個(gè)接口,并且將accept到的Socket交給額外的線程處理。
/** * 服務(wù)器 */ public class IOServer implements Server { private boolean stop; //端口號(hào) private final int port; //服務(wù)器線程 private ServerSocket serverSocket; private final Logger logger = Logger.getLogger(IOServer.class.getName()); //線程池,線程容量為maxConnection private final ExecutorService executorService; private final Cache- cache; public IOServer(int port, int maxConnection, Cache
- cache){ if (maxConnection<=0) throw new IllegalArgumentException("支持的最大連接數(shù)量必須為正整數(shù)"); this.port = port; executorService = Executors.newFixedThreadPool(maxConnection); this.cache = cache; } @Override public void start() { try { serverSocket = new ServerSocket(port); logger.info("服務(wù)器在端口"+port+"上啟動(dòng)"); while (true){ try { Socket socket = serverSocket.accept(); logger.info("收到"+socket.getLocalAddress()+"的連接"); executorService.submit(new SocketHandler(socket, cache)); } catch (IOException e) { e.printStackTrace(); } } } catch (IOException e) { logger.log(Level.WARNING, "服務(wù)器即將關(guān)閉..."); e.printStackTrace(); } finally { executorService.shutdown(); shutDown(); } } /** * 服務(wù)器是否仍在運(yùn)行 * @return */ public boolean isRunning() { return !serverSocket.isClosed(); } /** * 停止服務(wù)器 */ public void shutDown(){ try { if (serverSocket!=null){ serverSocket.close(); } } catch (IOException e) { e.printStackTrace(); } } }
/** * 處理各個(gè)客戶端的連接 * 在獲得end指令后關(guān)閉連接s */ public class SocketHandler implements Runnable{ private static Logger logger = Logger.getLogger(SocketHandler.class.getName()); private final Socket socket; private final Cache- cache; private boolean finish; public SocketHandler(Socket s, Cache
- cache){ this.socket = s; this.cache = cache; } @Override public void run() { try { //獲取socket輸入流 final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //獲取socket輸出流 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); CommandFactory commandFactory = CommandFactory.getInstance(cache); while (!finish){ final String commandLine = reader.readLine(); logger.info("ip:" + socket.getLocalAddress() + " 指令:" + commandLine); if (commandLine == null || commandLine.trim().isEmpty()) { continue; } //使用指令工廠獲取指令實(shí)例 final Command command = commandFactory.getCommand(commandLine); command.execute(reader, writer); if (command.getType() == CommandType.END){ logger.info("請(qǐng)求關(guān)閉連接"); finish = true; } } } catch (IOException e) { e.printStackTrace(); logger.info("關(guān)閉來自" + socket.getLocalAddress() + "的連接"); } finally { try { if (socket != null){ socket.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
項(xiàng)目地址請(qǐng)戳這里,如果覺得還不錯(cuò)的話,希望能給個(gè)星哈><
參考資料
memcached官網(wǎng)
memcache協(xié)議
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。