SeaweedFS 分布式文件系統(tǒng)源碼分析
本文基于 seaweedfs 3.46[1]
SeaweedFS 的架構(gòu)包括 Master Server、Volume Server 和 Filer Server 。
啟動(dòng) Master Server
啟動(dòng)一個(gè) Master Server 可以使用以下命令:
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0
啟動(dòng)入口以及所有的參數(shù)定義在 weed/command/master.go ,默認(rèn)情況 http 監(jiān)聽端口使用 9333 ,grpc 監(jiān)聽端口則在 http 端口的基礎(chǔ)上加 10000 (所有組件的默認(rèn)規(guī)則)即 19333 :
if *masterOption.portGrpc == 0 {
*masterOption.portGrpc = 10000 + *masterOption.port
}
Master Server 支持多節(jié)點(diǎn)(奇數(shù))部署。使用 Raft 一致性算法來選舉 Leader 節(jié)點(diǎn),這樣可以保證在 Leader 節(jié)點(diǎn)宕機(jī)的情況下,其他節(jié)點(diǎn)可以重新選舉出新的 Leader 節(jié)點(diǎn),從而保證系統(tǒng)的高可用性。
如下,啟動(dòng)一個(gè)由三個(gè) Master Server 節(jié)點(diǎn)所組成的集群:
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9333 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9334 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9335 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
當(dāng) Master Server 啟動(dòng)時(shí),它會(huì)嘗試加入集群并參與 Leader 選舉。一旦選舉完成,Leader 節(jié)點(diǎn)將負(fù)責(zé)管理整個(gè)集群以及 Volume Server 。
首先會(huì)創(chuàng)建一個(gè) Master Server 包裝的 weed_server.RaftServer 對(duì)象:
raftServer, err = weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}
在 weed_server.NewRaftServer() 方法中會(huì)創(chuàng)建好 Raft 節(jié)點(diǎn)所需的各種參數(shù)和對(duì)象,然后調(diào)用 github.com/seaweedfs/raft[2] 庫創(chuàng)建 RaftServer 對(duì)象并啟動(dòng) Raft 節(jié)點(diǎn):
type RaftServer struct {
// 存儲(chǔ)初始節(jié)點(diǎn)信息
peers map[string]pb.ServerAddress
// Raft 節(jié)點(diǎn)
raftServer raft.Server
// HashiCorp Raft 節(jié)點(diǎn)
RaftHashicorp *hashicorpRaft.Raft
// 用于管理 Raft 節(jié)點(diǎn)之間的通信
TransportManager *transport.Manager
// Raft 節(jié)點(diǎn)的數(shù)據(jù)目錄
dataDir string
// Raft 節(jié)點(diǎn)的地址
serverAddr pb.ServerAddress
// Raft 集群的拓?fù)浣Y(jié)構(gòu)
topo *topology.Topology
// Raft 節(jié)點(diǎn)的 gRPC 服務(wù)
*raft.GrpcServer
}
func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
// 通過 option 創(chuàng)建一個(gè) RaftServer 對(duì)象 s
s := &RaftServer{
peers: option.Peers,
serverAddr: option.ServerAddr,
dataDir: option.DataDir,
topo: option.Topo,
}
//...
// 調(diào)用 github.com/seaweedfs/raft 庫,創(chuàng)建 RaftServer 對(duì)象
s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
//...
// 啟動(dòng) Raft 節(jié)點(diǎn)
if err := s.raftServer.Start(); err != nil {
return nil, err
}
// 將節(jié)點(diǎn)加入到 Raft 集群中
for name, peer := range s.peers {
if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
return nil, err
}
}
//...
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
return s, nil
}
最后,會(huì)打印出當(dāng)前的 Leader 節(jié)點(diǎn),如果對(duì) Raft 選舉算法的處理細(xì)節(jié)感興趣,可以繼續(xù)深入 s.raftServer.Start() 的實(shí)現(xiàn)。
Raft 節(jié)點(diǎn)啟動(dòng)成功后,Master Server 會(huì)注冊(cè)一些集群相關(guān)的接口,方便查看集群狀態(tài):
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET", "HEAD")
if *masterOption.raftHashicorp {
r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
}
請(qǐng)求如下:
$ curl http://127.0.0.1:9333/cluster/status
{"IsLeader":true,"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9334"]}
$ curl http://127.0.0.1:9334/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9333"]}
$ curl http://127.0.0.1:9335/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9333","127.0.0.1:9334"]}
啟動(dòng) Volume Server
啟動(dòng)一個(gè) Volume Server 可以使用以下命令:
weed volume -mserver="127.0.0.1:9333" -dir=data -ip=127.0.0.1 -ip.bind=0.0.0.0
啟動(dòng)入口以及所有的參數(shù)定義在 weed/command/volume.go ,默認(rèn)情況 http 監(jiān)聽端口使用 8080 ,grpc 監(jiān)聽端口使用 18080 。
其中,-mserver 為 Master Server 連接地址,當(dāng)需要連接的 Master Server 為集群時(shí),可以將多個(gè) Master Server 的連接地址用逗號(hào)分隔; -dir 則用來指定 Volume Server 存儲(chǔ)數(shù)據(jù)文件的目錄。
和 Master Server 不同,Volume Server 支持橫向擴(kuò)展,其節(jié)點(diǎn)數(shù)量規(guī)??梢噪S著數(shù)據(jù)量和性能需求的變化而隨時(shí)動(dòng)態(tài)調(diào)整。
一旦 Volume Server 啟動(dòng)后,就會(huì)與 Master Server 保持通信,匯報(bào)自身的狀態(tài),并根據(jù) Master Server 的指示執(zhí)行創(chuàng)建、刪除、修復(fù)等操作。
核心邏輯在 weed/server/volume_grpc_client_to_master.go 的 VolumeServer.doHeartbeat 方法。
首先會(huì)創(chuàng)建一個(gè) Master Server 的 gRPC 連接客戶端,并使用該客戶端調(diào)用 SendHeartbeat 方法:
// 創(chuàng)建 Master Server 的 gRPC 連接客戶端
client := master_pb.NewSeaweedClient(grpcConnection)
// 調(diào)用 SendHeartbeat
stream, err := client.SendHeartbeat(ctx)
if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
return "", err
}
SendHeartbeat 方法是一個(gè)雙向流式 RPC ,允許在一次調(diào)用中發(fā)送多個(gè)請(qǐng)求和響應(yīng),其 ProtoBuf 定義如下:
rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}
接著創(chuàng)建一個(gè) goroutine 用來處理從 Master Server 發(fā)送過來的 Heartbeat 請(qǐng)求:
go func() {
for {
// 從輸入流中讀取 Heartbeat 請(qǐng)求
in, err := stream.Recv()
if err != nil {
doneChan <- err
return
}
// ...
// 如果 Heartbeat 請(qǐng)求中包含了卷大小限制,并且該限制和當(dāng)前 Volume Server 中保存的限制不同
if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
// 將 Volume Server 中保存的限制更新為 Heartbeat 請(qǐng)求中的限制
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
// 調(diào)用 vs.store.MaybeAdjustVolumeMax() 方法重新計(jì)算卷的最大容量
if vs.store.MaybeAdjustVolumeMax() {
// 如果計(jì)算結(jié)果發(fā)生了變化,則使用 stream.Send() 方法向 Master Server 發(fā)送 Heartbeat 響應(yīng)
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
return
}
}
}
// 如果 Heartbeat 請(qǐng)求中包含了新的 Master Server 地址,并且該地址和當(dāng)前地址不同
if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
// 通知主函數(shù)切換新的 Master Server 地址作為 Leader
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
newLeader = pb.ServerAddress(in.GetLeader())
doneChan <- nil
return
}
}
}()
最后使用一個(gè) for select 來監(jiān)聽來自 Volume Server 存儲(chǔ)層的四個(gè)通道:NewVolumesChan、NewEcShardsChan、DeletedVolumesChan 和 DeletedEcShardsChan。每當(dāng)有新的卷或 EC 分片被創(chuàng)建或刪除時(shí),會(huì)生成一個(gè) Heartbeat 消息,并使用 stream.Send() 方法將其發(fā)送到 Master Server ,同時(shí)也會(huì)定期發(fā)送心跳消息給 Master Server :
for {
select {
// 有新的卷被創(chuàng)建
case volumeMessage := <-vs.store.NewVolumesChan:
// ...
// 通知 Master Server
glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
// 有新的 EC 分片被創(chuàng)建
case ecShardMessage := <-vs.store.NewEcShardsChan:
// ...
// 通知 Master Server
glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
// 有卷被刪除
case volumeMessage := <-vs.store.DeletedVolumesChan:
// ...
// 通知 Master Server
glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
// 有 EC 分片被刪除
case ecShardMessage := <-vs.store.DeletedEcShardsChan:
// ...
// 通知 Master Server
glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
// 發(fā)送卷信息的心跳消息
case <-volumeTickChan.C:
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
vs.store.MaybeAdjustVolumeMax()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
// 發(fā)送 EC 分片信息的心跳消息
case <-ecShardTickChan.C:
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
// Volume Server 停止,退出監(jiān)聽
case err = <-doneChan:
return
// 用于在 Volume Server 停止時(shí)發(fā)送最終的心跳消息
case <-vs.stopChan:
// ...
glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
if err = stream.Send(emptyBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
return
}
}
啟動(dòng) Filer Server
啟動(dòng)一個(gè) Filer Server 可以使用以下命令:
weed filer -s3 -master="127.0.0.1:9333" -ip=127.0.0.1 -ip.bind=0.0.0.0
啟動(dòng)入口以及所有的參數(shù)定義在 weed/command/filer.go ,默認(rèn)情況 http 監(jiān)聽端口使用 8888 ,grpc 監(jiān)聽端口使用 18888 。
在這里,-master 為 Master Server 連接地址,同樣地,當(dāng)需要連接的 Master Server 為集群時(shí),可以將多個(gè) Master Server 的連接地址用逗號(hào)分隔; -s3 則代表要啟動(dòng) S3 網(wǎng)關(guān)功能,默認(rèn)監(jiān)聽 8333 端口。
Filer Server 可以理解為一個(gè)文件管理器,通過向下對(duì)接 Volume Server 與 Master Server,對(duì)外提供豐富的功能與特性,除了自身提供的 API 接口,還支持?jǐn)U展其它比如 POSIX ,WebDAV,S3 等的文件操作接口。
Filer Server 通過外部數(shù)據(jù)庫存儲(chǔ)文件的元數(shù)據(jù)信息。默認(rèn)情況下,使用的是 leveldb ,支持替換為其它流行的數(shù)據(jù)庫,例如 Sqlite、MySql、Etcd 等,具體可以參考 wiki/Filer-Stores[3] 。
作為一個(gè) API Server ,F(xiàn)iler Server 在架構(gòu)上就是一個(gè)服務(wù)端+數(shù)據(jù)庫模型,其節(jié)點(diǎn)的數(shù)量和規(guī)??梢愿鶕?jù)不同的工作負(fù)載和使用情況進(jìn)行優(yōu)化和調(diào)整。
上傳文件
首先分析 Filer Server 自身提供的 API 接口,上傳文件可以直接調(diào)用 :
$ curl -F "file_name=@test.txt" -X POST "http://127.0.0.1:8888"
{"name":"test.txt","size":14}
文件上傳的接口定義在 weed/server/filer_server_handlers_write.go 的 PostHandler 方法:
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
// 解析請(qǐng)求的目標(biāo)路徑
// ...
// 解析請(qǐng)求的查詢參數(shù),用于確定文件的存儲(chǔ)位置和屬性
// ...
if query.Has("mv.from") {
// 若查詢參數(shù)中出現(xiàn) mv.from ,則進(jìn)行文件移動(dòng)操作
fs.move(ctx, w, r, so)
} else {
// 文件上傳操作,自動(dòng)分塊
fs.autoChunk(ctx, w, r, contentLength, so)
}
util.CloseRequest(r)
}
跟蹤到 fs.autoChunk 方法:
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
//...
if r.Method == "POST" {
// 上傳文件
if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
reply, err = fs.mkdir(ctx, w, r)
} else {
// 自動(dòng)分塊上傳
reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
} else {
// 創(chuàng)建目錄
reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
//...
}
繼續(xù)來到 fs.doPostAutoChunk 方法:
func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
// 讀取上傳的文件內(nèi)容
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
return nil, nil, multipartReaderErr
}
// 讀取第一個(gè)分塊,在這里,我們只需要讀取第一個(gè)分塊,即上傳文件的內(nèi)容的分塊
part1, part1Err := multipartReader.NextPart()
if part1Err != nil {
return nil, nil, part1Err
}
// 獲取文件名和 Content-Type
fileName := part1.FileName()
if fileName != "" {
fileName = path.Base(fileName)
}
contentType := part1.Header.Get("Content-Type")
if contentType == "application/octet-stream" {
contentType = ""
}
// 核心邏輯
// 將上傳的文件內(nèi)容轉(zhuǎn)換為文件分塊,并返回文件分塊的相關(guān)信息
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
// 計(jì)算文件內(nèi)容的 MD5 值
md5bytes = md5Hash.Sum(nil)
// 保存文件元數(shù)據(jù)信息
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil {
fs.filer.DeleteChunks(fileChunks)
}
return
}
這些都比較好讀,繼續(xù)跟蹤到核心邏輯處 fs.uploadReaderToChunks ,方法內(nèi)首先會(huì)進(jìn)行一些正確性校驗(yàn)和必要變量的初始化,然后開啟一個(gè)循環(huán),不斷讀取數(shù)據(jù)并將其轉(zhuǎn)換為一個(gè)或多個(gè) Chunk :
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
// ...一系列操作
// 進(jìn)行一些正確性校驗(yàn)和必要變量的初始化
for {
// 使用對(duì)象池機(jī)制限制 bytes.Buffer 對(duì)象的數(shù)量,優(yōu)化內(nèi)存占用
bytesBufferLimitCond.L.Lock()
for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
bytesBufferLimitCond.Wait()
}
atomic.AddInt64(&bytesBufferCounter, 1)
bytesBufferLimitCond.L.Unlock()
bytesBuffer := bufPool.Get().(*bytes.Buffer)
glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
// 【關(guān)鍵】分塊操作,每個(gè)塊就是一個(gè) bytes.Buffer
// 根據(jù) chunkSize 從 partReader 中讀取數(shù)據(jù),并將讀取的數(shù)據(jù)保存到 bytes.Buffer 對(duì)象中
limitedReader := io.LimitReader(partReader, int64(chunkSize))
bytesBuffer.Reset()
dataSize, err := bytesBuffer.ReadFrom(limitedReader)
// 處理讀取數(shù)據(jù)時(shí)可能出現(xiàn)的錯(cuò)誤,以及在讀取完整個(gè)文件時(shí)的處理
// ...
wg.Add(1)
// 開啟 goroutine 處理
go func(offset int64) {
defer func() {
// 將 bytes.Buffer 對(duì)象歸還對(duì)象池
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
// 通知其他 goroutine 可以使用更多的 bytes.Buffer 對(duì)象
bytesBufferLimitCond.Signal()
wg.Done()
}()
// 【關(guān)鍵】上傳數(shù)據(jù)塊
chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
if toChunkErr != nil {
// 記錄上傳錯(cuò)誤
uploadErrLock.Lock()
if uploadErr == nil {
uploadErr = toChunkErr
}
uploadErrLock.Unlock()
}
if chunks != nil {
fileChunksLock.Lock()
fileChunksSize := len(fileChunks) + len(chunks)
for _, chunk := range chunks {
// 【關(guān)鍵】將當(dāng)前上傳的數(shù)據(jù)塊添加到 fileChunks 列表中
fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
}
fileChunksLock.Unlock()
}
}(chunkOffset)
// 更新已經(jīng)讀取的數(shù)據(jù)塊的大小
chunkOffset = chunkOffset + dataSize
if dataSize < int64(chunkSize) {
// 已經(jīng)讀取完整個(gè)文件
break
}
}
wg.Wait()
if uploadErr != nil {
// 上傳出錯(cuò),刪除 fileChunks
fs.filer.DeleteChunks(fileChunks)
return nil, md5Hash, 0, uploadErr, nil
}
// 【關(guān)鍵】對(duì)已經(jīng)上傳的數(shù)據(jù)塊,即 fileChunks 進(jìn)行排序,以便后續(xù)可以正確地進(jìn)行數(shù)據(jù)合并
slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
return a.Offset < b.Offset
})
// 返回 fileChunks 給調(diào)用方保存
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
文件的分塊操作都是在 Filer Server 完成的。而其中上傳數(shù)據(jù)塊的 fs.dataToChunk 方法會(huì)與 Master Server 進(jìn)行交互。
該方法首先會(huì)調(diào)用 fs.assignNewFileInfo 向 Master Server 請(qǐng)求分配一個(gè)新的文件 ID(fid)以及上傳 URL :
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil {
// ...
return uploadErr
}
然后使用分配的 fid 調(diào)用上傳 URL 上傳數(shù)據(jù)塊:
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
// ...
return uploadErr
}
這個(gè)由 Master Server 所分配的上傳 URL ,實(shí)際就是 Volume Server 的上傳地址,例 http://127.0.0.1:8080/14,1f343c431d ,其中 14,1f343c431d 就是文件 ID ,其實(shí)這個(gè)文件 ID 更準(zhǔn)確地說應(yīng)該是代表一個(gè)數(shù)據(jù)塊的文件 ID。
SeaweedFS 會(huì)根據(jù) maxMB 參數(shù),來把文件拆分成多個(gè)塊存儲(chǔ),默認(rèn)大小是 4MB 。即一個(gè) 100MB 大小的文件,上傳到 SeaweedFS 后會(huì)被分成 25 個(gè)塊存儲(chǔ),也就是申請(qǐng)分配了 25 個(gè)文件 ID 。
f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")
到這里,總算捋清流程了。
那還有一個(gè) S3 接口的文件上傳呢?
不用擔(dān)心,SeaweedFS S3 只是做了一個(gè) API 的代理轉(zhuǎn)發(fā),依舊轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口,邏輯依舊和上面一致,代碼位置在 weed/s3api/s3api_object_handlers.go :
// 這里的 uploadUrl 實(shí)際就是 Filer Server 的地址
// 例如在名稱為 test 的 S3 Bucket 中上傳 test.txt 文件
// 則 uploadUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")
下載文件
和上傳文件一樣,SeaweedFS S3 為文件下載做了一個(gè)代理轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口:
// 這里的 destUrl 實(shí)際就是 Filer Server 的地址
// 例如要下載 test Bucket 中的 test.txt 文件
// 則 destUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)
所以,當(dāng)下載一個(gè)文件時(shí):
$ curl http://127.0.0.1:8888/test.txt
hello test.txt
直接來看 weed/server/filer_server_handlers_read.go 的 GetOrHeadHandler 接口:
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
// ...
// 從 URL 中獲取文件或文件夾路徑
// 根據(jù)文件或文件夾的完整路徑從元數(shù)據(jù)數(shù)據(jù)庫中查找出 Entry 記錄(即文件的元數(shù)據(jù)信息)
// 若是文件夾,則列出文件夾下的文件
// ...
// 如果指定了 metadata=true 參數(shù),則直接返回文件或文件夾的元數(shù)據(jù)信息
if query.Get("metadata") == "true" {
// ...
return
}
// 減少服務(wù)器帶寬
// 通過 Etag 資源標(biāo)識(shí)對(duì)比資源是否發(fā)生變化
etag := filer.ETagEntry(entry)
if checkPreconditions(w, r, entry) {
// 如果資源未發(fā)生改變,則返回 304 Not Modified 響應(yīng),不返回具體的資源
// 客戶端可以直接讀取緩存中的數(shù)據(jù)
return
}
// 設(shè)置 ETag 標(biāo)識(shí)到響應(yīng)頭
setEtag(w, etag)
// ...
// 這里是用來處理獲取圖片文件的邏輯
if rangeReq := r.Header.Get("Range"); rangeReq == "" {
// ...
}
// 獲取普通文件核心邏輯
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
// 偏移量從請(qǐng)求頭中獲取,例 Range: bytes=100-199
// 若無指定偏移量,默認(rèn)為 0
// 判斷請(qǐng)求的范圍是否在文件的內(nèi)容大小范圍內(nèi)
if offset+size <= int64(len(entry.Content)) {
// ...
return err
}
// 從元數(shù)據(jù)數(shù)據(jù)庫獲取到的chunks信息
chunks := entry.GetChunks()
// 判斷文件是否只存在于遠(yuǎn)程存儲(chǔ)中,例如 AWS S3 、Google Cloud Storage 等
if entry.IsInRemoteOnly() {
// 將遠(yuǎn)程對(duì)象緩存到本地集群,并更新新的chunks
// ...
}
// 【核心】開始讀取文件并寫入 HTTP 響應(yīng)
// MasterClient :Master 節(jié)點(diǎn)的客戶端
// chunks :要讀取的文件數(shù)據(jù)塊列表
// offset :請(qǐng)求的文件內(nèi)容的起始位置
// size :請(qǐng)求的文件內(nèi)容的大小
// DownloadMaxBytesPs :下載速率的限制,單位是字節(jié)/秒
err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil {
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to stream content %s: %v", r.URL, err)
}
return err
})
}
根據(jù)代碼,我們可以直接通過 metadata=true 查詢參數(shù)查看文件的元數(shù)據(jù)信息:
$ curl http://127.0.0.1:8888/test.txt?metadata=true
{"FullPath":"/test.txt","Mtime":"2023-04-23T17:18:37+08:00","Crtime":"2023-04-23T17:18:37+08:00","Mode":432,"Uid":4294967295,"Gid":4294967295,"Mime":"text/plain","TtlSec":0,"UserName":"","GroupNames":null,"SymlinkTarget":"","Md5":"wuSNy045Bd4p8mTjIc40cg==","FileSize":14,"Rdev":0,"Inode":0,"Extended":null,"chunks":[{"file_id":"14,1f343c431d","size":14,"modified_ts_ns":1682241517592601300,"e_tag":"wuSNy045Bd4p8mTjIc40cg==","fid":{"volume_id":14,"file_key":31,"cookie":876364573},"is_compressed":true}],"HardLinkId":null,"HardLinkCounter":0,"Content":null,"Remote":null,"Quota":0}
其中最重要的就是 chunks 信息,里面定義了該文件的所有數(shù)據(jù)塊信息,只要把所有數(shù)據(jù)塊拼湊一起,就可以還原出整個(gè)文件。文件大小的原因,這里剛好只有一個(gè)塊,其文件 ID 為 14,1f343c431d 。
繼續(xù)解讀文件下載的核心方法 filer.StreamContentWithThrottler ,首先獲取所有文件 ID 所對(duì)應(yīng)的 URL 列表:
// 將 chunks 轉(zhuǎn)換為視圖列表 chunkViews
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
// 通過 chunkViews.Front() 獲取 chunkViews 列表的頭部元素,然后在每次迭代中將 x 移動(dòng)到下一個(gè)元素,直到遍歷完整個(gè)列表
for x := chunkViews.Front(); x != nil; x = x.Next {
// 從 x.Value 中獲取 chunkView 對(duì)象
chunkView := x.Value
var urlStrings []string
var err error
// 獲取 chunkView 對(duì)應(yīng)的文件 ID 的 URL 列表,并將 URL 列表存儲(chǔ)在 urlStrings 變量中
// 在分布式系統(tǒng)中,網(wǎng)絡(luò)故障和其他因素可能導(dǎo)致某些請(qǐng)求失敗,因此需要多次嘗試獲取 URL 列表,以提高獲取成功的概率
for _, backoff := range getLookupFileIdBackoffSchedule {
urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
if err == nil && len(urlStrings) > 0 {
break
}
glog.V(4).Infof("waiting for chunk: %s", chunkView.FileId)
time.Sleep(backoff)
}
// 錯(cuò)誤處理
// ...
fileId2Url[chunkView.FileId] = urlStrings
}
然后,通過獲取到的 URL 列表下載文件的所有 chunk :
// 下載速度限制器
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
// 通過遍歷 chunkViews 列表來下載每個(gè) chunk
for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value
// 檢查文件偏移量
if offset < chunkView.ViewOffset {
// ...
}
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
// 【核心】從 URL 列表中讀取 chunkView 的數(shù)據(jù),并將數(shù)據(jù)寫入到 writer 中給到客戶端
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
// 更新文件偏移量
offset += int64(chunkView.ViewSize)
// 更新剩余數(shù)據(jù)大小
remaining -= int64(chunkView.ViewSize)
// ...
}
// 檢查文件的所有數(shù)據(jù)是否都已經(jīng)成功下載
if remaining > 0 {
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
err := writeZero(writer, remaining)
if err != nil {
return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
}
}
可以總結(jié)出,下載文件本質(zhì)也是和 Master Server 交互,通過文件 ID 獲取到對(duì)應(yīng) Volume Server 的數(shù)據(jù)塊下載地址列表,按照列表順序請(qǐng)求下載數(shù)據(jù)塊,最后重新整合成了一個(gè)完整的文件返回給客戶端。
最后,附上文件下載的流程:
參考資料
[1]seaweedfs 3.46: https://github.com/seaweedfs/seaweedfs/tree/3.46
[2]github.com/seaweedfs/raft: https://github.com/seaweedfs/raft/tree/v1.1.0
[3]wiki/Filer-Stores: https://github.com/seaweedfs/seaweedfs/wiki/Filer-Stores