Skip to content

Commit

Permalink
fixes a race condition in request-response handling and simplifies Li…
Browse files Browse the repository at this point in the history
…mits
  • Loading branch information
rsms committed Oct 27, 2020
1 parent 3c3e00f commit d32a1ab
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 382 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Build them with `go build`:
$ cd examples/websocket-chat
$ go build
$ ./websocket-chat
Listening on http://0.0.0.0:1235/
Listening on http://localhost:1235/
```

Here's a minimal but complete example program: (`examples/websocket-minimal`)
Expand Down
3 changes: 2 additions & 1 deletion examples/limits/requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ func requestor(port string) {
}
i--
} else if res.IsError() {
panic(res)
panic(fmt.Sprintf("error response: %v", res.Error()))
} else {
fmt.Printf("requestor: received response: %q\n", string(res.Data))
}
}

fmt.Printf("done. Closing socket\n")
s.Close()
}
14 changes: 10 additions & 4 deletions examples/limits/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package main

import (
"fmt"
"github.com/rsms/gotalk"
"time"

"github.com/rsms/gotalk"
)

func responder(port string) {
// A simple echo operation with a 500ms response delay
gotalk.HandleBufferRequest("echo", func(s *gotalk.Sock, op string, buf []byte) ([]byte, error) {
fmt.Printf("responder: handling request\n")
time.Sleep(time.Millisecond * 400)
time.Sleep(time.Millisecond * 10)
return buf, nil
})

Expand All @@ -20,8 +21,13 @@ func responder(port string) {
panic(err)
}

// Limit this server to 5 concurrent requests (and disable streaming messages)
s.Limits = gotalk.NewLimits(5, 0)
// Limit this server to 5 concurrent requests and set really low wait times
// to that this demo doesn't take forever to run.
s.Limits = &gotalk.Limits{
BufferRequests: 5,
BufferMinWait: 10 * time.Millisecond,
BufferMaxWait: 100 * time.Millisecond,
}

// Accept connections
fmt.Printf("listening at %q\n", s.Addr())
Expand Down
103 changes: 72 additions & 31 deletions examples/read-timeout/requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package main

import (
"fmt"
"github.com/rsms/gotalk"
"io"
"strings"
"sync"
"time"

"github.com/rsms/gotalk"
)

// slowWriter simulates slow writing, to demonstrate timeout
Expand All @@ -24,65 +27,103 @@ func (rwc *slowWriter) Read(p []byte) (n int, err error) {
}
return
}

func (rwc *slowWriter) Write(p []byte) (n int, err error) {
// Delay anything but writing single-request headers
if err == nil && (len(p) < 13 || p[0] != byte(gotalk.MsgTypeSingleReq)) {

// tname := "?"
// switch gotalk.MsgType(p[0]) {
// case gotalk.MsgTypeSingleReq: tname = "SingleReq"
// case gotalk.MsgTypeStreamReq: tname = "StreamReq"
// case gotalk.MsgTypeStreamReqPart: tname = "StreamReqPart"
// case gotalk.MsgTypeSingleRes: tname = "SingleRes"
// case gotalk.MsgTypeStreamRes: tname = "StreamRes"
// case gotalk.MsgTypeErrorRes: tname = "ErrorRes"
// case gotalk.MsgTypeRetryRes: tname = "RetryRes"
// case gotalk.MsgTypeNotification: tname = "Notification"
// case gotalk.MsgTypeHeartbeat: tname = "Heartbeat"
// case gotalk.MsgTypeProtocolError: tname = "ProtocolError"
// default:
// fmt.Printf("slowWriter.Write (rest) %q\n", p)
// return rwc.c.Write(p)
// }
// fmt.Printf("slowWriter.Write %s\n", tname)

if err == nil && p[0] == byte(gotalk.MsgTypeSingleReq) {
time.Sleep(rwc.delay)
n, err = rwc.c.Write(p)
return
}
return rwc.c.Write(p)
}

func (rwc *slowWriter) Close() error {
return rwc.c.Close()
}

func sendRequest(s *gotalk.Sock) {
func sendRequest(s *gotalk.Sock) error {
fmt.Printf("requestor: sending 'echo' request\n")
b, err := s.BufferRequest("echo", []byte("Hello"))
if err == gotalk.ErrTimeout {
fmt.Printf("requestor: timed out\n")
} else if err != nil {
fmt.Printf("requestor: error %v\n", err.Error())
} else {
fmt.Printf("requestor: success: %v\n", string(b))
fmt.Printf("requestor: success: %q\n", b)
}
return err
}

func timeoutRequest(port string) {
func sendSlowWritingRequest(port string) error {
s, err := gotalk.Connect("tcp", "localhost:"+port)
if err != nil {
panic(err)
}
fmt.Printf("requestor: connected to %q\n", s.Addr())
if err == nil {
fmt.Printf("requestor: connected to %q on Sock@%p\n", s.Addr(), s)

// Wrap the connection for slow writing to simulate a poor connection
s.Adopt(&slowWriter{s.Conn(), 1000 * time.Millisecond})
// Wrap the connection for slow writing to simulate a poor connection
s.Adopt(&slowWriter{c: s.Conn(), delay: 200 * time.Millisecond})

// Send a request -- it will take too long and time out
sendRequest(s)
// Send a request -- it will take too long and time out
fmt.Printf("requestor: send slow request\n")
err = sendRequest(s)

s.Close()
s.Close()
}
return err
}

func heartbeatKeepAlive(port string) {
func sendRegularRequest(port string) error {
s, err := gotalk.Connect("tcp", "localhost:"+port)
if err != nil {
panic(err)
if err == nil {
fmt.Printf("requestor: connected to %q on Sock@%p\n", s.Addr(), s)
err = sendRequest(s)
s.Close()
}
fmt.Printf("requestor: connected to %q\n", s.Addr())

// set our heartbeat interval to half that time of the responder timeout
s.HeartbeatInterval = 500 * time.Millisecond

// Sleep for 3 seconds
time.Sleep(3 * time.Second)

// Send a request, which will work since we have kept the connection alive with heartbeats
sendRequest(s)

s.Close()
return err
}

func requestor(port string) {
timeoutRequest(port)
heartbeatKeepAlive(port)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
err := sendSlowWritingRequest(port)
if err == nil {
panic(fmt.Sprintf("expected error from sendSlowWritingRequest (no error)"))
} else {
s := err.Error()
if !strings.Contains(s, "timeout") && !strings.Contains(s, "socket closed") {
panic(fmt.Sprintf(
"expected timeout or socket closed from sendSlowWritingRequest but got %v",
err))
}
}
wg.Done()
}()
}
wg.Wait()

// time.Sleep(10 * time.Millisecond)
// if err := sendRegularRequest(port); err != nil {
// panic(fmt.Sprintf("expected sendRegularRequest to succeed but got error %v", err))
// }
}
6 changes: 4 additions & 2 deletions examples/read-timeout/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ func responder(port string) {
}

// Configure limits with a read timeout of 200 milliseconds
s.Limits = gotalk.NewLimits(0, 0)
s.Limits.SetReadTimeout(200 * time.Millisecond)
s.Limits = &gotalk.Limits{
BufferRequests: gotalk.Unlimited,
ReadTimeout: 100 * time.Millisecond,
}

// Accept connections
fmt.Printf("responder: listening at %q\n", s.Addr())
Expand Down
Loading

0 comments on commit d32a1ab

Please sign in to comment.