Go項(xiàng)目實(shí)戰(zhàn):一步步構(gòu)建一個(gè)并發(fā)文件下載器
大家好,我是 polarisxu。
今天為大家?guī)硪粋€(gè)實(shí)戰(zhàn)項(xiàng)目。建議你一定要?jiǎng)邮謱?shí)踐。
在往下看之前,你不妨思考下,用 Go 如何實(shí)現(xiàn)一個(gè)并發(fā)下載器。
01 原理
對(duì)于服務(wù)器上的某個(gè)文件,我們要并發(fā)下載到本地,很容易想到,應(yīng)該將文件分成多個(gè)部分,然后開多個(gè) goroutine 并發(fā)地去下載,最后將這多個(gè)部分合并成一個(gè)文件,實(shí)現(xiàn)并發(fā)下載的目的。
現(xiàn)在的問題是,服務(wù)器上的一個(gè)文件,我們怎么做到分成多個(gè)呢?
這需要 HTTP 協(xié)議相關(guān)知識(shí)了。
HTTP 協(xié)議有一個(gè)響應(yīng)頭:Accept-Ranges,服務(wù)器通過該頭來標(biāo)識(shí)自身支持部分請(qǐng)求(partial requests),也叫范圍請(qǐng)求。如果服務(wù)端支持部分請(qǐng)求,我們就可以實(shí)現(xiàn)并發(fā)下載。該頭有兩個(gè)可能的值:
- Accept-Ranges: bytes
- Accept-Ranges: none
- none:不支持任何部分請(qǐng)求單位,由于其等同于沒有返回此頭部,因此很少使用。不過一些瀏覽器,比如 IE9,會(huì)依據(jù)該頭部去禁用或者移除下載管理器的暫停按鈕。
- bytes:部分請(qǐng)求的單位是 bytes (字節(jié))。
所以,我們在并發(fā)下載之前,應(yīng)該先發(fā)起一個(gè) Head 請(qǐng)求,來確認(rèn)服務(wù)端是否支持部分請(qǐng)求。比如:
- resp, err := http.Head("https://studygolang.com/dl/golang/go1.16.5.src.tar.gz")
- if err != nil {
- return err
- }
- if resp.StatusCode == http.StatusOK && resp.Header.Get("Accept-Ranges") == "bytes" {
- // 支持部分請(qǐng)求
- }
確認(rèn)了服務(wù)器支持部分請(qǐng)求,接下來就是如何進(jìn)行部分請(qǐng)求。
這就用到 HTTP 的一個(gè)請(qǐng)求頭部:Range。(詳情參考: https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Headers/Range )
Range 告知服務(wù)器返回文件的哪一部分。在一個(gè) Range 頭部中,可以一次性請(qǐng)求多個(gè)部分,服務(wù)器會(huì)以 multipart 文件的形式將其返回。如果服務(wù)器返回的是范圍響應(yīng),需要使用 206 Partial Content 狀態(tài)碼。假如所請(qǐng)求的范圍不合法,那么服務(wù)器會(huì)返回 416 Range Not Satisfiable 狀態(tài)碼,表示客戶端錯(cuò)誤。服務(wù)器允許忽略 Range 首部,從而返回整個(gè)文件,狀態(tài)碼用 200。
具體語法:
- Range: <unit>=<range-start>-
- Range: <unit>=<range-start>-<range-end>
- Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end>
- Range: <unit>=<range-start>-<range-end>, <range-start>-<range-end>, <range-start>-<range-end>
-
<unit>
范圍所采用的單位,通常是字節(jié)(bytes)。
-
<range-start>
一個(gè)整數(shù),表示在特定單位下,范圍的起始值。
-
<range-end>
一個(gè)整數(shù),表示在特定單位下,范圍的結(jié)束值。這個(gè)值是可選的,如果不存在,表示此范圍一直延伸到文檔結(jié)束。
例如:
- Range: bytes=200-1000, 2000-6576, 19000-
掌握了以上知識(shí)點(diǎn),最后要做的就是將下載下來的各個(gè)部分合并成一個(gè)文件。需要注意各個(gè)部分的順序,比如根據(jù)順序,按 1、2、3 等編號(hào)。
02 動(dòng)手實(shí)現(xiàn)一個(gè)
知道了原理不代表你真的就會(huì)了,我們應(yīng)該實(shí)際動(dòng)手實(shí)現(xiàn)一個(gè),加深理解。
在本地某個(gè)目錄下創(chuàng)建目錄:downloader。
- $ mkdir downloader
- $ cd downloader
- $ go mod init github.com/polaris1119/downloader
命令行參數(shù)控制
為了讓工具更好用,我們應(yīng)該支持命令行參數(shù),而不是代碼寫死一個(gè),比如要下載的 URL、并發(fā)數(shù)、輸出的文件名等。關(guān)于命令行參數(shù)控制,除了使用標(biāo)準(zhǔn)庫 flag,我比較喜歡 github.com/urfave/cli,最新版本 v2。
創(chuàng)建一個(gè)文件 main.go,內(nèi)容如下:
- package main
- import (
- "log"
- "os"
- "runtime"
- "github.com/urfave/cli/v2"
- )
- func main() {
- // 默認(rèn)并發(fā)數(shù)
- concurrencyN := runtime.NumCPU()
- app := &cli.App{
- Name: "downloader",
- Usage: "File concurrency downloader",
- Flags: []cli.Flag{
- &cli.StringFlag{
- Name: "url",
- Aliases: []string{"u"},
- Usage: "`URL` to download",
- Required: true,
- },
- &cli.StringFlag{
- Name: "output",
- Aliases: []string{"o"},
- Usage: "Output `filename`",
- },
- &cli.IntFlag{
- Name: "concurrency",
- Aliases: []string{"n"},
- Value: concurrencyN,
- Usage: "Concurrency `number`",
- },
- },
- Action: func(c *cli.Context) error {
- return nil
- },
- }
- err := app.Run(os.Args)
- if err != nil {
- log.Fatal(err)
- }
- }
執(zhí)行 go mod tidy,下載必要的包。然后執(zhí)行:
- $ go run main.go -h
- NAME:
- downloader - File concurrency downloader
- USAGE:
- downloader [global options] command [command options] [arguments...]
- COMMANDS:
- help, h Shows a list of commands or help for one command
- GLOBAL OPTIONS:
- --url URL, -u URL URL to download
- --output filename, -o filename Output filename
- --concurrency number, -n number Concurrency number (default: 8)
- --help, -h show help (default: false)
關(guān)于 cli 這個(gè)庫的使用,可以參閱官方文檔,寫的很詳細(xì),也有很多例子。
檢查是否支持并發(fā)下載
創(chuàng)建另外一個(gè)文件 downloader.go,定義一個(gè)結(jié)構(gòu)體 Dowloader:
- package main
- type Downloader struct {
- concurrency int
- }
- func NewDownloader(concurrency int) *Downloader {
- return &Downloader{concurrency: concurrency}
- }
為該結(jié)構(gòu)體增加 Download 方法:
- func (d *Downloader) Download(strURL, filename string) error {
- if filename == "" {
- filename = path.Base(strURL)
- }
- resp, err := http.Head(strURL)
- if err != nil {
- return err
- }
- if resp.StatusCode == http.StatusOK && resp.Header.Get("Accept-Ranges") == "bytes" {
- return d.multiDownload(strURL, filename, int(resp.ContentLength))
- }
- return d.singleDownload(strURL, filename)
- }
- func (d *Downloader) multiDownload(strURL, filename string, contentLen int) error {
- return nil
- }
- func (d *Downloader) singleDownload(strURL, filename string) error {
- return nil
- }
- 通過 Head 請(qǐng)求,判斷是否支持部分請(qǐng)求。在原理部分已經(jīng)講解;
- 如果不支持,就直接下載整個(gè)文件;
當(dāng)支持部分請(qǐng)求時(shí),文件總大小通過 Head 請(qǐng)求的響應(yīng)中的 ContentLength 可以獲得。有了文件總大小和并發(fā)數(shù),就可以知道每個(gè)部分的大小了。
并發(fā)下載
這部分第一個(gè)要點(diǎn)是如何發(fā)起部分請(qǐng)求:
- req, err := http.NewRequest("GET", "https://apache.claz.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz", nil)
- if err != nil {
- return err
- }
- rangeStart := 2000
- rangeStop := 3000
- req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", rangeStart, rangeStop))
- res, err := http.DefaultClient.Do(req)
我們可以將其封裝成一個(gè)方法:
- func (d *Downloader) downloadPartial(strURL, filename string, rangeStart, rangeEnd, i int) {
- if rangeStart >= rangeEnd {
- return
- }
- req, err := http.NewRequest("GET", strURL, nil)
- if err != nil {
- log.Fatal(err)
- }
- req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd))
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- log.Fatal(err)
- }
- defer resp.Body.Close()
- flags := os.O_CREATE | os.O_WRONLY
- partFile, err := os.OpenFile(d.getPartFilename(filename, i), flags, 0666)
- if err != nil {
- log.Fatal(err)
- }
- defer partFile.Close()
- buf := make([]byte, 32*1024)
- _, err = io.CopyBuffer(partFile, resp.Body, buf)
- if err != nil {
- if err == io.EOF {
- return
- }
- log.Fatal(err)
- }
- }
- // getPartDir 部分文件存放的目錄
- func (d *Downloader) getPartDir(filename string) string {
- return strings.SplitN(filename, ".", 2)[0]
- }
- // getPartFilename 構(gòu)造部分文件的名字
- func (d *Downloader) getPartFilename(filename string, partNum int) string {
- partDir := d.getPartDir(filename)
- return fmt.Sprintf("%s/%s-%d", partDir, filename, partNum)
- }
- 通過發(fā)起 Range 請(qǐng)求后,將請(qǐng)求的內(nèi)容寫入本地文件中;
- 為了方便后續(xù)合并,文件名加上了序號(hào),這就是 downloadPartial 最后一個(gè)參數(shù)的作用;
- rangeStart 和 rangeEnd 分別表示 Range 的開始和結(jié)束;
然后就是 multiDownload 方法中怎么分部分,這和并發(fā)請(qǐng)求多個(gè) URL 很類似,使用 sync.WaitGroup 進(jìn)行控制:
- func (d *Downloader) multiDownload(strURL, filename string, contentLen int) error {
- partSize := contentLen / d.concurrency
- // 創(chuàng)建部分文件的存放目錄
- partDir := d.getPartDir(filename)
- os.Mkdir(partDir, 0777)
- defer os.RemoveAll(partDir)
- var wg sync.WaitGroup
- wg.Add(d.concurrency)
- rangeStart := 0
- for i := 0; i < d.concurrency; i++ {
- // 并發(fā)請(qǐng)求
- go func(i, rangeStart int) {
- defer wg.Done()
- rangeEnd := rangeStart + partSize
- // 最后一部分,總長度不能超過 ContentLength
- if i == d.concurrency-1 {
- rangeEnd = contentLen
- }
- d.downloadPartial(strURL, filename, rangeStart, rangeEnd, i)
- }(i, rangeStart)
- rangeStart += partSize + 1
- }
- wg.Wait()
- // 合并文件
- d.merge(filename)
- return nil
- }
- func (d *Downloader) merge(filename string) error {
- return nil
- }
- 計(jì)算出每個(gè)部分的大??;
- 通過 sync.WaitGroup 協(xié)調(diào)并發(fā)請(qǐng)求;
- 注意每個(gè)部分的 rangeStart 和 rangeEnd 的計(jì)算規(guī)則,特別注意最后一部分;
- 所有部分都請(qǐng)求完成后,需要進(jìn)行合并;
因?yàn)榘衙坎糠謫为?dú)保存為文件了,所以合并只需要按照順序處理這些文件即可:
- func (d *Downloader) merge(filename string) error {
- destFile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0666)
- if err != nil {
- return err
- }
- defer destFile.Close()
- for i := 0; i < d.concurrency; i++ {
- partFileName := d.getPartFilename(filename, i)
- partFile, err := os.Open(partFileName)
- if err != nil {
- return err
- }
- io.Copy(destFile, partFile)
- partFile.Close()
- os.Remove(partFileName)
- }
- return nil
- }
連接程序
到這里,程序的核心部分已經(jīng)完成。接下來該在 main.go 中的 Action 作如下處理:
- Action: func(c *cli.Context) error {
- strURL := c.String("url")
- filename := c.String("output")
- concurrency := c.Int("concurrency")
- return NewDownloader(concurrency).Download(strURL, filename)
- },
到這里可以運(yùn)行測試下:
- go run . --url https://apache.claz.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
不出意外的話文件會(huì)下載成功。
03 總結(jié)
實(shí)現(xiàn)了基本功能,讀者朋友們可以進(jìn)一步做優(yōu)化、完善。比如:
- 看到下載過程,體驗(yàn)更友好,可以加入 github.com/schollz/progressbar 庫;
- 可以暫停下載,然后繼續(xù)下載。即端點(diǎn)續(xù)傳;
- 不支持并發(fā)下載的,支持單個(gè)下載,即完成 singleDownload 方法;
類似下面這樣:
這個(gè)實(shí)現(xiàn)的完整代碼我放在了 GitHub: https://github.com/polaris1119/downloader 。
還有兩點(diǎn)大家可以注意下:
- 并發(fā)下載并不一定總是比簡單下載快,一般文件越大,并發(fā)下載的優(yōu)勢才能體現(xiàn)。不過,并發(fā)下載可以端點(diǎn)續(xù)傳;
- 并發(fā)下載可以進(jìn)一步優(yōu)化,畢竟寫文件,再打開文件合并,是需要時(shí)間的;
最后,再提醒一次,記得自己動(dòng)手實(shí)現(xiàn)一個(gè)哦。