這篇文章將為大家詳細(xì)講解有關(guān)基于Java實(shí)現(xiàn)多線程下載并允許斷點(diǎn)續(xù)傳的方法,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
專注于為中小企業(yè)提供網(wǎng)站建設(shè)、網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)青秀免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
Java是一門面向?qū)ο缶幊陶Z言,可以編寫桌面應(yīng)用程序、Web應(yīng)用程序、分布式系統(tǒng)和嵌入式系統(tǒng)應(yīng)用程序。
多線程下載及斷點(diǎn)續(xù)傳的實(shí)現(xiàn)是使用 HTTP/1.1 引入的 Range 請求參數(shù),可以訪問Web資源的指定區(qū)間的內(nèi)容。雖然實(shí)現(xiàn)了多線程及斷點(diǎn)續(xù)傳,但還有很多不完善的地方。
包含四個(gè)類:
Downloader: 主類,負(fù)責(zé)分配任務(wù)給各個(gè)子線程,及檢測進(jìn)度DownloadFile: 表示要下載的哪個(gè)文件,為了能寫輸入到文件的指定位置,使用 RandomAccessFile 類操作文件,多個(gè)線程寫同一個(gè)文件需要保證線程安全,這里直接調(diào)用 getChannel 方法,獲取一個(gè)文件通道,F(xiàn)ileChannel是線程安全的。DownloadTask: 實(shí)際執(zhí)行下載的線程,獲取 [lowerBound, upperBound] 區(qū)間的數(shù)據(jù),當(dāng)下載過程中出現(xiàn)異常時(shí)要通知其他線程(使用 AtomicBoolean),結(jié)束下載Logger: 實(shí)時(shí)記錄下載進(jìn)度,以便續(xù)傳時(shí)知道從哪開始。感覺這里做的比較差,為了能實(shí)時(shí)寫出日志及方便地使用Properties類的load/store方法格式化輸入輸出,每次都是打開后再關(guān)閉。
演示:
隨便找一個(gè)文件下載:
強(qiáng)行結(jié)束程序并重新運(yùn)行:
日志文件:
斷點(diǎn)續(xù)傳的關(guān)鍵是記錄各個(gè)線程的下載進(jìn)度,這里細(xì)節(jié)比較多,花了很久。只需要記錄每個(gè)線程請求的Range區(qū)間極客,每次成功寫數(shù)據(jù)到文件時(shí),就更新一次下載區(qū)間。下面是下載完成后的日志內(nèi)容。
代碼:
Downloader.java
package downloader; import java.io.*; import java.net.*; import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.atomic.AtomicBoolean; public class Downloader { private static final int DEFAULT_THREAD_COUNT = 4; // 默認(rèn)線程數(shù)量 private AtomicBoolean canceled; // 取消狀態(tài),如果有一個(gè)子線程出現(xiàn)異常,則取消整個(gè)下載任務(wù) private DownloadFile file; // 下載的文件對象 private String storageLocation; private final int threadCount; // 線程數(shù)量 private long fileSize; // 文件大小 private final String url; private long beginTime; // 開始時(shí)間 private Logger logger; public Downloader(String url) { this(url, DEFAULT_THREAD_COUNT); } public Downloader(String url, int threadCount) { this.url = url; this.threadCount = threadCount; this.canceled = new AtomicBoolean(false); this.storageLocation = url.substring(url.lastIndexOf('/')+1); this.logger = new Logger(storageLocation + ".log", url, threadCount); } public void start() { boolean reStart = Files.exists(Path.of(storageLocation + ".log")); if (reStart) { logger = new Logger(storageLocation + ".log"); System.out.printf("* 繼續(xù)上次下載進(jìn)度[已下載:%.2fMB]:%s\n", logger.getWroteSize() / 1014.0 / 1024, url); } else { System.out.println("* 開始下載:" + url); } if (-1 == (this.fileSize = getFileSize())) return; System.out.printf("* 文件大?。?.2fMB\n", fileSize / 1024.0 / 1024); this.beginTime = System.currentTimeMillis(); try { this.file = new DownloadFile(storageLocation, fileSize, logger); if (reStart) { file.setWroteSize(logger.getWroteSize()); } // 分配線程下載 dispatcher(reStart); // 循環(huán)打印進(jìn)度 printDownloadProgress(); } catch (IOException e) { System.err.println("x 創(chuàng)建文件失敗[" + e.getMessage() + "]"); } } /** * 分配器,決定每個(gè)線程下載哪個(gè)區(qū)間的數(shù)據(jù) */ private void dispatcher(boolean reStart) { long blockSize = fileSize / threadCount; // 每個(gè)線程要下載的數(shù)據(jù)量 long lowerBound = 0, upperBound = 0; long[][] bounds = null; int threadID = 0; if (reStart) { bounds = logger.getBounds(); } for (int i = 0; i < threadCount; i++) { if (reStart) { threadID = (int)(bounds[i][0]); lowerBound = bounds[i][1]; upperBound = bounds[i][2]; } else { threadID = i; lowerBound = i * blockSize; // fileSize-1 !!!!! fu.ck,找了一下午的錯 upperBound = (i == threadCount - 1) ? fileSize-1 : lowerBound + blockSize; } new DownloadTask(url, lowerBound, upperBound, file, canceled, threadID).start(); } } /** * 循環(huán)打印進(jìn)度,直到下載完畢,或任務(wù)被取消 */ private void printDownloadProgress() { long downloadedSize = file.getWroteSize(); int i = 0; long lastSize = 0; // 三秒前的下載量 while (!canceled.get() && downloadedSize < fileSize) { if (i++ % 4 == 3) { // 每3秒打印一次 System.out.printf("下載進(jìn)度:%.2f%%, 已下載:%.2fMB,當(dāng)前速度:%.2fMB/s\n", downloadedSize / (double)fileSize * 100 , downloadedSize / 1024.0 / 1024, (downloadedSize - lastSize) / 1024.0 / 1024 / 3); lastSize = downloadedSize; i = 0; } try { Thread.sleep(1000); } catch (InterruptedException ignore) {} downloadedSize = file.getWroteSize(); } file.close(); if (canceled.get()) { try { Files.delete(Path.of(storageLocation)); } catch (IOException ignore) { } System.err.println("x 下載失敗,任務(wù)已取消"); } else { System.out.println("* 下載成功,本次用時(shí)"+ (System.currentTimeMillis() - beginTime) / 1000 +"秒"); } } /** * @return 要下載的文件的尺寸 */ private long getFileSize() { if (fileSize != 0) { return fileSize; } HttpURLConnection conn = null; try { conn = (HttpURLConnection)new URL(url).openConnection(); conn.setConnectTimeout(3000); conn.setRequestMethod("HEAD"); conn.connect(); System.out.println("* 連接DownloadTask.java
package downloader; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.concurrent.atomic.AtomicBoolean; class DownloadTask extends Thread { private final String url; private long lowerBound; // 下載的文件區(qū)間 private long upperBound; private AtomicBoolean canceled; private DownloadFile downloadFile; private int threadId; DownloadTask(String url, long lowerBound, long upperBound, DownloadFile downloadFile, AtomicBoolean canceled, int threadID) { this.url = url; this.lowerBound = lowerBound; this.upperBound = upperBound; this.canceled = canceled; this.downloadFile = downloadFile; this.threadId = threadID; } @Override public void run() { ReadableByteChannel input = null; try { ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 2); // 2MB input = connect(); System.out.println("* [線程" + threadId + "]連接成功,開始下載..."); int len; while (!canceled.get() && lowerBound <= upperBound) { buffer.clear(); len = input.read(buffer); downloadFile.write(lowerBound, buffer, threadId, upperBound); lowerBound += len; } if (!canceled.get()) { System.out.println("* [線程" + threadId + "]下載完成" + ": " + lowerBound + "-" + upperBound); } } catch (IOException e) { canceled.set(true); System.err.println("x [線程" + threadId + "]遇到錯誤[" + e.getMessage() + "],結(jié)束下載"); } finally { if (input != null) { try { input.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 連接WEB服務(wù)器,并返回一個(gè)數(shù)據(jù)通道 * @return 返回通道 * @throws IOException 網(wǎng)絡(luò)連接錯誤 */ private ReadableByteChannel connect() throws IOException { HttpURLConnection conn = (HttpURLConnection)new URL(url).openConnection(); conn.setConnectTimeout(3000); conn.setRequestMethod("GET"); conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound); // System.out.println("thread_"+ threadId +": " + lowerBound + "-" + upperBound); conn.connect(); int statusCode = conn.getResponseCode(); if (HttpURLConnection.HTTP_PARTIAL != statusCode) { conn.disconnect(); throw new IOException("狀態(tài)碼錯誤:" + statusCode); } return Channels.newChannel(conn.getInputStream()); } }DownloadFile.java
package downloader; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicLong; class DownloadFile { private final RandomAccessFile file; private final FileChannel channel; // 線程安全類 private AtomicLong wroteSize; // 已寫入的長度 private Logger logger; DownloadFile(String fileName, long fileSize, Logger logger) throws IOException { this.wroteSize = new AtomicLong(0); this.logger = logger; this.file = new RandomAccessFile(fileName, "rw"); file.setLength(fileSize); channel = file.getChannel(); } /** * 寫數(shù)據(jù) * @param offset 寫偏移量 * @param buffer 數(shù)據(jù) * @throws IOException 寫數(shù)據(jù)出現(xiàn)異常 */ void write(long offset, ByteBuffer buffer, int threadID, long upperBound) throws IOException { buffer.flip(); int length = buffer.limit(); while (buffer.hasRemaining()) { channel.write(buffer, offset); } wroteSize.addAndGet(length); logger.updateLog(threadID, length, offset + length, upperBound); // 更新日志 } /** * @return 已經(jīng)下載的數(shù)據(jù)量,為了知道何時(shí)結(jié)束整個(gè)任務(wù),以及統(tǒng)計(jì)信息 */ long getWroteSize() { return wroteSize.get(); } // 繼續(xù)下載時(shí)調(diào)用 void setWroteSize(long wroteSize) { this.wroteSize.set(wroteSize); } void close() { try { file.close(); } catch (IOException e) { e.printStackTrace(); } } }Logger.java
package downloader; import java.io.*; import java.util.Properties; class Logger { private String logFileName; // 下載的文件的名字 private Properties log; /** * 重新開始下載時(shí),使用該構(gòu)造函數(shù) * @param logFileName */ Logger(String logFileName) { this.logFileName = logFileName; log = new Properties(); FileInputStream fin = null; try { log.load(new FileInputStream(logFileName)); } catch (IOException ignore) { } finally { try { fin.close(); } catch (Exception ignore) {} } } Logger(String logFileName, String url, int threadCount) { this.logFileName = logFileName; this.log = new Properties(); log.put("url", url); log.put("wroteSize", "0"); log.put("threadCount", String.valueOf(threadCount)); for (int i = 0; i < threadCount; i++) { log.put("thread_" + i, "0-0"); } } synchronized void updateLog(int threadID, long length, long lowerBound, long upperBound) { log.put("thread_"+threadID, lowerBound + "-" + upperBound); log.put("wroteSize", String.valueOf(length + Long.parseLong(log.getProperty("wroteSize")))); FileOutputStream file = null; try { file = new FileOutputStream(logFileName); // 每次寫時(shí)都清空文件 log.store(file, null); } catch (IOException e) { e.printStackTrace(); } finally { if (file != null) { try { file.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 獲取區(qū)間信息 * ret[i][0] = threadID, ret[i][1] = lowerBoundID, ret[i][2] = upperBoundID * @return */ long[][] getBounds() { long[][] bounds = new long[Integer.parseInt(log.get("threadCount").toString())][3]; int[] index = {0}; log.forEach((k, v) -> { String key = k.toString(); if (key.startsWith("thread_")) { String[] interval = v.toString().split("-"); bounds[index[0]][0] = Long.parseLong(key.substring(key.indexOf("_") + 1)); bounds[index[0]][1] = Long.parseLong(interval[0]); bounds[index[0]++][2] = Long.parseLong(interval[1]); } }); return bounds; } long getWroteSize() { return Long.parseLong(log.getProperty("wroteSize")); } }關(guān)于“基于Java實(shí)現(xiàn)多線程下載并允許斷點(diǎn)續(xù)傳的方法”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
名稱欄目:基于Java實(shí)現(xiàn)多線程下載并允許斷點(diǎn)續(xù)傳的方法
本文鏈接:http://weahome.cn/article/jehhce.html