Skip to content

Commit

Permalink
Add test to check order of ConnectionManager trans
Browse files Browse the repository at this point in the history
- In unregisterInboundConnectionImpl: OutboundDupState Ticking -> OutboundDupState Expired
- In withConnectionManager cleanup, where we put the state in TerminatedState is missing a trace. However, since this can be async the connState can come out of order and/or not making much sense.
- In requestOutboundConnectionImpl bracketOnError: TerminatedState -> Unknown
- In includeInboundConnectionImpl: on readPromise Left handleError case, where we put the state in either TerminatingState or TerminatedState
- In unregisterInboundConnectionImpl: Only trace TerminatingState -> TerminatedState, after having written to the connVar
- In a possible race between withConnectionManager finally block and forkConnectionHandler cleanup function, where the latter can run first and break an assumption made by withConnectionManager that it is the first to run so a connection shouldn't be in TerminatingState (which is not the case, since there is a race condition). So an out of order and possibly not making much sense transition is logged.
- When accept call returns first than connect and the connVar gets overwritten. In requestOutboundConnectionImpl cleanup function we wrongly trace * -> TerminatedState transition of a removed connVar, we shouldn't care about that connVar anymore.
- In promotedToWarmRemoteImpl, in OutboundIdleState case we are not updating the state correctly.
  • Loading branch information
bolt12 committed Sep 22, 2021
1 parent 9530510 commit b68bb6d
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import Data.Proxy (Proxy (..))
import Data.Typeable (Typeable)
import GHC.Stack (CallStack, HasCallStack, callStack)

import Data.Map (Map)
import Data.Map (Map, traverseWithKey)
import qualified Data.Map as Map
import qualified Data.Set as Set

