流處理方式
分片處理
去年的面試中我被問到超大文件你怎么處理,這個問題確實當(dāng)時沒多想,回來之后仔細(xì)研究和討論了下這個問題,對大文件讀取做了一個分析
比如我們有一個log文件,運行了幾年,有100G之大。按照我們之前的操作可能代碼會這樣寫:
func ReadFile(filePath string) []byte{ content, err := ioutil.ReadFile(filePath) if err != nil { log.Println("Read error") } return content }
上面的代碼讀取幾兆的文件可以,但是如果大于你本身及其內(nèi)存,那就直接翻車了。因為上面的代碼,是把文件所有的內(nèi)容全部都讀取到內(nèi)存之后返回,幾兆的文件,你內(nèi)存夠大可以處理,但是一旦上幾百兆的文件,就沒那么好處理了。
那么,正確的方法有兩種
func ReadFile(filePath string, handle func(string)) error { f, err := os.Open(filePath) defer f.Close() if err != nil { return err } buf := bufio.NewReader(f) for { line, err := buf.ReadLine("\n") line = strings.TrimSpace(line) handle(line) if err != nil { if err == io.EOF{ return nil } return err } return nil } }
當(dāng)讀取的是二進(jìn)制文件,沒有換行符的時候,使用下面的方案一樣處理大文件
func ReadBigFile(fileName string, handle func([]byte)) error { f, err := os.Open(fileName) if err != nil { fmt.Println("can't opened this file") return err } defer f.Close() s := make([]byte, 4096) for { switch nr, err := f.Read(s[:]); true { case nr 0: fmt.Fprintf(os.Stderr, "cat: error reading: %s\n
補(bǔ)充:golang 讀取大文件處理sync.pool + bufio.NewReader(f)
文件大小
package main import ( "bufio" "fmt" "io" //"math" "os" "strings" "sync" "time" ) func main() { /* 文件數(shù)據(jù)樣例 {"remark": "來電時間: 2021/04/15 13:52:07客戶電話:13913xx39xx ", "no": "600020510132021101310210547639", "title": "b-ae0e-0242ac100907", "call_in_date": "2021-04-15 13:52:12", "name": "張三", "_date": "2021-06-15", "name": "張三", "meet": "1"} 1、我們?nèi)〕?call_in_date": "2021-04-15 13:52:1的數(shù)據(jù)寫入另一個文件 */ var ( s time.Time //當(dāng)前時間 file *os.File fileStat os.FileInfo err error lastLineSize int64 ) s = time.Now() if file, err = os.Open("/Users/zhangsan/Downloads/log.txt");err != nil{ fmt.Println(err) } defer func() { err = file.Close() //close after checking err }() //queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg) //if err != nil { // fmt.Println("Could not able to parse the start time", startTimeArg) // return //} // //queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg) //if err != nil { // fmt.Println("Could not able to parse the finish time", finishTimeArg) // return //} /** * {name:"log.log", size:911100961, mode:0x1a4, modTime:time.Time{wall:0x656c25c, ext:63742660691, loc:(*time.Location)(0x1192c80)}, sys:syscall.Stat_t{Dev:16777220, Mode:0x81a4, Nlink:0x1, Ino:0x118cba7, Uid:0x1f5, Gid:0x14, Rdev:0, Pad_cgo_0:[4]uint8{0x0, 0x0, 0x0, 0x0}, Atimespec:syscall.Timespec{Sec:1607063899, Nsec:977970393}, Mtimespec:syscall.Timespec{Sec:1607063891, Nsec:106349148}, Ctimespec:syscall.Timespec{Sec:1607063891, Nsec:258847043}, Birthtimespec:syscall.Timespec{Sec:1607063883, Nsec:425808150}, Size:911100961, Blocks:1784104, Blksize:4096, Flags:0x0, Gen:0x0, Lspare:0, Qspare:[2]int64{0, 0}} * */ if fileStat, err = file.Stat();err != nil { return } fileSize := fileStat.Size()//72849354767 offset := fileSize - 1 //檢測是不是都是空行 只有\(zhòng)n for { var ( b []byte n int char string ) b = make([]byte, 1) //從指定位置讀取 if n, err = file.ReadAt(b, offset);err != nil { fmt.Println("Error reading file ", err) break } char = string(b[0]) if char == "\n" { break } offset-- //獲取一行的大小 lastLineSize += int64(n) } var ( lastLine []byte logSlice []string logSlice1 []string ) //初始化一行大小的空間 lastLine = make([]byte, lastLineSize) _, err = file.ReadAt(lastLine, offset) if err != nil { fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize) return } //根據(jù)條件進(jìn)行區(qū)分 logSlice = strings.Split(strings.Trim(string(lastLine),"\n"),"next_pay_date") logSlice1 = strings.Split(logSlice[1],"\"") if logSlice1[2] == "2021-06-15"{ Process(file) } fmt.Println("\nTime taken - ", time.Since(s)) fmt.Println(err) } func Process(f *os.File) error { //讀取數(shù)據(jù)的key,減小gc壓力 linesPool := sync.Pool{New: func() interface{} { lines := make([]byte, 250*1024) return lines }} //讀取回來的數(shù)據(jù)池 stringPool := sync.Pool{New: func() interface{} { lines := "" return lines }} //一個文件對象本身是實現(xiàn)了io.Reader的 使用bufio.NewReader去初始化一個Reader對象,存在buffer中的,讀取一次就會被清空 r := bufio.NewReader(f) // //設(shè)置讀取緩沖池大小 默認(rèn)16 r = bufio.NewReaderSize(r,250 *1024) var wg sync.WaitGroup for { buf := linesPool.Get().([]byte) //讀取Reader對象中的內(nèi)容到[]byte類型的buf中 n, err := r.Read(buf) buf = buf[:n] if n == 0 { if err != nil { fmt.Println(err) break } if err == io.EOF { break } return err } //補(bǔ)齊剩下沒滿足的剩余 nextUntillNewline, err := r.ReadBytes('\n') //fmt.Println(string(nextUntillNewline)) if err != io.EOF { buf = append(buf, nextUntillNewline...) } wg.Add(1) go func() { ProcessChunk(buf, linesPool, stringPool) wg.Done() }() } wg.Wait() return nil } func ProcessChunk(chunk []byte, linesPool *sync.Pool,stringPool *sync.Pool) { //做相應(yīng)的處理 }
執(zhí)行
go run test2.go "2020-01-01T00:00:00.0000Z" "2020-02-02T00:00:00.0000Z" /Users/zhangsan/go/src/workspace/test/log.log
EOF Time taken - 20.023517675s nil>
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。
標(biāo)簽:重慶 吐魯番 梅河口 銅川 雞西 汕頭 欽州 蘭州
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Golang 實現(xiàn)超大文件讀取的兩種方法》,本文關(guān)鍵詞 Golang,實現(xiàn),超大,文件,讀,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。