Skip to content

Commit

Permalink
Add optimized ReadAll function
Browse files Browse the repository at this point in the history
This adds an optimized io.ReadAll that uses a bytes.Buffer + io.Copy
internally, improving the buffer growth ratio over the runtime append
and taking advange of io.WriterTo when available.

The only scenario where this optimizes version is slower than io.ReadAll is
when reading < 512 bytes of a reader that does not implement io.WriterTo.

For now I only updated tests to use the new function as updating the code
requires more careful consideration.

While at it, migrate from the deprecated ioutil.ReadAll to io.ReadAll.

goos: linux
goarch: amd64
pkg: github.com/elastic/elastic-agent-libs/iobuf
cpu: Intel(R) Xeon(R) CPU E5-2697A v4 @ 2.60GHz
BenchmarkReadAll/size_32B/io.ReadAll/WriterTo-32         	 7479658	       162.4 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_32B/io.ReadAll/Reader-32           	 7000012	       169.3 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_32B/ReadAll/WriterTo-32            	10630838	       112.3 ns/op	     112 B/op	       2 allocs/op
BenchmarkReadAll/size_32B/ReadAll/Reader-32              	 2111246	       567.3 ns/op	    1584 B/op	       3 allocs/op
BenchmarkReadAll/size_64B/io.ReadAll/WriterTo-32         	 7154652	       163.2 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_64B/io.ReadAll/Reader-32           	 7223264	       166.1 ns/op	     512 B/op	       1 allocs/op
BenchmarkReadAll/size_64B/ReadAll/WriterTo-32            	10400562	       113.5 ns/op	     112 B/op	       2 allocs/op
BenchmarkReadAll/size_64B/ReadAll/Reader-32              	 2129949	       558.7 ns/op	    1584 B/op	       3 allocs/op
BenchmarkReadAll/size_512B/io.ReadAll/WriterTo-32        	 2843871	       419.1 ns/op	    1408 B/op	       2 allocs/op
BenchmarkReadAll/size_512B/io.ReadAll/Reader-32          	 2871580	       413.1 ns/op	    1408 B/op	       2 allocs/op
BenchmarkReadAll/size_512B/ReadAll/WriterTo-32           	 4976233	       241.5 ns/op	     560 B/op	       2 allocs/op
BenchmarkReadAll/size_512B/ReadAll/Reader-32             	 2183186	       552.9 ns/op	    1584 B/op	       3 allocs/op
BenchmarkReadAll/size_10KB/io.ReadAll/WriterTo-32        	  142633	      8235 ns/op	   46080 B/op	      10 allocs/op
BenchmarkReadAll/size_10KB/io.ReadAll/Reader-32          	  148326	      8229 ns/op	   46080 B/op	      10 allocs/op
BenchmarkReadAll/size_10KB/ReadAll/WriterTo-32           	  574903	      2210 ns/op	   10288 B/op	       2 allocs/op
BenchmarkReadAll/size_10KB/ReadAll/Reader-32             	  147210	      7995 ns/op	   32304 B/op	       7 allocs/op
BenchmarkReadAll/size_100KB/io.ReadAll/WriterTo-32       	   13171	     90853 ns/op	  514304 B/op	      18 allocs/op
BenchmarkReadAll/size_100KB/io.ReadAll/Reader-32         	   12892	     91787 ns/op	  514304 B/op	      18 allocs/op
BenchmarkReadAll/size_100KB/ReadAll/WriterTo-32          	   51472	     22420 ns/op	  106544 B/op	       2 allocs/op
BenchmarkReadAll/size_100KB/ReadAll/Reader-32            	   21568	     55070 ns/op	  261680 B/op	      10 allocs/op
BenchmarkReadAll/size_1MB/io.ReadAll/WriterTo-32         	    1220	    983276 ns/op	 5241098 B/op	      27 allocs/op
BenchmarkReadAll/size_1MB/io.ReadAll/Reader-32           	    1089	    990818 ns/op	 5241100 B/op	      27 allocs/op
BenchmarkReadAll/size_1MB/ReadAll/WriterTo-32            	    4153	    294507 ns/op	 1048627 B/op	       2 allocs/op
BenchmarkReadAll/size_1MB/ReadAll/Reader-32              	    1195	    944781 ns/op	 4193852 B/op	      14 allocs/op
  • Loading branch information
mauri870 committed Sep 20, 2024
1 parent 4babd25 commit e3c4851
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 50 deletions.
4 changes: 2 additions & 2 deletions api/npipe/listener_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ package npipe

