Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve names returned by server_info counters #4031

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2937,7 +2937,7 @@ NetworkOPsImp::reportFeeChange()
if (f != mLastFeeSummary)
{
m_job_queue.addJob(
jtCLIENT, "reportFeeChange->pubServer", [this](Job&) {
jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this](Job&) {
pubServer();
});
}
Expand All @@ -2947,7 +2947,7 @@ void
NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase)
{
m_job_queue.addJob(
jtCLIENT,
jtCLIENT_CONSENSUS,
"reportConsensusStateChange->pubConsensus",
[this, phase](Job&) { pubConsensus(phase); });
}
Expand Down Expand Up @@ -3340,7 +3340,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
}

app_.getJobQueue().addJob(
jtCLIENT,
jtCLIENT_ACCT_HIST,
"AccountHistoryTxStream",
[this, dbType = databaseType, subInfo](Job&) {
auto const& accountId = subInfo.index_->accountId_;
Expand Down
63 changes: 35 additions & 28 deletions src/ripple/core/Job.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,41 @@ enum JobType {
// earlier jobs having lower priority than later jobs. If you wish to
// insert a job at a specific priority, simply add it at the right location.

jtPACK, // Make a fetch pack for a peer
jtPUBOLDLEDGER, // An old ledger has been accepted
jtCLIENT, // A websocket command from the client
jtRPC, // A websocket command from the client
jtVALIDATION_ut, // A validation from an untrusted source
jtUPDATE_PF, // Update pathfinding requests
jtTRANSACTION_l, // A local transaction
jtREPLAY_REQ, // Peer request a ledger delta or a skip list
jtLEDGER_REQ, // Peer request ledger/txnset data
jtPROPOSAL_ut, // A proposal from an untrusted source
jtREPLAY_TASK, // A Ledger replay task/subtask
jtLEDGER_DATA, // Received data for a ledger we're acquiring
jtSWEEP, // Sweep for stale structures
jtTRANSACTION, // A transaction received from the network
jtMISSING_TXN, // Request missing transactions
jtREQUESTED_TXN, // Reply with requested transactions
jtBATCH, // Apply batched transactions
jtADVANCE, // Advance validated/acquired ledgers
jtPUBLEDGER, // Publish a fully-accepted ledger
jtTXN_DATA, // Fetch a proposed set
jtWAL, // Write-ahead logging
jtVALIDATION_t, // A validation from a trusted source
jtWRITE, // Write out hashed objects
jtACCEPT, // Accept a consensus ledger
jtPROPOSAL_t, // A proposal from a trusted source
jtNETOP_CLUSTER, // NetworkOPs cluster peer report
jtNETOP_TIMER, // NetworkOPs net timer processing
jtADMIN, // An administrative operation
jtPACK, // Make a fetch pack for a peer
jtPUBOLDLEDGER, // An old ledger has been accepted
jtCLIENT, // A placeholder for the priority of all jtCLIENT jobs
jtCLIENT_SUBSCRIBE, // A websocket subscription by a client
jtCLIENT_FEE_CHANGE, // Subscription for fee change by a client
jtCLIENT_CONSENSUS, // Subscription for consensus state change by a client
jtCLIENT_ACCT_HIST, // Subscription for account history by a client
jtCLIENT_SHARD, // Client request for shard archiving
jtCLIENT_RPC, // Client RPC request
jtCLIENT_WEBSOCKET, // Client websocket request
jtRPC, // A websocket command from the client
jtSWEEP, // Sweep for stale structures
jtVALIDATION_ut, // A validation from an untrusted source
jtUPDATE_PF, // Update pathfinding requests
jtTRANSACTION_l, // A local transaction
jtREPLAY_REQ, // Peer request a ledger delta or a skip list
jtLEDGER_REQ, // Peer request ledger/txnset data
jtPROPOSAL_ut, // A proposal from an untrusted source
jtREPLAY_TASK, // A Ledger replay task/subtask
jtLEDGER_DATA, // Received data for a ledger we're acquiring
jtTRANSACTION, // A transaction received from the network
jtMISSING_TXN, // Request missing transactions
jtREQUESTED_TXN, // Reply with requested transactions
jtBATCH, // Apply batched transactions
jtADVANCE, // Advance validated/acquired ledgers
jtPUBLEDGER, // Publish a fully-accepted ledger
jtTXN_DATA, // Fetch a proposed set
jtWAL, // Write-ahead logging
jtVALIDATION_t, // A validation from a trusted source
jtWRITE, // Write out hashed objects
jtACCEPT, // Accept a consensus ledger
jtPROPOSAL_t, // A proposal from a trusted source
jtNETOP_CLUSTER, // NetworkOPs cluster peer report
jtNETOP_TIMER, // NetworkOPs net timer processing
jtADMIN, // An administrative operation

// Special job types which are not dispatched by the job pool
jtPEER,
Expand Down
89 changes: 49 additions & 40 deletions src/ripple/core/JobTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,46 +67,55 @@ class JobTypes
};

// clang-format off
add(jtPACK, "makeFetchPack", 1, 0ms, 0ms);
add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms);
add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms);
add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms);
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms);
add(jtLEDGER_REQ, "ledgerRequest", 4, 0ms, 0ms);
add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, 500ms, 1250ms);
add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, 0ms, 0ms);
add(jtLEDGER_DATA, "ledgerData", 4, 0ms, 0ms);
add(jtCLIENT, "clientCommand", maxLimit, 2000ms, 5000ms);
add(jtRPC, "RPC", maxLimit, 0ms, 0ms);
add(jtUPDATE_PF, "updatePaths", 1, 0ms, 0ms);
add(jtTRANSACTION, "transaction", maxLimit, 250ms, 1000ms);
add(jtBATCH, "batch", maxLimit, 250ms, 1000ms);
add(jtADVANCE, "advanceLedger", maxLimit, 0ms, 0ms);
add(jtPUBLEDGER, "publishNewLedger", maxLimit, 3000ms, 4500ms);
add(jtTXN_DATA, "fetchTxnData", 5, 0ms, 0ms);
add(jtWAL, "writeAhead", maxLimit, 1000ms, 2500ms);
add(jtVALIDATION_t, "trustedValidation", maxLimit, 500ms, 1500ms);
add(jtWRITE, "writeObjects", maxLimit, 1750ms, 2500ms);
add(jtACCEPT, "acceptLedger", maxLimit, 0ms, 0ms);
add(jtPROPOSAL_t, "trustedProposal", maxLimit, 100ms, 500ms);
add(jtSWEEP, "sweep", 1, 0ms, 0ms);
add(jtNETOP_CLUSTER, "clusterReport", 1, 9999ms, 9999ms);
add(jtNETOP_TIMER, "heartbeat", 1, 999ms, 999ms);
add(jtADMIN, "administration", maxLimit, 0ms, 0ms);
add(jtMISSING_TXN, "handleHaveTransactions", 1200, 0ms, 0ms);
add(jtREQUESTED_TXN, "doTransactions", 1200, 0ms, 0ms);

