自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

SeaweedFS 分布式文件系統(tǒng)源碼分析

開發(fā) 架構(gòu)
Master Server 支持多節(jié)點(diǎn)(奇數(shù))部署。使用 Raft 一致性算法來選舉 Leader 節(jié)點(diǎn),這樣可以保證在 Leader 節(jié)點(diǎn)宕機(jī)的情況下,其他節(jié)點(diǎn)可以重新選舉出新的 Leader 節(jié)點(diǎn),從而保證系統(tǒng)的高可用性。

本文基于 seaweedfs 3.46[1]

SeaweedFS 的架構(gòu)包括 Master Server、Volume Server 和 Filer Server 。

圖片

啟動(dòng) Master Server

啟動(dòng)一個(gè) Master Server 可以使用以下命令:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動(dòng)入口以及所有的參數(shù)定義在 weed/command/master.go ,默認(rèn)情況 http 監(jiān)聽端口使用 9333 ,grpc 監(jiān)聽端口則在 http 端口的基礎(chǔ)上加 10000 (所有組件的默認(rèn)規(guī)則)即 19333 :

if *masterOption.portGrpc == 0 {
 *masterOption.portGrpc = 10000 + *masterOption.port
}

Master Server 支持多節(jié)點(diǎn)(奇數(shù))部署。使用 Raft 一致性算法來選舉 Leader 節(jié)點(diǎn),這樣可以保證在 Leader 節(jié)點(diǎn)宕機(jī)的情況下,其他節(jié)點(diǎn)可以重新選舉出新的 Leader 節(jié)點(diǎn),從而保證系統(tǒng)的高可用性。

如下,啟動(dòng)一個(gè)由三個(gè) Master Server 節(jié)點(diǎn)所組成的集群:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9333 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9334 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9335 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"

當(dāng) Master Server 啟動(dòng)時(shí),它會(huì)嘗試加入集群并參與 Leader 選舉。一旦選舉完成,Leader 節(jié)點(diǎn)將負(fù)責(zé)管理整個(gè)集群以及 Volume Server 。

首先會(huì)創(chuàng)建一個(gè) Master Server 包裝的 weed_server.RaftServer 對(duì)象:

raftServer, err = weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
 glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}

在 weed_server.NewRaftServer() 方法中會(huì)創(chuàng)建好 Raft 節(jié)點(diǎn)所需的各種參數(shù)和對(duì)象,然后調(diào)用 github.com/seaweedfs/raft[2] 庫創(chuàng)建 RaftServer 對(duì)象并啟動(dòng) Raft 節(jié)點(diǎn):

type RaftServer struct {
 // 存儲(chǔ)初始節(jié)點(diǎn)信息
 peers map[string]pb.ServerAddress
 // Raft 節(jié)點(diǎn)
 raftServer raft.Server
 // HashiCorp Raft 節(jié)點(diǎn)
 RaftHashicorp *hashicorpRaft.Raft
 // 用于管理 Raft 節(jié)點(diǎn)之間的通信
 TransportManager *transport.Manager
 // Raft 節(jié)點(diǎn)的數(shù)據(jù)目錄
 dataDir string
 // Raft 節(jié)點(diǎn)的地址
 serverAddr pb.ServerAddress
 // Raft 集群的拓?fù)浣Y(jié)構(gòu)
 topo *topology.Topology
 // Raft 節(jié)點(diǎn)的 gRPC 服務(wù)
 *raft.GrpcServer
}

func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
 // 通過 option 創(chuàng)建一個(gè) RaftServer 對(duì)象 s
 s := &RaftServer{
  peers:      option.Peers,
  serverAddr: option.ServerAddr,
  dataDir:    option.DataDir,
  topo:       option.Topo,
 }

 //...

 // 調(diào)用 github.com/seaweedfs/raft 庫,創(chuàng)建 RaftServer 對(duì)象
 s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")

 //...

 // 啟動(dòng) Raft 節(jié)點(diǎn)
 if err := s.raftServer.Start(); err != nil {
  return nil, err
 }

 // 將節(jié)點(diǎn)加入到 Raft 集群中
 for name, peer := range s.peers {
  if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
   return nil, err
  }
 }

 //...

 glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())

 return s, nil
}

