10 億行數(shù)據(jù)集處理挑戰(zhàn):從 15 分鐘到 5 秒
0 億行挑戰(zhàn)[2]在編程界引起了廣泛關(guān)注,其主要目的是測(cè)試不同編程語(yǔ)言處理并匯總包含 10 億行的海量數(shù)據(jù)集的速度,而以性能和并發(fā)能力著稱的 Go 是有力競(jìng)爭(zhēng)者。目前性能最好的 Java 實(shí)現(xiàn)處理數(shù)據(jù)的時(shí)間僅為 1.535 秒,我們看看 Go 的表現(xiàn)如何。
本文將基于 Go 特有的功能進(jìn)行優(yōu)化。注:所有基準(zhǔn)數(shù)據(jù)都是在多次運(yùn)行后計(jì)算得出的。
硬件設(shè)置
在配備 M2 Pro 芯片的 16 英寸 MacBook Pro 上進(jìn)行了所有測(cè)試,該芯片擁有 12 個(gè) CPU 核和 36 GB 內(nèi)存。不同環(huán)境運(yùn)行的測(cè)試結(jié)果可能因硬件而異,但相對(duì)性能差異應(yīng)該差不多。
什么是 "10 億行挑戰(zhàn)"?
挑戰(zhàn)很簡(jiǎn)單:處理包含 10 億個(gè)任意溫度測(cè)量值的文本文件,并計(jì)算每個(gè)站點(diǎn)的匯總統(tǒng)計(jì)數(shù)據(jù)(最小值、平均值和最大值)。問(wèn)題在于如何高效處理如此龐大的數(shù)據(jù)集。
數(shù)據(jù)集通過(guò)代碼倉(cāng)庫(kù)中的createMeasurements.go 腳本生成。運(yùn)行腳本后,將得到一個(gè) 12.8 GB 大的由分號(hào)分隔的文本文件(measurements.txt),包含兩列數(shù)據(jù):站點(diǎn)名稱和測(cè)量值。
我們需要處理該文件,并輸出以下結(jié)果:
{Station1=12.3/25.6/38.9, Station2=10.0/22.5/35.0, ...}
我們看看如何實(shí)現(xiàn)這一目標(biāo)。
Go 初始實(shí)現(xiàn)
1.單核版本
我們從基本的單線程實(shí)現(xiàn)開(kāi)始。該版本逐行讀取文件,解析數(shù)據(jù),并更新映射以跟蹤統(tǒng)計(jì)數(shù)據(jù)。
package main
import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
)
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func processFile(filename string) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
statsMap := make(map[string]*Stats)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
parts := strings.Split(line, ";")
if len(parts) != 2 {
continue
}
station := parts[0]
measurement, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
continue
}
stat, exists := statsMap[station]
if !exists {
statsMap[station] = &Stats{
Min: measurement,
Max: measurement,
Sum: measurement,
Count: 1,
}
} else {
if measurement < stat.Min {
stat.Min = measurement
}
if measurement > stat.Max {
stat.Max = measurement
}
stat.Sum += measurement
stat.Count++
}
}
if err := scanner.Err(); err != nil {
panic(err)
}
fmt.Print("{")
for station, stat := range statsMap {
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processFile("data/measurements.txt")
}
2.多核版本
為了充分利用多個(gè) CPU 核,我們把文件分成若干塊,然后利用Goroutine 和Channel 并行處理。
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
)
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func worker(lines []string, wg *sync.WaitGroup, statsChan chan map[string]*Stats) {
defer wg.Done()
statsMap := make(map[string]*Stats)
for _, line := range lines {
parts := strings.Split(line, ";")
if len(parts) != 2 {
continue
}
station := parts[0]
measurement, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
continue
}
stat, exists := statsMap[station]
if !exists {
statsMap[station] = &Stats{
Min: measurement,
Max: measurement,
Sum: measurement,
Count: 1,
}
} else {
if measurement < stat.Min {
stat.Min = measurement
}
if measurement > stat.Max {
stat.Max = measurement
}
stat.Sum += measurement
stat.Count++
}
}
statsChan <- statsMap
}
func processFile(filename string) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
numCPU := runtime.NumCPU()
linesPerWorker := 1000000
scanner := bufio.NewScanner(file)
lines := make([]string, 0, linesPerWorker)
var wg sync.WaitGroup
statsChan := make(chan map[string]*Stats, numCPU)
go func() {
wg.Wait()
close(statsChan)
}()
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) >= linesPerWorker {
wg.Add(1)
go worker(lines, &wg, statsChan)
lines = make([]string, 0, linesPerWorker)
}
}
if len(lines) > 0 {
wg.Add(1)
go worker(lines, &wg, statsChan)
}
if err := scanner.Err(); err != nil {
panic(err)
}
finalStats := make(map[string]*Stats)
for partialStats := range statsChan {
for station, stat := range partialStats {
existingStat, exists := finalStats[station]
if !exists {
finalStats[station] = stat
} else {
if stat.Min < existingStat.Min {
existingStat.Min = stat.Min
}
if stat.Max > existingStat.Max {
existingStat.Max = stat.Max
}
existingStat.Sum += stat.Sum
existingStat.Count += stat.Count
}
}
}
fmt.Print("{")
for station, stat := range finalStats {
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processFile("data/measurements.txt")
}
3.Go 實(shí)現(xiàn)結(jié)果
單核和多核版本的運(yùn)行結(jié)果分別如下:
- 單核版本:15 分 30 秒
- 多核版本:6 分 45 秒
雖然多核版本有明顯改善,但處理數(shù)據(jù)仍然花了好幾分鐘。下面看看如何進(jìn)一步優(yōu)化。
利用 Go 的并發(fā)和緩沖 I/O 進(jìn)行優(yōu)化
為了提高性能,我們考慮利用緩沖 I/O,并優(yōu)化 Goroutine。
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
)
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func worker(id int, jobs <-chan []string, results chan<- map[string]*Stats, wg *sync.WaitGroup) {
defer wg.Done()
for lines := range jobs {
statsMap := make(map[string]*Stats)
for _, line := range lines {
parts := strings.Split(line, ";")
if len(parts) != 2 {
continue
}
station := parts[0]
measurement, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
continue
}
stat, exists := statsMap[station]
if !exists {
statsMap[station] = &Stats{
Min: measurement,
Max: measurement,
Sum: measurement,
Count: 1,
}
} else {
if measurement < stat.Min {
stat.Min = measurement
}
if measurement > stat.Max {
stat.Max = measurement
}
stat.Sum += measurement
stat.Count++
}
}
results <- statsMap
}
}
func processFile(filename string) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
numCPU := runtime.NumCPU()
jobs := make(chan []string, numCPU)
results := make(chan map[string]*Stats, numCPU)
var wg sync.WaitGroup
for i := 0; i < numCPU; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
go func() {
wg.Wait()
close(results)
}()
scanner := bufio.NewScanner(file)
bufferSize := 1000000
lines := make([]string, 0, bufferSize)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) >= bufferSize {
jobs <- lines
lines = make([]string, 0, bufferSize)
}
}
if len(lines) > 0 {
jobs <- lines
}
close(jobs)
if err := scanner.Err(); err != nil {
panic(err)
}
finalStats := make(map[string]*Stats)
for partialStats := range results {
for station, stat := range partialStats {
existingStat, exists := finalStats[station]
if !exists {
finalStats[station] = stat
} else {
if stat.Min < existingStat.Min {
existingStat.Min = stat.Min
}
if stat.Max > existingStat.Max {
existingStat.Max = stat.Max
}
existingStat.Sum += stat.Sum
existingStat.Count += stat.Count
}
}
}
fmt.Print("{")
for station, stat := range finalStats {
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processFile("data/measurements.txt")
}
優(yōu)化 Go 實(shí)現(xiàn)結(jié)果
優(yōu)化后,處理時(shí)間大大縮短:
多核優(yōu)化版:3 分 50 秒
確實(shí)獲得了實(shí)質(zhì)性改進(jìn),但仍然無(wú)法與最快的 Java 實(shí)現(xiàn)相媲美。
利用高效數(shù)據(jù)格式
由于文本文件不是處理大型數(shù)據(jù)集的最有效方式,我們考慮將數(shù)據(jù)轉(zhuǎn)換為 Parquet 等二進(jìn)制格式來(lái)提高效率。
1.轉(zhuǎn)換為 Parquet
可以用 Apache Arrow 等工具將文本文件轉(zhuǎn)換為 Parquet 文件。為簡(jiǎn)單起見(jiàn),假設(shè)數(shù)據(jù)已轉(zhuǎn)換為measurements.parquet。
2.用 Go 處理 Parquet 文件
我們用parquet-go 庫(kù)來(lái)讀取 Parquet 文件。
package main
import (
"fmt"
"log"
"sort"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source/local"
)
type Measurement struct {
StationName string `parquet:"name=station_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Measurement float64 `parquet:"name=measurement, type=DOUBLE"`
}
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func processParquetFile(filename string) {
fr, err := local.NewLocalFileReader(filename)
if err != nil {
log.Fatal(err)
}
defer fr.Close()
pr, err := reader.NewParquetReader(fr, new(Measurement), 4)
if err != nil {
log.Fatal(err)
}
defer pr.ReadStop()
num := int(pr.GetNumRows())
statsMap := make(map[string]*Stats)
for i := 0; i < num; i += 1000000 {
readNum := 1000000
if i+readNum > num {
readNum = num - i
}
measurements := make([]Measurement, readNum)
if err = pr.Read(&measurements); err != nil {
log.Fatal(err)
}
for _, m := range measurements {
stat, exists := statsMap[m.StationName]
if !exists {
statsMap[m.StationName] = &Stats{
Min: m.Measurement,
Max: m.Measurement,
Sum: m.Measurement,
Count: 1,
}
} else {
if m.Measurement < stat.Min {
stat.Min = m.Measurement
}
if m.Measurement > stat.Max {
stat.Max = m.Measurement
}
stat.Sum += m.Measurement
stat.Count++
}
}
}
stationNames := make([]string, 0, len(statsMap))
for station := range statsMap {
stationNames = append(stationNames, station)
}
sort.Strings(stationNames)
fmt.Print("{")
for _, station := range stationNames {
stat := statsMap[station]
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processParquetFile("data/measurements.parquet")
}
3.Parquet 處理結(jié)果
通過(guò)以 Parquet 格式處理數(shù)據(jù),取得了顯著的性能提升:
Parquet 處理時(shí)間:5 秒
Go 的性能更進(jìn)一步接近了最快的 Java 實(shí)現(xiàn)。
結(jié)論
Go 在 10 億行挑戰(zhàn)中表現(xiàn)出色。通過(guò)利用 Go 的并發(fā)模型和優(yōu)化 I/O 操作,大大縮短了處理時(shí)間。通過(guò)將數(shù)據(jù)集轉(zhuǎn)換為二進(jìn)制格式(如 Parquet)可進(jìn)一步提高性能。
主要收獲:
- Go 高效的并發(fā)機(jī)制使其適合處理大型數(shù)據(jù)集。
- 優(yōu)化 I/O 和使用緩沖讀取可大幅提高性能。
- 利用 Parquet 等高效數(shù)據(jù)格式可大大縮短處理時(shí)間。
最終想法:
盡管 Go 可能無(wú)法取代速度最快的 Java 實(shí)現(xiàn),但在高效處理大數(shù)據(jù)方面展示了令人印象深刻的能力。工具的選擇和優(yōu)化可以縮小性能差距,使 Go 成為數(shù)據(jù)密集型任務(wù)的可行選擇。
參考資料
- [1]Go One Billion Row Challenge — From 15 Minutes to 5 Seconds:https://medium.com/@code-geass/go-one-billion-row-challenge-from-15-minutes-to-5-seconds-a1206611e230
- [2]10 億行挑戰(zhàn):https://1brc.dev