From d0397110d9ad109f26007a5e4b749cf5fee99fb0 Mon Sep 17 00:00:00 2001 From: Finn <67562851+finnroblin@users.noreply.github.com> Date: Fri, 30 Aug 2024 14:22:10 -0700 Subject: [PATCH 1/3] Add approximate filters (#607) Signed-off-by: Finn Roblin --- osbenchmark/utils/dataset.py | 5 + osbenchmark/utils/parse.py | 16 +- osbenchmark/worker_coordinator/runner.py | 26 +- osbenchmark/workload/params.py | 192 +++++++----- tests/utils/dataset_helper.py | 49 +++ tests/worker_coordinator/runner_test.py | 120 +++++--- tests/workload/params_test.py | 370 ++++++++++++++++++----- 7 files changed, 591 insertions(+), 187 deletions(-) diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py index 7e773d586..0e990609a 100644 --- a/osbenchmark/utils/dataset.py +++ b/osbenchmark/utils/dataset.py @@ -27,6 +27,7 @@ class Context(Enum): MAX_DISTANCE_NEIGHBORS = 4 MIN_SCORE_NEIGHBORS = 5 PARENTS = 6 + ATTRIBUTES = 7 class DataSet(ABC): @@ -133,6 +134,7 @@ def size(self): def reset(self): self.current = self.BEGINNING + # pylint: disable=R0911 @staticmethod def parse_context(context: Context) -> str: if context == Context.NEIGHBORS: @@ -152,6 +154,9 @@ def parse_context(context: Context) -> str: if context == Context.MIN_SCORE_NEIGHBORS: return "min_score_neighbors" + if context == Context.ATTRIBUTES: + return "attributes" + raise Exception("Unsupported context") diff --git a/osbenchmark/utils/parse.py b/osbenchmark/utils/parse.py index f7ca381fe..213965ce5 100644 --- a/osbenchmark/utils/parse.py +++ b/osbenchmark/utils/parse.py @@ -22,7 +22,7 @@ def parse_string_parameter(key: str, params: dict, default: str = None) -> str: def parse_int_parameter(key: str, params: dict, default: int = None) -> int: if key not in params: - if default: + if default is not None: return default raise ConfigurationError( "Value cannot be None for param {}".format(key) @@ -46,3 +46,17 @@ def parse_float_parameter(key: str, params: dict, default: float = None) -> floa return params[key] raise ConfigurationError("Value must be a float for param {}".format(key)) + + +def parse_bool_parameter(key: str, params: dict, default: bool = None) -> bool: + if key not in params: + if default is not None: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if isinstance(params[key], bool): + return params[key] + + raise ConfigurationError("Value must be a bool for param {}".format(key)) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 21ed83b6a..7d8d69de7 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -1254,6 +1254,17 @@ def _get_field_value(content, field_name): return _get_field_value(content["_source"], field_name) return None + def binary_search_for_last_negative_1(neighbors): + low = 0 + high = len(neighbors) + while low < high: + mid = (low + high) // 2 + if neighbors[mid] == "-1": + high = mid + else: + low = mid + 1 + return low - 1 + def calculate_topk_search_recall(predictions, neighbors, top_k): """ Calculates the recall by comparing top_k neighbors with predictions. @@ -1270,7 +1281,20 @@ def calculate_topk_search_recall(predictions, neighbors, top_k): self.logger.info("No neighbors are provided for recall calculation") return 0.0 min_num_of_results = min(top_k, len(neighbors)) + last_neighbor_is_negative_1 = int(neighbors[min_num_of_results-1]) == -1 truth_set = neighbors[:min_num_of_results] + if last_neighbor_is_negative_1: + self.logger.debug("Last neighbor is -1") + last_neighbor_idx = binary_search_for_last_negative_1(truth_set) + + # Note: we do - 1 since list indexing is inclusive, and we want to ignore the first '-1' in neighbors. + truth_set = truth_set[:last_neighbor_idx-1] + if not truth_set: + self.logger.info("No true neighbors after filtering, returning recall = 1.\n" + "Total neighbors in prediction: [%d].", len(predictions)) + return 1.0 + + for j in range(min_num_of_results): if j >= len(predictions): self.logger.info("No more neighbors in prediction to compare against ground truth.\n" @@ -1280,7 +1304,7 @@ def calculate_topk_search_recall(predictions, neighbors, top_k): if predictions[j] in truth_set: correct += 1.0 - return correct / min_num_of_results + return correct / len(truth_set) def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=False): """ diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 270fb6178..92cc61c98 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -40,7 +40,7 @@ from osbenchmark import exceptions from osbenchmark.utils import io from osbenchmark.utils.dataset import DataSet, get_data_set, Context -from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter, parse_float_parameter +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter from osbenchmark.workload import workload __PARAM_SOURCES_BY_OP = {} @@ -1041,12 +1041,12 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): request-params: query parameters that can be passed to search request """ PARAMS_NAME_K = "k" - PARAMS_NAME_MAX_DISTANCE = "max_distance" - PARAMS_NAME_MIN_SCORE = "min_score" PARAMS_NAME_BODY = "body" PARAMS_NAME_SIZE = "size" PARAMS_NAME_QUERY = "query" PARAMS_NAME_FILTER = "filter" + PARAMS_NAME_FILTER_TYPE = "filter_type" + PARAMS_NAME_FILTER_BODY = "filter_body" PARAMS_NAME_REPETITIONS = "repetitions" PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format" PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path" @@ -1057,27 +1057,11 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): PARAMS_NAME_REQUEST_PARAMS = "request-params" PARAMS_NAME_SOURCE = "_source" PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results" - MIN_SCORE_QUERY_TYPE = "min_score" - MAX_DISTANCE_QUERY_TYPE = "max_distance" - KNN_QUERY_TYPE = "knn" - DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE = 10000 def __init__(self, workloads, params, query_params, **kwargs): super().__init__(workloads, params, Context.QUERY, **kwargs) self.logger = logging.getLogger(__name__) - self.k = None - self.distance = None - self.score = None - if self.PARAMS_NAME_K in params: - self.k = parse_int_parameter(self.PARAMS_NAME_K, params) - self.query_type = self.KNN_QUERY_TYPE - if self.PARAMS_NAME_MAX_DISTANCE in params: - self.distance = parse_float_parameter(self.PARAMS_NAME_MAX_DISTANCE, params) - self.query_type = self.MAX_DISTANCE_QUERY_TYPE - if self.PARAMS_NAME_MIN_SCORE in params: - self.score = parse_float_parameter(self.PARAMS_NAME_MIN_SCORE, params) - self.query_type = self.MIN_SCORE_QUERY_TYPE - self._validate_query_type_parameters() + self.k = parse_int_parameter(self.PARAMS_NAME_K, params) self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1) self.current_rep = 1 self.neighbors_data_set_format = parse_string_parameter( @@ -1090,24 +1074,28 @@ def __init__(self, workloads, params, query_params, **kwargs): self.PARAMS_VALUE_VECTOR_SEARCH) self.query_params = query_params self.query_params.update({ + self.PARAMS_NAME_K: self.k, self.PARAMS_NAME_OPERATION_TYPE: operation_type, self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME), }) - if self.PARAMS_NAME_K in params: - self.query_params.update({ - self.PARAMS_NAME_K: self.k - }) - if self.PARAMS_NAME_MAX_DISTANCE in params: + + self.filter_type = self.query_params.get(self.PARAMS_NAME_FILTER_TYPE) + self.filter_body = self.query_params.get(self.PARAMS_NAME_FILTER_BODY) + + + if self.PARAMS_NAME_FILTER in params: self.query_params.update({ - self.PARAMS_NAME_MAX_DISTANCE: self.distance + self.PARAMS_NAME_FILTER: params.get(self.PARAMS_NAME_FILTER) }) - if self.PARAMS_NAME_MIN_SCORE in params: + + if self.PARAMS_NAME_FILTER_TYPE in params: self.query_params.update({ - self.PARAMS_NAME_MIN_SCORE: self.score + self.PARAMS_NAME_FILTER_TYPE: params.get(self.PARAMS_NAME_FILTER_TYPE) }) - if self.PARAMS_NAME_FILTER in params: + + if self.PARAMS_NAME_FILTER_BODY in params: self.query_params.update({ - self.PARAMS_NAME_FILTER: params.get(self.PARAMS_NAME_FILTER) + self.PARAMS_NAME_FILTER_BODY: params.get(self.PARAMS_NAME_FILTER_BODY) }) # if neighbors data set is defined as corpus, extract corresponding corpus from workload # and add it to corpora list @@ -1115,10 +1103,6 @@ def __init__(self, workloads, params, query_params, **kwargs): neighbors_corpora = self.extract_corpora(self.neighbors_data_set_corpus, self.neighbors_data_set_format) self.corpora.extend(corpora for corpora in neighbors_corpora if corpora not in self.corpora) - def _validate_query_type_parameters(self): - if bool(self.k) + bool(self.distance) + bool(self.score) > 1: - raise ValueError("Only one of k, max_distance, or min_score can be specified in vector search.") - @staticmethod def _validate_neighbors_data_set(file_path, corpus): if file_path and corpus: @@ -1133,31 +1117,26 @@ def _update_request_params(self): self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false") self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params}) - def _get_query_neighbors(self): - if self.query_type == self.KNN_QUERY_TYPE: - return Context.NEIGHBORS - if self.query_type == self.MIN_SCORE_QUERY_TYPE: - return Context.MIN_SCORE_NEIGHBORS - if self.query_type == self.MAX_DISTANCE_QUERY_TYPE: - return Context.MAX_DISTANCE_NEIGHBORS - raise Exception("Unknown query type [%s]" % self.query_type) - - def _get_query_size(self): - if self.query_type == self.KNN_QUERY_TYPE: - return self.k - return self.DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE - def _update_body_params(self, vector): # accept body params if passed from workload, else, create empty dictionary body_params = self.query_params.get(self.PARAMS_NAME_BODY) or dict() if self.PARAMS_NAME_SIZE not in body_params: - body_params[self.PARAMS_NAME_SIZE] = self._get_query_size() + body_params[self.PARAMS_NAME_SIZE] = self.k if self.PARAMS_NAME_QUERY in body_params: self.logger.warning( "[%s] param from body will be replaced with vector search query.", self.PARAMS_NAME_QUERY) - efficient_filter = self.query_params.get(self.PARAMS_NAME_FILTER) + + self.logger.info("Here, we have query_params: %s ", self.query_params) + filter_type=self.query_params.get(self.PARAMS_NAME_FILTER_TYPE) + filter_body=self.query_params.get(self.PARAMS_NAME_FILTER_BODY) + efficient_filter = filter_body if filter_type == "efficient" else None + # override query params with vector search query - body_params[self.PARAMS_NAME_QUERY] = self._build_vector_search_query_body(vector, efficient_filter) + body_params[self.PARAMS_NAME_QUERY] = self._build_vector_search_query_body(vector, efficient_filter, filter_type, filter_body) + + if filter_type == "post_filter": + body_params["post_filter"] = filter_body + self.query_params.update({self.PARAMS_NAME_BODY: body_params}) def partition(self, partition_index, total_partitions): @@ -1171,7 +1150,7 @@ def partition(self, partition_index, total_partitions): self.neighbors_data_set_path = self.data_set_path # add neighbor instance to partition partition.neighbors_data_set = get_data_set( - self.neighbors_data_set_format, self.neighbors_data_set_path, self._get_query_neighbors()) + self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) partition.neighbors_data_set.seek(partition.offset) return partition @@ -1190,7 +1169,7 @@ def params(self): raise StopIteration vector = self.data_set.read(1)[0] neighbor = self.neighbors_data_set.read(1)[0] - true_neighbors = list(map(str, neighbor[:self.k] if self.k else neighbor)) + true_neighbors = list(map(str, neighbor[:self.k])) self.query_params.update({ "neighbors": true_neighbors, }) @@ -1200,35 +1179,18 @@ def params(self): self.percent_completed = self.current / self.total return self.query_params - def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict: - """Builds a vector search request that can be used to execute an approximate nearest + def _build_vector_search_query_body(self, vector, efficient_filter=None, filter_type=None, filter_body=None) -> dict: + """Builds a k-NN request that can be used to execute an approximate nearest neighbor search against a k-NN plugin index Args: vector: vector used for query - efficient_filter: efficient filter used for query Returns: A dictionary containing the body used for search query """ - query = {} - if self.query_type == self.KNN_QUERY_TYPE: - query.update({ - "k": self.k, - }) - elif self.query_type == self.MIN_SCORE_QUERY_TYPE: - query.update({ - "min_score": self.score, - }) - elif self.query_type == self.MAX_DISTANCE_QUERY_TYPE: - query.update({ - "max_distance": self.distance, - }) - else: - raise Exception("Unknown query type [%s]" % self.query_type) - - query.update({ + query = { "vector": vector, - }) - + "k": self.k, + } if efficient_filter: query.update({ "filter": efficient_filter, @@ -1241,7 +1203,7 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict } if self.is_nested: - outer_field_name, _ = self.get_split_fields() + outer_field_name, _inner_field_name = self.get_split_fields() return { "nested": { "path": outer_field_name, @@ -1249,8 +1211,36 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict } } + if filter_type and not efficient_filter and not filter_type == "post_filter": + return self._knn_query_with_filter(vector, knn_search_query, filter_type, filter_body) + return knn_search_query + def _knn_query_with_filter(self, vector, knn_query, filter_type, filter_body) -> dict: + if filter_type == "script": + return { + "script_score": { + "query": {"bool": {"filter": filter_body}}, + "script": { + "source": "knn_score", + "lang": "knn", + "params": { + "field": self.field_name, + "query_value": vector, + "space_type": "l2" + } + } + } + } + + if filter_type == "boolean": + return { + "bool": { + "filter": filter_body, + "must": [knn_query] + } + } + raise exceptions.ConfigurationError("Unsupported filter type: %s" % filter_type) class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource): """ Create bulk index requests from a data set of vectors. @@ -1272,6 +1262,7 @@ def __init__(self, workload, params, **kwargs): self.id_field_name: str = parse_string_parameter( self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME ) + self.filter_attributes: List[Any] = params.get("filter_attributes", []) self.action_buffer = None self.num_nested_vectors = 10 @@ -1303,8 +1294,43 @@ def partition(self, partition_index, total_partitions): ) partition.parent_data_set.seek(partition.offset) + if self.filter_attributes: + partition.attributes_data_set = get_data_set( + self.parent_data_set_format, self.parent_data_set_path, Context.ATTRIBUTES + ) + partition.attributes_data_set.seek(partition.offset) + return partition + def bulk_transform_add_attributes(self, partition: np.ndarray, action, attributes: np.ndarray) -> List[Dict[str, Any]]: + """attributes is a (partition_len x 3) matrix. """ + actions = [] + + _ = [ + actions.extend([action(self.id_field_name, i + self.current), None]) + for i in range(len(partition)) + ] + bulk_contents = [] + + add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME + for vec, attribute_list, identifier in zip( + partition.tolist(), attributes.tolist(), range(self.current, self.current + len(partition)) + ): + row = {self.field_name: vec} + for idx, attribute_name in zip(range(len(self.filter_attributes)), self.filter_attributes): + attribute = attribute_list[idx].decode() + if attribute != "None": + row.update({attribute_name : attribute}) + if add_id_field_to_body: + row.update({self.id_field_name: identifier}) + bulk_contents.append(row) + + actions[1::2] = bulk_contents + + self.logger.info("Actions: %s", actions) + return actions + + def bulk_transform_non_nested(self, partition: np.ndarray, action) -> List[Dict[str, Any]]: """ Create bulk ingest actions for data with a non-nested field. @@ -1333,7 +1359,7 @@ def bulk_transform_non_nested(self, partition: np.ndarray, action) -> List[Dict[ def bulk_transform( - self, partition: np.ndarray, action, parents_ids: Optional[np.ndarray] + self, partition: np.ndarray, action, parents_ids: Optional[np.ndarray], attributes: Optional[np.ndarray] ) -> List[Dict[str, Any]]: """Partitions and transforms a list of vectors into OpenSearch's bulk injection format. @@ -1345,9 +1371,12 @@ def bulk_transform( An array of transformed vectors in bulk format. """ - if not self.is_nested: + if not self.is_nested and not self.filter_attributes: return self.bulk_transform_non_nested(partition, action) + # TODO: Assumption: we won't add attributes if we're also doing a nested query. + if self.filter_attributes: + return self.bulk_transform_add_attributes(partition, action, attributes) actions = [] outer_field_name, inner_field_name = self.get_split_fields() @@ -1430,7 +1459,12 @@ def action(id_field_name, doc_id): else: parent_ids = None - body = self.bulk_transform(partition, action, parent_ids) + if self.filter_attributes: + attributes = self.attributes_data_set.read(bulk_size) + else: + attributes = None + + body = self.bulk_transform(partition, action, parent_ids, attributes) size = len(body) // 2 if not self.is_nested: diff --git a/tests/utils/dataset_helper.py b/tests/utils/dataset_helper.py index c13a27b39..2a565b7eb 100644 --- a/tests/utils/dataset_helper.py +++ b/tests/utils/dataset_helper.py @@ -193,6 +193,27 @@ def _build_data_set(self, context: DataSetBuildContext): # file with distance. context.vectors.tofile(f) + +def create_attributes(num_vectors: int) -> np.ndarray: + rng = np.random.default_rng() + + # Random strings and None + strings = ["str1", "str2", "str3"] + + # First column: random choice from strings + col1 = rng.choice(strings, num_vectors).astype("S10") + + # Second column: random choice from strings + col2 = rng.choice(strings, num_vectors).astype("S10") + + # Third column: random numbers between 0 and 100 + col3 = rng.integers(0, 101, num_vectors).astype("S10") + + # Combine columns into a single array + random_vector = np.column_stack((col1, col2, col3)) + + return random_vector + def create_parent_ids(num_vectors: int, group_size: int = 10) -> np.ndarray: num_ids = (num_vectors + group_size - 1) // group_size # Calculate total number of different IDs needed ids = np.arange(1, num_ids + 1) # Create an array of IDs starting from 1 @@ -245,6 +266,34 @@ def create_data_set( return data_set_path +def create_attributes_data_set( + num_vectors: int, + dimension: int, + extension: str, + data_set_context: Context, + data_set_dir, + file_path: str = None +) -> str: + if file_path: + data_set_path = file_path + else: + file_name_base = ''.join(random.choice(string.ascii_letters) for _ in + range(DEFAULT_RANDOM_STRING_LENGTH)) + data_set_file_name = "{}.{}".format(file_name_base, extension) + data_set_path = os.path.join(data_set_dir, data_set_file_name) + context = DataSetBuildContext( + data_set_context, + create_attributes(num_vectors), + data_set_path) + + if extension == HDF5DataSet.FORMAT_NAME: + HDF5Builder().add_data_set_build_context(context).build() + else: + BigANNVectorBuilder().add_data_set_build_context(context).build() + + return data_set_path + + def create_parent_data_set( num_vectors: int, dimension: int, diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index fbf2ecff4..a33953676 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -3125,7 +3125,7 @@ async def test_query_vector_search_with_custom_id_field_inside_source(self, open @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_radial_search_with_min_score(self, opensearch, on_client_request_start, on_client_request_end): + async def test_calculate_recall_with_negative_one_neighbors(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -3142,10 +3142,6 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl { "_id": 102, "_score": 0.88 - }, - { - "_id": 103, - "_score": 0.87 } ] } @@ -3159,8 +3155,8 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl "operation-type": "vector-search", "detailed-results": True, "response-compression-enabled": False, - "min_score": 0.80, - "neighbors": [101, 102, 103], + "k": 4, + "neighbors": [101, 102, -1, -1], "body": { "query": { "knn": { @@ -3169,27 +3165,19 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl 5, 4 ], - "min_score": 0.80, + "k": 3 } - } - } + }} } } async with query_runner: result = await query_runner(opensearch, params) - self.assertEqual(1, result["weight"]) - self.assertEqual("ops", result["unit"]) - self.assertEqual(3, result["hits"]) - self.assertEqual("eq", result["hits_relation"]) - self.assertFalse(result["timed_out"]) - self.assertEqual(5, result["took"]) + self.assertEqual(result["recall@k"], 1.0) self.assertIn("recall_time_ms", result.keys()) - self.assertIn("recall@min_score", result.keys()) - self.assertEqual(result["recall@min_score"], 1.0) - self.assertIn("recall@min_score_1", result.keys()) - self.assertEqual(result["recall@min_score_1"], 1.0) + self.assertIn("recall@1", result.keys()) + self.assertEqual(result["recall@1"], 1) self.assertNotIn("error-type", result.keys()) opensearch.transport.perform_request.assert_called_once_with( @@ -3204,7 +3192,7 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_radial_search_with_max_distance(self, opensearch, on_client_request_start, on_client_request_end): + async def test_calculate_recall_with_some_negative_one_neighbors(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -3221,10 +3209,74 @@ async def test_query_vector_radial_search_with_max_distance(self, opensearch, on { "_id": 102, "_score": 0.88 + } + ] + } + } + opensearch.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + + query_runner = runner.Query() + + params = { + "index": "unittest", + "operation-type": "vector-search", + "detailed-results": True, + "response-compression-enabled": False, + "k": 6, + "neighbors": [101, 102, 103, 104, -1, -1], + "body": { + "query": { + "knn": { + "location": { + "vector": [ + 5, + 4 + ], + "k": 3 + } + }} + } + } + + async with query_runner: + result = await query_runner(opensearch, params) + + self.assertEqual(result["recall@k"], 0.5) + self.assertIn("recall_time_ms", result.keys()) + self.assertIn("recall@1", result.keys()) + self.assertEqual(result["recall@1"], 1) + self.assertNotIn("error-type", result.keys()) + + opensearch.transport.perform_request.assert_called_once_with( + "GET", + "/unittest/_search", + params={}, + body=params["body"], + headers={"Accept-Encoding": "identity"} + ) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_calculate_recall_with_intermediate_negative_one_neighbors(self, opensearch, + on_client_request_start, on_client_request_end): + search_response = { + "timed_out": False, + "took": 5, + "hits": { + "total": { + "value": 3, + "relation": "eq" + }, + "hits": [ + { + "_id": 101, + "_score": 0.95 }, { - "_id": 103, - "_score": 0.87 + "_id": 102, + "_score": 0.88 } ] } @@ -3238,8 +3290,8 @@ async def test_query_vector_radial_search_with_max_distance(self, opensearch, on "operation-type": "vector-search", "detailed-results": True, "response-compression-enabled": False, - "max_distance": 15.0, - "neighbors": [101, 102, 103, 104], + "k": 4, + "neighbors": [101, 103,102, 104, -1], "body": { "query": { "knn": { @@ -3248,27 +3300,19 @@ async def test_query_vector_radial_search_with_max_distance(self, opensearch, on 5, 4 ], - "max_distance": 15.0, + "k": 3 } - } - } + }} } } async with query_runner: result = await query_runner(opensearch, params) - self.assertEqual(1, result["weight"]) - self.assertEqual("ops", result["unit"]) - self.assertEqual(3, result["hits"]) - self.assertEqual("eq", result["hits_relation"]) - self.assertFalse(result["timed_out"]) - self.assertEqual(5, result["took"]) + self.assertEqual(result["recall@k"], 0.5) self.assertIn("recall_time_ms", result.keys()) - self.assertIn("recall@max_distance", result.keys()) - self.assertEqual(result["recall@max_distance"], 0.75) - self.assertIn("recall@max_distance_1", result.keys()) - self.assertEqual(result["recall@max_distance_1"], 1.0) + self.assertIn("recall@1", result.keys()) + self.assertEqual(result["recall@1"], 1) self.assertNotIn("error-type", result.keys()) opensearch.transport.perform_request.assert_called_once_with( diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 17c904e3e..51921f5f4 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -37,7 +37,7 @@ from osbenchmark.workload import params, workload from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionParamSource, \ BulkVectorsFromDataSetParamSource -from tests.utils.dataset_helper import create_data_set, create_parent_data_set +from tests.utils.dataset_helper import create_data_set, create_attributes_data_set, create_parent_data_set from tests.utils.dataset_test import DEFAULT_NUM_VECTORS @@ -2900,7 +2900,7 @@ def test_params_default(self): with self.assertRaises(StopIteration): query_param_source_partition.params() - def test_params_custom_body(self): + def test_post_filter(self): # Create a data set k = 12 data_set_path = create_data_set( @@ -2915,134 +2915,188 @@ def test_params_custom_body(self): self.DEFAULT_DIMENSION, self.DEFAULT_TYPE, Context.NEIGHBORS, - self.data_set_dir + self.data_set_dir, ) - filter_body = { - "key": "value" - } # Create a QueryVectorsFromDataSetParamSource with relevant params + + POST_FILTER_BODY = {"range": {"price": {"gte": 5, "lte": 10}}} test_param_source_params = { "field": self.DEFAULT_FIELD_NAME, "data_set_format": self.DEFAULT_TYPE, "data_set_path": data_set_path, "neighbors_data_set_path": neighbors_data_set_path, "k": k, - "filter": filter_body, + "filter_type": "post_filter", + "filter_body": POST_FILTER_BODY, } query_param_source = VectorSearchPartitionParamSource( workload.Workload(name="unit-test"), - test_param_source_params, { + test_param_source_params, + { "index": self.DEFAULT_INDEX_NAME, "request-params": {}, "body": { "size": 100, - } - } + }, + }, ) query_param_source_partition = query_param_source.partition(0, 1) # Check each for _ in range(DEFAULT_NUM_VECTORS): + params = query_param_source_partition.params() self._check_params( - query_param_source_partition.params(), + params, self.DEFAULT_FIELD_NAME, self.DEFAULT_DIMENSION, k, 100, - filter_body, ) + post_filter = params.get("body").get("post_filter") + self.assertIsInstance(post_filter, dict) + self.assertEqual(post_filter, POST_FILTER_BODY) # Assert last call creates stop iteration with self.assertRaises(StopIteration): query_param_source_partition.params() - def test_params_when_multiple_query_type_provided_then_raise_exception(self): + def test_bool_filter(self): # Create a data set + k = 12 data_set_path = create_data_set( self.DEFAULT_NUM_VECTORS, self.DEFAULT_DIMENSION, self.DEFAULT_TYPE, Context.QUERY, - self.data_set_dir + self.data_set_dir, ) neighbors_data_set_path = create_data_set( self.DEFAULT_NUM_VECTORS, self.DEFAULT_DIMENSION, self.DEFAULT_TYPE, Context.NEIGHBORS, - self.data_set_dir + self.data_set_dir, ) + # Create a QueryVectorsFromDataSetParamSource with relevant params - test_param_source_params_1 = { + BOOL_FILTER_BODY = { + "bool": { + "must": [ + {"range": {"rating": {"gte": 8, "lte": 10}}}, + {"term": {"parking": "true"}}, + ] + } + } + test_param_source_params = { "field": self.DEFAULT_FIELD_NAME, "data_set_format": self.DEFAULT_TYPE, "data_set_path": data_set_path, "neighbors_data_set_path": neighbors_data_set_path, - "k": 10, - "min_score": 0.5, + "k": k, + "filter_type": "boolean", + "filter_body": BOOL_FILTER_BODY, } + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params, + { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + "body": { + "size": 100, + }, + }, + ) + query_param_source_partition = query_param_source.partition(0, 1) - with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): - query_param_source = VectorSearchPartitionParamSource( - workload.Workload(name="unit-test"), - test_param_source_params_1, { - "index": self.DEFAULT_INDEX_NAME, - "request-params": {}, - "body": { - "size": 100, - } - } + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + params = query_param_source_partition.params() + self._check_params_bool( + params, + self.DEFAULT_FIELD_NAME, + self.DEFAULT_DIMENSION, + k, + 100, + BOOL_FILTER_BODY, ) - # This line won't be executed if exception is raised during initialization - query_param_source.partition(0, 1) + # post_filter = params.get("body").get("post_filter") + # self.assertIsInstance(post_filter, dict) + # self.assertEqual(post_filter, BOOL_FILTER_BODY) - test_param_source_params_2 = { - "field": self.DEFAULT_FIELD_NAME, - "data_set_format": self.DEFAULT_TYPE, - "data_set_path": data_set_path, - "neighbors_data_set_path": neighbors_data_set_path, - "k": 10, - "max_distance": 100.0, - } + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source_partition.params() - with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): - query_param_source = VectorSearchPartitionParamSource( - workload.Workload(name="unit-test"), - test_param_source_params_2, { - "index": self.DEFAULT_INDEX_NAME, - "request-params": {}, - "body": { - "size": 100, - } - } - ) - # This line won't be executed if exception is raised during initialization - query_param_source.partition(0, 1) + def test_script_score_filter(self): + # Create a data set + k = 12 + data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.QUERY, + self.data_set_dir, + ) + neighbors_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.NEIGHBORS, + self.data_set_dir, + ) - test_param_source_params_3 = { + # Create a QueryVectorsFromDataSetParamSource with relevant params + + SCRIPT_SCORE_FILTER_BODY = { + "bool": { + "must": [ + {"range": {"rating": {"gte": 8, "lte": 10}}}, + {"term": {"parking": "true"}}, + ] + } + } + test_param_source_params = { "field": self.DEFAULT_FIELD_NAME, "data_set_format": self.DEFAULT_TYPE, "data_set_path": data_set_path, "neighbors_data_set_path": neighbors_data_set_path, - "min_score": 0.5, - "max_distance": 100.0, - "k": 10, + "k": k, + "filter_type": "script", + "filter_body": SCRIPT_SCORE_FILTER_BODY, } + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params, + { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + "body": { + "size": 100, + }, + }, + ) + query_param_source_partition = query_param_source.partition(0, 1) - with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): - query_param_source = VectorSearchPartitionParamSource( - workload.Workload(name="unit-test"), - test_param_source_params_3, { - "index": self.DEFAULT_INDEX_NAME, - "request-params": {}, - "body": { - "size": 100, - } - } + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + params = query_param_source_partition.params() + self._check_params_script_score( + params, + self.DEFAULT_FIELD_NAME, + self.DEFAULT_DIMENSION, + k, + 100, + SCRIPT_SCORE_FILTER_BODY, ) - # This line won't be executed if exception is raised during initialization - query_param_source.partition(0, 1) + # post_filter = params.get("body").get("post_filter") + # self.assertIsInstance(post_filter, dict) + # self.assertEqual(post_filter, BOOL_FILTER_BODY) + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source_partition.params() def _check_params( self, @@ -3073,6 +3127,78 @@ def _check_params( self.assertEqual(size, expected_size if expected_size else expected_k) self.assertEqual(field.get("filter"), expected_filter) + def _check_params_bool( + self, + actual_params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int, + expected_size=None, + expected_bool_query=None, + check_vectors=True, + ): + body = actual_params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + query_bool = query.get("bool") + self.assertIsInstance(query_bool, dict) + filter = query_bool.get("filter") + self.assertIsInstance(filter, dict) + self.assertEqual(filter, expected_bool_query) + + must_clause = query_bool.get("must") + self.assertIsInstance(must_clause, list) + + if check_vectors: + knn_dict = must_clause[0] + + repacked = {"body": {"query": knn_dict, "size": body.get("size") }, + "neighbors": actual_params.get("neighbors") + } + + self._check_params(repacked, expected_field, expected_dimension, expected_k,expected_size) + + def _check_params_script_score( + self, + actual_params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int, + expected_size=None, + expected_script_query=None + ): + body = actual_params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + script_score_query = query.get("script_score") + self.assertIsInstance(script_score_query, dict) + bool_from_script_score = script_score_query.get("query").get("bool").get("filter") + + self.assertEqual(bool_from_script_score, expected_script_query) + + script = script_score_query.get("script") + self.assertIsInstance(script, dict) + + source = script.get("source") + self.assertEqual(source, "knn_score") + + lang = script.get("lang") + self.assertEqual(lang, "knn") + + params = script.get("params") + self.assertIsInstance(params, dict) + + field = params.get("field") + self.assertEqual(field, expected_field) + + vector = params.get("query_value") + self.assertIsInstance(vector, np.ndarray) + self.assertEqual(len(list(vector)), expected_dimension) + + space_type = params.get("space_type") + self.assertEqual(space_type, "l2") # TODO change this once it's all modifiable. class BulkVectorsFromDataSetParamSourceTestCase(TestCase): @@ -3208,6 +3334,114 @@ def _check_params( self.assertTrue(expected_id_field in req_body) +class BulkVectorsAttributeCase(TestCase): + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_VECTOR_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + DEFAULT_ID_FIELD_NAME = "_id" + ATTRIBUTES_LIST = ['taste', 'color', 'age'] + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_params_efficient_filter( + self + ): + num_vectors = 49 + bulk_size = 10 + data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.INDEX, + self.data_set_dir + ) + parent_data_set_path = create_attributes_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.ATTRIBUTES, + self.data_set_dir, + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "bulk_size": bulk_size, + "id-field-name": self.DEFAULT_ID_FIELD_NAME, + "filter_attributes": self.ATTRIBUTES_LIST + } + bulk_param_source = BulkVectorsFromDataSetParamSource( + workload.Workload(name="unit-test"), test_param_source_params + ) + bulk_param_source.parent_data_set_path = parent_data_set_path + bulk_param_source_partition = bulk_param_source.partition(0, 1) + # Check each payload returned + vectors_consumed = 0 + while vectors_consumed < num_vectors: + expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size) + actual_params = bulk_param_source_partition.params() + self._check_params_attributes( + actual_params, + self.DEFAULT_INDEX_NAME, + self.DEFAULT_VECTOR_FIELD_NAME, + self.DEFAULT_DIMENSION, + expected_num_vectors, + self.DEFAULT_ID_FIELD_NAME, + ) + vectors_consumed += expected_num_vectors + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + bulk_param_source_partition.params() + + def _check_params_attributes( + self, + actual_params: dict, + expected_index: str, + expected_vector_field: str, + expected_dimension: int, + expected_num_vectors_in_payload: int, + expected_id_field: str, + ): + size = actual_params.get("size") + self.assertEqual(size, expected_num_vectors_in_payload) + body = actual_params.get("body") + self.assertIsInstance(body, list) + self.assertEqual(len(body) // 2, expected_num_vectors_in_payload) + + # Bulk payload has 2 parts: first one is the header and the second one + # is the body. The header will have the index name and the body will + # have the vector + for header, req_body in zip(*[iter(body)] * 2): + index = header.get("index") + self.assertIsInstance(index, dict) + + index_name = index.get("_index") + self.assertEqual(index_name, expected_index) + + vector = req_body.get(expected_vector_field) + self.assertIsInstance(vector, list) + self.assertEqual(len(vector), expected_dimension) + + for attribute in self.ATTRIBUTES_LIST: + self.assertTrue(attribute in req_body) + if expected_id_field in index: + self.assertEqual(self.DEFAULT_ID_FIELD_NAME, expected_id_field) + self.assertFalse(expected_id_field in req_body) + continue + self.assertTrue(expected_id_field in req_body) + + class VectorsNestedCase(TestCase): DEFAULT_INDEX_NAME = "test-partition-index" DEFAULT_VECTOR_FIELD_NAME = "nested.test-vector-field" From 6f0ae23dda1b606c18a0864a409dcfa7afbd3106 Mon Sep 17 00:00:00 2001 From: Govind Kamat Date: Wed, 4 Sep 2024 13:30:29 -0700 Subject: [PATCH 2/3] Refactored the OSB build system (#632) Signed-off-by: Govind Kamat --- .ci/build.sh | 116 ++------------ .../{docker-test.yml => docker-build.yml} | 16 +- .../{manual-integ.yml => integ-test.yml} | 40 ++--- .github/workflows/{main.yml => unit-test.yml} | 7 +- DEVELOPER_GUIDE.md | 11 +- Makefile | 145 +++++++----------- docker/Dockerfile | 44 ++++-- setup.py | 2 +- 8 files changed, 118 insertions(+), 263 deletions(-) rename .github/workflows/{docker-test.yml => docker-build.yml} (60%) rename .github/workflows/{manual-integ.yml => integ-test.yml} (56%) rename .github/workflows/{main.yml => unit-test.yml} (87%) diff --git a/.ci/build.sh b/.ci/build.sh index 28f79a7ab..e3c4d5e22 100644 --- a/.ci/build.sh +++ b/.ci/build.sh @@ -1,128 +1,40 @@ #!/usr/bin/env bash -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# fail this script immediately if any command fails with a non-zero exit code -set -e -# fail on pipeline errors, e.g. when grepping -set -o pipefail -# fail on any unset environment variables -set -u - -function update_pyenv { - # need to have the latest pyenv version to ensure latest patch releases are installable - cd $HOME/.pyenv/plugins/python-build/../.. && git pull origin master --rebase && cd - +pyenv_init() { + PATH=$HOME/.pyenv/shims:$PATH:$HOME/.pyenv/bin } -function build { +function setup { export THESPLOG_FILE="${THESPLOG_FILE:-${BENCHMARK_HOME}/.benchmark/logs/actor-system-internal.log}" # this value is in bytes, the default is 50kB. We increase it to 200kiB. export THESPLOG_FILE_MAXSIZE=${THESPLOG_FILE_MAXSIZE:-204800} # adjust the default log level from WARNING export THESPLOG_THRESHOLD="INFO" - # turn nounset off because some of the following commands fail if nounset is turned on - set +u - - export PATH="$HOME/.pyenv/bin:$PATH" + pyenv_init export TERM=dumb export LC_ALL=en_US.UTF-8 - update_pyenv - eval "$(pyenv init -)" - # ensure pyenv shims are added to PATH, see https://github.com/pyenv/pyenv/issues/1906 - eval "$(pyenv init --path)" - eval "$(pyenv virtualenv-init -)" +} + +function build { + setup - make prereq - make install - make precommit + set -e + make install-devel + make lint make test } function build_it { - export THESPLOG_FILE="${THESPLOG_FILE:-${BENCHMARK_HOME}/.benchmark/logs/actor-system-internal.log}" - # this value is in bytes, the default is 50kB. We increase it to 200kiB. - export THESPLOG_FILE_MAXSIZE=${THESPLOG_FILE_MAXSIZE:-204800} - # adjust the default log level from WARNING - export THESPLOG_THRESHOLD="INFO" + setup - # turn nounset off because some of the following commands fail if nounset is turned on - set +u - - export PATH="$HOME/.pyenv/bin:$PATH" - export TERM=dumb - export LC_ALL=en_US.UTF-8 export BENCHMARK_HOME="$GITHUB_WORKSPACE" - update_pyenv - eval "$(pyenv init -)" - # ensure pyenv shims are added to PATH, see https://github.com/pyenv/pyenv/issues/1906 - eval "$(pyenv init --path)" - eval "$(pyenv virtualenv-init -)" - - python3_version=`python3 --version` - echo "Python3 version is ... $python3_version" - - python3 -m pip install opensearch-benchmark docker pull ubuntu/squid:latest - make prereq - make install - make precommit - - # make it38, it39, etc. + # make it38, it39, etc. so they run as concurrent GHA jobs make "it${1//./}" } -function license-scan { - # turn nounset off because some of the following commands fail if nounset is turned on - set +u - - export PATH="$HOME/.pyenv/bin:$PATH" - eval "$(pyenv init -)" - # ensure pyenv shims are added to PATH, see https://github.com/pyenv/pyenv/issues/1906 - eval "$(pyenv init --path)" - eval "$(pyenv virtualenv-init -)" - - make prereq - # only install depdencies that are needed by end users - make install-user - fossa analyze -} - -function archive { - # Treat unset env variables as an error, but only in this function as there are other functions that allow unset variables - set -u - - # this will only be done if the build number variable is present - BENCHMARK_DIR=${BENCHMARK_HOME}/.benchmark - if [[ -d ${BENCHMARK_DIR} ]]; then - find ${BENCHMARK_DIR} -name "*.log" -printf "%P\\0" | tar -cvjf ${BENCHMARK_DIR}/${BUILD_NUMBER}.tar.bz2 -C ${BENCHMARK_DIR} --transform "s,^,ci-${BUILD_NUMBER}/," --null -T - - else - echo "Benchmark directory [${BENCHMARK_DIR}] not present. Ensure the BENCHMARK_DIR environment variable is correct" - exit 1 - fi -} +$@ -if declare -F "$1" > /dev/null; then - "$@" - exit -else - echo "Please specify a function to run" - exit 1 -fi diff --git a/.github/workflows/docker-test.yml b/.github/workflows/docker-build.yml similarity index 60% rename from .github/workflows/docker-test.yml rename to .github/workflows/docker-build.yml index 039733521..da87a35d3 100644 --- a/.github/workflows/docker-test.yml +++ b/.github/workflows/docker-build.yml @@ -1,4 +1,4 @@ -name: Docker Build and Test +name: Docker Build on: pull_request: workflow_dispatch: @@ -28,14 +28,12 @@ jobs: with: version: 'v0.9.1' - uses: actions/checkout@v4 - with: - path: 'opensearch-benchmark-git' + - name: Docker Build ${{ matrix.platform }} run: | docker buildx version - cp -a opensearch-benchmark-git/* ./ - echo "Disable VERSION arg to enter docker build test mode" - PLATFORM=${{ matrix.platform }} - PLATFORM=`echo $PLATFORM | tr '/' '-'` - docker buildx build --platform ${{ matrix.platform }} --build-arg BUILD_ENV=testing --build-arg BUILD_DATE=`date -u +%Y-%m-%dT%H:%M:%SZ` -f "docker/Dockerfile" -t "osb/osb-$PLATFORM" -o type=docker . - docker images | grep "osb/osb-$PLATFORM" + tag=osb/osb-`echo ${{ matrix.platform }} | tr '/' '-'` + set -x + docker buildx build --platform ${{ matrix.platform }} --build-arg VERSION=`cat version.txt` --build-arg BUILD_DATE=`date -u +%Y-%m-%dT%H:%M:%SZ` -f docker/Dockerfile -t "$tag" -o type=docker . + set +x + docker images | grep "$tag" diff --git a/.github/workflows/manual-integ.yml b/.github/workflows/integ-test.yml similarity index 56% rename from .github/workflows/manual-integ.yml rename to .github/workflows/integ-test.yml index 66906fae1..a8c3cf444 100644 --- a/.github/workflows/manual-integ.yml +++ b/.github/workflows/integ-test.yml @@ -1,4 +1,4 @@ -name: Integ Actions +name: Run Integration Tests on: [workflow_dispatch, pull_request] jobs: Integration-Tests: @@ -12,11 +12,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} + - uses: KengoTODA/actions-setup-docker-compose@v1 with: version: '1.29.2' + # - name: Enforce docker-compose v1 # run: | # echo "GitHub starts to switch runners to include docker-compose v2" @@ -27,43 +27,21 @@ jobs: # sudo pip install docker-compose==1.29.2 # docker --version # docker-compose --version + - name: Check out repository code uses: actions/checkout@v2 - - name: Clone pyenv + + - name: Install pyenv run: git clone https://github.com/pyenv/pyenv.git ~/.pyenv - - name: Clone pyenv-virtualenv - run: git clone https://github.com/pyenv/pyenv-virtualenv.git ~/.pyenv/plugins/pyenv-virtualenv - - name: Install JDK 8 - uses: actions/setup-java@v3 - with: - distribution: 'adopt' - java-version: '8' - - run: echo "JAVA8_HOME=$JAVA_HOME" >> $GITHUB_ENV - - name: Install JDK 11 - uses: actions/setup-java@v3 - with: - distribution: 'adopt' - java-version: '11' - - run: echo "JAVA11_HOME=$JAVA_HOME" >> $GITHUB_ENV - - name: Install JDK 15 - uses: actions/setup-java@v3 - with: - distribution: 'adopt' - java-version: '15' - - run: echo "JAVA15_HOME=$JAVA_HOME" >> $GITHUB_ENV - - name: Install JDK 16 - uses: actions/setup-java@v3 - with: - distribution: 'adopt' - java-version: '16' - - run: echo "JAVA16_HOME=$JAVA_HOME" >> $GITHUB_ENV + - name: Install JDK 17 uses: actions/setup-java@v3 with: distribution: 'adopt' java-version: '17' - run: echo "JAVA17_HOME=$JAVA_HOME" >> $GITHUB_ENV - - name: Run the CI build_it script + + - name: Run the CI build script run: bash .ci/build.sh build_it ${{ matrix.python-version }} env: BENCHMARK_HOME: env.GITHUB_WORKSPACE diff --git a/.github/workflows/main.yml b/.github/workflows/unit-test.yml similarity index 87% rename from .github/workflows/main.yml rename to .github/workflows/unit-test.yml index 42e872782..44393e995 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/unit-test.yml @@ -1,5 +1,5 @@ -name: CI Actions -on: [pull_request] +name: Run Unit Tests +on: [workflow_dispatch, pull_request] jobs: Unit-Tests: strategy: @@ -8,11 +8,14 @@ jobs: - ubuntu-latest - macos-latest runs-on: ${{ matrix.os }} + steps: - name: Check out repository code uses: actions/checkout@v2 + - name: Clone pyenv run: git clone https://github.com/pyenv/pyenv.git ~/.pyenv + - name: Run the CI build script run: bash .ci/build.sh build env: diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index bd7767d9d..b895bb6fe 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -23,7 +23,7 @@ This document will walk you through on what's needed to start contributing code ### Prerequisites - - **Pyenv** : Install `pyenv` and follow the instructions in the output of `pyenv init` to set up your shell and restart it before proceeding. + - **Pyenv**: Install `pyenv` and follow the instructions in the output of `pyenv init` to set up your shell and restart it before proceeding. For more details please refer to the [PyEnv installation instructions](https://github.com/pyenv/pyenv#installation). **Optional Step:** For Debian-based systems, install the following modules to continue with the next steps: @@ -54,15 +54,8 @@ For those working on WSL2, it is recommended to clone the repository and set up After you git cloned the forked copy of OpenSearch Benchmark, use the following command-line instructions to set up OpenSearch Benchmark for development: ``` cd opensearch-benchmark -make prereq -make install -``` - -NOTE: `make prereq` produces the following message. -``` -IMPORTANT: please add `eval "$(pyenv init -)"` to your bash profile and restart your terminal before proceeding any further. +make install-devel ``` -This line is commonly thought of as an error message but rather it's just a warning. Unless you haven't already added `eval "$(pyenv init -)"` to your bash profile and restarted your terminal, then feel free to proceed forward. This eval statement is necessary in the startup configuration as it allows Pyenv to manage python versions by adding python shims to your path. If you experience any issues, please see https://github.com/pyenv/pyenv. Depending on the platform and shell you have, use the following command to activate the virtual environment: diff --git a/Makefile b/Makefile index dcf66d8fa..55f4a0248 100644 --- a/Makefile +++ b/Makefile @@ -16,38 +16,20 @@ # under the License. SHELL = /bin/bash -# We assume an active virtualenv for development -PYENV_REGEX = .pyenv/shims -PY_BIN = python3 -PY_PREFIX = python -# https://github.com/pypa/pip/issues/5599 -PIP_WRAPPER = $(PY_BIN) -m pip -export PY38 = $(shell jq -r '.python_versions.PY38' .ci/variables.json) -export PY38_BIN = $(PY_PREFIX)$(shell cut -d '.' -f 1,2 <<< $(PY38)) -export PY39 = $(shell jq -r '.python_versions.PY39' .ci/variables.json) -export PY39_BIN = $(PY_PREFIX)$(shell cut -d '.' -f 1,2 <<< $(PY39)) -export PY310 = $(shell jq -r '.python_versions.PY310' .ci/variables.json) -export PY310_BIN = $(PY_PREFIX)$(shell cut -d '.' -f 1,2 <<< $(PY310)) -export PY311 = $(shell jq -r '.python_versions.PY311' .ci/variables.json) -export PY311_BIN = $(PY_PREFIX)$(shell cut -d '.' -f 1,2 <<< $(PY311)) -VENV_NAME ?= .venv -VENV_ACTIVATE_FILE = $(VENV_NAME)/bin/activate -VENV_ACTIVATE = . $(VENV_ACTIVATE_FILE) -VEPYTHON = $(VENV_NAME)/bin/$(PY_BIN) -VEPYLINT = $(VENV_NAME)/bin/pylint -PYENV_ERROR = "\033[0;31mIMPORTANT\033[0m: Please install pyenv.\n" -PYENV_PREREQ_HELP = "\033[0;31mIMPORTANT\033[0m: If you haven't already, please add \033[0;31meval \"\$$(pyenv init -)\"\033[0m to your bash profile and restart your terminal before proceeding any further.\n" -VE_MISSING_HELP = "\033[0;31mIMPORTANT\033[0m: Couldn't find $(PWD)/$(VENV_NAME); have you executed make venv-create?\033[0m\n" - -prereq: - pyenv install --skip-existing $(PY38) - pyenv install --skip-existing $(PY39) - pyenv install --skip-existing $(PY310) - pyenv install --skip-existing $(PY311) - pyenv local $(PY38) - @# Ensure all Python versions are registered for this project - @ jq -r '.python_versions | [.[] | tostring] | join("\n")' .ci/variables.json > .python-version - -@ printf $(PYENV_PREREQ_HELP) +PIP = pip3 +VERSIONS = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d') +VERSION38 = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d' | grep 3\.8) +PYENV_ERROR = "\033[0;31mIMPORTANT\033[0m: Please install pyenv and run \033[0;31meval \"\$$(pyenv init -)\"\033[0m.\n" + +pyinst: + @which pyenv > /dev/null 2>&1 || { printf $(PYENV_ERROR); exit 1; } + @for i in $(VERSIONS); do pyenv install --skip-existing $$i; done + pyenv local $(VERSIONS) + +pyinst38: + @which pyenv > /dev/null 2>&1 || { printf $(PYENV_ERROR); exit 1; } + pyenv install --skip-existing $(VERSION38) + pyenv local $(VERSION38) check-java: @if ! test "$(JAVA_HOME)" || ! java --version > /dev/null 2>&1 || ! javadoc --help > /dev/null 2>&1; then \ @@ -58,87 +40,64 @@ check-java: echo "NOTE: Java version 17 required to have all integration tests pass" >&2; \ fi -venv-create: - @if [[ ! -x $$(command -v pyenv) ]]; then \ - printf $(PYENV_ERROR); \ - exit 1; \ - fi; - @if [[ ! -f $(VENV_ACTIVATE_FILE) ]]; then \ - eval "$$(pyenv init -)" && eval "$$(pyenv init --path)" && $(PY38_BIN) -mvenv $(VENV_NAME); \ - eval "$$(pyenv init -)" && eval "$$(pyenv init --path)" && $(PY39_BIN) -mvenv $(VENV_NAME); \ - eval "$$(pyenv init -)" && eval "$$(pyenv init --path)" && $(PY310_BIN) -mvenv $(VENV_NAME); \ - eval "$$(pyenv init -)" && eval "$$(pyenv init --path)" && $(PY311_BIN) -mvenv $(VENV_NAME); \ - printf "Created python3 venv under $(PWD)/$(VENV_NAME).\n"; \ - fi; - -check-venv: - @if [[ ! -f $(VENV_ACTIVATE_FILE) ]]; then \ - printf $(VE_MISSING_HELP); \ - fi +install-deps: pyinst38 + # @if test `uname` = Darwin -o `python3 --version | sed 's/.* 3.\([0-9]*\).*/3\1/'` -lt 38; then make pyinst38; fi + $(PIP) install --upgrade pip setuptools wheel -install-user: venv-create - . $(VENV_ACTIVATE_FILE); $(PIP_WRAPPER) install --upgrade pip setuptools wheel - . $(VENV_ACTIVATE_FILE); PIP_ONLY_BINARY=h5py $(PIP_WRAPPER) install -e . +install-user: install-deps + PIP_ONLY_BINARY=h5py $(PIP) install -e . -install: install-user - # Also install development dependencies - . $(VENV_ACTIVATE_FILE); $(PIP_WRAPPER) install -e .[develop] +install-devel: install-deps + $(PIP) install -e .[develop] -clean: nondocs-clean docs-clean +wheel: + $(PIP) install --upgrade pip setuptools wheel + PIP_ONLY_BINARY=h5py $(PIP) wheel . -nondocs-clean: - rm -rf .benchmarks .eggs .tox .benchmark_it .cache build dist osbenchmark.egg-info logs junit-py*.xml NOTICE.txt +install: wheel + PIP_ONLY_BINARY=h5py $(PIP) install opensearch_benchmark-*.whl + rm -r *.whl *.egg-info -docs-clean: - cd docs && $(MAKE) clean +clean: + rm -rf .benchmarks .eggs .tox .benchmark_it .cache build dist *.egg-info logs junit-py*.xml *.whl NOTICE.txt # Avoid conflicts between .pyc/pycache related files created by local Python interpreters and other interpreters in Docker python-caches-clean: -@find . -name "__pycache__" -prune -exec rm -rf -- \{\} \; -@find . -name ".pyc" -prune -exec rm -rf -- \{\} \; -# Force recreation of the virtual environment used by tox. -# -# See https://github.com/opensearch-project/OpenSearch-Benchmark/blob/main/DEVELOPER_GUIDE.md: -# -# > Note pip will not update project dependencies (specified either in the install_requires or the extras -# > section of the setup.py) if any version already exists in the virtual environment; therefore we recommend -# > to recreate your environments whenever your project dependencies change. +# Note: pip will not update project dependencies (specified either in the install_requires or the extras +# section of the setup.py) if any version is already installed; therefore we recommend +# recreating your environments whenever your project dependencies change. tox-env-clean: rm -rf .tox -lint: check-venv - @find osbenchmark benchmarks scripts tests it -name "*.py" -exec $(VEPYLINT) -j0 -rn --load-plugins pylint_quotes --rcfile=$(CURDIR)/.pylintrc \{\} + - -docs: check-venv - @. $(VENV_ACTIVATE_FILE); cd docs && $(MAKE) html - -serve-docs: check-venv - @. $(VENV_ACTIVATE_FILE); cd docs && $(MAKE) serve - -test: check-venv - . $(VENV_ACTIVATE_FILE); pytest tests/ +lint: + @find osbenchmark benchmarks scripts tests it -name "*.py" -exec pylint -j0 -rn --load-plugins pylint_quotes --rcfile=$(CURDIR)/.pylintrc \{\} + -precommit: lint +test: + pytest tests/ -it: check-java check-venv python-caches-clean tox-env-clean - . $(VENV_ACTIVATE_FILE); tox +it: pyinst check-java python-caches-clean tox-env-clean + @which tox || $(PIP) install tox + tox -it38 it39 it310 it311: check-java check-venv python-caches-clean tox-env-clean - . $(VENV_ACTIVATE_FILE); tox -e $(@:it%=py%) +it38 it39 it310 it311: pyinst check-java python-caches-clean tox-env-clean + @which tox || $(PIP) install tox + tox -e $(@:it%=py%) -benchmark: check-venv - . $(VENV_ACTIVATE_FILE); pytest benchmarks/ +benchmark: + pytest benchmarks/ -coverage: check-venv - . $(VENV_ACTIVATE_FILE); coverage run setup.py test - . $(VENV_ACTIVATE_FILE); coverage html +coverage: + coverage run setup.py test + coverage html -release-checks: check-venv - . $(VENV_ACTIVATE_FILE); ./release-checks.sh $(release_version) $(next_version) +release-checks: + ./release-checks.sh $(release_version) $(next_version) # usage: e.g. make release release_version=0.9.2 next_version=0.9.3 -release: check-venv release-checks clean docs it - . $(VENV_ACTIVATE_FILE); ./release.sh $(release_version) $(next_version) +release: release-checks clean it + ./release.sh $(release_version) $(next_version) -.PHONY: install clean nondocs-clean docs-clean python-caches-clean tox-env-clean docs serve-docs test it it38 benchmark coverage release release-checks prereq venv-create check-env +.PHONY: install clean python-caches-clean tox-env-clean test it it38 benchmark coverage release release-checks pyinst diff --git a/docker/Dockerfile b/docker/Dockerfile index 147b5b861..104a10972 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,36 +2,48 @@ # Install OpenSearch Benchmark to build a Docker image # ######################################################## -ARG VERSION -ARG BUILD_ENV=production +# +# Stage 1: build packages and compile where needed +# +FROM python:3.11.2-slim AS build-stage +WORKDIR /opensearch-benchmark + +RUN apt-get -y update && \ + apt-get install -y curl git gcc pbzip2 pigz make jq && \ + apt-get -y upgrade + +COPY . opensearch-benchmark -FROM python:3.11.2-slim as build_env_testing -ONBUILD COPY opensearch-benchmark-git/ ./ +RUN cd opensearch-benchmark; make wheel -FROM python:3.11.2-slim as build_env_production -ONBUILD RUN echo Production Environment -FROM build_env_${BUILD_ENV} +# +# Stage 2: create image +# +FROM python:3.11.2-slim AS image-stage WORKDIR /opensearch-benchmark ENV BENCHMARK_RUNNING_IN_DOCKER=True -RUN apt-get -y update && \ - apt-get install -y curl git gcc pbzip2 pigz && \ - apt-get -y upgrade && \ - rm -rf /var/lib/apt/lists/* - RUN groupadd --gid 1000 opensearch-benchmark && \ useradd -d /opensearch-benchmark -m -k /dev/null -g 1000 -N -u 1000 -l -s /bin/bash benchmark -ENV PIP_ONLY_BINARY=h5py -RUN if [ "$BUILD_ENV" = "testing" ] ; then echo Testing; ls -l; python3 -m pip install -e . ; \ - else echo Production; if [ -z "$VERSION" ] ; then python3 -m pip install opensearch-benchmark ; else python3 -m pip install opensearch-benchmark==$VERSION ; fi; fi - RUN mkdir -p /opensearch-benchmark/.benchmark && \ chown -R 1000:0 /opensearch-benchmark/.benchmark +COPY --from=build-stage /opensearch-benchmark/opensearch-benchmark/yappi-*.whl /opensearch-benchmark/opensearch-benchmark/opensearch_benchmark-*.whl ./ + +# There is no binary package currently available for yappi on ARM. +RUN set -ex; \ + apt-get -y update; \ + apt-get install -y git pbzip2; \ + apt-get -y upgrade; \ + rm -rf /var/lib/apt/lists/*; \ + PIP_ONLY_BINARY=h5py pip install yappi-*.whl opensearch_benchmark-*.whl; \ + rm *.whl + USER 1000 +ARG VERSION ARG BUILD_DATE LABEL org.label-schema.schema-version="1.0" \ diff --git a/setup.py b/setup.py index 5e0cc4c7f..55ba5b85a 100644 --- a/setup.py +++ b/setup.py @@ -121,7 +121,7 @@ def str_from_file(name): "tox==3.14.0", "coverage==5.5", "twine==1.15.0", - "wheel==0.38.4", + "wheel>=0.38.4", "github3.py==1.3.0", "pylint==2.6.0", "pylint-quotes==0.2.1" From 54037002ee7101c677cdbe0c90433f103daef422 Mon Sep 17 00:00:00 2001 From: Finn <67562851+finnroblin@users.noreply.github.com> Date: Thu, 5 Sep 2024 12:09:59 -0400 Subject: [PATCH 3/3] Specify multiple search clients for easier benchmarking (#614) Signed-off-by: Finn Roblin --- osbenchmark/results_publisher.py | 55 +++++++++++++++++++++++---- osbenchmark/workload/loader.py | 34 +++++++++++++++-- tests/workload/loader_test.py | 64 ++++++++++++++++++++++++++++++++ 3 files changed, 141 insertions(+), 12 deletions(-) diff --git a/osbenchmark/results_publisher.py b/osbenchmark/results_publisher.py index 32b2e4ecb..4b3ebebd6 100644 --- a/osbenchmark/results_publisher.py +++ b/osbenchmark/results_publisher.py @@ -27,6 +27,8 @@ import io import logging import sys +import re +from enum import Enum import tabulate @@ -43,6 +45,11 @@ ------------------------------------------------------ """ +class Throughput(Enum): + MEAN = "mean" + MAX = "max" + MIN = "min" + MEDIAN = "median" def summarize(results, cfg): SummaryResultsPublisher(results, cfg).publish() @@ -126,6 +133,17 @@ def __init__(self, results, config): "throughput":comma_separated_string_to_number_list(config.opts("workload", "throughput.percentiles", mandatory=False)), "latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False)) } + self.logger = logging.getLogger(__name__) + + def publish_operational_statistics(self, metrics_table: list, warnings: list, record, task): + metrics_table.extend(self._publish_throughput(record, task)) + metrics_table.extend(self._publish_latency(record, task)) + metrics_table.extend(self._publish_service_time(record, task)) + # this is mostly needed for debugging purposes but not so relevant to end users + if self.show_processing_time: + metrics_table.extend(self._publish_processing_time(record, task)) + metrics_table.extend(self._publish_error_rate(record, task)) + self.add_warnings(warnings, record, task) def publish(self): print_header(FINAL_SCORE) @@ -145,16 +163,33 @@ def publish(self): metrics_table.extend(self._publish_transform_stats(stats)) + # These variables are used with the clients_list parameter in test_procedures to find the max throughput. + max_throughput = -1 + record_with_best_throughput = None + + throughput_pattern = r"_(\d+)_clients$" + + for record in stats.op_metrics: task = record["task"] - metrics_table.extend(self._publish_throughput(record, task)) - metrics_table.extend(self._publish_latency(record, task)) - metrics_table.extend(self._publish_service_time(record, task)) - # this is mostly needed for debugging purposes but not so relevant to end users - if self.show_processing_time: - metrics_table.extend(self._publish_processing_time(record, task)) - metrics_table.extend(self._publish_error_rate(record, task)) - self.add_warnings(warnings, record, task) + is_task_part_of_throughput_testing = re.search(throughput_pattern, task) + if is_task_part_of_throughput_testing: + # assumption: all units are the same and only maximizing throughput over one operation (i.e. not both ingest and search). + # To maximize throughput over multiple operations, would need a list/dictionary of maximum throughputs. + task_throughput = record["throughput"][Throughput.MEAN.value] + self.logger.info("Task %s has throughput %s", task, task_throughput) + if task_throughput > max_throughput: + max_throughput = task_throughput + record_with_best_throughput = record + + else: + self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record, task=task) + + # The following code is run when the clients_list parameter is specified and publishes the max throughput. + if max_throughput != -1 and record_with_best_throughput is not None: + self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record_with_best_throughput, + task=record_with_best_throughput["task"]) + metrics_table.extend(self._publish_best_client_settings(record_with_best_throughput, record_with_best_throughput["task"])) for record in stats.correctness_metrics: task = record["task"] @@ -217,6 +252,10 @@ def _publish_recall(self, values, task): self._line("Mean recall@1", task, recall_1_mean, "", lambda v: "%.2f" % v) ) + def _publish_best_client_settings(self, record, task): + num_clients = re.search(r"_(\d+)_clients$", task).group(1) + return self._join(self._line("Number of clients that achieved max throughput", "", num_clients, "")) + def _publish_percentiles(self, name, task, value, unit="ms"): lines = [] percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES) diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index a84c07559..0d8114234 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -1596,11 +1596,25 @@ def _create_test_procedures(self, workload_spec): schedule = [] for op in self._r(test_procedure_spec, "schedule", error_ctx=name): - if "parallel" in op: - task = self.parse_parallel(op["parallel"], ops, name) + if "clients_list" in op: + self.logger.info("Clients list specified: %s. Running multiple search tasks, "\ + "each scheduled with the corresponding number of clients from the list.", op["clients_list"]) + for num_clients in op["clients_list"]: + op["clients"] = num_clients + + new_name = self._rename_task_based_on_num_clients(name, num_clients) + + new_name = name + "_" + str(num_clients) + "_clients" + new_task = self.parse_task(op, ops, new_name) + new_task.name = new_name + schedule.append(new_task) else: - task = self.parse_task(op, ops, name) - schedule.append(task) + if "parallel" in op: + task = self.parse_parallel(op["parallel"], ops, name) + else: + task = self.parse_task(op, ops, name) + + schedule.append(task) # verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing). known_task_names = set() @@ -1635,6 +1649,18 @@ def _create_test_procedures(self, workload_spec): % ", ".join([c.name for c in test_procedures])) return test_procedures + def _rename_task_based_on_num_clients(self, name: str, num_clients: int) -> str: + has_underscore = "_" in name + has_hyphen = "-" in name + if has_underscore and has_hyphen: + self.logger.warning("The test procedure name %s contains a mix of _ and -. "\ + "Consider changing the name to avoid frustrating bugs in the future.", name) + return name + "_" + str(num_clients) + "_clients" + elif has_hyphen: + return name + "-" + str(num_clients) + "-clients" + else: + return name + "_" + str(num_clients) + "_clients" + def _get_test_procedure_specs(self, workload_spec): schedule = self._r(workload_spec, "schedule", mandatory=False) test_procedure = self._r(workload_spec, "test_procedure", mandatory=False) diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index eeccc14ec..285e52772 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -2477,6 +2477,70 @@ def test_parse_unique_task_names(self): self.assertEqual("search-two-clients", schedule[1].name) self.assertEqual("search", schedule[1].operation.name) + def test_parse_clients_list(self): + workload_specification = { + "description": "description for unit test", + "operations": [ + { + "name": "search", + "operation-type": "search", + "index": "_all" + } + ], + "test_procedure": { + "name": "default-test-procedure", + "schedule": [ + { + "name": "search-one-client", + "operation": "search", + "clients": 1, + "clients_list": [1,2,3] + }, + { + "name": "search-two-clients", + "operation": "search", + "clients": 2 + } + ] + } + } + + reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test-procedure") + resulting_workload = reader("unittest", workload_specification, "/mappings") + self.assertEqual("unittest", resulting_workload.name) + test_procedure = resulting_workload.test_procedures[0] + self.assertTrue(test_procedure.selected) + schedule = test_procedure.schedule + self.assertEqual(4, len(schedule)) + + self.assertEqual("default-test-procedure_1_clients", schedule[0].name) + self.assertEqual("search", schedule[0].operation.name) + self.assertEqual("default-test-procedure_2_clients", schedule[1].name) + self.assertEqual("search", schedule[1].operation.name) + self.assertEqual("default-test-procedure_3_clients", schedule[2].name) + self.assertEqual("search", schedule[2].operation.name) + + self.assertEqual("search-two-clients", schedule[3].name) + self.assertEqual("search", schedule[3].operation.name) + # pylint: disable=W0212 + def test_naming_with_clients_list(self): + reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") + # Test case 1: name contains both "_" and "-" + result = reader._rename_task_based_on_num_clients("test_name-task", 5) + self.assertEqual(result, "test_name-task_5_clients") + + # Test case 2: name contains only "-" + result = reader._rename_task_based_on_num_clients("test-name", 3) + self.assertEqual(result, "test-name-3-clients") + + # Test case 3: name contains only "_" + result = reader._rename_task_based_on_num_clients("test_name", 2) + self.assertEqual(result, "test_name_2_clients") + + # Test case 4: name contains neither "_" nor "-" + result = reader._rename_task_based_on_num_clients("testname", 1) + self.assertEqual(result, "testname_1_clients") + def test_parse_indices_valid_workload_specification(self): workload_specification = { "description": "description for unit test",