最后,會(huì)打印出當(dāng)前的 Leader 節(jié)點(diǎn),如果對(duì) Raft 選舉算法的處理細(xì)節(jié)感興趣,可以繼續(xù)深入 s.raftServer.Start() 的實(shí)現(xiàn)。

Raft 節(jié)點(diǎn)啟動(dòng)成功后,Master Server 會(huì)注冊(cè)一些集群相關(guān)的接口,方便查看集群狀態(tài):

r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET", "HEAD")
if *masterOption.raftHashicorp {
 r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
}

請(qǐng)求如下:

$ curl http://127.0.0.1:9333/cluster/status
{"IsLeader":true,"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9334"]}
$ curl http://127.0.0.1:9334/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9333"]}
$ curl http://127.0.0.1:9335/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9333","127.0.0.1:9334"]}

啟動(dòng) Volume Server

啟動(dòng)一個(gè) Volume Server 可以使用以下命令:

weed volume -mserver="127.0.0.1:9333" -dir=data -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動(dòng)入口以及所有的參數(shù)定義在 weed/command/volume.go ,默認(rèn)情況 http 監(jiān)聽端口使用 8080 ,grpc 監(jiān)聽端口使用 18080 。

其中,-mserver 為 Master Server 連接地址,當(dāng)需要連接的 Master Server 為集群時(shí),可以將多個(gè) Master Server 的連接地址用逗號(hào)分隔; -dir 則用來指定 Volume Server 存儲(chǔ)數(shù)據(jù)文件的目錄。

和 Master Server 不同,Volume Server 支持橫向擴(kuò)展,其節(jié)點(diǎn)數(shù)量規(guī)??梢噪S著數(shù)據(jù)量和性能需求的變化而隨時(shí)動(dòng)態(tài)調(diào)整。

一旦 Volume Server 啟動(dòng)后,就會(huì)與 Master Server 保持通信,匯報(bào)自身的狀態(tài),并根據(jù) Master Server 的指示執(zhí)行創(chuàng)建、刪除、修復(fù)等操作。

核心邏輯在 weed/server/volume_grpc_client_to_master.go 的 VolumeServer.doHeartbeat 方法。

首先會(huì)創(chuàng)建一個(gè) Master Server 的 gRPC 連接客戶端,并使用該客戶端調(diào)用 SendHeartbeat 方法:

// 創(chuàng)建 Master Server 的 gRPC 連接客戶端
client := master_pb.NewSeaweedClient(grpcConnection)
// 調(diào)用 SendHeartbeat
stream, err := client.SendHeartbeat(ctx)
if err != nil {
 glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
 return "", err
}

SendHeartbeat 方法是一個(gè)雙向流式 RPC ,允許在一次調(diào)用中發(fā)送多個(gè)請(qǐng)求和響應(yīng),其 ProtoBuf 定義如下:

rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}

接著創(chuàng)建一個(gè) goroutine 用來處理從 Master Server 發(fā)送過來的 Heartbeat 請(qǐng)求:

go func() {
 for {
  // 從輸入流中讀取 Heartbeat 請(qǐng)求
  in, err := stream.Recv()
  if err != nil {
   doneChan <- err
   return
  }
  // ...

  // 如果 Heartbeat 請(qǐng)求中包含了卷大小限制,并且該限制和當(dāng)前 Volume Server 中保存的限制不同
  if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
   // 將 Volume Server 中保存的限制更新為 Heartbeat 請(qǐng)求中的限制
   vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
   // 調(diào)用 vs.store.MaybeAdjustVolumeMax() 方法重新計(jì)算卷的最大容量
   if vs.store.MaybeAdjustVolumeMax() {
    // 如果計(jì)算結(jié)果發(fā)生了變化,則使用 stream.Send() 方法向 Master Server 發(fā)送 Heartbeat 響應(yīng)
    if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
     glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
     return
    }
   }
  }
  // 如果 Heartbeat 請(qǐng)求中包含了新的 Master Server 地址,并且該地址和當(dāng)前地址不同
  if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
   // 通知主函數(shù)切換新的 Master Server 地址作為 Leader
   glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
   newLeader = pb.ServerAddress(in.GetLeader())
   doneChan <- nil
   return
  }
 }
}()

