最近在寫(xiě)一個(gè)FTP上傳工具,用到了Apache的FTPClient,為了提高上傳效率,我采用了多線(xiàn)程的方式,但是每個(gè)線(xiàn)程頻繁的創(chuàng)建和銷(xiāo)毀FTPClient對(duì)象勢(shì)必會(huì)造成不必要的開(kāi)銷(xiāo),因此,此處最好使用一個(gè)FTPClient連接池。仔細(xì)翻了一下Apache的api,發(fā)現(xiàn)它并沒(méi)有一個(gè)FTPClientPool的實(shí)現(xiàn),所以,不得不自己寫(xiě)一個(gè)FTPClientPool。下面就大體介紹一下開(kāi)發(fā)連接池的整個(gè)過(guò)程,供大家參考。
創(chuàng)新互聯(lián)是一家專(zhuān)業(yè)提供千山企業(yè)網(wǎng)站建設(shè),專(zhuān)注與成都做網(wǎng)站、成都網(wǎng)站建設(shè)、H5高端網(wǎng)站建設(shè)、小程序制作等業(yè)務(wù)。10年已為千山眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專(zhuān)業(yè)的建站公司優(yōu)惠進(jìn)行中。
關(guān)于對(duì)象池
有些對(duì)象的創(chuàng)建開(kāi)銷(xiāo)是比較大的,比如數(shù)據(jù)庫(kù)連接等。為了減少頻繁創(chuàng)建、銷(xiāo)毀對(duì)象帶來(lái)的性能消耗,我們可以利用對(duì)象池的技術(shù)來(lái)實(shí)現(xiàn)對(duì)象的復(fù)用。對(duì)象池提供了一種機(jī)制,它可以管理對(duì)象池中對(duì)象的生命周期,提供了獲取和釋放對(duì)象的方法,可以讓客戶(hù)端很方便的使用對(duì)象池中的對(duì)象。
如果我們要自己實(shí)現(xiàn)一個(gè)對(duì)象池,一般需要完成如下功能:
1. 如果池中有可用的對(duì)象,對(duì)象池應(yīng)當(dāng)能返回給客戶(hù)端
2. 客戶(hù)端把對(duì)象放回池里后,可以對(duì)這些對(duì)象進(jìn)行重用
3. 對(duì)象池能夠創(chuàng)建新的對(duì)象來(lái)滿(mǎn)足客戶(hù)端不斷增長(zhǎng)的需求
4. 需要有一個(gè)正確關(guān)閉池的機(jī)制來(lái)結(jié)束對(duì)象的生命周期
Apache的對(duì)象池工具包
為了方便我們開(kāi)發(fā)自己的對(duì)象池,Apache 提供的common-pool工具包,里面包含了開(kāi)發(fā)通用對(duì)象池的一些接口和實(shí)現(xiàn)類(lèi)。其中最基本的兩個(gè)接口是ObjectPool 和PoolableObjectFactory。
ObjectPool接口中有幾個(gè)最基本的方法:
1. addObject() : 添加對(duì)象到池
2. borrowObject():客戶(hù)端從池中借出一個(gè)對(duì)象
3. returnObject():客戶(hù)端歸還一個(gè)對(duì)象到池中
4. close():關(guān)閉對(duì)象池,清理內(nèi)存釋放資源等
5. setFactory(ObjectFactory factory):需要一個(gè)工廠(chǎng)來(lái)制造池中的對(duì)象
PoolableObjectFactory接口中幾個(gè)最基本的方法:
1. makeObject():制造一個(gè)對(duì)象
2. destoryObject():銷(xiāo)毀一個(gè)對(duì)象
3. validateObject():驗(yàn)證一個(gè)對(duì)象是否還可用
通過(guò)以上兩個(gè)接口我們就可以自己實(shí)現(xiàn)一個(gè)對(duì)象池了。
實(shí)例:開(kāi)發(fā)一個(gè)FTPClient對(duì)象池
最近在開(kāi)發(fā)一個(gè)項(xiàng)目,需要把hdfs中的文件上傳到一組ftp服務(wù)器,為了提高上傳效率,自然考慮到使用多線(xiàn)程的方式進(jìn)行上傳。我上傳ftp用的工具是Apache common-net包中的FTPClient,但Apache并沒(méi)有提供FTPClientPool,于是為了減少FTPClient的創(chuàng)建銷(xiāo)毀次數(shù),我們就自己開(kāi)發(fā)一個(gè)FTPClientPool來(lái)復(fù)用FTPClient連接。
通過(guò)上面的介紹,我們可以利用Apache提供的common-pool包來(lái)協(xié)助我們開(kāi)發(fā)連接池。而開(kāi)發(fā)一個(gè)簡(jiǎn)單的對(duì)象池,僅需要實(shí)現(xiàn)common-pool 包中的ObjectPool和PoolableObjectFactory兩個(gè)接口即可。下面就看一下我寫(xiě)的實(shí)現(xiàn):
寫(xiě)一個(gè)ObjectPool接口的實(shí)現(xiàn)FTPClientPool
import java.io.IOException; import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.PoolableObjectFactory; /** * 實(shí)現(xiàn)了一個(gè)FTPClient連接池 * @author heaven */ public class FTPClientPool implements ObjectPool{ private static final int DEFAULT_POOL_SIZE = 10; private final BlockingQueue pool; private final FtpClientFactory factory; /** * 初始化連接池,需要注入一個(gè)工廠(chǎng)來(lái)提供FTPClient實(shí)例 * @param factory * @throws Exception */ public FTPClientPool(FtpClientFactory factory) throws Exception{ this(DEFAULT_POOL_SIZE, factory); } /** * * @param maxPoolSize * @param factory * @throws Exception */ public FTPClientPool(int poolSize, FtpClientFactory factory) throws Exception { this.factory = factory; pool = new ArrayBlockingQueue (poolSize*2); initPool(poolSize); } /** * 初始化連接池,需要注入一個(gè)工廠(chǎng)來(lái)提供FTPClient實(shí)例 * @param maxPoolSize * @throws Exception */ private void initPool(int maxPoolSize) throws Exception { for(int i=0;i factory) throws IllegalStateException, UnsupportedOperationException { } }
再寫(xiě)一個(gè)PoolableObjectFactory接口的實(shí)現(xiàn)FTPClientFactory
import java.io.IOException; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.pool.PoolableObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.hdfstoftp.util.FTPClientException; /** * FTPClient工廠(chǎng)類(lèi),通過(guò)FTPClient工廠(chǎng)提供FTPClient實(shí)例的創(chuàng)建和銷(xiāo)毀 * @author heaven */ public class FtpClientFactory implements PoolableObjectFactory{ private static Logger logger = LoggerFactory.getLogger("file"); private FTPClientConfigure config; //給工廠(chǎng)傳入一個(gè)參數(shù)對(duì)象,方便配置FTPClient的相關(guān)參數(shù) public FtpClientFactory(FTPClientConfigure config){ this.config=config; } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#makeObject() */ public FTPClient makeObject() throws Exception { FTPClient ftpClient = new FTPClient(); ftpClient.setConnectTimeout(config.getClientTimeout()); try { ftpClient.connect(config.getHost(), config.getPort()); int reply = ftpClient.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftpClient.disconnect(); logger.warn("FTPServer refused connection"); return null; } boolean result = ftpClient.login(config.getUsername(), config.getPassword()); if (!result) { throw new FTPClientException("ftpClient登陸失敗! userName:" + config.getUsername() + " ; password:" + config.getPassword()); } ftpClient.setFileType(config.getTransferFileType()); ftpClient.setBufferSize(1024); ftpClient.setControlEncoding(config.getEncoding()); if (config.getPassiveMode().equals("true")) { ftpClient.enterLocalPassiveMode(); } } catch (IOException e) { e.printStackTrace(); } catch (FTPClientException e) { e.printStackTrace(); } return ftpClient; } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) */ public void destroyObject(FTPClient ftpClient) throws Exception { try { if (ftpClient != null && ftpClient.isConnected()) { ftpClient.logout(); } } catch (IOException io) { io.printStackTrace(); } finally { // 注意,一定要在finally代碼中斷開(kāi)連接,否則會(huì)導(dǎo)致占用ftp連接情況 try { ftpClient.disconnect(); } catch (IOException io) { io.printStackTrace(); } } } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) */ public boolean validateObject(FTPClient ftpClient) { try { return ftpClient.sendNoOp(); } catch (IOException e) { throw new RuntimeException("Failed to validate client: " + e, e); } } public void activateObject(FTPClient ftpClient) throws Exception { } public void passivateObject(FTPClient ftpClient) throws Exception { } }
最后,我們最好給工廠(chǎng)傳遞一個(gè)參數(shù)對(duì)象,方便我們?cè)O(shè)置FTPClient的一些參數(shù)
package org.apache.commons.pool.impl.contrib; /** * FTPClient配置類(lèi),封裝了FTPClient的相關(guān)配置 * * @author heaven */ public class FTPClientConfigure { private String host; private int port; private String username; private String password; private String passiveMode; private String encoding; private int clientTimeout; private int threadNum; private int transferFileType; private boolean renameUploaded; private int retryTimes; public String getHost() { return host; } public void setHost(String host) { this. host = host; } public int getPort() { return port; } public void setPort(int port) { this. port = port; } public String getUsername() { return username; } public void setUsername(String username) { this. username = username; } public String getPassword() { return password; } public void setPassword(String password) { this. password = password; } public String getPassiveMode() { return passiveMode; } public void setPassiveMode(String passiveMode) { this. passiveMode = passiveMode; } public String getEncoding() { return encoding; } public void setEncoding(String encoding) { this. encoding = encoding; } public int getClientTimeout() { return clientTimeout; } public void setClientTimeout( int clientTimeout) { this. clientTimeout = clientTimeout; } public int getThreadNum() { return threadNum; } public void setThreadNum( int threadNum) { this. threadNum = threadNum; } public int getTransferFileType() { return transferFileType; } public void setTransferFileType( int transferFileType) { this. transferFileType = transferFileType; } public boolean isRenameUploaded() { return renameUploaded; } public void setRenameUploaded( boolean renameUploaded) { this. renameUploaded = renameUploaded; } public int getRetryTimes() { return retryTimes; } public void setRetryTimes( int retryTimes) { this. retryTimes = retryTimes; } @Override public String toString() { return "FTPClientConfig [host=" + host + "\n port=" + port + "\n username=" + username + "\n password=" + password + "\n passiveMode=" + passiveMode + "\n encoding=" + encoding + "\n clientTimeout=" + clientTimeout + "\n threadNum=" + threadNum + "\n transferFileType=" + transferFileType + "\n renameUploaded=" + renameUploaded + "\n retryTimes=" + retryTimes + "]" ; } }
FTPClientPool連接池類(lèi)管理FTPClient對(duì)象的生命周期,負(fù)責(zé)對(duì)象的借出、規(guī)劃、池的銷(xiāo)毀等;FTPClientPool類(lèi)依賴(lài)于FtpClientFactory類(lèi),由這個(gè)工程類(lèi)來(lái)制造和銷(xiāo)毀對(duì)象;FtpClientFactory又依賴(lài)FTPClientConfigure類(lèi),F(xiàn)TPClientConfigure負(fù)責(zé)封裝FTPClient的配置參數(shù)。至此,我們的FTPClient連接池就開(kāi)發(fā)完成了。
需要注意的是,F(xiàn)TPClientPool中用到了一個(gè)阻塞隊(duì)列ArrayBlockingQueue來(lái)管理存放FTPClient對(duì)象,關(guān)于阻塞隊(duì)列,請(qǐng)參考我的這篇文章: 【Java并發(fā)之】BlockingQueue
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。