Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom headers and request-timeout #1070

Merged
merged 9 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,9 @@ Each operation consists of the following properties:
* ``name`` (mandatory): The name of this operation. You can choose this name freely. It is only needed to reference the operation when defining schedules.
* ``operation-type`` (mandatory): Type of this operation. See below for the operation types that are supported out of the box in Rally. You can also add arbitrary operations by defining :doc:`custom runners </adding_tracks>`.
* ``include-in-reporting`` (optional, defaults to ``true`` for normal operations and to ``false`` for administrative operations): Whether or not this operation should be included in the command line report. For example you might want Rally to create an index for you but you are not interested in detailed metrics about it. Note that Rally will still record all metrics in the metrics store.
* ``request-timeout`` (optional, defaults to ``None``): The client-side timeout for this operation. Represented as a floating-point number in seconds, e.g. ``1.5``.
* ``headers`` (optional, defaults to ``None``): A dictionary of key-value pairs to pass as headers in the operation.
* ``opaque-id`` (optional, defaults to ``None`` [unused]): A special ID set as the value of ``x-opaque-id`` in the client headers of the operation. Overrides existing ``x-opaque-id`` entries in ``headers`` (case-insensitive).

Some of the operations below are also retryable (marked accordingly below). Retryable operations expose the following properties:

Expand Down
140 changes: 88 additions & 52 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ async def __call__(self, es, params):
async def __aexit__(self, exc_type, exc_val, exc_tb):
return False

def _default_kw_params(self, params):
# map of API kwargs to Rally config parameters
kw_dict = {
"body": "body",
"headers": "headers",
"index": "index",
"opaque_id": "opaque-id",
DJRickyB marked this conversation as resolved.
Show resolved Hide resolved
"params": "request-params",
"request_timeout": "request-timeout",
}
full_result = {k: params.get(v) for (k, v) in kw_dict.items()}
# filter Nones
return dict(filter(lambda kv: kv[1] is not None, full_result.items()))

def _transport_request_params(self, params):
request_params = params.get("request-params", {})
request_timeout = params.get("request-timeout")
if request_timeout is not None:
request_params["request_timeout"] = request_timeout
headers = params.get("headers") or {}
opaque_id = params.get("opaque-id")
if opaque_id is not None:
headers.update({"x-opaque-id": opaque_id})
return request_params, headers

class Delegator:
"""
Expand Down Expand Up @@ -322,7 +346,8 @@ async def __call__(self, es, params):
is in the single digit microsecond range when this feature is disabled and in the single digit millisecond range when this feature
is enabled; numbers based on a bulk size of 500 elements and no errors). For details please refer to the respective benchmarks
in ``benchmarks/driver``.

* ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present, defaults to
``None`` and potentially falls back to the global timeout setting.

Returned meta data
`
Expand Down Expand Up @@ -460,7 +485,7 @@ async def __call__(self, es, params):
}
"""
detailed_results = params.get("detailed-results", False)
index = params.get("index")
api_kwargs = self._default_kw_params(params)

bulk_params = {}
if "pipeline" in params:
Expand All @@ -475,15 +500,16 @@ async def __call__(self, es, params):
es.return_raw_response()

if with_action_metadata:
api_kwargs.pop("index", None)
# only half of the lines are documents
response = await es.bulk(body=params["body"], params=bulk_params)
response = await es.bulk(params=bulk_params, **api_kwargs)
else:
response = await es.bulk(body=params["body"], index=index, doc_type=params.get("type"), params=bulk_params)
response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)

stats = self.detailed_stats(params, bulk_size, response) if detailed_results else self.simple_stats(bulk_size, response)