最后使用一個(gè) for select 來監(jiān)聽來自 Volume Server 存儲(chǔ)層的四個(gè)通道:NewVolumesChan、NewEcShardsChan、DeletedVolumesChan 和 DeletedEcShardsChan。每當(dāng)有新的卷或 EC 分片被創(chuàng)建或刪除時(shí),會(huì)生成一個(gè) Heartbeat 消息,并使用 stream.Send() 方法將其發(fā)送到 Master Server ,同時(shí)也會(huì)定期發(fā)送心跳消息給 Master Server :

for {
 select {
 // 有新的卷被創(chuàng)建
 case volumeMessage := <-vs.store.NewVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有新的 EC 分片被創(chuàng)建
 case ecShardMessage := <-vs.store.NewEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有卷被刪除
 case volumeMessage := <-vs.store.DeletedVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有 EC 分片被刪除
 case ecShardMessage := <-vs.store.DeletedEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 發(fā)送卷信息的心跳消息
 case <-volumeTickChan.C:
  glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
  vs.store.MaybeAdjustVolumeMax()
  if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // 發(fā)送 EC 分片信息的心跳消息
 case <-ecShardTickChan.C:
  glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
  if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // Volume Server 停止,退出監(jiān)聽
 case err = <-doneChan:
  return
 // 用于在 Volume Server 停止時(shí)發(fā)送最終的心跳消息
 case <-vs.stopChan:
  // ...
  glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
  if err = stream.Send(emptyBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
  return
 }
}

啟動(dòng) Filer Server

啟動(dòng)一個(gè) Filer Server 可以使用以下命令:

weed filer -s3 -master="127.0.0.1:9333" -ip=127.0.0.1 -ip.bind=0.0.0.0

啟動(dòng)入口以及所有的參數(shù)定義在 weed/command/filer.go ,默認(rèn)情況 http 監(jiān)聽端口使用 8888 ,grpc 監(jiān)聽端口使用 18888 。

在這里,-master 為 Master Server 連接地址,同樣地,當(dāng)需要連接的 Master Server 為集群時(shí),可以將多個(gè) Master Server 的連接地址用逗號(hào)分隔; -s3 則代表要啟動(dòng) S3 網(wǎng)關(guān)功能,默認(rèn)監(jiān)聽 8333 端口。

Filer Server 可以理解為一個(gè)文件管理器,通過向下對(duì)接 Volume Server 與 Master Server,對(duì)外提供豐富的功能與特性,除了自身提供的 API 接口,還支持?jǐn)U展其它比如 POSIX ,WebDAV,S3 等的文件操作接口。

Filer Server 通過外部數(shù)據(jù)庫存儲(chǔ)文件的元數(shù)據(jù)信息。默認(rèn)情況下,使用的是 leveldb ,支持替換為其它流行的數(shù)據(jù)庫,例如 Sqlite、MySql、Etcd 等,具體可以參考 wiki/Filer-Stores[3] 。

作為一個(gè) API Server ,F(xiàn)iler Server 在架構(gòu)上就是一個(gè)服務(wù)端+數(shù)據(jù)庫模型,其節(jié)點(diǎn)的數(shù)量和規(guī)??梢愿鶕?jù)不同的工作負(fù)載和使用情況進(jìn)行優(yōu)化和調(diào)整。

上傳文件

首先分析 Filer Server 自身提供的 API 接口,上傳文件可以直接調(diào)用 :

$ curl -F "file_name=@test.txt" -X POST "http://127.0.0.1:8888"
{"name":"test.txt","size":14}

文件上傳的接口定義在 weed/server/filer_server_handlers_write.go 的 PostHandler 方法:

func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
 // 解析請(qǐng)求的目標(biāo)路徑
 // ...
 // 解析請(qǐng)求的查詢參數(shù),用于確定文件的存儲(chǔ)位置和屬性
 // ...
 if query.Has("mv.from") {
  // 若查詢參數(shù)中出現(xiàn) mv.from ,則進(jìn)行文件移動(dòng)操作
  fs.move(ctx, w, r, so)
 } else {
  // 文件上傳操作,自動(dòng)分塊
  fs.autoChunk(ctx, w, r, contentLength, so)
 }

 util.CloseRequest(r)
}

