這篇文章主要介紹了java如何使用多線程讀取超大文件,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
公司主營(yíng)業(yè)務(wù):網(wǎng)站設(shè)計(jì)、做網(wǎng)站、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。成都創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。成都創(chuàng)新互聯(lián)公司推出壽縣免費(fèi)做網(wǎng)站回饋大家。
基本思路如下:
1.計(jì)算出文件總大小
2.分段處理,計(jì)算出每個(gè)線程讀取文件的開始與結(jié)束位置
(文件大小/線程數(shù))*N,N是指第幾個(gè)線程,這樣能得到每個(gè)線程在讀該文件的大概起始位置
使用"大概起始位置",作為讀文件的開始偏移量(fileChannel.position("大概起始位置")),來(lái)讀取該文件,直到讀到第一個(gè)換行符,記錄下這個(gè)換行符的位置,作為該線程的準(zhǔn)確起 始位置.同時(shí)它也是上一個(gè)線程的結(jié)束位置.最后一個(gè)線程的結(jié)束位置也直接設(shè)置為-1
3.啟動(dòng)線程,每個(gè)線程從開始位置讀取到結(jié)束位置為止
代碼如下:
讀文件工具類
import java.io.*;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.util.Observable; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午3:12 * 讀取文件 */public class ReadFile extends Observable { private int bufSize = 1024; // 換行符 private byte key = "\n".getBytes()[0]; // 當(dāng)前行數(shù) private long lineNum = 0; // 文件編碼,默認(rèn)為gb2312 private String encode = "gb2312"; // 具體業(yè)務(wù)邏輯監(jiān)聽器 private ReaderFileListener readerListener; public void setEncode(String encode) { this.encode = encode; } public void setReaderListener(ReaderFileListener readerListener) { this.readerListener = readerListener; } /** * 獲取準(zhǔn)確開始位置 * @param file * @param position * @return * @throws Exception */ public long getStartNum(File file, long position) throws Exception { long startNum = position; FileChannel fcin = new RandomAccessFile(file, "r").getChannel(); fcin.position(position); try { int cache = 1024; ByteBuffer rBuffer = ByteBuffer.allocate(cache); // 每次讀取的內(nèi)容 byte[] bs = new byte[cache]; // 緩存 byte[] tempBs = new byte[0]; String line = ""; while (fcin.read(rBuffer) != -1) { int rSize = rBuffer.position(); rBuffer.rewind(); rBuffer.get(bs); rBuffer.clear(); byte[] newStrByte = bs; // 如果發(fā)現(xiàn)有上次未讀完的緩存,則將它加到當(dāng)前讀取的內(nèi)容前面 if (null != tempBs) { int tL = tempBs.length; newStrByte = new byte[rSize + tL]; System.arraycopy(tempBs, 0, newStrByte, 0, tL); System.arraycopy(bs, 0, newStrByte, tL, rSize); } // 獲取開始位置之后的第一個(gè)換行符 int endIndex = indexOf(newStrByte, 0); if (endIndex != -1) { return startNum + endIndex; } tempBs = substring(newStrByte, 0, newStrByte.length); startNum += 1024; } } catch (Exception e) { e.printStackTrace(); } finally { fcin.close(); } return position; } /** * 從設(shè)置的開始位置讀取文件,一直到結(jié)束為止。如果 end設(shè)置為負(fù)數(shù),剛讀取到文件末尾 * @param fullPath * @param start * @param end * @throws Exception */ public void readFileByLine(String fullPath, long start, long end) throws Exception { File fin = new File(fullPath); if (fin.exists()) { FileChannel fcin = new RandomAccessFile(fin, "r").getChannel(); fcin.position(start); try { ByteBuffer rBuffer = ByteBuffer.allocate(bufSize); // 每次讀取的內(nèi)容 byte[] bs = new byte[bufSize]; // 緩存 byte[] tempBs = new byte[0]; String line = ""; // 當(dāng)前讀取文件位置 long nowCur = start; while (fcin.read(rBuffer) != -1) { nowCur += bufSize; int rSize = rBuffer.position(); rBuffer.rewind(); rBuffer.get(bs); rBuffer.clear(); byte[] newStrByte = bs; // 如果發(fā)現(xiàn)有上次未讀完的緩存,則將它加到當(dāng)前讀取的內(nèi)容前面 if (null != tempBs) { int tL = tempBs.length; newStrByte = new byte[rSize + tL]; System.arraycopy(tempBs, 0, newStrByte, 0, tL); System.arraycopy(bs, 0, newStrByte, tL, rSize); } // 是否已經(jīng)讀到最后一位 boolean isEnd = false; // 如果當(dāng)前讀取的位數(shù)已經(jīng)比設(shè)置的結(jié)束位置大的時(shí)候,將讀取的內(nèi)容截取到設(shè)置的結(jié)束位置 if (end > 0 && nowCur > end) { // 緩存長(zhǎng)度 - 當(dāng)前已經(jīng)讀取位數(shù) - 最后位數(shù) int l = newStrByte.length - (int) (nowCur - end); newStrByte = substring(newStrByte, 0, l); isEnd = true; } int fromIndex = 0; int endIndex = 0; // 每次讀一行內(nèi)容,以 key(默認(rèn)為\n) 作為結(jié)束符 while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) { byte[] bLine = substring(newStrByte, fromIndex, endIndex); line = new String(bLine, 0, bLine.length, encode); lineNum++; // 輸出一行內(nèi)容,處理方式由調(diào)用方提供 readerListener.outLine(line.trim(), lineNum, false); fromIndex = endIndex + 1; } // 將未讀取完成的內(nèi)容放到緩存中 tempBs = substring(newStrByte, fromIndex, newStrByte.length); if (isEnd) { break; } } // 將剩下的最后內(nèi)容作為一行,輸出,并指明這是最后一行 String lineStr = new String(tempBs, 0, tempBs.length, encode); readerListener.outLine(lineStr.trim(), lineNum, true); } catch (Exception e) { e.printStackTrace(); } finally { fcin.close(); } } else { throw new FileNotFoundException("沒(méi)有找到文件:" + fullPath); } // 通知觀察者,當(dāng)前工作已經(jīng)完成 setChanged(); notifyObservers(start+"-"+end); } /** * 查找一個(gè)byte[]從指定位置之后的一個(gè)換行符位置 * * @param src * @param fromIndex * @return * @throws Exception */ private int indexOf(byte[] src, int fromIndex) throws Exception { for (int i = fromIndex; i < src.length; i++) { if (src[i] == key) { return i; } } return -1; } /** * 從指定開始位置讀取一個(gè)byte[]直到指定結(jié)束位置為止生成一個(gè)全新的byte[] * * @param src * @param fromIndex * @param endIndex * @return * @throws Exception */ private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception { int size = endIndex - fromIndex; byte[] ret = new byte[size]; System.arraycopy(src, fromIndex, ret, 0, size); return ret; } }
讀文件線程
/** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午4:50 * To change this template use File | Settings | File Templates. */public class ReadFileThread extends Thread { private ReaderFileListener processPoiDataListeners; private String filePath; private long start; private long end; public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) { this.setName(this.getName()+"-ReadFileThread"); this.start = start; this.end = end; this.filePath = file; this.processPoiDataListeners = processPoiDataListeners; } @Override public void run() { ReadFile readFile = new ReadFile(); readFile.setReaderListener(processPoiDataListeners); readFile.setEncode(processPoiDataListeners.getEncode());// readFile.addObserver(); try { readFile.readFileByLine(filePath, start, end + 1); } catch (Exception e) { e.printStackTrace(); } }}
具體業(yè)務(wù)邏輯監(jiān)聽
/** * Created with Okey * User: Okey * Date: 13-3-14 * Time: 下午3:19 * NIO逐行讀數(shù)據(jù)回調(diào)方法 */public abstract class ReaderFileListener { // 一次讀取行數(shù),默認(rèn)為500 private int readColNum = 500; private String encode; private List
線程調(diào)度
import java.io.File;import java.io.FileInputStream;import java.io.IOException; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-1 * Time: 下午6:03 * To change this template use File | Settings | File Templates. */public class BuildData { public static void main(String[] args) throws Exception { File file = new File("E:\\1396341974289.csv"); FileInputStream fis = null; try { ReadFile readFile = new ReadFile(); fis = new FileInputStream(file); int available = fis.available(); int maxThreadNum = 50; // 線程粗略開始位置 int i = available / maxThreadNum; for (int j = 0; j < maxThreadNum; j++) { // 計(jì)算精確開始位置 long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j); long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2; // 具體監(jiān)聽實(shí)現(xiàn) ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk"); new ReadFileThread(listeners, startNum, endNum, file.getPath()).start(); } } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }}
現(xiàn)在就可以盡情的調(diào)整maxThreadNum來(lái)享受風(fēng)一般的速度吧!
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“java如何使用多線程讀取超大文件”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!