Skip to content

Commit

Permalink
fix: update tests for synchronous io
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeckett committed Feb 21, 2024
1 parent 60ccf6a commit c79be39
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
3 changes: 1 addition & 2 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"io"
"log"
"net"
Expand All @@ -24,7 +23,7 @@ func main() {
log.Printf("Listening for RPC messages on %s", s.LocalAddr().String())

for {
msg, err := s.Recv(context.TODO())
msg, err := s.Recv()
if err != nil {
log.Fatal(err)
}
Expand Down
58 changes: 40 additions & 18 deletions socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package homa_test

import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"errors"
Expand Down Expand Up @@ -52,37 +53,58 @@ func TestHomaRPC(t *testing.T) {
clientSock, err := homa.NewSocket(clientAddr)
require.NoError(t, err)

var g errgroup.Group
g, ctx := errgroup.WithContext(context.Background())

ctx, cancel := context.WithCancel(ctx)

g.Go(func() error {
for {
msg, err := serverSock.Recv()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
defer serverSock.Close()

errCh := make(chan error, 1)
defer close(errCh)

go func() {
for {
msg, err := serverSock.Recv()
if err != nil {
if errors.Is(err, net.ErrClosed) {
errCh <- nil
return
}

errCh <- err
return
}

return err
}
h := sha256.New()
if _, err := io.Copy(h, msg); err != nil {
errCh <- err
return
}

h := sha256.New()
if _, err := io.Copy(h, msg); err != nil {
return err
}
if err := msg.Close(); err != nil {
errCh <- err
return
}

if err := msg.Close(); err != nil {
return err
if err := serverSock.Reply(msg.PeerAddr(), msg.ID(), h.Sum(nil)); err != nil {
errCh <- err
return
}
}
}()

if err := serverSock.Reply(msg.PeerAddr(), msg.ID(), h.Sum(nil)); err != nil {
return err
}
select {
case <-ctx.Done():
return nil
case err := <-errCh:
return err
}
})

g.Go(func() error {
defer clientSock.Close()
defer serverSock.Close()
defer cancel()

for i := 0; i < 100; i++ {
size, err := rand.Int(rand.Reader, big.NewInt(homa.HOMA_MAX_MESSAGE_LENGTH-1))
Expand Down

0 comments on commit c79be39

Please sign in to comment.