跟蹤到 fs.autoChunk 方法:

func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
 //...

 if r.Method == "POST" {
  // 上傳文件
  if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
   reply, err = fs.mkdir(ctx, w, r)
  } else {
   // 自動(dòng)分塊上傳
   reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
  }
 } else {
  // 創(chuàng)建目錄
  reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
 }

 //...
}

繼續(xù)來到 fs.doPostAutoChunk 方法:

func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {

 // 讀取上傳的文件內(nèi)容
 multipartReader, multipartReaderErr := r.MultipartReader()
 if multipartReaderErr != nil {
  return nil, nil, multipartReaderErr
 }

 // 讀取第一個(gè)分塊,在這里,我們只需要讀取第一個(gè)分塊,即上傳文件的內(nèi)容的分塊
 part1, part1Err := multipartReader.NextPart()
 if part1Err != nil {
  return nil, nil, part1Err
 }

 // 獲取文件名和 Content-Type
 fileName := part1.FileName()
 if fileName != "" {
  fileName = path.Base(fileName)
 }
 contentType := part1.Header.Get("Content-Type")
 if contentType == "application/octet-stream" {
  contentType = ""
 }

 // 核心邏輯
 // 將上傳的文件內(nèi)容轉(zhuǎn)換為文件分塊,并返回文件分塊的相關(guān)信息
 fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
 if err != nil {
  return nil, nil, err
 }

 // 計(jì)算文件內(nèi)容的 MD5 值
 md5bytes = md5Hash.Sum(nil)
 // 保存文件元數(shù)據(jù)信息
 filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
 if replyerr != nil {
  fs.filer.DeleteChunks(fileChunks)
 }

 return
}

這些都比較好讀,繼續(xù)跟蹤到核心邏輯處 fs.uploadReaderToChunks ,方法內(nèi)首先會(huì)進(jìn)行一些正確性校驗(yàn)和必要變量的初始化,然后開啟一個(gè)循環(huán),不斷讀取數(shù)據(jù)并將其轉(zhuǎn)換為一個(gè)或多個(gè) Chunk :

func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
 // ...一系列操作
 // 進(jìn)行一些正確性校驗(yàn)和必要變量的初始化

 for {

  // 使用對(duì)象池機(jī)制限制 bytes.Buffer 對(duì)象的數(shù)量,優(yōu)化內(nèi)存占用
  bytesBufferLimitCond.L.Lock()
  for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
   glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
   bytesBufferLimitCond.Wait()
  }
  atomic.AddInt64(&bytesBufferCounter, 1)
  bytesBufferLimitCond.L.Unlock()

  bytesBuffer := bufPool.Get().(*bytes.Buffer)
  glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))

  // 【關(guān)鍵】分塊操作,每個(gè)塊就是一個(gè) bytes.Buffer
  // 根據(jù) chunkSize 從 partReader 中讀取數(shù)據(jù),并將讀取的數(shù)據(jù)保存到 bytes.Buffer 對(duì)象中
  limitedReader := io.LimitReader(partReader, int64(chunkSize))

  bytesBuffer.Reset()

  dataSize, err := bytesBuffer.ReadFrom(limitedReader)

  // 處理讀取數(shù)據(jù)時(shí)可能出現(xiàn)的錯(cuò)誤,以及在讀取完整個(gè)文件時(shí)的處理
  // ...

  wg.Add(1)
  // 開啟 goroutine 處理
  go func(offset int64) {
   defer func() {
    // 將 bytes.Buffer 對(duì)象歸還對(duì)象池
    bufPool.Put(bytesBuffer)
    atomic.AddInt64(&bytesBufferCounter, -1)
    // 通知其他 goroutine 可以使用更多的 bytes.Buffer 對(duì)象
    bytesBufferLimitCond.Signal()
    wg.Done()
   }()

   // 【關(guān)鍵】上傳數(shù)據(jù)塊
   chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)

   if toChunkErr != nil {
    // 記錄上傳錯(cuò)誤
    uploadErrLock.Lock()
    if uploadErr == nil {
     uploadErr = toChunkErr
    }
    uploadErrLock.Unlock()
   }
   if chunks != nil {
    fileChunksLock.Lock()
    fileChunksSize := len(fileChunks) + len(chunks)
    for _, chunk := range chunks {
     // 【關(guān)鍵】將當(dāng)前上傳的數(shù)據(jù)塊添加到 fileChunks 列表中
     fileChunks = append(fileChunks, chunk)
     glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
    }
    fileChunksLock.Unlock()
   }
  }(chunkOffset)

  // 更新已經(jīng)讀取的數(shù)據(jù)塊的大小
  chunkOffset = chunkOffset + dataSize

  if dataSize < int64(chunkSize) {
   // 已經(jīng)讀取完整個(gè)文件
   break
  }
 }

 wg.Wait()

 if uploadErr != nil {
  // 上傳出錯(cuò),刪除 fileChunks
  fs.filer.DeleteChunks(fileChunks)
  return nil, md5Hash, 0, uploadErr, nil
 }
 // 【關(guān)鍵】對(duì)已經(jīng)上傳的數(shù)據(jù)塊,即 fileChunks 進(jìn)行排序,以便后續(xù)可以正確地進(jìn)行數(shù)據(jù)合并
 slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
  return a.Offset < b.Offset
 })
 // 返回 fileChunks 給調(diào)用方保存
 return fileChunks, md5Hash, chunkOffset, nil, smallContent
}