meta_data = {
"index": str(index) if index else None,
"index": params.get("index"),
"weight": bulk_size,
"unit": "docs",
"bulk-size": bulk_size
Expand Down Expand Up @@ -612,21 +638,18 @@ class ForceMerge(Runner):
async def __call__(self, es, params):
import elasticsearch
max_num_segments = params.get("max-num-segments")
# preliminary support for overriding the global request timeout (see #567). As force-merge falls back to
# the raw transport API (where the keyword argument is called `timeout`) in some cases we will always need
# a special handling for the force-merge API.
request_timeout = params.get("request-timeout")
mode = params.get("mode")
merge_params = {"request_timeout": request_timeout}
merge_params = self._default_kw_params(params)
if max_num_segments:
merge_params["max_num_segments"] = max_num_segments
try:
if mode == "polling":
# we ignore the request_timeout if we are in polling mode and deliberately timeout early
# no reason to wait as long as a whole {polling-period} (which has a minimum of 1 second)
merge_params["request_timeout"] = 1
complete = False
try:
await es.indices.forcemerge(index=params.get("index"), **merge_params)
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
Expand All @@ -637,15 +660,14 @@ async def __call__(self, es, params):
# empty nodes response indicates no tasks
complete = True
else:
await es.indices.forcemerge(index=params.get("index"), **merge_params)
await es.indices.forcemerge(**merge_params)
except elasticsearch.TransportError as e:
# this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize
if e.status_code == 400:
params, headers = self._transport_request_params(params)
if max_num_segments:
await es.transport.perform_request("POST", f"/_optimize?max_num_segments={max_num_segments}",
timeout=request_timeout)
else:
await es.transport.perform_request("POST", "/_optimize", timeout=request_timeout)
params["max_num_segments"] = max_num_segments
await es.transport.perform_request(method="POST", url="/_optimize", params=params, headers=headers)
else:
raise e

Expand All @@ -670,10 +692,10 @@ def _safe_string(self, v):
return str(v) if v is not None else None

async def __call__(self, es, params):
index = params.get("index", "_all")
api_kwargs = self._default_kw_params(params)
index = api_kwargs.pop("index", "_all")
condition = params.get("condition")

response = await es.indices.stats(index=index, metric="_all")
response = await es.indices.stats(index=index, metric="_all", **api_kwargs)
if condition:
path = mandatory(condition, "path", repr(self))
expected_value = mandatory(condition, "expected-value", repr(self))
Expand Down Expand Up @@ -707,7 +729,8 @@ class NodeStats(Runner):
"""

async def __call__(self, es, params):
await es.nodes.stats(metric="_all")
request_timeout = params.get("request-timeout")
await es.nodes.stats(metric="_all", request_timeout=request_timeout)

def __repr__(self, *args, **kwargs):
return "node-stats"
Expand Down Expand Up @@ -769,7 +792,8 @@ class Query(Runner):
corresponding response in more detail, this might incur additional
overhead which can skew measurement results. This flag is ineffective
for scroll queries (detailed meta-data are always returned).

* ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present,
defaults to ``None`` and potentially falls back to the global timeout setting.
If the following parameters are present in addition, a scroll query will be issued:

* `pages`: Number of pages to retrieve at most for this scroll. If a scroll query does yield less results than the specified number of
Expand Down Expand Up @@ -799,17 +823,25 @@ async def __call__(self, es, params):
return await self.request_body_query(es, params)

async def request_body_query(self, es, params):
request_params = self._default_request_params(params)
request_params, headers = self._transport_request_params(params)
index = params.get("index", "_all")
body = mandatory(params, "body", self)
doc_type = params.get("type")
detailed_results = params.get("detailed-results", False)
headers = self._headers(params)
encoding_header = self._query_headers(params)
if encoding_header is not None:
headers.update(encoding_header)

cache = params.get("cache")
if cache is not None:
request_params["request_cache"] = str(cache).lower()
if not bool(headers):
# counter-intuitive but preserves prior behavior
headers = None
# disable eager response parsing - responses might be huge thus skewing results
es.return_raw_response()

r = await self._raw_search(es, doc_type, index, body, request_params, headers)
r = await self._raw_search(es, doc_type, index, body, request_params, headers=headers)

if detailed_results:
props = parse(r, ["hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"])
Expand All @@ -835,7 +867,7 @@ async def request_body_query(self, es, params):
}

async def scroll_query(self, es, params):
request_params = self._default_request_params(params)
request_params, headers = self._transport_request_params(params)
hits = 0
hits_relation = None
retrieved_pages = 0
Expand All @@ -844,9 +876,16 @@ async def scroll_query(self, es, params):
# explicitly convert to int to provoke an error otherwise
total_pages = sys.maxsize if params["pages"] == "all" else int(params["pages"])
size = params.get("results-per-page")
headers = self._headers(params)
encoding_header = self._query_headers(params)
if encoding_header is not None:
headers.update(encoding_header)
scroll_id = None

cache = params.get("cache")
if cache is not None:
request_params["request_cache"] = str(cache).lower()
if not bool(headers):
# counter-intuitive but preserves prior behavior
headers = None
# disable eager response parsing - responses might be huge thus skewing results
es.return_raw_response()

Expand All @@ -858,7 +897,7 @@ async def scroll_query(self, es, params):
sort = "_doc"
scroll = "10s"
doc_type = params.get("type")
params = request_params
params = request_params.copy()
params["sort"] = sort
params["scroll"] = scroll
params["size"] = size
Expand All @@ -876,6 +915,7 @@ async def scroll_query(self, es, params):
else:
r = await es.transport.perform_request("GET", "/_search/scroll",
body={"scroll_id": scroll_id, "scroll": "10s"},
params=request_params,
headers=headers)
props = parse(r, ["hits.total", "hits.total.value", "hits.total.relation", "timed_out", "took"], ["hits.hits"])
timed_out = timed_out or props.get("timed_out", False)
Expand Down Expand Up @@ -914,14 +954,7 @@ async def _raw_search(self, es, doc_type, index, body, params, headers=None):
path = "/".join(components)
return await es.transport.perform_request("GET", "/" + path, params=params, body=body, headers=headers)

def _default_request_params(self, params):
request_params = params.get("request-params", {})
cache = params.get("cache")
if cache is not None:
request_params["request_cache"] = str(cache).lower()
return request_params

def _headers(self, params):
def _query_headers(self, params):
# reduces overhead due to decompression of very large responses
if params.get("response-compression-enabled", True):
return None
Expand Down Expand Up @@ -960,8 +993,8 @@ def status(v):
except (KeyError, AttributeError):
return ClusterHealthStatus.UNKNOWN

index = params.get("index")
request_params = params.get("request-params", {})
api_kw_params = self._default_kw_params(params)
# by default, Elasticsearch will not wait and thus we treat this as success
expected_cluster_status = request_params.get("wait_for_status", str(ClusterHealthStatus.UNKNOWN))
# newer ES versions >= 5.0
Expand All @@ -972,7 +1005,7 @@ def status(v):
# either the user has defined something or we're good with any count of relocating shards.
expected_relocating_shards = int(request_params.get("wait_for_relocating_shards", sys.maxsize))

result = await es.cluster.health(index=index, params=request_params)
result = await es.cluster.health(**api_kw_params)
cluster_status = result["status"]
relocating_shards = result["relocating_shards"]

Expand Down Expand Up @@ -1024,9 +1057,12 @@ class CreateIndex(Runner):

async def __call__(self, es, params):
indices = mandatory(params, "indices", self)
request_params = params.get("request-params", {})
api_params = self._default_kw_params(params)
## ignore invalid entries rather than erroring
for term in ["index", "body"]:
api_params.pop(term, None)
for index, body in indices:
await es.indices.create(index=index, body=body, params=request_params)
await es.indices.create(index=index, body=body, **api_params)
return len(indices), "ops"

def __repr__(self, *args, **kwargs):
Expand Down Expand Up @@ -1457,19 +1493,20 @@ def __repr__(self, *args, **kwargs):

class RawRequest(Runner):
async def __call__(self, es, params):
request_params = {}
request_params, headers = self._transport_request_params(params)
if "ignore" in params:
request_params["ignore"] = params["ignore"]
request_params.update(params.get("request-params", {}))

path = mandatory(params, "path", self)
if not path.startswith("/"):
self.logger.error("RawRequest failed. Path parameter: [%s] must begin with a '/'.", path)
raise exceptions.RallyAssertionError(f"RawRequest [{path}] failed. Path parameter must begin with a '/'.")
if not bool(headers):
#counter-intuitive, but preserves prior behavior
headers = None

await es.transport.perform_request(method=params.get("method", "GET"),
url=path,
headers=params.get("headers"),
headers=headers,
body=params.get("body"),
params=request_params)

Expand Down Expand Up @@ -1519,16 +1556,16 @@ class CreateSnapshot(Runner):
Creates a new snapshot repository
"""
async def __call__(self, es, params):
request_params = params.get("request-params", {})
wait_for_completion = params.get("wait-for-completion", False)
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
body = mandatory(params, "body", repr(self))
# just assert, gets set in _default_kw_params
mandatory(params, "body", repr(self))
api_kwargs = self._default_kw_params(params)
await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)
wait_for_completion=wait_for_completion,
**api_kwargs)

def __repr__(self, *args, **kwargs):
return "create-snapshot"
Expand Down Expand Up @@ -1587,12 +1624,11 @@ class RestoreSnapshot(Runner):
Restores a snapshot from an already registered repository
"""
async def __call__(self, es, params):
request_params = params.get("request-params", {})
api_kwargs = self._default_kw_params(params)
await es.snapshot.restore(repository=mandatory(params, "repository", repr(self)),
snapshot=mandatory(params, "snapshot", repr(self)),
body=params.get("body"),
wait_for_completion=params.get("wait-for-completion", False),
params=request_params)
**api_kwargs)

def __repr__(self, *args, **kwargs):
return "restore-snapshot"
Expand Down
5 changes: 5 additions & 0 deletions esrally/resources/track-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@
"type": "string",
"description": "Type of this operation."
},
"request-timeout": {
"type": "number",
"minimum": 0,
"description": "Client-side per-request timeout in seconds for this operation."
},
"bulk-size": {
"type": "integer",
"minimum": 1,
Expand Down
Loading