add(jtPEER, "peerCommand", 0, 200ms, 2500ms);
add(jtDISK, "diskAccess", 0, 500ms, 1000ms);
add(jtTXN_PROC, "processTransaction", 0, 0ms, 0ms);
add(jtOB_SETUP, "orderBookSetup", 0, 0ms, 0ms);
add(jtPATH_FIND, "pathFind", 0, 0ms, 0ms);
add(jtHO_READ, "nodeRead", 0, 0ms, 0ms);
add(jtHO_WRITE, "nodeWrite", 0, 0ms, 0ms);
add(jtGENERIC, "generic", 0, 0ms, 0ms);
add(jtNS_SYNC_READ, "SyncReadNode", 0, 0ms, 0ms);
add(jtNS_ASYNC_READ, "AsyncReadNode", 0, 0ms, 0ms);
add(jtNS_WRITE, "WriteNode", 0, 0ms, 0ms);
// avg peak
// JobType name limit latency latency
add(jtPACK, "makeFetchPack", 1, 0ms, 0ms);
add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms);
add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms);
add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms);
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms);
add(jtLEDGER_REQ, "ledgerRequest", 5, 0ms, 0ms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit for this job type used to be 4. Now it's 5. Was this intentional?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. That was not an intentional change. Good catch. Thanks.

add(jtPROPOSAL_ut, "untrustedProposal", maxLimit, 500ms, 1250ms);
add(jtREPLAY_TASK, "ledgerReplayTask", maxLimit, 0ms, 0ms);
add(jtLEDGER_DATA, "ledgerData", 5, 0ms, 0ms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit for this job type used to be 4. Now it's 5. Was the change intentional?

add(jtCLIENT, "clientCommand", maxLimit, 2000ms, 5000ms);
add(jtCLIENT_SUBSCRIBE, "clientSubscribe", maxLimit, 2000ms, 5000ms);
add(jtCLIENT_FEE_CHANGE, "clientFeeChange", maxLimit, 2000ms, 5000ms);
add(jtCLIENT_CONSENSUS, "clientConsensus", maxLimit, 2000ms, 5000ms);
add(jtCLIENT_ACCT_HIST, "clientAccountHistory", maxLimit, 2000ms, 5000ms);
add(jtCLIENT_SHARD, "clientShardArchive", maxLimit, 2000ms, 5000ms);
add(jtCLIENT_RPC, "clientRPC", maxLimit, 2000ms, 5000ms);
add(jtCLIENT_WEBSOCKET, "clientWebsocket", maxLimit, 2000ms, 5000ms);
add(jtRPC, "RPC", maxLimit, 0ms, 0ms);
add(jtUPDATE_PF, "updatePaths", 1, 0ms, 0ms);
add(jtTRANSACTION, "transaction", maxLimit, 250ms, 1000ms);
add(jtBATCH, "batch", maxLimit, 250ms, 1000ms);
add(jtADVANCE, "advanceLedger", maxLimit, 0ms, 0ms);
add(jtPUBLEDGER, "publishNewLedger", maxLimit, 3000ms, 4500ms);
add(jtTXN_DATA, "fetchTxnData", 5, 0ms, 0ms);
add(jtWAL, "writeAhead", maxLimit, 1000ms, 2500ms);
add(jtVALIDATION_t, "trustedValidation", maxLimit, 500ms, 1500ms);
add(jtWRITE, "writeObjects", maxLimit, 1750ms, 2500ms);
add(jtACCEPT, "acceptLedger", maxLimit, 0ms, 0ms);
add(jtPROPOSAL_t, "trustedProposal", maxLimit, 100ms, 500ms);
add(jtSWEEP, "sweep", 1, 0ms, 0ms);
add(jtNETOP_CLUSTER, "clusterReport", 1, 9999ms, 9999ms);
add(jtNETOP_TIMER, "heartbeat", 1, 999ms, 999ms);
add(jtADMIN, "administration", maxLimit, 0ms, 0ms);
add(jtMISSING_TXN, "handleHaveTransactions", 1200, 0ms, 0ms);
add(jtREQUESTED_TXN, "doTransactions", 1200, 0ms, 0ms);

add(jtPEER, "peerCommand", 0, 200ms, 2500ms);
add(jtDISK, "diskAccess", 0, 500ms, 1000ms);
add(jtTXN_PROC, "processTransaction", 0, 0ms, 0ms);
add(jtOB_SETUP, "orderBookSetup", 0, 0ms, 0ms);
add(jtPATH_FIND, "pathFind", 0, 0ms, 0ms);
add(jtHO_READ, "nodeRead", 0, 0ms, 0ms);
add(jtHO_WRITE, "nodeWrite", 0, 0ms, 0ms);
add(jtGENERIC, "generic", 0, 0ms, 0ms);
add(jtNS_SYNC_READ, "SyncReadNode", 0, 0ms, 0ms);
add(jtNS_ASYNC_READ, "AsyncReadNode", 0, 0ms, 0ms);
add(jtNS_WRITE, "WriteNode", 0, 0ms, 0ms);
// clang-format on
}

Expand Down
4 changes: 3 additions & 1 deletion src/ripple/core/impl/JobQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ JobQueue::addRefCountedJob(

// FIXME: Workaround incorrect client shutdown ordering
// do not add jobs to a queue with no threads
assert(type == jtCLIENT || m_workers.getNumberOfThreads() > 0);
assert(
(type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
m_workers.getNumberOfThreads() > 0);

{
std::lock_guard lock(m_mutex);
Expand Down
4 changes: 3 additions & 1 deletion src/ripple/net/impl/RPCSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class RPCSubImp : public RPCSub
JLOG(j_.info()) << "RPCCall::fromNetwork start";

mSending = m_jobQueue.addJob(
jtCLIENT, "RPCSub::sendThread", [this](Job&) { sendThread(); });
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this](Job&) {
sendThread();
});
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/ripple/rpc/impl/RPCHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,11 @@ fillHandler(JsonContext& context, Handler const*& result)
{
if (!isUnlimited(context.role))
{
// VFALCO NOTE Should we also add up the jtRPC jobs?
//
int jc = context.app.getJobQueue().getJobCountGE(jtCLIENT);
if (jc > Tuning::maxJobQueueClients)
// Count all jobs at jtCLIENT priority or higher.
int const jobCount = context.app.getJobQueue().getJobCountGE(jtCLIENT);
if (jobCount > Tuning::maxJobQueueClients)
{
JLOG(context.j.debug()) << "Too busy for command: " << jc;
JLOG(context.j.debug()) << "Too busy for command: " << jobCount;
return rpcTOO_BUSY;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/rpc/impl/ServerHandlerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ ServerHandlerImp::onRequest(Session& session)

std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(
jtCLIENT,
jtCLIENT_RPC,
"RPC-Client",
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
Expand Down Expand Up @@ -339,7 +339,7 @@ ServerHandlerImp::onWSMessage(
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";

auto const postResult = m_jobQueue.postCoro(
jtCLIENT,
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](
std::shared_ptr<JobQueue::Coro> const& coro) {
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/rpc/impl/ShardArchiveHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ ShardArchiveHandler::next(std::lock_guard<std::mutex> const& l)
return onClosureFailed(
"failed to wrap closure for starting download", l);

app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper);
app_.getJobQueue().addJob(jtCLIENT_SHARD, "ShardArchiveHandler", *wrapper);

return true;
}
Expand Down Expand Up @@ -465,7 +465,7 @@ ShardArchiveHandler::complete(path dstPath)
}

// Process in another thread to not hold up the IO service
app_.getJobQueue().addJob(jtCLIENT, "ShardArchiveHandler", *wrapper);
app_.getJobQueue().addJob(jtCLIENT_SHARD, "ShardArchiveHandler", *wrapper);
}

void
Expand Down