文件的分塊操作都是在 Filer Server 完成的。而其中上傳數(shù)據(jù)塊的 fs.dataToChunk 方法會(huì)與 Master Server 進(jìn)行交互。

該方法首先會(huì)調(diào)用 fs.assignNewFileInfo 向 Master Server 請(qǐng)求分配一個(gè)新的文件 ID(fid)以及上傳 URL :

fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil {
 // ...
 return uploadErr
}

然后使用分配的 fid 調(diào)用上傳 URL 上傳數(shù)據(jù)塊:

uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
 // ...
 return uploadErr
}

這個(gè)由 Master Server 所分配的上傳 URL ,實(shí)際就是 Volume Server 的上傳地址,例 http://127.0.0.1:8080/14,1f343c431d ,其中 14,1f343c431d 就是文件 ID ,其實(shí)這個(gè)文件 ID 更準(zhǔn)確地說應(yīng)該是代表一個(gè)數(shù)據(jù)塊的文件 ID。

SeaweedFS 會(huì)根據(jù) maxMB 參數(shù),來把文件拆分成多個(gè)塊存儲(chǔ),默認(rèn)大小是 4MB 。即一個(gè) 100MB 大小的文件,上傳到 SeaweedFS 后會(huì)被分成 25 個(gè)塊存儲(chǔ),也就是申請(qǐng)分配了 25 個(gè)文件 ID 。

f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")

到這里,總算捋清流程了。

那還有一個(gè) S3 接口的文件上傳呢?

不用擔(dān)心,SeaweedFS S3 只是做了一個(gè) API 的代理轉(zhuǎn)發(fā),依舊轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口,邏輯依舊和上面一致,代碼位置在 weed/s3api/s3api_object_handlers.go :

// 這里的 uploadUrl 實(shí)際就是 Filer Server 的地址
// 例如在名稱為 test 的 S3 Bucket 中上傳 test.txt 文件
// 則 uploadUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")

下載文件

和上傳文件一樣,SeaweedFS S3 為文件下載做了一個(gè)代理轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)到 Filer Server 自身提供的 API 接口:

// 這里的 destUrl 實(shí)際就是 Filer Server 的地址
// 例如要下載 test Bucket 中的 test.txt 文件
// 則 destUrl 為: http://127.0.0.1:8888/buckets/test/test.txt
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)

所以,當(dāng)下載一個(gè)文件時(shí):

$ curl http://127.0.0.1:8888/test.txt
hello test.txt

直接來看 weed/server/filer_server_handlers_read.go 的 GetOrHeadHandler 接口:

