Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Search Tasks Response Updates #7673

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https:/opensearch-project/OpenSearch/pull/7836))
- Add min, max, average and thread info to resource stats in tasks API ([#7673](https:/opensearch-project/OpenSearch/pull/7673))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskThreadUsage;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -138,12 +139,12 @@ private static RawTaskStatus randomRawTaskStatus() {
}

private static TaskResourceStats randomResourceStats() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<>() {
{
for (int i = 0; i < randomInt(5); i++) {
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
}
}
});
}, new TaskThreadUsage(randomInt(10), randomInt(10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
- do:
tasks.list:
group_by: parents
detailed: true
- set:
tasks._arbitrary_key_: task_id

- is_true: tasks
- is_true: tasks.$task_id.resource_stats
- is_true: tasks.$task_id.resource_stats.total

---
"tasks_list headers":
Expand All @@ -32,3 +37,21 @@

- is_true: tasks
- match: { tasks.0.headers.X-Opaque-Id: "That is me" }

---
"tasks_list detailed":
- skip:
version: " - 2.99.99"
reason: thread_info was introduced in 3.0.0

- do:
tasks.list:
group_by: parents
detailed: true
- set:
tasks._arbitrary_key_: task_id

- is_true: tasks
- is_true: tasks.$task_id.resource_stats
- is_true: tasks.$task_id.resource_stats.thread_info
- is_true: tasks.$task_id.resource_stats.total
104 changes: 103 additions & 1 deletion server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class Task {

private static final String TOTAL = "total";

private static final String AVERAGE = "average";

private static final String MIN = "min";

private static final String MAX = "max";

public static final String THREAD_INFO = "thread_info";

private final long id;

private final String type;
Expand Down Expand Up @@ -175,8 +183,11 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS
resourceStats = new TaskResourceStats(new HashMap<>() {
{
put(TOTAL, getTotalResourceStats());
put(AVERAGE, getAverageResourceStats());
put(MIN, getMinResourceStats());
put(MAX, getMaxResourceStats());
}
});
}, getThreadUsage());
}
return taskInfo(localNodeId, description, status, resourceStats);
}
Expand Down Expand Up @@ -289,6 +300,27 @@ public TaskResourceUsage getTotalResourceStats() {
return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns current average per-execution resource usage of the task.
*/
public TaskResourceUsage getAverageResourceStats() {
return new TaskResourceUsage(getAverageResourceUtilization(ResourceStats.CPU), getAverageResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns current min per-execution resource usage of the task.
*/
public TaskResourceUsage getMinResourceStats() {
return new TaskResourceUsage(getMinResourceUtilization(ResourceStats.CPU), getMinResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns current max per-execution resource usage of the task.
*/
public TaskResourceUsage getMaxResourceStats() {
return new TaskResourceUsage(getMaxResourceUtilization(ResourceStats.CPU), getMaxResourceUtilization(ResourceStats.MEMORY));
}

/**
* Returns total resource consumption for a specific task stat.
*/
Expand All @@ -305,6 +337,76 @@ public long getTotalResourceUtilization(ResourceStats stats) {
return totalResourceConsumption;
}

/**
* Returns average per-execution resource consumption for a specific task stat.
*/
private long getAverageResourceUtilization(ResourceStats stats) {
long totalResourceConsumption = 0L;
int threadResourceInfoCount = 0;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
totalResourceConsumption += statsInfo.getTotalValue();
threadResourceInfoCount++;
}
}
}
return (threadResourceInfoCount > 0) ? totalResourceConsumption / threadResourceInfoCount : 0;
}

/**
* Returns minimum per-execution resource consumption for a specific task stat.
*/
private long getMinResourceUtilization(ResourceStats stats) {
if (resourceStats.size() == 0) {
return 0L;
}
long minResourceConsumption = Long.MAX_VALUE;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
minResourceConsumption = Math.min(minResourceConsumption, statsInfo.getTotalValue());
}
}
}
return minResourceConsumption;
}

