Skip to content

Commit

Permalink
Refactor of the HTTPMultibin WebSocket handlers
Browse files Browse the repository at this point in the history
Refactor for better handling and asserting closing operations.
  • Loading branch information
codebien committed Mar 9, 2023
1 parent e1952ba commit f364159
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 44 deletions.
45 changes: 30 additions & 15 deletions js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,16 @@ func TestSessionTimeout(t *testing.T) {
test := newTestState(t)
_, err := test.VU.Runtime().RunString(sr(`
var start = new Date().getTime();
var ellapsed = new Date().getTime() - start;
var elapsed = new Date().getTime() - start;
var res = ws.connect("WSBIN_URL/ws-echo", function(socket){
socket.setTimeout(function () {
ellapsed = new Date().getTime() - start;
elapsed = new Date().getTime() - start;
socket.close();
}, 500);
});
if (ellapsed > 3000 || ellapsed < 500) {
throw new Error ("setTimeout occurred after " + ellapsed + "ms, expected 500<T<3000");
}
`))
if (elapsed > 3000 || elapsed < 500) {
throw new Error ("setTimeout occurred after " + elapsed + "ms, expected 500<T<3000")
}`))
require.NoError(t, err)
}

Expand Down Expand Up @@ -390,7 +389,7 @@ func TestSessionClose(t *testing.T) {
name string
endpoint string
}{
{"server_close_ok", "/ws-echo"},
{"server_close_ok", "/ws-close"},
// Ensure we correctly handle invalid WS server
// implementations that close the connection prematurely
// without sending a close control frame first.
Expand Down Expand Up @@ -718,18 +717,20 @@ func TestErrors(t *testing.T) {
test := newTestState(t)
_, err := test.VU.Runtime().RunString(sr(`
var closed = false;
var res = ws.connect("WSBIN_URL/ws-close", function(socket){
var onerror = false;
var res = ws.connect("WSBIN_URL/ws-close-invalid", function(socket){
socket.on('open', function open() {
socket.setInterval(function timeout() {
socket.ping();
}, 1000);
});
socket.on("ping", function() {
socket.close();
socket.on("pong", function() {
socket.close();
});
socket.on("error", function(errorEvent) {
onerror = true;
if (errorEvent == null) {
throw new Error(JSON.stringify(errorEvent));
}
Expand All @@ -738,10 +739,23 @@ func TestErrors(t *testing.T) {
socket.close();
}
});
socket.on("close", function() {
closed = true;
})
});
if (res.status != 101) {
throw new Error("connection failed with status: " + res.status);
}
if (!closed) {
throw new Error ("conn no closed");
}
if (!onerror) {
throw new Error ("error event has not triggered");
}
`))
require.NoError(t, err)
assertSessionMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("WSBIN_URL/ws-close"), statusProtocolSwitch, "")
assertSessionMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("WSBIN_URL/ws-close-invalid"), statusProtocolSwitch, "")
})
}

Expand Down Expand Up @@ -821,13 +835,13 @@ func TestTLSConfig(t *testing.T) {
}

_, err := test.VU.Runtime().RunString(sr(`
var res = ws.connect("WSSBIN_URL/ws-close", function(socket){
var res = ws.connect("WSSBIN_URL/ws-echo", function(socket){
socket.close()
});
if (res.status != 101) { throw new Error("TLS connection failed with status: " + res.status); }
`))
require.NoError(t, err)
assertSessionMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "")
assertSessionMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("WSSBIN_URL/ws-echo"), statusProtocolSwitch, "")
})

t.Run("custom certificates", func(t *testing.T) {
Expand All @@ -839,15 +853,15 @@ func TestTLSConfig(t *testing.T) {
test.VU.StateField.TLSConfig = tb.TLSClientConfig

_, err := test.VU.Runtime().RunString(sr(`
var res = ws.connect("WSSBIN_URL/ws-close", function(socket){
var res = ws.connect("WSSBIN_URL/ws-echo", function(socket){
socket.close()
});
if (res.status != 101) {
throw new Error("TLS connection failed with status: " + res.status);
}
`))
require.NoError(t, err)
assertSessionMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("WSSBIN_URL/ws-close"), statusProtocolSwitch, "")
assertSessionMetricsEmitted(t, metrics.GetBufferedSamples(test.samples), "", sr("WSSBIN_URL/ws-echo"), statusProtocolSwitch, "")
})
}

Expand Down Expand Up @@ -1263,6 +1277,7 @@ func TestWSConnectDisableThrowErrorOption(t *testing.T) {
if (res == null && res.error == null) {
throw new Error("res.error is expected to be not null");
}
`)
require.NoError(t, err)
entries := logHook.Drain()
Expand Down
91 changes: 62 additions & 29 deletions lib/testutils/httpmultibin/httpmultibin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,40 +88,73 @@ type jsonBody struct {
Compression string `json:"compression"`
}

func getWebsocketHandler(echo bool, closePrematurely bool) http.Handler {
// autocloseHandler handles requests just opening
// then closing the connection without waiting for the client input.
// It simulates the server-side closing operation.
func autocloseHandler(t testing.TB) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header())
require.NoError(t, err)

err = conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Second))
require.NoError(t, err)

err = conn.Close()
require.NoError(t, err)
})
}

// echoHandler handles requests proxying the same request input to the client.
// If closePrematurely is false then it waits for the client's request to close the connection.
// If closePrematurely is true then it closes the connection in a brutally
// without respecting the protocol.
func echoHandler(t testing.TB, closePrematurely bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conn, err := (&websocket.Upgrader{}).Upgrade(w, req, w.Header())
require.NoError(t, err)

defer func() {
_ = conn.Close()
}()

messageType, r, e := conn.NextReader()
if e != nil {
return
}
var wc io.WriteCloser
wc, err = conn.NextWriter(messageType)
if err != nil {
return
}
if echo {
messageType, r, e := conn.NextReader()
if e != nil {
return
}
var wc io.WriteCloser
wc, err = conn.NextWriter(messageType)
if err != nil {
return
}
if _, err = io.Copy(wc, r); err != nil {
return
}
if err = wc.Close(); err != nil {
return
}
if _, err = io.Copy(wc, r); err != nil {
return
}
if err = wc.Close(); err != nil {
return
}

// closePrematurely=true mimics an invalid WS server that doesn't
// send a close control frame before closing the connection.
if !closePrematurely {
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
_ = conn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(time.Second))
// Wait for response control frame
<-time.After(time.Second)
}
err = conn.Close()
if err != nil {
return
// Closing is delegated to the client,
// it waits the control message for closing.
closeReceived := make(chan struct{})
defaultCloseHandler := conn.CloseHandler()
conn.SetCloseHandler(func(code int, text string) error {
close(closeReceived)
return defaultCloseHandler(code, text)
})

for {
_, _, e := conn.ReadMessage()
if e != nil {
break
}
}
<-closeReceived
}
})
}
Expand Down Expand Up @@ -256,10 +289,10 @@ func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin {
// Create a http.ServeMux and set the httpbin handler as the default
mux := http.NewServeMux()
mux.Handle("/brotli", getEncodedHandler(t, "br"))
mux.Handle("/ws-echo", getWebsocketHandler(true, false))
mux.Handle("/ws-echo-invalid", getWebsocketHandler(true, true))
mux.Handle("/ws-close", getWebsocketHandler(false, false))
mux.Handle("/ws-close-invalid", getWebsocketHandler(false, true))
mux.Handle("/ws-echo", echoHandler(t, false))
mux.Handle("/ws-echo-invalid", echoHandler(t, true))
mux.Handle("/ws-close", autocloseHandler(t))
mux.Handle("/ws-close-invalid", echoHandler(t, true))
mux.Handle("/zstd", getEncodedHandler(t, "zstd"))
mux.Handle("/zstd-br", getZstdBrHandler(t))
mux.Handle("/", httpbin.New().Handler())
Expand Down

0 comments on commit f364159

Please sign in to comment.