func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {

 // ...
 // 從 URL 中獲取文件或文件夾路徑

 // 根據(jù)文件或文件夾的完整路徑從元數(shù)據(jù)數(shù)據(jù)庫中查找出 Entry 記錄(即文件的元數(shù)據(jù)信息)

 // 若是文件夾,則列出文件夾下的文件
 // ...

 // 如果指定了 metadata=true 參數(shù),則直接返回文件或文件夾的元數(shù)據(jù)信息
 if query.Get("metadata") == "true" {
  // ...
  return
 }

 // 減少服務(wù)器帶寬
 // 通過 Etag 資源標(biāo)識(shí)對(duì)比資源是否發(fā)生變化
 etag := filer.ETagEntry(entry)
 if checkPreconditions(w, r, entry) {
  // 如果資源未發(fā)生改變,則返回 304 Not Modified 響應(yīng),不返回具體的資源
  // 客戶端可以直接讀取緩存中的數(shù)據(jù)
  return
 }

 // 設(shè)置 ETag 標(biāo)識(shí)到響應(yīng)頭
 setEtag(w, etag)

 // ...

 // 這里是用來處理獲取圖片文件的邏輯
 if rangeReq := r.Header.Get("Range"); rangeReq == "" {
  // ...
 }

 // 獲取普通文件核心邏輯
 processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
  // 偏移量從請(qǐng)求頭中獲取,例 Range: bytes=100-199
  // 若無指定偏移量,默認(rèn)為 0
  // 判斷請(qǐng)求的范圍是否在文件的內(nèi)容大小范圍內(nèi)
  if offset+size <= int64(len(entry.Content)) {
   // ...
   return err
  }
  // 從元數(shù)據(jù)數(shù)據(jù)庫獲取到的chunks信息
  chunks := entry.GetChunks()
  // 判斷文件是否只存在于遠(yuǎn)程存儲(chǔ)中,例如 AWS S3 、Google Cloud Storage 等
  if entry.IsInRemoteOnly() {
   // 將遠(yuǎn)程對(duì)象緩存到本地集群,并更新新的chunks
   // ...
  }

  // 【核心】開始讀取文件并寫入 HTTP 響應(yīng)
  // MasterClient :Master 節(jié)點(diǎn)的客戶端
  // chunks :要讀取的文件數(shù)據(jù)塊列表
  // offset :請(qǐng)求的文件內(nèi)容的起始位置
  // size :請(qǐng)求的文件內(nèi)容的大小
  // DownloadMaxBytesPs :下載速率的限制,單位是字節(jié)/秒
  err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
  if err != nil {
   stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
   glog.Errorf("failed to stream content %s: %v", r.URL, err)
  }
  return err
 })
}

根據(jù)代碼,我們可以直接通過 metadata=true 查詢參數(shù)查看文件的元數(shù)據(jù)信息:

$ curl http://127.0.0.1:8888/test.txt?metadata=true
{"FullPath":"/test.txt","Mtime":"2023-04-23T17:18:37+08:00","Crtime":"2023-04-23T17:18:37+08:00","Mode":432,"Uid":4294967295,"Gid":4294967295,"Mime":"text/plain","TtlSec":0,"UserName":"","GroupNames":null,"SymlinkTarget":"","Md5":"wuSNy045Bd4p8mTjIc40cg==","FileSize":14,"Rdev":0,"Inode":0,"Extended":null,"chunks":[{"file_id":"14,1f343c431d","size":14,"modified_ts_ns":1682241517592601300,"e_tag":"wuSNy045Bd4p8mTjIc40cg==","fid":{"volume_id":14,"file_key":31,"cookie":876364573},"is_compressed":true}],"HardLinkId":null,"HardLinkCounter":0,"Content":null,"Remote":null,"Quota":0}

其中最重要的就是 chunks 信息,里面定義了該文件的所有數(shù)據(jù)塊信息,只要把所有數(shù)據(jù)塊拼湊一起,就可以還原出整個(gè)文件。文件大小的原因,這里剛好只有一個(gè)塊,其文件 ID 為 14,1f343c431d 。

繼續(xù)解讀文件下載的核心方法 filer.StreamContentWithThrottler ,首先獲取所有文件 ID 所對(duì)應(yīng)的 URL 列表:

// 將 chunks 轉(zhuǎn)換為視圖列表 chunkViews
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)

fileId2Url := make(map[string][]string)

