Go語(yǔ)言開(kāi)發(fā)者的Apache Arrow使用指南:高級(jí)數(shù)據(jù)結(jié)構(gòu)
經(jīng)過(guò)對(duì)前面兩篇文章《Arrow數(shù)據(jù)類型》[1]和《Arrow Go實(shí)現(xiàn)的內(nèi)存管理》[2]的學(xué)習(xí),我們知道了各種Arrow array type以及它們?cè)趦?nèi)存中的layout,我們了解了Go arrow實(shí)現(xiàn)在內(nèi)存管理上的一些機(jī)制和使用原則。
Arrow的array type只是一個(gè)定長(zhǎng)的、同類型的值序列。在實(shí)際應(yīng)用中,array type更多時(shí)候只是充當(dāng)基礎(chǔ)類型,我們需要具有組合基礎(chǔ)類型能力的更高級(jí)的數(shù)據(jù)結(jié)構(gòu)。在這一篇文章中,我們就來(lái)看看Arrow規(guī)范以及一些實(shí)現(xiàn)中提供的高級(jí)數(shù)據(jù)結(jié)構(gòu),包括Record Batch、Chunked Array以及Table。
我們先來(lái)看看Record Batch[3]。
1. Record Batch
Record這個(gè)名字讓我想起了[Pascal編程語(yǔ)言](https://en.wikipedia.org/wiki/Pascal_(programming_language "Pascal編程語(yǔ)言"))中的Record。在Pascal中,Record的角色大致與Go中的Struct類似,也是一組異構(gòu)字段的集合。下面是《In-Memory Analytics with Apache Arrow》[4]書(shū)中的一個(gè)Record例子:
// 以Go語(yǔ)言呈現(xiàn)
type Archer struct {
archer string
location string
year int16
}
Record Batch則顧名思義,是一批Record,即一個(gè)Record的集合:[N]Archer。
如果將Record的各個(gè)字段作為列,將集合中的每個(gè)Record作為行,我們能得到如下面示意圖中的結(jié)構(gòu):
圖片
Go Arrow實(shí)現(xiàn)中沒(méi)有直接使用“Record Batch”這個(gè)名字,而是使用了“Record”,這個(gè)“Record”實(shí)際代表的就是Record Batch。下面是Go Arrow實(shí)現(xiàn)定義的Record接口:
// github.com/apache/arrow/go/arrow/record.go
// Record is a collection of equal-length arrays matching a particular Schema.
// Also known as a RecordBatch in the spec and in some implementations.
//
// It is also possible to construct a Table from a collection of Records that
// all have the same schema.
type Record interface {
json.Marshaler
Release()
Retain()
Schema() *Schema
NumRows() int64
NumCols() int64
Columns() []Array
Column(i int) Array
ColumnName(i int) string
SetColumn(i int, col Array) (Record, error)
// NewSlice constructs a zero-copy slice of the record with the indicated
// indices i and j, corresponding to array[i:j].
// The returned record must be Release()'d after use.
//
// NewSlice panics if the slice is outside the valid range of the record array.
// NewSlice panics if j < i.
NewSlice(i, j int64) Record
}
我們依然可以使用Builder模式來(lái)創(chuàng)建一個(gè)arrow.Record,下面我們就來(lái)用Go代碼創(chuàng)建[N]Archer這個(gè)Record Batch:
// record_batch.go
func main() {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "archer", Type: arrow.BinaryTypes.String},
{Name: "location", Type: arrow.BinaryTypes.String},
{Name: "year", Type: arrow.PrimitiveTypes.Int16},
},
nil,
)
rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
defer rb.Release()
rb.Field(0).(*array.StringBuilder).AppendValues([]string{"tony", "amy", "jim"}, nil)
rb.Field(1).(*array.StringBuilder).AppendValues([]string{"beijing", "shanghai", "chengdu"}, nil)
rb.Field(2).(*array.Int16Builder).AppendValues([]int16{1992, 1993, 1994}, nil)
rec := rb.NewRecord()
defer rec.Release()
fmt.Println(rec)
}
運(yùn)行上述示例,輸出如下:
$go run record_batch.go
record:
schema:
fields: 3
- archer: type=utf8
- location: type=utf8
- year: type=int16
rows: 3
col[0][archer]: ["tony" "amy" "jim"]
col[1][location]: ["beijing" "shanghai" "chengdu"]
col[2][year]: [1992 1993 1994]
在這個(gè)示例里,我們看到了一個(gè)名為Schema的概念,并且NewRecordBuilder創(chuàng)建時(shí)需要傳入一個(gè)arrow.Schema的實(shí)例。和數(shù)據(jù)庫(kù)表Schema類似,Arrow中的Schema也是一個(gè)元數(shù)據(jù)概念,它包含一系列作為“列”的字段的名稱和類型信息。Schema不僅在Record Batch中使用,在后面的Table中,Schema也是必要元素。
arrow.Record可以通過(guò)NewSlice可以ZeroCopy方式共享Record Batch的內(nèi)存數(shù)據(jù),NewSlice會(huì)創(chuàng)建一個(gè)新的Record Batch,這個(gè)Record Batch中的Record與原Record是共享的:
// record_batch_slice.go
sl := rec.NewSlice(0, 2)
fmt.Println(sl)
cols := sl.Columns()
a1 := cols[0]
fmt.Println(a1)
新的sl取了rec的前兩個(gè)record,輸出sl得到如下結(jié)果:
record:
schema:
fields: 3
- archer: type=utf8
- location: type=utf8
- year: type=int16
rows: 2
col[0][archer]: ["tony" "amy"]
col[1][location]: ["beijing" "shanghai"]
col[2][year]: [1992 1993]
["tony" "amy"]
相同schema的record batch可以合并,我們只需要分配一個(gè)更大的Record Batch,然后將兩個(gè)待合并的Record batch copy到新Record Batch中就可以了,但顯然這樣做的開(kāi)銷很大。
Arrow的一些實(shí)現(xiàn)中提供了Chunked Array的概念,可以更低開(kāi)銷的來(lái)完成某個(gè)列的array的追加。
注:Chunked array并不是Arrow Columnar Format的一部分。
2. Chunked Array
如果說(shuō)Record Batch本質(zhì)上是不同Array type的橫向聚合,那么Chunked Array就是相同Array type的縱向聚合了,用Go語(yǔ)法表示就是:[N]Array或[]Array,即array of array。下面是一個(gè)Chunked Array的結(jié)構(gòu)示意圖:
圖片
我們看到:Go的Chunked array的實(shí)現(xiàn)使用的是一個(gè)Array切片:
// github.com/apache/arrow/go/arrow/table.go
// Chunked manages a collection of primitives arrays as one logical large array.
type Chunked struct {
refCount int64 // refCount must be first in the struct for 64 bit alignment and sync/atomic (https://github.com/golang/go/issues/37262)
chunks []Array
length int
nulls int
dtype DataType
}
按照Go切片的本質(zhì),Chunked Array中的各個(gè)元素Array間的實(shí)際內(nèi)存buffer并不連續(xù)。并且正如示意圖所示:每個(gè)Array的長(zhǎng)度也并非是一樣的。
注:在《Go語(yǔ)言第一課》[5]中的第15講中有關(guān)于切片本質(zhì)的深入系統(tǒng)的講解。
我們可以使用arrow包提供的NewChunked函數(shù)創(chuàng)建一個(gè)Chunked Array,具體見(jiàn)下面源碼:
// chunked_array.go
func main() {
ib := array.NewInt64Builder(memory.DefaultAllocator)
defer ib.Release()
ib.AppendValues([]int64{1, 2, 3, 4, 5}, nil)
i1 := ib.NewInt64Array()
defer i1.Release()
ib.AppendValues([]int64{6, 7}, nil)
i2 := ib.NewInt64Array()
defer i2.Release()
ib.AppendValues([]int64{8, 9, 10}, nil)
i3 := ib.NewInt64Array()
defer i3.Release()
c := arrow.NewChunked(
arrow.PrimitiveTypes.Int64,
[]arrow.Array{i1, i2, i3},
)
defer c.Release()
for _, arr := range c.Chunks() {
fmt.Println(arr)
}
fmt.Println("chunked length =", c.Len())
fmt.Println("chunked null count=", c.NullN())
}
我們看到在Chunked Array聚合了多個(gè)arrow.Array實(shí)例,并且這些arrow.Array實(shí)例的長(zhǎng)短可不一致,arrow.Chunked的Len()返回的則是Chunked中Array的長(zhǎng)度之和。下面是示例程序的輸出結(jié)果:
$go run chunked_array.go
[1 2 3 4 5]
[6 7]
[8 9 10]
chunked length = 10
chunked null count= 0
這樣來(lái)看,Chunked Array可以看成一個(gè)邏輯上的大Array。
好了,問(wèn)題來(lái)了!Record Batch是用來(lái)聚合等長(zhǎng)array type的,那么是否有某種數(shù)據(jù)結(jié)構(gòu)可以用來(lái)聚合等長(zhǎng)的Chunked Array呢?答案是有的!下面我們就來(lái)看看這種結(jié)構(gòu):Table。
3. Table
Table和Chunked Array一樣并不屬于Arrow Columnar Format的一部分,最初只是Arrow的C++實(shí)現(xiàn)中的一個(gè)數(shù)據(jù)結(jié)構(gòu),Go Arrow的實(shí)現(xiàn)也提供了對(duì)Table的支持。
Table的結(jié)構(gòu)示意圖如下(圖摘自《In-Memory Analytics with Apache Arrow》[6]一書(shū)):
圖片
我們看到:和Record Batch的每列是一個(gè)array不同,Table的每一列為一個(gè)chunked array,所有列的chunked array的Length是相同的,但各個(gè)列的chunked array中的array的長(zhǎng)度可以不同。
Table和Record Batch相似的地方是都有自己的Schema。
下面的示意圖(來(lái)自這里[7])對(duì)Table和Chunked Array做了十分直觀的對(duì)比:
圖片
Record Batch是Arrow Columnar format中的一部分,所有語(yǔ)言的實(shí)現(xiàn)都支持Record Batch;但Table并非format spec的一部分,并非所有語(yǔ)言的實(shí)現(xiàn)對(duì)其都提供支持。
另外從圖中看到,由于Table采用了Chunked Array作為列,chunked array下的各個(gè)array內(nèi)部分布并不連續(xù),這讓Table在運(yùn)行時(shí)喪失了一些局部性。
下面我們就使用Go arrow實(shí)現(xiàn)來(lái)創(chuàng)建一個(gè)table,這是一個(gè)3列、10行的table:
// table.go
func main() {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "col1", Type: arrow.PrimitiveTypes.Int32},
{Name: "col2", Type: arrow.PrimitiveTypes.Float64},
{Name: "col3", Type: arrow.BinaryTypes.String},
},
nil,
)
col1 := func() *arrow.Column {
chunk := func() *arrow.Chunked {
ib := array.NewInt32Builder(memory.DefaultAllocator)
defer ib.Release()
ib.AppendValues([]int32{1, 2, 3}, nil)
i1 := ib.NewInt32Array()
defer i1.Release()
ib.AppendValues([]int32{4, 5, 6, 7, 8, 9, 10}, nil)
i2 := ib.NewInt32Array()
defer i2.Release()
c := arrow.NewChunked(
arrow.PrimitiveTypes.Int32,
[]arrow.Array{i1, i2},
)
return c
}()
defer chunk.Release()
return arrow.NewColumn(schema.Field(0), chunk)
}()
defer col1.Release()
col2 := func() *arrow.Column {
chunk := func() *arrow.Chunked {
fb := array.NewFloat64Builder(memory.DefaultAllocator)
defer fb.Release()
fb.AppendValues([]float64{1.1, 2.2, 3.3, 4.4, 5.5}, nil)
f1 := fb.NewFloat64Array()
defer f1.Release()
fb.AppendValues([]float64{6.6, 7.7}, nil)
f2 := fb.NewFloat64Array()
defer f2.Release()
fb.AppendValues([]float64{8.8, 9.9, 10.0}, nil)
f3 := fb.NewFloat64Array()
defer f3.Release()
c := arrow.NewChunked(
arrow.PrimitiveTypes.Float64,
[]arrow.Array{f1, f2, f3},
)
return c
}()
defer chunk.Release()
return arrow.NewColumn(schema.Field(1), chunk)
}()
defer col2.Release()
col3 := func() *arrow.Column {
chunk := func() *arrow.Chunked {
sb := array.NewStringBuilder(memory.DefaultAllocator)
defer sb.Release()
sb.AppendValues([]string{"s1", "s2"}, nil)
s1 := sb.NewStringArray()
defer s1.Release()
sb.AppendValues([]string{"s3", "s4"}, nil)
s2 := sb.NewStringArray()
defer s2.Release()
sb.AppendValues([]string{"s5", "s6", "s7", "s8", "s9", "s10"}, nil)
s3 := sb.NewStringArray()
defer s3.Release()
c := arrow.NewChunked(
arrow.BinaryTypes.String,
[]arrow.Array{s1, s2, s3},
)
return c
}()
defer chunk.Release()
return arrow.NewColumn(schema.Field(2), chunk)
}()
defer col3.Release()
var tbl arrow.Table
tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
defer tbl.Release()
dumpTable(tbl)
}
func dumpTable(tbl arrow.Table) {
s := tbl.Schema()
fmt.Println(s)
fmt.Println("------")
fmt.Println("the count of table columns=", tbl.NumCols())
fmt.Println("the count of table rows=", tbl.NumRows())
fmt.Println("------")
for i := 0; i < int(tbl.NumCols()); i++ {
col := tbl.Column(i)
fmt.Printf("arrays in column(%s):\n", col.Name())
chunk := col.Data()
for _, arr := range chunk.Chunks() {
fmt.Println(arr)
}
fmt.Println("------")
}
}
我們看到:table創(chuàng)建之前,我們需要準(zhǔn)備一個(gè)schema,以及各個(gè)column。每個(gè)column則是一個(gè)chunked array。
運(yùn)行上述代碼,我們得到如下結(jié)果:
$go run table.go
schema:
fields: 3
- col1: type=int32
- col2: type=float64
- col3: type=utf8
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3]
[4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5]
[6.6 7.7]
[8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2"]
["s3" "s4"]
["s5" "s6" "s7" "s8" "s9" "s10"]
------
table還支持schema變更,我們可以基于上述代碼為table增加一列:
// table_schema_change.go
func main() {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "col1", Type: arrow.PrimitiveTypes.Int32},
{Name: "col2", Type: arrow.PrimitiveTypes.Float64},
{Name: "col3", Type: arrow.BinaryTypes.String},
},
nil,
)
col1 := func() *arrow.Column {
chunk := func() *arrow.Chunked {
ib := array.NewInt32Builder(memory.DefaultAllocator)
defer ib.Release()
ib.AppendValues([]int32{1, 2, 3}, nil)
i1 := ib.NewInt32Array()
defer i1.Release()
ib.AppendValues([]int32{4, 5, 6, 7, 8, 9, 10}, nil)
i2 := ib.NewInt32Array()
defer i2.Release()
c := arrow.NewChunked(
arrow.PrimitiveTypes.Int32,
[]arrow.Array{i1, i2},
)
return c
}()
defer chunk.Release()
return arrow.NewColumn(schema.Field(0), chunk)
}()
defer col1.Release()
col2 := func() *arrow.Column {
chunk := func() *arrow.Chunked {
fb := array.NewFloat64Builder(memory.DefaultAllocator)
defer fb.Release()
fb.AppendValues([]float64{1.1, 2.2, 3.3, 4.4, 5.5}, nil)
f1 := fb.NewFloat64Array()
defer f1.Release()
fb.AppendValues([]float64{6.6, 7.7}, nil)
f2 := fb.NewFloat64Array()
defer f2.Release()
fb.AppendValues([]float64{8.8, 9.9, 10.0}, nil)
f3 := fb.NewFloat64Array()
defer f3.Release()
c := arrow.NewChunked(
arrow.PrimitiveTypes.Float64,
[]arrow.Array{f1, f2, f3},
)
return c
}()
defer chunk.Release()
return arrow.NewColumn(schema.Field(1), chunk)
}()
defer col2.Release()
col3 := func() *arrow.Column {
chunk := func() *arrow.Chunked {
sb := array.NewStringBuilder(memory.DefaultAllocator)
defer sb.Release()
sb.AppendValues([]string{"s1", "s2"}, nil)
s1 := sb.NewStringArray()
defer s1.Release()
sb.AppendValues([]string{"s3", "s4"}, nil)
s2 := sb.NewStringArray()
defer s2.Release()
sb.AppendValues([]string{"s5", "s6", "s7", "s8", "s9", "s10"}, nil)
s3 := sb.NewStringArray()
defer s3.Release()
c := arrow.NewChunked(
arrow.BinaryTypes.String,
[]arrow.Array{s1, s2, s3},
)
return c
}()
defer chunk.Release()
return arrow.NewColumn(schema.Field(2), chunk)
}()
defer col3.Release()
var tbl arrow.Table
tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
defer tbl.Release()
dumpTable(tbl)
col4 := func() *arrow.Column {
chunk := func() *arrow.Chunked {
sb := array.NewStringBuilder(memory.DefaultAllocator)
defer sb.Release()
sb.AppendValues([]string{"ss1", "ss2"}, nil)
s1 := sb.NewStringArray()
defer s1.Release()
sb.AppendValues([]string{"ss3", "ss4", "ss5"}, nil)
s2 := sb.NewStringArray()
defer s2.Release()
sb.AppendValues([]string{"ss6", "ss7", "ss8", "ss9", "ss10"}, nil)
s3 := sb.NewStringArray()
defer s3.Release()
c := arrow.NewChunked(
arrow.BinaryTypes.String,
[]arrow.Array{s1, s2, s3},
)
return c
}()
defer chunk.Release()
return arrow.NewColumn(arrow.Field{Name: "col4", Type: arrow.BinaryTypes.String}, chunk)
}()
defer col4.Release()
tbl, err := tbl.AddColumn(
3,
arrow.Field{Name: "col4", Type: arrow.BinaryTypes.String},
*col4,
)
if err != nil {
panic(err)
}
dumpTable(tbl)
}
運(yùn)行上述示例,輸出如下:
$go run table_schema_change.go
schema:
fields: 3
- col1: type=int32
- col2: type=float64
- col3: type=utf8
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3]
[4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5]
[6.6 7.7]
[8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2"]
["s3" "s4"]
["s5" "s6" "s7" "s8" "s9" "s10"]
------
schema:
fields: 4
- col1: type=int32
- col2: type=float64
- col3: type=utf8
- col4: type=utf8
------
the count of table columns= 4
the count of table rows= 10
------
arrays in column(col1):
[1 2 3]
[4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5]
[6.6 7.7]
[8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2"]
["s3" "s4"]
["s5" "s6" "s7" "s8" "s9" "s10"]
------
arrays in column(col4):
["ss1" "ss2"]
["ss3" "ss4" "ss5"]
["ss6" "ss7" "ss8" "ss9" "ss10"]
------
這種對(duì)schema變更操作的支持在實(shí)際開(kāi)發(fā)中也是非常有用的。
4. 小結(jié)
本文講解了基于array type的三個(gè)高級(jí)數(shù)據(jù)結(jié)構(gòu):Record Batch、Chunked Array和Table。其中Record Batch是Arrow Columnar Format中的結(jié)構(gòu),可以被所有實(shí)現(xiàn)arrow的編程語(yǔ)言所支持;Chunked Array和Table則是在一些編程語(yǔ)言的實(shí)現(xiàn)中創(chuàng)建的。
三個(gè)概念容易混淆,這里給出簡(jiǎn)單記法:
- Record Batch: schema + 長(zhǎng)度相同的多個(gè)array
- Chunked Array: []array
- Table: schema + 總長(zhǎng)度相同的多個(gè)Chunked Array
注:本文涉及的源代碼在這里[8]可以下載。
5. 參考資料
- Apache Arrow Glossary[9] - https://arrow.apache.org/docs/format/Glossary.html
- 參考資料
- [1] 《Arrow數(shù)據(jù)類型》: https://tonybai.com/2023/06/25/a-guide-of-using-apache-arrow-for-gopher-part1
- [2] 《Arrow Go實(shí)現(xiàn)的內(nèi)存管理》: https://tonybai.com/2023/06/30/a-guide-of-using-apache-arrow-for-gopher-part2
- [3] Record Batch: https://arrow.apache.org/docs/format/Glossary.html#term-record-batch
- [4] 《In-Memory Analytics with Apache Arrow》: https://book.douban.com/subject/35954154/
- [5] 《Go語(yǔ)言第一課》: http://gk.link/a/10AVZ
- [6] 《In-Memory Analytics with Apache Arrow》: https://book.douban.com/subject/35954154/
- [7] 這里: https://arrow.apache.org/docs/format/Glossary.html#term-table
- [8] 這里: https://github.com/bigwhite/experiments/blob/master/arrow/advanced-datastructure
- [9] Apache Arrow Glossary: https://arrow.apache.org/docs/format/Glossary.html
- [10] “Gopher部落”知識(shí)星球: https://wx.zsxq.com/dweb2/index/group/51284458844544
- [11] 鏈接地址: https://m.do.co/c/bff6eed92687