使用Go語言,25秒讀取16GB文件
當(dāng)今世界的任何計(jì)算機(jī)系統(tǒng)每天都會生成大量的日志或數(shù)據(jù)。隨著系統(tǒng)的發(fā)展,將調(diào)試數(shù)據(jù)存儲到數(shù)據(jù)庫中是不可行的,因?yàn)樗鼈兪遣豢勺兊?,并且只能用于分析和解決故障。所以大部分公司傾向于將日志存儲在文件中,而這些文件通常位于本地磁盤中。
我們將使用Go語言,從一個大小為16GB的.txt或.log文件中提取日志。
讓我們開始編碼……
首先,我們打開文件。對于任何文件的IO,我們都將使用標(biāo)準(zhǔn)的Go os.File。
- f, err := os.Open(fileName)
- if err != nil {
- fmt.Println("cannot able to read the file", err)
- return
- }
- // UPDATE: close after checking error
- defer file.Close() //Do not forget to close the file
打開文件后,我們有以下兩個選項(xiàng)可以選擇:
逐行讀取文件,這有助于減少內(nèi)存緊張,但需要更多的時間。一次將整個文件讀入內(nèi)存并處理該文件,這將消耗更多內(nèi)存,但會顯著減少時間。
由于文件太大,即16 GB,因此無法將整個文件加載到內(nèi)存中。但是第一種選擇對我們來說也是不可行的,因?yàn)槲覀兿M趲酌腌妰?nèi)處理文件。
但你猜怎么著,還有第三種選擇。瞧……相比于將整個文件加載到內(nèi)存中,在Go語言中,我們還可以使用bufio.NewReader()將文件分塊加載。
- r := bufio.NewReader(f)
- for {
- buf := make([]byte,4*1024) //the chunk size
- n, err := r.Read(buf) //loading chunk into buffer
- bufbuf = buf[:n]
- if n == 0 {
- if err != nil {
- fmt.Println(err)
- break
- }
- if err == io.EOF {
- break
- }
- return err
- }
- }
一旦我們將文件分塊,我們就可以分叉一個線程,即Go routine,同時處理多個文件區(qū)塊。上述代碼將修改為:
- //sync pools to reuse the memory and decrease the preassure on Garbage Collector
- linesPool := sync.Pool{New: func() interface{} {
- lines := make([]byte, 500*1024)
- return lines
- }}
- stringPool := sync.Pool{New: func() interface{} {
- lines := ""
- return lines
- }}
- slicePool := sync.Pool{New: func() interface{} {
- lines := make([]string, 100)
- return lines
- }}
- r := bufio.NewReader(f)
- var wg sync.WaitGroup //wait group to keep track off all threads
- for {
- buf := linesPool.Get().([]byte)
- n, err := r.Read(buf)
- bufbuf = buf[:n]
- if n == 0 {
- if err != nil {
- fmt.Println(err)
- break
- }
- if err == io.EOF {
- break
- }
- return err
- }
- nextUntillNewline, err := r.ReadBytes('\n')//read entire line
- if err != io.EOF {
- buf = append(buf, nextUntillNewline...)
- }
- wg.Add(1)
- go func() {
- //process each chunk concurrently
- //start -> log start time, end -> log end time
- ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)
- wg.Done()
- }()
- }
- wg.Wait()
- }
上面的代碼,引入了兩個優(yōu)化點(diǎn):
sync.Pool是一個強(qiáng)大的對象池,可以重用對象來減輕垃圾收集器的壓力。我們將重用各個分片的內(nèi)存,以減少內(nèi)存消耗,大大加快我們的工作。Go Routines幫助我們同時處理緩沖區(qū)塊,這大大提高了處理速度。
現(xiàn)在讓我們實(shí)現(xiàn)ProcessChunk函數(shù),它將處理以下格式的日志行。
- 2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
我們將根據(jù)命令行提供的時間戳提取日志。
- func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
- //another wait group to process every chunk further
- var wg2 sync.WaitGroup
- logs := stringPool.Get().(string)
- logs = string(chunk)
- linesPool.Put(chunk) //put back the chunk in pool
- //split the string by "\n", so that we have slice of logs
- logsSlice := strings.Split(logs, "\n")
- stringPool.Put(logs) //put back the string pool
- chunkSize := 100 //process the bunch of 100 logs in thread
- n := len(logsSlice)
- noOfThread := n / chunkSize
- if n%chunkSize != 0 { //check for overflow
- noOfThread++
- }
- length := len(logsSlice)
- //traverse the chunk
- for i := 0; i < length; i += chunkSize {
- wg2.Add(1)
- //process each chunk in saperate chunk
- go func(s int, e int) {
- for i:= s; i<e;i++{
- text := logsSlice[i]
- if len(text) == 0 {
- continue
- }
- logParts := strings.SplitN(text, ",", 2)
- logCreationTimeString := logParts[0]
- logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString)
- if err != nil {
- fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
- return
- }
- // check if log's timestamp is inbetween our desired period
- if logCreationTime.After(start) && logCreationTime.Before(end) {
- fmt.Println(text)
- }
- }
- textSlice = nil
- wg2.Done()
- }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
- //passing the indexes for processing
- }
- wg2.Wait() //wait for a chunk to finish
- logsSlice = nil
- }
對上面的代碼進(jìn)行基準(zhǔn)測試。以16 GB的日志文件為例,提取日志所需的時間約為25秒。
完整的代碼示例如下:
- func main() {
- s := time.Now()
- args := os.Args[1:]
- if len(args) != 6 { // for format LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
- fmt.Println("Please give proper command line arguments")
- return
- }
- startTimeArg := args[1]
- finishTimeArg := args[3]
- fileName := args[5]
- file, err := os.Open(fileName)
- if err != nil {
- fmt.Println("cannot able to read the file", err)
- return
- }
- defer file.Close() //close after checking err
- queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
- if err != nil {
- fmt.Println("Could not able to parse the start time", startTimeArg)
- return
- }
- queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
- if err != nil {
- fmt.Println("Could not able to parse the finish time", finishTimeArg)
- return
- }
- filestat, err := file.Stat()
- if err != nil {
- fmt.Println("Could not able to get the file stat")
- return
- }
- fileSize := filestat.Size()
- offset := fileSize - 1
- lastLineSize := 0
- for {
- b := make([]byte, 1)
- n, err := file.ReadAt(b, offset)
- if err != nil {
- fmt.Println("Error reading file ", err)
- break
- }
- char := string(b[0])
- if char == "\n" {
- break
- }
- offset--
- lastLineSize += n
- }
- lastLine := make([]byte, lastLineSize)
- _, err = file.ReadAt(lastLine, offset+1)
- if err != nil {
- fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
- return
- }
- logSlice := strings.SplitN(string(lastLine), ",", 2)
- logCreationTimeString := logSlice[0]
- lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
- if err != nil {
- fmt.Println("can not able to parse time : ", err)
- }
- if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
- Process(file, queryStartTime, queryFinishTime)
- }
- fmt.Println("\nTime taken - ", time.Since(s))
- }
- func Process(f *os.File, start time.Time, end time.Time) error {
- linesPool := sync.Pool{New: func() interface{} {
- lines := make([]byte, 250*1024)
- return lines
- }}
- stringPool := sync.Pool{New: func() interface{} {
- lines := ""
- return lines
- }}
- r := bufio.NewReader(f)
- var wg sync.WaitGroup
- for {
- buf := linesPool.Get().([]byte)
- n, err := r.Read(buf)
- bufbuf = buf[:n]
- if n == 0 {
- if err != nil {
- fmt.Println(err)
- break
- }
- if err == io.EOF {
- break
- }
- return err
- }
- nextUntillNewline, err := r.ReadBytes('\n')
- if err != io.EOF {
- buf = append(buf, nextUntillNewline...)
- }
- wg.Add(1)
- go func() {
- ProcessChunk(buf, &linesPool, &stringPool, start, end)
- wg.Done()
- }()
- }
- wg.Wait()
- return nil
- }
- func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
- var wg2 sync.WaitGroup
- logs := stringPool.Get().(string)
- logs = string(chunk)
- linesPool.Put(chunk)
- logsSlice := strings.Split(logs, "\n")
- stringPool.Put(logs)
- chunkSize := 300
- n := len(logsSlice)
- noOfThread := n / chunkSize
- if n%chunkSize != 0 {
- noOfThread++
- }
- for i := 0; i < (noOfThread); i++ {
- wg2.Add(1)
- go func(s int, e int) {
- defer wg2.Done() //to avaoid deadlocks
- for i := s; i < e; i++ {
- text := logsSlice[i]
- if len(text) == 0 {
- continue
- }
- logSlice := strings.SplitN(text, ",", 2)
- logCreationTimeString := logSlice[0]
- logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
- if err != nil {
- fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
- return
- }
- if logCreationTime.After(start) && logCreationTime.Before(end)
- //fmt.Println(text)
- }
- }
- }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
- }
- wg2.Wait()
- logsSlice = nil
- }