From e392e18fbd8617069b9be8010c2c0c81d4116555 Mon Sep 17 00:00:00 2001 From: Philip Potter Date: Fri, 4 Aug 2023 22:15:38 +0100 Subject: [PATCH 1/4] WIP: use less memory by downloading to sparse file pget has a problem that it needs to download everything into memory along each of the different parallel connections before finally saving everything to disk. This has two consequences: - pget needs to consume at least as much memory as the size of the downloaded file - pget cannot begin writing to disk until all the bytes have been downloaded from source I had a thought that we could precreate a sparse file of the correct length by using the os.Truncate(size) call. Then each goroutine can write to different offsets within the same file, in parallel. I tested this locally on macOS and I was able to download a 308MB file with pget's memory usage only reaching 26MB. This commit is a WIP because I've commented out the untar stuff to keep the proof of concept simple. We should restore (or deprecate?) that functionality before merging this. We should also do more extensive performance testing. --- main.go | 60 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/main.go b/main.go index 4641cc7..ac6724c 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import ( "flag" "fmt" "io" - "io/ioutil" "net/http" "os" "path/filepath" @@ -34,17 +33,28 @@ func getRemoteFileSize(url string) (int64, error) { return fileSize, nil } -func downloadFileToBuffer(url string, concurrency int) (*bytes.Buffer, error) { +func downloadFile(url string, dest string, concurrency int) error { fileSize, err := getRemoteFileSize(url) if err != nil { - return nil, err + return err + } + + destFile, err := os.Create(dest) + defer destFile.Close() + if err != nil { + fmt.Printf("Error creating file: %v\n", err) + os.Exit(1) + } + + err = destFile.Truncate(fileSize) + if err != nil { + return err } chunkSize := fileSize / int64(concurrency) var wg sync.WaitGroup wg.Add(concurrency) - data := make([]byte, fileSize) errc := make(chan error, concurrency) startTime := time.Now() @@ -58,6 +68,11 @@ func downloadFileToBuffer(url string, concurrency int) (*bytes.Buffer, error) { go func(start, end int64) { defer wg.Done() + fh, err := os.OpenFile(dest, os.O_RDWR, 0644) + if err != nil { + errc <- fmt.Errorf("Failed to reopen file: %v", err) + } + defer fh.Close() retries := 5 for retries > 0 { @@ -85,14 +100,22 @@ func downloadFileToBuffer(url string, concurrency int) (*bytes.Buffer, error) { } defer resp.Body.Close() - n, err := io.ReadFull(resp.Body, data[start:end+1]) + _, err = fh.Seek(start, 0) + if err != nil { + fmt.Printf("Error seeking in file: %v\n", err) + retries-- + time.Sleep(time.Millisecond * 100) // wait 100 milliseconds before retrying + continue + } + + n, err := io.CopyN(fh, resp.Body, end-start+1) if err != nil && err != io.EOF { fmt.Printf("Error reading response: %v\n", err) retries-- time.Sleep(time.Millisecond * 100) // wait 100 milliseconds before retrying continue } - if n != int(end-start+1) { + if n != end-start+1 { fmt.Printf("Downloaded %d bytes instead of %d\n", n, end-start+1) retries-- time.Sleep(time.Millisecond * 100) // wait 100 milliseconds before retrying @@ -112,15 +135,14 @@ func downloadFileToBuffer(url string, concurrency int) (*bytes.Buffer, error) { close(errc) // close the error channel for err := range errc { if err != nil { - return nil, err // return the first error we encounter + return err // return the first error we encounter } } elapsed := time.Since(startTime).Seconds() througput := humanize.Bytes(uint64(float64(fileSize) / elapsed)) fmt.Printf("Downloaded %s bytes in %.3fs (%s/s)\n", humanize.Bytes(uint64(fileSize)), elapsed, througput) - buffer := bytes.NewBuffer(data) - return buffer, nil + return nil } func extractTarFile(buffer *bytes.Buffer, destDir string) error { @@ -176,7 +198,7 @@ func extractTarFile(buffer *bytes.Buffer, destDir string) error { func main() { // define flags concurrency := flag.Int("c", runtime.GOMAXPROCS(0)*4, "concurrency level - default 4 * cores") - extract := flag.Bool("x", false, "extract tar file") + //extract := flag.Bool("x", false, "extract tar file") flag.Parse() // check required positional arguments @@ -195,26 +217,10 @@ func main() { os.Exit(1) } - buffer, err := downloadFileToBuffer(url, *concurrency) + err := downloadFile(url, dest, *concurrency) if err != nil { fmt.Printf("Error downloading file: %v\n", err) os.Exit(1) } - // extract the tar file if the -x flag was provided - if *extract { - err = extractTarFile(buffer, dest) - if err != nil { - fmt.Printf("Error extracting tar file: %v\n", err) - os.Exit(1) - } - } else { - // if -x flag is not set, save the buffer to a file - err = ioutil.WriteFile(dest, buffer.Bytes(), 0644) - if err != nil { - fmt.Printf("Error writing file: %v\n", err) - os.Exit(1) - } - } - } From 4402844abb410b2272a7da9e0e3c525dfc8bc777 Mon Sep 17 00:00:00 2001 From: Philip Potter Date: Mon, 7 Aug 2023 17:31:28 +0100 Subject: [PATCH 2/4] restore tar extract functionality This also does the download to a temp partial file. If the user does not pass `-x`, we finish by renaming the temp file to the target file. This means that if pget is interrupted, we don't leave an incomplete file with the target filename. If the users does pass `-x`, we pass the temp file as input to TarReader, then delete it at the end. --- main.go | 49 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/main.go b/main.go index ac6724c..8985f6d 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,6 @@ package main import ( "archive/tar" - "bytes" "flag" "fmt" "io" @@ -33,14 +32,12 @@ func getRemoteFileSize(url string) (int64, error) { return fileSize, nil } -func downloadFile(url string, dest string, concurrency int) error { +func downloadFile(url string, destFile *os.File, concurrency int) error { fileSize, err := getRemoteFileSize(url) if err != nil { return err } - destFile, err := os.Create(dest) - defer destFile.Close() if err != nil { fmt.Printf("Error creating file: %v\n", err) os.Exit(1) @@ -68,7 +65,7 @@ func downloadFile(url string, dest string, concurrency int) error { go func(start, end int64) { defer wg.Done() - fh, err := os.OpenFile(dest, os.O_RDWR, 0644) + fh, err := os.OpenFile(destFile.Name(), os.O_RDWR, 0644) if err != nil { errc <- fmt.Errorf("Failed to reopen file: %v", err) } @@ -145,9 +142,9 @@ func downloadFile(url string, dest string, concurrency int) error { return nil } -func extractTarFile(buffer *bytes.Buffer, destDir string) error { +func extractTarFile(input io.Reader, destDir string) error { startTime := time.Now() - tarReader := tar.NewReader(buffer) + tarReader := tar.NewReader(input) for { header, err := tarReader.Next() @@ -198,7 +195,7 @@ func extractTarFile(buffer *bytes.Buffer, destDir string) error { func main() { // define flags concurrency := flag.Int("c", runtime.GOMAXPROCS(0)*4, "concurrency level - default 4 * cores") - //extract := flag.Bool("x", false, "extract tar file") + extract := flag.Bool("x", false, "extract tar file") flag.Parse() // check required positional arguments @@ -217,10 +214,44 @@ func main() { os.Exit(1) } - err := downloadFile(url, dest, *concurrency) + // create tempfile for downloading to + cwd, err := os.Getwd() + if err != nil { + fmt.Printf("Error getting cwd: %v\n", err) + os.Exit(1) + } + destTemp, err := os.CreateTemp(cwd, dest+".partial") + if err != nil { + fmt.Printf("Failed to create temp file: %v\n", err) + os.Exit(1) + } + + err = downloadFile(url, destTemp, *concurrency) if err != nil { fmt.Printf("Error downloading file: %v\n", err) os.Exit(1) } + // extract the tar file if the -x flag was provided + if *extract { + _, err = destTemp.Seek(0, 0) + if err != nil { + fmt.Printf("Error extracting tar file: %v\n", err) + os.Exit(1) + } + err = extractTarFile(destTemp, dest) + if err != nil { + fmt.Printf("Error extracting tar file: %v\n", err) + os.Exit(1) + } + destTemp.Close() + os.Remove(destTemp.Name()) + } else { + // move destTemp to dest + err = os.Rename(destTemp.Name(), dest) + if err != nil { + fmt.Printf("Error moving downloaded file to correct location: %v\n", err) + os.Exit(1) + } + } } From 06a6aa08981e2afa69369f70d639e969e706801b Mon Sep 17 00:00:00 2001 From: Philip Potter Date: Mon, 7 Aug 2023 17:34:44 +0100 Subject: [PATCH 3/4] fix usage text Flags have to go before arguments. From https://pkg.go.dev/flag : > Flag parsing stops just before the first non-flag argument ("-" is a non-flag argument) or after the terminator "--". --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 8985f6d..e947dbc 100644 --- a/main.go +++ b/main.go @@ -201,7 +201,7 @@ func main() { // check required positional arguments args := flag.Args() if len(args) < 2 { - fmt.Println("Usage: pcurl [-c concurrency] [-x]") + fmt.Println("Usage: pcurl [-c concurrency] [-x] ") os.Exit(1) } From be1a93eb87110804c62e9b3d4b3a973ee2028748 Mon Sep 17 00:00:00 2001 From: Philip Potter Date: Mon, 7 Aug 2023 17:36:04 +0100 Subject: [PATCH 4/4] allow pushing prerelease versions so v0.0.3-sparse can be released and tested without it being a full release --- .github/workflows/ci.yaml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d41df3a..362dbac 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -28,7 +28,12 @@ jobs: go-version-file: go.mod - run: script/build - uses: ncipollo/release-action@v1 - if: ${{ startsWith(github.ref, 'refs/tags') }} + if: github.ref_type=='tag' && !contains(github.ref_name, '-') with: artifacts: "pget" + - uses: ncipollo/release-action@v1 + if: github.ref_type=='tag' && contains(github.ref_name, '-') + with: + artifacts: "pget" + prerelease: true