/**
* Returns maximum per-execution resource consumption for a specific task stat.
*/
private long getMaxResourceUtilization(ResourceStats stats) {
long maxResourceConsumption = 0L;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats);
if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) {
maxResourceConsumption = Math.max(maxResourceConsumption, statsInfo.getTotalValue());
}
}
}
return maxResourceConsumption;
}

/**
* Returns the total and active number of thread executions for the task.
*/
public TaskThreadUsage getThreadUsage() {
int numThreadExecutions = 0;
int activeThreads = 0;
for (List<ThreadResourceInfo> threadResourceInfosList : resourceStats.values()) {
numThreadExecutions += threadResourceInfosList.size();
for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) {
if (threadResourceInfo.isActive()) {
activeThreads++;
}
}
}
return new TaskThreadUsage(numThreadExecutions, activeThreads);
}

/**
* Adds thread's starting resource consumption information
* @param threadId ID of the thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.tasks;

import org.opensearch.Version;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -22,6 +23,8 @@
import java.util.Map;
import java.util.Objects;

import static org.opensearch.tasks.Task.THREAD_INFO;

/**
* Resource information about a currently running task.
* <p>
Expand All @@ -32,27 +35,42 @@
*/
public class TaskResourceStats implements Writeable, ToXContentFragment {
private final Map<String, TaskResourceUsage> resourceUsage;
private final TaskThreadUsage threadUsage;

public TaskResourceStats(Map<String, TaskResourceUsage> resourceUsage) {
public TaskResourceStats(Map<String, TaskResourceUsage> resourceUsage, TaskThreadUsage threadUsage) {
this.resourceUsage = Objects.requireNonNull(resourceUsage, "resource usage is required");
this.threadUsage = Objects.requireNonNull(threadUsage, "thread usage is required");
}

/**
* Read from a stream.
*/
public TaskResourceStats(StreamInput in) throws IOException {
resourceUsage = in.readMap(StreamInput::readString, TaskResourceUsage::readFromStream);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
threadUsage = TaskThreadUsage.readFromStream(in);
} else {
// Initialize TaskThreadUsage in case it is not found in mixed cluster case
threadUsage = new TaskThreadUsage(0, 0);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(resourceUsage, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
jed326 marked this conversation as resolved.
Show resolved Hide resolved
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
threadUsage.writeTo(out);
}
}

public Map<String, TaskResourceUsage> getResourceUsageInfo() {
return resourceUsage;
}

public TaskThreadUsage getThreadUsage() {
return threadUsage;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (Map.Entry<String, TaskResourceUsage> resourceUsageEntry : resourceUsage.entrySet()) {
Expand All @@ -62,6 +80,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
builder.startObject(THREAD_INFO);
threadUsage.toXContent(builder, params);
builder.endObject();
return builder;
}

Expand All @@ -74,17 +95,24 @@ public static TaskResourceStats fromXContent(XContentParser parser) throws IOExc
token = parser.nextToken();
}
final Map<String, TaskResourceUsage> resourceStats = new HashMap<>();
// Initialize TaskThreadUsage in case it is not found in mixed cluster case
TaskThreadUsage threadUsage = new TaskThreadUsage(0, 0);
if (token == XContentParser.Token.FIELD_NAME) {
assert parser.currentToken() == XContentParser.Token.FIELD_NAME : "Expected field name but saw [" + parser.currentToken() + "]";
do {
// Must point to field name
String fieldName = parser.currentName();
// And then the value
TaskResourceUsage value = TaskResourceUsage.fromXContent(parser);
resourceStats.put(fieldName, value);

if (fieldName.equals(THREAD_INFO)) {
threadUsage = TaskThreadUsage.fromXContent(parser);
} else {
TaskResourceUsage value = TaskResourceUsage.fromXContent(parser);
resourceStats.put(fieldName, value);
}
} while (parser.nextToken() == XContentParser.Token.FIELD_NAME);
}
return new TaskResourceStats(resourceStats);
return new TaskResourceStats(resourceStats, threadUsage);
}

@Override
Expand Down
Loading