diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 3e36fa27b72..22ea1e4145e 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -2937,7 +2937,7 @@ NetworkOPsImp::reportFeeChange() if (f != mLastFeeSummary) { m_job_queue.addJob( - jtCLIENT, "reportFeeChange->pubServer", [this](Job&) { + jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this](Job&) { pubServer(); }); } @@ -2947,7 +2947,7 @@ void NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase) { m_job_queue.addJob( - jtCLIENT, + jtCLIENT_CONSENSUS, "reportConsensusStateChange->pubConsensus", [this, phase](Job&) { pubConsensus(phase); }); } @@ -3340,7 +3340,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo) } app_.getJobQueue().addJob( - jtCLIENT, + jtCLIENT_ACCT_HIST, "AccountHistoryTxStream", [this, dbType = databaseType, subInfo](Job&) { auto const& accountId = subInfo.index_->accountId_; diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index 8f603e8d292..66aa0d051f7 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -39,34 +39,41 @@ enum JobType { // earlier jobs having lower priority than later jobs. If you wish to // insert a job at a specific priority, simply add it at the right location. - jtPACK, // Make a fetch pack for a peer - jtPUBOLDLEDGER, // An old ledger has been accepted - jtCLIENT, // A websocket command from the client - jtRPC, // A websocket command from the client - jtVALIDATION_ut, // A validation from an untrusted source - jtUPDATE_PF, // Update pathfinding requests - jtTRANSACTION_l, // A local transaction - jtREPLAY_REQ, // Peer request a ledger delta or a skip list - jtLEDGER_REQ, // Peer request ledger/txnset data - jtPROPOSAL_ut, // A proposal from an untrusted source - jtREPLAY_TASK, // A Ledger replay task/subtask - jtLEDGER_DATA, // Received data for a ledger we're acquiring - jtSWEEP, // Sweep for stale structures - jtTRANSACTION, // A transaction received from the network - jtMISSING_TXN, // Request missing transactions - jtREQUESTED_TXN, // Reply with requested transactions - jtBATCH, // Apply batched transactions - jtADVANCE, // Advance validated/acquired ledgers - jtPUBLEDGER, // Publish a fully-accepted ledger - jtTXN_DATA, // Fetch a proposed set - jtWAL, // Write-ahead logging - jtVALIDATION_t, // A validation from a trusted source - jtWRITE, // Write out hashed objects - jtACCEPT, // Accept a consensus ledger - jtPROPOSAL_t, // A proposal from a trusted source - jtNETOP_CLUSTER, // NetworkOPs cluster peer report - jtNETOP_TIMER, // NetworkOPs net timer processing - jtADMIN, // An administrative operation + jtPACK, // Make a fetch pack for a peer + jtPUBOLDLEDGER, // An old ledger has been accepted + jtCLIENT, // A placeholder for the priority of all jtCLIENT jobs + jtCLIENT_SUBSCRIBE, // A websocket subscription by a client + jtCLIENT_FEE_CHANGE, // Subscription for fee change by a client + jtCLIENT_CONSENSUS, // Subscription for consensus state change by a client + jtCLIENT_ACCT_HIST, // Subscription for account history by a client + jtCLIENT_SHARD, // Client request for shard archiving + jtCLIENT_RPC, // Client RPC request + jtCLIENT_WEBSOCKET, // Client websocket request + jtRPC, // A websocket command from the client + jtSWEEP, // Sweep for stale structures + jtVALIDATION_ut, // A validation from an untrusted source + jtUPDATE_PF, // Update pathfinding requests + jtTRANSACTION_l, // A local transaction + jtREPLAY_REQ, // Peer request a ledger delta or a skip list + jtLEDGER_REQ, // Peer request ledger/txnset data + jtPROPOSAL_ut, // A proposal from an untrusted source + jtREPLAY_TASK, // A Ledger replay task/subtask + jtLEDGER_DATA, // Received data for a ledger we're acquiring + jtTRANSACTION, // A transaction received from the network + jtMISSING_TXN, // Request missing transactions + jtREQUESTED_TXN, // Reply with requested transactions + jtBATCH, // Apply batched transactions + jtADVANCE, // Advance validated/acquired ledgers + jtPUBLEDGER, // Publish a fully-accepted ledger + jtTXN_DATA, // Fetch a proposed set + jtWAL, // Write-ahead logging + jtVALIDATION_t, // A validation from a trusted source + jtWRITE, // Write out hashed objects + jtACCEPT, // Accept a consensus ledger + jtPROPOSAL_t, // A proposal from a trusted source + jtNETOP_CLUSTER, // NetworkOPs cluster peer report + jtNETOP_TIMER, // NetworkOPs net timer processing + jtADMIN, // An administrative operation // Special job types which are not dispatched by the job pool jtPEER, diff --git a/src/ripple/core/JobTypes.h b/src/ripple/core/JobTypes.h index 624ba227ed6..b28bc3d59c7 100644 --- a/src/ripple/core/JobTypes.h +++ b/src/ripple/core/JobTypes.h @@ -67,46 +67,55 @@ class JobTypes }; // clang-format off - add(jtPACK, "makeFetchPack", 1, 0ms, 0ms); - add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms); - add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms); - add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms); - add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms); - add(jtLEDGER_REQ, "ledgerRequest", 4, 0ms, 0ms); - add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, 500ms, 1250ms); - add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, 0ms, 0ms); - add(jtLEDGER_DATA, "ledgerData", 4, 0ms, 0ms); - add(jtCLIENT, "clientCommand", maxLimit, 2000ms, 5000ms); - add(jtRPC, "RPC", maxLimit, 0ms, 0ms); - add(jtUPDATE_PF, "updatePaths", 1, 0ms, 0ms); - add(jtTRANSACTION, "transaction", maxLimit, 250ms, 1000ms); - add(jtBATCH, "batch", maxLimit, 250ms, 1000ms); - add(jtADVANCE, "advanceLedger", maxLimit, 0ms, 0ms); - add(jtPUBLEDGER, "publishNewLedger", maxLimit, 3000ms, 4500ms); - add(jtTXN_DATA, "fetchTxnData", 5, 0ms, 0ms); - add(jtWAL, "writeAhead", maxLimit, 1000ms, 2500ms); - add(jtVALIDATION_t, "trustedValidation", maxLimit, 500ms, 1500ms); - add(jtWRITE, "writeObjects", maxLimit, 1750ms, 2500ms); - add(jtACCEPT, "acceptLedger", maxLimit, 0ms, 0ms); - add(jtPROPOSAL_t, "trustedProposal", maxLimit, 100ms, 500ms); - add(jtSWEEP, "sweep", 1, 0ms, 0ms); - add(jtNETOP_CLUSTER, "clusterReport", 1, 9999ms, 9999ms); - add(jtNETOP_TIMER, "heartbeat", 1, 999ms, 999ms); - add(jtADMIN, "administration", maxLimit, 0ms, 0ms); - add(jtMISSING_TXN, "handleHaveTransactions", 1200, 0ms, 0ms); - add(jtREQUESTED_TXN, "doTransactions", 1200, 0ms, 0ms); - - add(jtPEER, "peerCommand", 0, 200ms, 2500ms); - add(jtDISK, "diskAccess", 0, 500ms, 1000ms); - add(jtTXN_PROC, "processTransaction", 0, 0ms, 0ms); - add(jtOB_SETUP, "orderBookSetup", 0, 0ms, 0ms); - add(jtPATH_FIND, "pathFind", 0, 0ms, 0ms); - add(jtHO_READ, "nodeRead", 0, 0ms, 0ms); - add(jtHO_WRITE, "nodeWrite", 0, 0ms, 0ms); - add(jtGENERIC, "generic", 0, 0ms, 0ms); - add(jtNS_SYNC_READ, "SyncReadNode", 0, 0ms, 0ms); - add(jtNS_ASYNC_READ, "AsyncReadNode", 0, 0ms, 0ms); - add(jtNS_WRITE, "WriteNode", 0, 0ms, 0ms); + // avg peak + // JobType name limit latency latency + add(jtPACK, "makeFetchPack", 1, 0ms, 0ms); + add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms); + add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms); + add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms); + add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms); + add(jtLEDGER_REQ, "ledgerRequest", 4, 0ms, 0ms); + add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, 500ms, 1250ms); + add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, 0ms, 0ms); + add(jtLEDGER_DATA, "ledgerData", 4, 0ms, 0ms); + add(jtCLIENT, "clientCommand", maxLimit, 2000ms, 5000ms); + add(jtCLIENT_SUBSCRIBE, "clientSubscribe", maxLimit, 2000ms, 5000ms); + add(jtCLIENT_FEE_CHANGE, "clientFeeChange", maxLimit, 2000ms, 5000ms); + add(jtCLIENT_CONSENSUS, "clientConsensus", maxLimit, 2000ms, 5000ms); + add(jtCLIENT_ACCT_HIST, "clientAccountHistory", maxLimit, 2000ms, 5000ms); + add(jtCLIENT_SHARD, "clientShardArchive", maxLimit, 2000ms, 5000ms); + add(jtCLIENT_RPC, "clientRPC", maxLimit, 2000ms, 5000ms); + add(jtCLIENT_WEBSOCKET, "clientWebsocket", maxLimit, 2000ms, 5000ms); + add(jtRPC, "RPC", maxLimit, 0ms, 0ms); + add(jtUPDATE_PF, "updatePaths", 1, 0ms, 0ms); + add(jtTRANSACTION, "transaction", maxLimit, 250ms, 1000ms); + add(jtBATCH, "batch", maxLimit, 250ms, 1000ms); + add(jtADVANCE, "advanceLedger", maxLimit, 0ms, 0ms); + add(jtPUBLEDGER, "publishNewLedger", maxLimit, 3000ms, 4500ms); + add(jtTXN_DATA, "fetchTxnData", 5, 0ms, 0ms); + add(jtWAL, "writeAhead", maxLimit, 1000ms, 2500ms); + add(jtVALIDATION_t, "trustedValidation", maxLimit, 500ms, 1500ms); + add(jtWRITE, "writeObjects", maxLimit, 1750ms, 2500ms); + add(jtACCEPT, "acceptLedger", maxLimit, 0ms, 0ms); + add(jtPROPOSAL_t, "trustedProposal", maxLimit, 100ms, 500ms); + add(jtSWEEP, "sweep", 1, 0ms, 0ms); + add(jtNETOP_CLUSTER, "clusterReport", 1, 9999ms, 9999ms); + add(jtNETOP_TIMER, "heartbeat", 1, 999ms, 999ms); + add(jtADMIN, "administration", maxLimit, 0ms, 0ms); + add(jtMISSING_TXN, "handleHaveTransactions", 1200, 0ms, 0ms); + add(jtREQUESTED_TXN, "doTransactions", 1200, 0ms, 0ms); + + add(jtPEER, "peerCommand", 0, 200ms, 2500ms); + add(jtDISK, "diskAccess", 0, 500ms, 1000ms); + add(jtTXN_PROC, "processTransaction", 0, 0ms, 0ms); + add(jtOB_SETUP, "orderBookSetup", 0, 0ms, 0ms); + add(jtPATH_FIND, "pathFind", 0, 0ms, 0ms); + add(jtHO_READ, "nodeRead", 0, 0ms, 0ms); + add(jtHO_WRITE, "nodeWrite", 0, 0ms, 0ms); + add(jtGENERIC, "generic", 0, 0ms, 0ms); + add(jtNS_SYNC_READ, "SyncReadNode", 0, 0ms, 0ms); + add(jtNS_ASYNC_READ, "AsyncReadNode", 0, 0ms, 0ms); + add(jtNS_WRITE, "WriteNode", 0, 0ms, 0ms); // clang-format on } diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index d87664416a0..09191afd7fc 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -94,7 +94,9 @@ JobQueue::addRefCountedJob( // FIXME: Workaround incorrect client shutdown ordering // do not add jobs to a queue with no threads - assert(type == jtCLIENT || m_workers.getNumberOfThreads() > 0); + assert( + (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) || + m_workers.getNumberOfThreads() > 0); { std::lock_guard lock(m_mutex); diff --git a/src/ripple/net/impl/RPCSub.cpp b/src/ripple/net/impl/RPCSub.cpp index f9b08e5c074..f65f9a3614b 100644 --- a/src/ripple/net/impl/RPCSub.cpp +++ b/src/ripple/net/impl/RPCSub.cpp @@ -96,7 +96,9 @@ class RPCSubImp : public RPCSub JLOG(j_.info()) << "RPCCall::fromNetwork start"; mSending = m_jobQueue.addJob( - jtCLIENT, "RPCSub::sendThread", [this](Job&) { sendThread(); }); + jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this](Job&) { + sendThread(); + }); } } diff --git a/src/ripple/rpc/impl/RPCHandler.cpp b/src/ripple/rpc/impl/RPCHandler.cpp index 5430f3a6a5e..b04a6f0ed2a 100644 --- a/src/ripple/rpc/impl/RPCHandler.cpp +++ b/src/ripple/rpc/impl/RPCHandler.cpp @@ -129,12 +129,11 @@ fillHandler(JsonContext& context, Handler const*& result) { if (!isUnlimited(context.role)) { - // VFALCO NOTE Should we also add up the jtRPC jobs? - // - int jc = context.app.getJobQueue().getJobCountGE(jtCLIENT); - if (jc > Tuning::maxJobQueueClients) + // Count all jobs at jtCLIENT priority or higher. + int const jobCount = context.app.getJobQueue().getJobCountGE(jtCLIENT); + if (jobCount > Tuning::maxJobQueueClients) { - JLOG(context.j.debug()) << "Too busy for command: " << jc; + JLOG(context.j.debug()) << "Too busy for command: " << jobCount; return rpcTOO_BUSY; } } diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp index b2f52e9a551..b9b2637eb1c 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.cpp +++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp @@ -292,7 +292,7 @@ ServerHandlerImp::onRequest(Session& session) std::shared_ptr detachedSession = session.detach(); auto const postResult = m_jobQueue.postCoro( - jtCLIENT, + jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr coro) { processSession(detachedSession, coro); @@ -339,7 +339,7 @@ ServerHandlerImp::onWSMessage( JLOG(m_journal.trace()) << "Websocket received '" << jv << "'"; auto const postResult = m_jobQueue.postCoro( - jtCLIENT, + jtCLIENT_WEBSOCKET, "WS-Client", [this, session, jv = std::move(jv)]( std::shared_ptr const& coro) { diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index 7ad8bcbbad2..c52c1b50174 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -382,7 +382,7 @@ ShardArchiveHandler::next(std::lock_guard const& l) return onClosureFailed( "failed to wrap closure for starting download", l); - app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper); + app_.getJobQueue().addJob(jtCLIENT_SHARD, "ShardArchiveHandler", *wrapper); return true; } @@ -465,7 +465,7 @@ ShardArchiveHandler::complete(path dstPath) } // Process in another thread to not hold up the IO service - app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper); + app_.getJobQueue().addJob(jtCLIENT_SHARD, "ShardArchiveHandler", *wrapper); } void