本文将实现一个并发的文件下载器,可以在不重新启动整个下载的情况下处理错误。这将通过分块下载文件来实现。
Idea
首先从发出下载的HTTP请求开始,当采用HEAD option来请求要下载的文件时,在某些服务器上,返回的标头之一是Content-Length。此标头以字节为单位指定文件的大小。知道文件大小后,分派多个Goroutine,每个Goroutine都分配有一个要下载的数据范围。Goroutine发送GET请求来执行下载,该请求将具有标头Range,此标头将告诉服务器要返回多少文件。Goroutine完成下载后,数据将通过通道发回。一旦所有的Goroutines完成,将加入数据并写出文件。
实现
探针
probe模块主要负责探针功能,侦测要下载的文件是否包含Content-Length的HTTP头部。如果存在,那么会返回分块下载的文件大小,具体代码如下:
package probe
import (
"fmt"
"log"
"net/http"
"strconv"
)
type Probe struct {
workers int
url string
}
func NewProbe(worker int, url string) *Probe {
return &Probe{
workers: worker,
url: url,
}
}
func (p *Probe) GetFileSize() (int, error) {
var size = -1
client := &http.Client{}
req, err := http.NewRequest("HEAD", p.url, nil)
if err != nil {
log.Fatal(err)
}
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
if header, ok := resp.Header["Content-Length"]; ok {
fileSize, err := strconv.Atoi(header[0])
if err != nil {
log.Fatal("File size could not be determined : ", err)
}
size = fileSize / p.workers
} else {
log.Fatal("File size was not provided!")
return size, fmt.Errorf("file size was not provided.")
}
return size, nil
}
通过发送一条HEAD的HTTP请求来拿到目标文件的大小,从而确定并发下载的分块大小。
下载器
接下来是下载器部分,首先定义下载器的结构体
type Downloader struct {
result chan Part
size int
workers int
}
下载器包括了一个由文件分块组成的channel,它的定义如下
type Part struct {
Data []byte
Index int
}
包含了文件分块的数据流以及对应索引顺序。同时下载器也定义了分块下载的大小,并发数量。
func (d *Downloader) Download(index int, url string) {
client := &http.Client{}
// calculate offset by multiplying
// index with size
start := index * d.size
// Write data range in correct format
// I'm reducing one from the end size to account for
// the next chunk starting there
dataRange := fmt.Sprintf("bytes=%d-%d", start, start+d.size-1)
// if this is downloading the last chunk
// rewrite the header. It's an easy way to specify
// getting the rest of the file
if index == d.workers-1 {
dataRange = fmt.Sprintf("bytes=%d-", start)
}
log.Println(dataRange)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
// TODO: restart download
return
}
req.Header.Add("Range", dataRange)
resp, err := client.Do(req)
if err != nil {
// TODO: restart download
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
// TODO: restart download
return
}
d.result <- Part{Index: index, Data: body}
}
当执行下载操作时,该方法将向下载请求添加标头Range。此标头将指定要获取文件的哪些部分。HTTP请求完成后,数据将写入函数调用时传递的通道。
当下载开始后,不需要等待下载完成,可以直接开始合并分块文件,原理在于golang的channel本身就具有并发的属性。从channel中持续读取已经下载好的分块文件,然后根据索引顺序写入本地文件中。
func (d *Downloader) Merge(filename string) error {
log.Println("start to merge data")
parts := make([][]byte, d.workers)
counter := 0
for part := range d.result {
counter++
parts[part.Index] = part.Data
if counter == d.workers {
break
}
}
log.Println("sort data as original order")
file := []byte{}
for _, part := range parts {
file = append(file, part...)
}
log.Println("write data into buffer array")
err := ioutil.WriteFile(filename, file, 0777)
return err
}
运行
至此,我们可以编写一个main函数来测试并发下载器。下载的目标文件是http://212.183.159.230/512MB.zip ,大小为512MB,我们控制并发数为5,测试下载到本地的时间。
package main
import (
"flag"
"log"
"time"
"go-store/applications/downloader/download"
"go-store/applications/downloader/probe"
)
var (
// to test internet
url = flag.String("url", "http://212.183.159.230/512MB.zip", "download url")
// number of goroutines to spawn for download.
workers = flag.Int("worker", 5, "concurrent downloader number")
// filename for downloaded file
filename = flag.String("file", "data.zip", "downloaded filename")
)
func main() {
flag.Parse()
start := time.Now()
probe := probe.NewProbe(*workers, *url)
size, err := probe.GetFileSize()
if err != nil {
panic(err)
}
results := make(chan download.Part, *workers)
downloader := download.NewDownloader(results, size, *workers)
for i := 0; i < *workers; i++ {
go downloader.Download(i, *url)
}
err = downloader.Merge(*filename)
end := time.Now()
if err != nil {
panic(err)
}
log.Println("cost time: ", end.Sub(start))
}
结果如下
song@ubuntu20-04:~/go/src/github.com/surzia/go-store/applications/downloader$ go build main.go
song@ubuntu20-04:~/go/src/github.com/surzia/go-store/applications/downloader$ ./main
2023/02/26 12:13:59 bytes=429496728-
2023/02/26 12:13:59 bytes=107374182-214748363
2023/02/26 12:13:59 bytes=214748364-322122545
2023/02/26 12:13:59 bytes=322122546-429496727
2023/02/26 12:13:59 bytes=0-107374181
2023/02/26 12:14:21 start to merge data
2023/02/26 12:14:21 sort data as original order
2023/02/26 12:14:23 write data into buffer array
2023/02/26 12:14:23 cost time: 24.43482453s
用时约25s。对比直接下载该文件
song@ubuntu20-04:~/Downloads$ curl http://212.183.159.230/512MB.zip -o 512M.zip
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 512M 100 512M 0 0 14.6M 0 0:00:34 0:00:34 --:--:-- 17.9M
用时34s,并发下载器的速度提升了10s左右。
结论
Go是一门天然支持并发的语言,利用该特性我们可以大大提升程序的效率。