import (
"fmt"
"io/ioutil"
"net/http"
"testing"

"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestHTTPOverNamedPipe(t *testing.T) {
// nolint:noctx // for testing purposes
r, err := c.Get("http://npipe/echo-hello")
require.NoError(t, err)
body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)
defer r.Body.Close()

Expand Down
13 changes: 6 additions & 7 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package api
import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -32,6 +30,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

const (
Expand Down Expand Up @@ -71,7 +70,7 @@ func TestSocket(t *testing.T) {
}

t.Run("socket doesn't exist before", func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
tmpDir, err := os.MkdirTemp("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

Expand Down Expand Up @@ -102,7 +101,7 @@ func TestSocket(t *testing.T) {
})

t.Run("starting beat and recover a dangling socket file", func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
tmpDir, err := os.MkdirTemp("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

Expand Down Expand Up @@ -161,7 +160,7 @@ func getResponse(t *testing.T, sockFile, url string) string {
require.NoError(t, err)
defer r.Body.Close()

body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)
return string(body)
}
Expand All @@ -188,7 +187,7 @@ func TestHTTP(t *testing.T) {
require.NoError(t, err)
}()

body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)

assert.Equal(t, "ehlo!", string(body))
Expand Down Expand Up @@ -229,7 +228,7 @@ func TestAttachHandler(t *testing.T) {
require.NoError(t, err)
}()

body, err := io.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)

assert.Equal(t, "test!", string(body))
Expand Down
4 changes: 2 additions & 2 deletions api/server_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package api

import (
"io/ioutil"
"net/http"
"testing"

Expand All @@ -29,6 +28,7 @@ import (

"github.com/elastic/elastic-agent-libs/api/npipe"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestNamedPipe(t *testing.T) {
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestNamedPipe(t *testing.T) {
require.NoError(t, err)
defer r.Body.Close()

body, err := ioutil.ReadAll(r.Body)
body, err := httpcommon.ReadAll(r)
require.NoError(t, err)

assert.Equal(t, "ehlo!", string(body))
Expand Down
3 changes: 1 addition & 2 deletions file/fileinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
package file_test

import (
"io/ioutil"
"os"
"path/filepath"
"testing"
Expand All @@ -35,7 +34,7 @@ import (
)

func TestStat(t *testing.T) {
f, err := ioutil.TempFile("", "teststat")
f, err := os.CreateTemp("", "teststat")
if err != nil {
t.Fatal(err)
}
Expand Down
17 changes: 8 additions & 9 deletions file/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package file

import (
"io/ioutil"
"os"
"path/filepath"
"testing"
Expand All @@ -29,19 +28,19 @@ import (
)

func TestSafeFileRotateExistingFile(t *testing.T) {
tempdir, err := ioutil.TempDir("", "")
tempdir, err := os.MkdirTemp("", "")
assert.NoError(t, err)
defer func() {
assert.NoError(t, os.RemoveAll(tempdir))
}()

// create an existing registry file
err = ioutil.WriteFile(filepath.Join(tempdir, "registry"),
err = os.WriteFile(filepath.Join(tempdir, "registry"),
[]byte("existing filebeat"), 0x777)
assert.NoError(t, err)

// create a new registry.new file
err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"),
err = os.WriteFile(filepath.Join(tempdir, "registry.new"),
[]byte("new filebeat"), 0x777)
assert.NoError(t, err)

Expand All @@ -50,35 +49,35 @@ func TestSafeFileRotateExistingFile(t *testing.T) {
filepath.Join(tempdir, "registry.new"))
assert.NoError(t, err)

contents, err := ioutil.ReadFile(filepath.Join(tempdir, "registry"))
contents, err := os.ReadFile(filepath.Join(tempdir, "registry"))
assert.NoError(t, err)
assert.Equal(t, []byte("new filebeat"), contents)

// do it again to make sure we deal with deleting the old file

err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"),
err = os.WriteFile(filepath.Join(tempdir, "registry.new"),
[]byte("new filebeat 1"), 0x777)
assert.NoError(t, err)

err = SafeFileRotate(filepath.Join(tempdir, "registry"),
filepath.Join(tempdir, "registry.new"))
assert.NoError(t, err)

contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry"))
contents, err = os.ReadFile(filepath.Join(tempdir, "registry"))
assert.NoError(t, err)
assert.Equal(t, []byte("new filebeat 1"), contents)

// and again for good measure

err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"),
err = os.WriteFile(filepath.Join(tempdir, "registry.new"),
[]byte("new filebeat 2"), 0x777)
assert.NoError(t, err)

err = SafeFileRotate(filepath.Join(tempdir, "registry"),
filepath.Join(tempdir, "registry.new"))
assert.NoError(t, err)

contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry"))
contents, err = os.ReadFile(filepath.Join(tempdir, "registry"))
assert.NoError(t, err)
assert.Equal(t, []byte("new filebeat 2"), contents)
}
59 changes: 59 additions & 0 deletions iobuf/iobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package iobuf

import (
"bytes"
"io"
)

// ReadAll reads all data from r and returns it as a byte slice.
// A successful call returns err == nil, not err == EOF. It does not
// treat an EOF as an error to be reported.
//
// This function is similar to io.ReadAll, but uses a bytes.Buffer to
// accumulate the data, which has a more efficient growing algorithm.
//
// Compared to io.ReadAll, this implementation is more cpu and memory
// efficient in general, specially for > 512 byte reads.
func ReadAll(r io.Reader) ([]byte, error) {
var buf bytes.Buffer
// if _, ok := r.(io.WriterTo); !ok {
// buf.Grow(512)
// b := buf.AvailableBuffer()
// for {
// n, err := r.Read(b[len(b):cap(b)])
// b = b[:len(b)+n]
// if err != nil {
// if err == io.EOF {
// err = nil
// }
// return b, err
// }

// if len(b) == cap(b) {
// // buffer full, leave to io.Copy to handle the rest
// buf.Write(b)
// break
// }
// }
// }

_, err := io.Copy(&buf, r)
return buf.Bytes(), err
}
120 changes: 120 additions & 0 deletions iobuf/iobuf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package iobuf

import (
"bytes"
"fmt"
"io"
"log"
"strings"
"testing"
)

func ExampleReadAll() {
r := strings.NewReader("The quick brown fox jumps over the lazy dog.")

b, err := ReadAll(r)
if err != nil {
log.Fatal(err)
}

fmt.Printf("%s", b)

// Output:
// The quick brown fox jumps over the lazy dog.
}

// dumbReadSeeker is a ReadSeeker that does not implement the io.WriteTo optimization.
type dumbReadSeeker struct {
rs io.ReadSeeker
}

func (r *dumbReadSeeker) Read(p []byte) (n int, err error) {
return r.rs.Read(p)
}

func (r *dumbReadSeeker) Seek(offset int64, whence int) (int64, error) {
return r.rs.Seek(offset, whence)
}

func genData(n int) io.ReadSeeker {
return bytes.NewReader(bytes.Repeat([]byte{'a'}, n))
}

func genDataDumb(n int) io.ReadSeeker {
return &dumbReadSeeker{rs: genData(n)}
}

func BenchmarkReadAll(b *testing.B) {
// Make sure we test different sizes to overcome initial buffer sizes:
// io.ReadAll uses a 512 bytes buffer
// bytes.Buffer uses a 64 bytes buffer
sizes := []int{
32, // 32 bytes
64, // 64 bytes
512, // 512 bytes
10 * 1024, // 10KB
100 * 1024, // 100KB
1024 * 1024, // 1MB
}
sizesReadable := []string{
"32B",
"64B",
"512B",
"10KB",
"100KB",
"1MB",
}

benchFunc := func(b *testing.B, size int, genFunc func(n int) io.ReadSeeker, readFunc func(io.Reader) ([]byte, error)) {
buf := genFunc(size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
buf.Seek(0, io.SeekStart) // reset

Check failure on line 89 in iobuf/iobuf_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `buf.Seek` is not checked (errcheck)
data, err := readFunc(buf)
if err != nil {
b.Fatal(err)
}
if len(data) != size {
b.Fatalf("size does not match, expected %d, actual %d", size, len(data))
}
}
}

for i, size := range sizes {
b.Run(fmt.Sprintf("size %s", sizesReadable[i]), func(b *testing.B) {
b.Run("io.ReadAll", func(b *testing.B) {
b.Run("WriterTo", func(b *testing.B) {
benchFunc(b, size, genData, io.ReadAll)
})
b.Run("Reader", func(b *testing.B) {
benchFunc(b, size, genDataDumb, io.ReadAll)
})
})
b.Run("ReadAll", func(b *testing.B) {
b.Run("WriterTo", func(b *testing.B) {
benchFunc(b, size, genData, ReadAll)
})
b.Run("Reader", func(b *testing.B) {
benchFunc(b, size, genDataDumb, ReadAll)
})
})
})
}
}
Loading

0 comments on commit e3c4851

Please sign in to comment.