這篇文章主要講解了“怎么用Java高效讀取大文件”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么用Java高效讀取大文件”吧!
創(chuàng)新互聯(lián)是一家專業(yè)提供大慶企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站建設(shè)、網(wǎng)站設(shè)計、H5開發(fā)、小程序制作等業(yè)務(wù)。10年已為大慶眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。
內(nèi)存讀取
第一個版本,阿粉采用內(nèi)存讀取的方式,所有的數(shù)據(jù)首先讀讀取到內(nèi)存中,程序代碼如下:
Stopwatch stopwatch = Stopwatch.createStarted(); // 將全部行數(shù)讀取的內(nèi)存中 Listlines = FileUtils.readLines(new File("temp/test.txt"), Charset.defaultCharset()); for (String line : lines) { // pass } stopwatch.stop(); System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s"); // 計算內(nèi)存占用 logMemory();
logMemory方法如下:
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); //堆內(nèi)存使用情況 MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage(); //初始的總內(nèi)存 long totalMemorySize = memoryUsage.getInit(); //已使用的內(nèi)存 long usedMemorySize = memoryUsage.getUsed(); System.out.println("Total Memory: " + totalMemorySize / (1024 * 1024) + " Mb"); System.out.println("Free Memory: " + usedMemorySize / (1024 * 1024) + " Mb");
上述程序中,阿粉使用 Apache Common-Io 開源第三方庫,F(xiàn)ileUtils#readLines將會把文件中所有內(nèi)容,全部讀取到內(nèi)存中。
這個程序簡單測試并沒有什么問題,但是等拿到真正的數(shù)據(jù)文件,運(yùn)行程序,很快程序發(fā)生了 OOM。
之所以會發(fā)生 OOM,主要原因是因?yàn)檫@個數(shù)據(jù)文件太大。假設(shè)上面測試文件 test.txt總共有 200W 行數(shù)據(jù),文件大小為:740MB。
通過上述程序讀取到內(nèi)存之后,在我的電腦上內(nèi)存占用情況如下:
可以看到一個實(shí)際大小為 700 多 M 的文件,讀到內(nèi)存中占用內(nèi)存量為 1.5G 之多。而我之前的程序,虛擬機(jī)設(shè)置內(nèi)存大小只有 1G,所以程序發(fā)生了 OOM。
當(dāng)然這里最簡單的辦法就是加內(nèi)存唄,將虛擬機(jī)內(nèi)存設(shè)置到 2G,甚至更多。不過機(jī)器內(nèi)存始終有限,如果文件更大,還是沒有辦法全部都加載到內(nèi)存。
不過仔細(xì)一想真的需要將全部數(shù)據(jù)一次性加載到內(nèi)存中?
很顯然,不需要!
在上述的場景中,我們將數(shù)據(jù)到加載內(nèi)存中,最后不還是一條條處理數(shù)據(jù)。
所以下面我們將讀取方式修改成逐行讀取。
逐行讀取
逐行讀取的方式比較多,這里阿粉主要介紹兩種方式:
BufferReader
Apache Commons IO
Java8 stream
BufferReader
我們可以使用 BufferReader#readLine 逐行讀取數(shù)據(jù)。
try (BufferedReader fileBufferReader = new BufferedReader(new FileReader("temp/test.txt"))) { String fileLineContent; while ((fileLineContent = fileBufferReader.readLine()) != null) { // process the line. } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
Apache Commons IOCommon-IO
中有一個方法 FileUtils#lineIterator可以實(shí)現(xiàn)逐行讀取方式,使用代碼如下:
Stopwatch stopwatch = Stopwatch.createStarted(); LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name()); while (fileContents.hasNext()) { fileContents.nextLine(); // pass } logMemory(); fileContents.close(); stopwatch.stop(); System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");
這個方法返回一個迭代器,每次我們都可以獲取的一行數(shù)據(jù)。
其實(shí)我們查看代碼,其實(shí)可以發(fā)現(xiàn) FileUtils#lineIterator,其實(shí)用的就是 BufferReader,感興趣的同學(xué)可以自己查看一下源碼。
由于公號內(nèi)無法插入外鏈,關(guān)注『Java極客技術(shù)』,回復(fù)『20200610』 獲取源碼
Java8 stream
Java8 Files 類新增了一個 lines,可以返回 Stream我們可以逐行處理數(shù)據(jù)。
Stopwatch stopwatch = Stopwatch.createStarted(); // lines(Path path, Charset cs) try (StreaminputStream = Files.lines(Paths.get("temp/test.txt"), StandardCharsets.UTF_8)) { inputStream .filter(str -> str.length() > 5)// 過濾數(shù)據(jù) .forEach(o -> { // pass do sample logic }); } logMemory(); stopwatch.stop(); System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");
使用這個方法有個好處在于,我們可以方便使用 Stream 鏈?zhǔn)讲僮?,做一些過濾操作。
注意:這里我們使用 try-with-resources 方式,可以安全的確保讀取結(jié)束,流可以被安全的關(guān)閉。
并發(fā)讀取
逐行的讀取的方式,解決我們 OOM 的問題。不過如果數(shù)據(jù)很多,我們這樣一行行處理,需要花費(fèi)很多時間。
上述的方式,只有一個線程在處理數(shù)據(jù),那其實(shí)我們可以多來幾個線程,增加并行度。
下面在上面的基礎(chǔ)上,阿粉就拋磚引玉,介紹下阿粉自己比較常用兩種并行處理方式。
逐行批次打包
第一種方式,先逐行讀取數(shù)據(jù),加載到內(nèi)存中,等到積累一定數(shù)據(jù)之后,然后再交給線程池異步處理。
@SneakyThrows public static void readInApacheIOWithThreadPool() { // 創(chuàng)建一個 最大線程數(shù)為 10,隊列最大數(shù)為 100 的線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); // 使用 Apache 的方式逐行讀取數(shù)據(jù) LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name()); Listlines = Lists.newArrayList(); while (fileContents.hasNext()) { String nextLine = fileContents.nextLine(); lines.add(nextLine); // 讀取到十萬的時候 if (lines.size() == 100000) { // 拆分成兩個 50000 ,交給異步線程處理 List > partition = Lists.partition(lines, 50000); List
futureList = Lists.newArrayList(); for (List strings : partition) { Future> future = threadPoolExecutor.submit(() -> { processTask(strings); }); futureList.add(future); } // 等待兩個線程將任務(wù)執(zhí)行結(jié)束之后,再次讀取數(shù)據(jù)。這樣的目的防止,任務(wù)過多,加載的數(shù)據(jù)過多,導(dǎo)致 OOM for (Future future : futureList) { // 等待執(zhí)行結(jié)束 future.get(); } // 清除內(nèi)容 lines.clear(); } } // lines 若還有剩余,繼續(xù)執(zhí)行結(jié)束 if (!lines.isEmpty()) { // 繼續(xù)執(zhí)行 processTask(lines); } threadPoolExecutor.shutdown(); } private static void processTask(List strings) { for (String line : strings) { // 模擬業(yè)務(wù)執(zhí)行 try { TimeUnit.MILLISECONDS.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } } }
上述方法,等到內(nèi)存的數(shù)據(jù)到達(dá) 10000 的時候,拆封兩個任務(wù)交給異步線程執(zhí)行,每個任務(wù)分別處理 50000 行數(shù)據(jù)。
后續(xù)使用 future#get(),等待異步線程執(zhí)行完成之后,主線程才能繼續(xù)讀取數(shù)據(jù)。
之所以這么做,主要原因是因?yàn)?,線程池的任務(wù)過多,再次導(dǎo)致 OOM 的問題。
大文件拆分成小文件第二種方式,首先我們將一個大文件拆分成幾個小文件,然后使用多個異步線程分別逐行處理數(shù)據(jù)。
public static void splitFileAndRead() throws Exception { // 先將大文件拆分成小文件 ListfileList = splitLargeFile("temp/test.txt"); // 創(chuàng)建一個 最大線程數(shù)為 10,隊列最大數(shù)為 100 的線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); List futureList = Lists.newArrayList(); for (File file : fileList) { Future> future = threadPoolExecutor.submit(() -> { try (Stream inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8)) { inputStream.forEach(o -> { // 模擬執(zhí)行業(yè)務(wù) try { TimeUnit.MILLISECONDS.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } }); futureList.add(future); } for (Future future : futureList) { // 等待所有任務(wù)執(zhí)行結(jié)束 future.get(); } threadPoolExecutor.shutdown(); } private static List splitLargeFile(String largeFileName) throws IOException { LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name()); List lines = Lists.newArrayList(); // 文件序號 int num = 1; List files = Lists.newArrayList(); while (fileContents.hasNext()) { String nextLine = fileContents.nextLine(); lines.add(nextLine); // 每個文件 10w 行數(shù)據(jù) if (lines.size() == 100000) { createSmallFile(lines, num, files); num++; } } // lines 若還有剩余,繼續(xù)執(zhí)行結(jié)束 if (!lines.isEmpty()) { // 繼續(xù)執(zhí)行 createSmallFile(lines, num, files); } return files; }
上述方法,首先將一個大文件拆分成多個保存 10W 行的數(shù)據(jù)的小文件,然后再將小文件交給線程池異步處理。
由于這里的異步線程每次都是逐行從小文件的讀取數(shù)據(jù),所以這種方式不用像上面方法一樣擔(dān)心 OOM 的問題。
另外,上述我們使用 Java 代碼,將大文件拆分成小文件。這里阿粉還有一個簡單的辦法,我們可以直接使用下述命令,直接將大文件拆分成小文件:
# 將大文件拆分成 100000 的小文件 split -l 100000 test.txt
后續(xù) Java 代碼只需要直接讀取小文件即可。
總結(jié)當(dāng)我們從文件讀取數(shù)據(jù)時,如果文件不是很大,我們可以考慮一次性讀取到內(nèi)存中,然后快速處理。
如果文件過大,我們就沒辦法一次性加載到內(nèi)存中,所以我們需要考慮逐行讀取,然后處理數(shù)據(jù)。但是單線程處理數(shù)據(jù)畢竟有限,所以我們考慮使用多線程,加快處理數(shù)據(jù)。
感謝各位的閱讀,以上就是“怎么用Java高效讀取大文件”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對怎么用Java高效讀取大文件這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!