Skip to content

Commit

Permalink
refactor: use synchronous io in order to improve perf
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeckett committed Feb 21, 2024
1 parent 09c5074 commit 60ccf6a
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 149 deletions.
177 changes: 177 additions & 0 deletions benchmark/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/* SPDX-License-Identifier: ISC
*
* Copyright (c) 2019-2024 Stanford University
* Copyright (c) 2024 Damian Peckett <[email protected]>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/

package main

import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"runtime"

"github.com/cheggaaa/pb/v3"
"github.com/dpeckett/go-homa"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

func main() {
serverAddr, err := net.ResolveUDPAddr("udp", "localhost:0")
if err != nil {
log.Fatalf("could not resolve server address: %v", err)
}

serverSock, err := homa.NewSocket(serverAddr)
if err != nil {
log.Fatalf("could not create server socket: %v", err)
}
defer serverSock.Close()

var serverGroup errgroup.Group

nCPUs := runtime.GOMAXPROCS(0)

nReceivers := nCPUs / 2
for i := 0; i < nReceivers; i++ {
serverGroup.Go(func() error {
for {
msg, err := serverSock.Recv()
if err != nil {
return fmt.Errorf("could not receive message: %w", err)
}

data, err := io.ReadAll(msg)
if err != nil {
return fmt.Errorf("could not read message: %w", err)
}

if string(data) != "PING" {
return fmt.Errorf("unexpected message: %s", data)
}

if err := msg.Close(); err != nil {
return fmt.Errorf("could not close message: %w", err)
}

err = serverSock.Reply(msg.PeerAddr(), msg.ID(), []byte("PONG"))
if err != nil {
return fmt.Errorf("could not send reply: %w", err)
}
}
})
}

go func() {
if err := serverGroup.Wait(); err != nil && !errors.Is(err, net.ErrClosed) {
log.Fatalf("error: %v", err)
}
}()

var senderGroup errgroup.Group

const (
totalMessages = 1000000
maxOutstandingMessages = 100
)
sem := semaphore.NewWeighted(int64(maxOutstandingMessages))

bar := pb.StartNew(totalMessages)

nSenders := nCPUs / 2
for i := 0; i < nSenders; i++ {
senderGroup.Go(func() error {
senderAddr, err := net.ResolveUDPAddr("udp", "localhost:0")
if err != nil {
return fmt.Errorf("could not resolve sender address: %w", err)
}

senderSock, err := homa.NewSocket(senderAddr)
if err != nil {
return fmt.Errorf("could not create sender socket: %w", err)
}

var g errgroup.Group

nMessages := totalMessages / nSenders

g.Go(func() error {
defer senderSock.Close()

for i := 0; i < nMessages; i++ {
msg, err := senderSock.Recv()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
}

return fmt.Errorf("could not receive reply: %w", err)
}

data, err := io.ReadAll(msg)
if err != nil {
return fmt.Errorf("could not read reply: %w", err)
}

if string(data) != "PONG" {
return fmt.Errorf("unexpected reply: %s", data)
}

if err := msg.Close(); err != nil {
return fmt.Errorf("could not close reply: %w", err)
}

sem.Release(1)

bar.Increment()
}

return nil
})

g.Go(func() error {
for i := 0; i < nMessages; i++ {
if err := sem.Acquire(context.Background(), 1); err != nil {
return fmt.Errorf("failed to acquire semaphore: %w", err)
}

_, err := senderSock.Send(serverSock.LocalAddr(), []byte("PING"), 0)
if err != nil {
sem.Release(1)

return fmt.Errorf("could not send message: %w", err)
}
}

return nil
})

return g.Wait()
})
}

if err := senderGroup.Wait(); err != nil {
log.Fatalf("error: %v", err)
}

if err := serverSock.Close(); err != nil {
log.Fatalf("could not close server socket: %v", err)
}
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ module github.com/dpeckett/go-homa
go 1.21.0

require (
github.com/cheggaaa/pb/v3 v3.1.5
github.com/daedaluz/goioctl v0.0.0-20220112121310-eef48b7845b0
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.6.0
golang.org/x/sys v0.17.0
)

require (
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/cheggaaa/pb/v3 v3.1.5 h1:QuuUzeM2WsAqG2gMqtzaWithDJv0i+i6UlnwSCI4QLk=
github.com/cheggaaa/pb/v3 v3.1.5/go.mod h1:CrxkeghYTXi1lQBEI7jSn+3svI3cuc19haAj6jM60XI=
github.com/daedaluz/goioctl v0.0.0-20220112121310-eef48b7845b0 h1:tLanypj7anfk592ujzQ4RrZrvy4KrQQ+ozdZ2usuloM=
github.com/daedaluz/goioctl v0.0.0-20220112121310-eef48b7845b0/go.mod h1:NbO2vzbi679q0yLroSQd/T4/NnTvpQgHWs7tgZCjiv8=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
16 changes: 8 additions & 8 deletions homa.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,21 @@ type SendmsgArgs struct {
CompletionCookie uint64
}

// SendmsgArgsFromBytes deserializes a sendmsgArgs from a byte slice.
// sendmsgArgsFromBytes deserializes a sendmsgArgs from a byte slice.
// We implement our own deserialization method here because the Go doesn't support packed structs
// and binary.Read uses reflection, which is very slow.
func SendmsgArgsFromBytes(buf []byte) SendmsgArgs {
func sendmsgArgsFromBytes(buf []byte) SendmsgArgs {
var args SendmsgArgs
args.ID = binary.NativeEndian.Uint64(buf[0:8])
args.CompletionCookie = binary.NativeEndian.Uint64(buf[8:16])

return args
}

// Bytes returns the byte representation of the sendmsgArgs, suitable for passing to the kernel.
// bytes returns the byte representation of the sendmsgArgs, suitable for passing to the kernel.
// We implement our own serialization method here because the Go doesn't support packed structs
// and binary.Write uses reflection, which is very slow.
func (s *SendmsgArgs) Bytes() []byte {
func (s *SendmsgArgs) bytes() []byte {
var buf [16]byte
binary.NativeEndian.PutUint64(buf[0:8], s.ID)
binary.NativeEndian.PutUint64(buf[8:16], s.CompletionCookie)
Expand Down Expand Up @@ -121,10 +121,10 @@ type RecvmsgArgs struct {
BPageOffsets [HOMA_MAX_BPAGES]uint32
}

// RecvmsgArgsFromBytes deserializes a recvmsgArgs from a byte slice.
// recvmsgArgsFromBytes deserializes a recvmsgArgs from a byte slice.
// We implement our own deserialization method here because the Go doesn't support packed structs
// and binary.Read uses reflection, which is very slow.
func RecvmsgArgsFromBytes(buf []byte) RecvmsgArgs {
func recvmsgArgsFromBytes(buf []byte) RecvmsgArgs {
var args RecvmsgArgs
args.ID = binary.NativeEndian.Uint64(buf[0:8])
args.CompletionCookie = binary.NativeEndian.Uint64(buf[8:16])
Expand All @@ -138,10 +138,10 @@ func RecvmsgArgsFromBytes(buf []byte) RecvmsgArgs {
return args
}

// Bytes returns the byte representation of the recvmsgArgs, suitable for passing to the kernel.
// bytes returns the byte representation of the recvmsgArgs, suitable for passing to the kernel.
// We implement our own serialization method here because the Go doesn't support packed structs
// and binary.Write uses reflection, which is very slow.
func (r *RecvmsgArgs) Bytes() []byte {
func (r *RecvmsgArgs) bytes() []byte {
var buf [120]byte
binary.NativeEndian.PutUint64(buf[0:8], r.ID)
binary.NativeEndian.PutUint64(buf[8:16], r.CompletionCookie)
Expand Down
Loading

0 comments on commit 60ccf6a

Please sign in to comment.