// 通過 chunkViews.Front() 獲取 chunkViews 列表的頭部元素,然后在每次迭代中將 x 移動(dòng)到下一個(gè)元素,直到遍歷完整個(gè)列表
for x := chunkViews.Front(); x != nil; x = x.Next {
 // 從 x.Value 中獲取 chunkView 對(duì)象
 chunkView := x.Value
 var urlStrings []string
 var err error
 // 獲取 chunkView 對(duì)應(yīng)的文件 ID 的 URL 列表,并將 URL 列表存儲(chǔ)在 urlStrings 變量中
 // 在分布式系統(tǒng)中,網(wǎng)絡(luò)故障和其他因素可能導(dǎo)致某些請(qǐng)求失敗,因此需要多次嘗試獲取 URL 列表,以提高獲取成功的概率
 for _, backoff := range getLookupFileIdBackoffSchedule {
  urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  if err == nil && len(urlStrings) > 0 {
   break
  }
  glog.V(4).Infof("waiting for chunk: %s", chunkView.FileId)
  time.Sleep(backoff)
 }
 // 錯(cuò)誤處理
 // ...
 fileId2Url[chunkView.FileId] = urlStrings
}

然后,通過獲取到的 URL 列表下載文件的所有 chunk :

// 下載速度限制器
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
// 通過遍歷 chunkViews 列表來下載每個(gè) chunk
for x := chunkViews.Front(); x != nil; x = x.Next {
 chunkView := x.Value
 // 檢查文件偏移量
 if offset < chunkView.ViewOffset {
  // ...
 }
 urlStrings := fileId2Url[chunkView.FileId]
 start := time.Now()
 // 【核心】從 URL 列表中讀取 chunkView 的數(shù)據(jù),并將數(shù)據(jù)寫入到 writer 中給到客戶端
 err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
 // 更新文件偏移量
 offset += int64(chunkView.ViewSize)
 // 更新剩余數(shù)據(jù)大小
 remaining -= int64(chunkView.ViewSize)
 // ...
}
// 檢查文件的所有數(shù)據(jù)是否都已經(jīng)成功下載
if remaining > 0 {
 glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
 err := writeZero(writer, remaining)
 if err != nil {
  return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
 }
}

可以總結(jié)出,下載文件本質(zhì)也是和 Master Server 交互,通過文件 ID 獲取到對(duì)應(yīng) Volume Server 的數(shù)據(jù)塊下載地址列表,按照列表順序請(qǐng)求下載數(shù)據(jù)塊,最后重新整合成了一個(gè)完整的文件返回給客戶端。

最后,附上文件下載的流程:

圖片

參考資料

[1]seaweedfs 3.46: https://github.com/seaweedfs/seaweedfs/tree/3.46

[2]github.com/seaweedfs/raft: https://github.com/seaweedfs/raft/tree/v1.1.0

[3]wiki/Filer-Stores: https://github.com/seaweedfs/seaweedfs/wiki/Filer-Stores

責(zé)任編輯:武曉燕 來源: gopher云原生
相關(guān)推薦

2010-11-01 05:50:46

分布式文件系統(tǒng)

2017-10-17 08:33:31

存儲(chǔ)系統(tǒng)分布式

2010-06-04 18:45:43

Hadoop分布式文件

2012-09-19 13:43:13

OpenAFS分布式文件系統(tǒng)

2012-09-19 15:05:24

MogileFS分布式文件系統(tǒng)

2010-11-15 13:24:07

分布式文件系統(tǒng)

2013-01-07 10:29:31

大數(shù)據(jù)

2012-08-31 16:04:11

HDFS分布式文件系統(tǒng)

2013-06-18 14:00:59

HDFS分布式文件系統(tǒng)

2013-05-27 14:46:06

文件系統(tǒng)分布式文件系統(tǒng)

2012-07-20 14:40:22

2022-09-13 07:51:08

JuiceFS分布式文件系統(tǒng)

2011-07-15 17:48:27

Platform

2020-01-03 08:33:57

Ceph硬件系統(tǒng)

2011-03-16 14:23:38

分布式文件

2012-05-10 15:23:53

分布式文件系統(tǒng)測(cè)試

2012-10-09 16:43:47

FastDFS分布式文件系統(tǒng)

2012-05-10 14:04:07

分布式文件系統(tǒng)架構(gòu)

2013-01-09 10:16:09

HDFS

2021-04-13 08:06:17

分布式HDFS存儲(chǔ)大型數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)