Expand Down Expand Up @@ -556,19 +556,37 @@ withConnectionManager ConnectionManagerArguments {
k connectionManager
`finally` do
traceWith tracer TrShutdown

state <- atomically $ readTMVar stateVar
traverse_
(\connVar -> do
void $ traverseWithKey
(\peerAddr connVar -> do
-- cleanup handler for that thread will close socket associated
-- with the thread. We put each connection in 'TerminatedState' to
-- guarantee, that non of the connection threads will enter
-- try that none of the connection threads will enter
-- 'TerminatingState' (and thus delay shutdown for 'tcp_WAIT_TIME'
-- seconds) when receiving the 'AsyncCancelled' exception.
connState <- atomically $ do
-- seconds) when receiving the 'AsyncCancelled' exception. However this code
-- races with the one in 'forkConnectionHandler' and in the case that this
-- finally block loses the race the AsyncCancelled received should unblock
-- the threadDelay.
--
-- TODO: Check if this block of code had only AsyncCancelled exceptions in
-- mind, and if given that when an AsyncCancelled is received threadDelay
-- will be unblocked; if it is really necessary the extra effort of putting
-- connections in TerminatedState.
(connState, tr, shouldTrace) <- atomically $ do
connState <- readTVar connVar
writeTVar connVar (TerminatedState Nothing)
return connState
traverse_ cancel (getConnThread connState) )
let connState' = TerminatedState Nothing
tr = TransitionTrace peerAddr (mkTransition connState connState')
absConnState = abstractState (Known connState)
shouldTrace = absConnState /= TerminatedSt

writeTVar connVar connState'
return (connState, tr, shouldTrace)

when shouldTrace $
traceWith trTracer tr
traverse_ cancel (getConnThread connState)
)
state
where
traceCounters :: StrictTMVar m (ConnectionManagerState peerAddr handle handleError version m) -> m ()
Expand Down Expand Up @@ -635,35 +653,36 @@ withConnectionManager ConnectionManagerArguments {
Just connVar -> do
connState <- readTVar connVar
let connState' = TerminatedState Nothing
transition = TransitionTrace peerAddr (mkTransition connState connState')
transition = mkTransition connState connState'
transitionTrace = TransitionTrace peerAddr transition
case connState of
ReservedOutboundState -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
UnnegotiatedState {} -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
OutboundUniState {} -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
OutboundDupState {} -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
OutboundIdleState {} -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
InboundIdleState {} -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
InboundState {} -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
DuplexState {} -> do
writeTVar connVar connState'
return $ There (Just transition)
return $ There (Just transitionTrace)
TerminatingState {} -> do
return $ Here (connVar, transition)
TerminatedState {} ->
TerminatedState {} -> do
return $ There Nothing

case wConnVar of
Expand Down Expand Up @@ -710,11 +729,10 @@ withConnectionManager ConnectionManagerArguments {
-- - `Terminate: TerminatingState → TerminatedState` transition.
traceWith tracer (TrConnectionTimeWaitDone connId)

-- TerminatingState -> TerminatedState transition
traceWith trTracer transition
traceCounters stateVar

trs <- atomically $ do
connState <- readTVar connVar
let transition' = transition { fromState = Known connState }
shouldTrace = abstractState (Known connState) /= TerminatedSt
writeTVar connVar (TerminatedState Nothing)
-- We have to be careful when deleting it from
-- 'ConnectionManagerState'.
Expand All @@ -739,11 +757,19 @@ withConnectionManager ConnectionManagerArguments {
-- Key was present in the dictionary (stateVar) and
-- removed so we trace the removal.
Just Nothing ->
return [ Transition
{ fromState = Known (TerminatedState Nothing)
, toState = Unknown
}
]
return $
if shouldTrace
then [ transition'
, Transition
{ fromState = Known (TerminatedState Nothing)
, toState = Unknown
}
]
else [ Transition
{ fromState = Known (TerminatedState Nothing)
, toState = Unknown
}
]
-- Key was not present in the dictionary (stateVar),
-- so we do not trace anything as it was already traced upon
-- deletion.
Expand Down Expand Up @@ -812,13 +838,24 @@ withConnectionManager ConnectionManagerArguments {
res <- atomically $ readPromise reader
case res of
Left handleError -> do
atomically $ do
writeTVar connVar $
case classifyHandleError handleError of
HandshakeFailure -> TerminatingState connId connThread
(Just handleError)
HandshakeProtocolViolation -> TerminatedState (Just handleError)
(tr, shouldTrace) <- atomically $ do
connState <- readTVar connVar
let connState' =
case classifyHandleError handleError of
HandshakeFailure -> TerminatingState connId connThread
(Just handleError)
HandshakeProtocolViolation -> TerminatedState (Just handleError)
tr = TransitionTrace peerAddr (mkTransition connState connState')
absConnState = abstractState (Known connState)
writeTVar connVar connState'
modifyTMVarPure_ stateVar (Map.delete peerAddr)
return (tr, absConnState /= TerminatedSt)

-- If 'connState' can be in 'TerminatedState' (due to some 'SomeAsyncException'
-- for example) and we do not want to transition possibly invalid
-- transitions such as TerminatedState -> TerminatingState
when shouldTrace $
traceWith trTracer tr
traceCounters stateVar
return (Disconnected connId (Just handleError))

Expand Down Expand Up @@ -881,7 +918,6 @@ withConnectionManager ConnectionManagerArguments {
TerminatedState {} -> return Nothing

traverse_ (traceWith trTracer . TransitionTrace peerAddr) mbTransition
traceCounters stateVar

traceCounters stateVar

Expand Down Expand Up @@ -941,9 +977,10 @@ withConnectionManager ConnectionManagerArguments {
-- → OutboundState Duplex
-- @
OutboundDupState connId connThread handle Ticking -> do
writeTVar connVar (OutboundDupState connId connThread handle Expired)
let connState' = OutboundDupState connId connThread handle Expired
writeTVar connVar connState'
return ( Nothing
, Nothing
, Just (mkTransition connState connState')
, OperationSuccess KeepTr )
OutboundDupState _connId _connThread _handle Expired ->
assert False $
Expand Down Expand Up @@ -1198,18 +1235,47 @@ withConnectionManager ConnectionManagerArguments {
(openToConnect cmSnocket peerAddr)
(\socket -> do
close cmSnocket socket
tr <- atomically $ do
connState <- readTVar connVar
let connState' = TerminatedState Nothing
writeTVar connVar connState'
modifyTMVarPure_ stateVar $
(Map.update (\connVar' ->
if eqTVar (Proxy :: Proxy m) connVar' connVar
then Nothing
else Just connVar')
peerAddr)
return (mkTransition connState connState')
traceWith trTracer (TransitionTrace peerAddr tr)
res <- atomically $ do
state <- takeTMVar stateVar

case Map.lookup peerAddr state of
-- Lookup failed, which means connection was already removed.
-- So we just update the connVar and trace accordingly.
Nothing -> do
connState <- readTVar connVar
let connState' = TerminatedState Nothing
writeTVar connVar connState'
putTMVar stateVar state
return $
Right ( mkTransition connState connState'
, Transition (Known connState') Unknown)

-- Current connVar.
Just connVar' ->
-- If accept call returned first than connect then the
-- connVar will be replaced. If it was replaced then we do
-- not need to do anything. Otherwise, we need to remove
-- the connVar from the state and trace accordingly.
if eqTVar (Proxy :: Proxy m) connVar' connVar
then do
connState <- readTVar connVar
let state' = Map.delete peerAddr state
connState' = TerminatedState Nothing
writeTVar connVar connState'
putTMVar stateVar state'
return $
Right ( mkTransition connState connState'
, Transition (Known connState') Unknown)
else do
putTMVar stateVar state
return $
Left ()

case res of
Left _ -> pure ()
Right (tr, tr') -> do
traceWith trTracer (TransitionTrace peerAddr tr)
traceWith trTracer (TransitionTrace peerAddr tr')
traceCounters stateVar

)
Expand Down Expand Up @@ -1268,28 +1334,39 @@ withConnectionManager ConnectionManagerArguments {
res <- atomically (readPromise reader)
case res of
Left handleError -> do
modifyTMVar stateVar $ \state -> do
(transition, shouldTrace) <- atomically $ do
connState <- readTVar connVar

let connState' =
case classifyHandleError handleError of
HandshakeFailure ->
TerminatingState connId connThread
(Just handleError)
HandshakeProtocolViolation ->
TerminatedState (Just handleError)
transition = TransitionTrace peerAddr (mkTransition connState connState')
absConnState = abstractState (Known connState)

-- 'handleError' might be either a handshake negotiation
-- a protocol failure (an IO exception, a timeout or
-- codec failure). In the first case we should not reset
-- the connection as this is not a protocol error.
atomically $ writeTVar connVar $
case classifyHandleError handleError of
HandshakeFailure ->
TerminatingState connId connThread
(Just handleError)
HandshakeProtocolViolation ->
TerminatedState (Just handleError)

return ( Map.update
(\connVar' ->
if eqTVar (Proxy :: Proxy m) connVar' connVar
then Nothing
else Just connVar')
peerAddr
state
, Disconnected connId (Just handleError)
)
writeTVar connVar connState'

modifyTMVarPure_ stateVar
(Map.update
(\connVar' ->
if eqTVar (Proxy :: Proxy m) connVar' connVar
then Nothing
else Just connVar')
peerAddr)
return (transition, absConnState /= TerminatedSt)

when shouldTrace $
traceWith trTracer transition
traceCounters stateVar

return (Disconnected connId (Just handleError))

Right (handle, version) -> do
let dataFlow = connectionDataFlow version
Expand Down Expand Up @@ -1774,7 +1851,7 @@ withConnectionManager ConnectionManagerArguments {
-- → InboundState Duplex
-- @
let connState' = InboundState connId connThread handle dataFlow
writeTVar connVar connState
writeTVar connVar connState'
return (OperationSuccess (mkTransition connState connState'), Nothing)
OutboundIdleState _connId _connThread _handle
dataFlow@Unidirectional ->
Expand Down
Loading

0 comments on commit b68bb6d

Please sign in to comment.