Skip to content

Commit

Permalink
Merge branch 'master' into rally-567
Browse files Browse the repository at this point in the history
  • Loading branch information
Rick Boyd committed Oct 25, 2020
2 parents e2a6289 + 33a7a70 commit fdd418f
Show file tree
Hide file tree
Showing 31 changed files with 1,184 additions and 178 deletions.
4 changes: 0 additions & 4 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,11 @@ disable=print-statement,
C0330,
C0415,
C4001,
C4002,
E1120,
R0904,
R0916,
W0201,
W0212,
W0221,
W0404,
W0612,
W0613,
W0621,
invalid-docstring-quote,
Expand Down
1 change: 1 addition & 0 deletions esrally/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"""


# pylint: disable=C4002
SKULL = '''
uuuuuuu
uu$$$$$$$$$$$uu
Expand Down
2 changes: 1 addition & 1 deletion esrally/chart_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def query(environment, race_config, q):
@staticmethod
def index(environment, race_configs, title):
filters = []
for idx, race_config in enumerate(race_configs):
for race_config in race_configs:
label = index_label(race_config)
# the assumption is that we only have one bulk task
for bulk_task in race_config.bulk_tasks:
Expand Down
1 change: 0 additions & 1 deletion esrally/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ def migrate(config_file, current_version, target_version, out=print, i=input):
raise ConfigError("The config file in {} is too old. Please delete it and reconfigure Rally from scratch with {} configure."
.format(config_file.location, PROGRAM_NAME))

prompter = Prompter(i=i, o=out, assume_defaults=False)
logger.info("Upgrading configuration from version [%s] to [%s].", current_version, target_version)
# Something is really fishy. We don't want to downgrade the configuration.
if current_version >= target_version:
Expand Down
24 changes: 12 additions & 12 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,32 +209,32 @@ def receiveMsg_ChildActorExited(self, msg, sender):
def receiveUnrecognizedMessage(self, msg, sender):
self.logger.info("Main driver received unknown message [%s] (ignoring).", str(msg))

@actor.no_retry("driver")
@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_PrepareBenchmark(self, msg, sender):
self.start_sender = sender
self.coordinator = Driver(self, msg.config)
self.coordinator.prepare_benchmark(msg.track)

@actor.no_retry("driver")
@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_StartBenchmark(self, msg, sender):
self.start_sender = sender
self.coordinator.start_benchmark()
self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS))

@actor.no_retry("driver")
@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_TrackPrepared(self, msg, sender):
self.transition_when_all_children_responded(sender, msg,
expected_status=None, new_status=None, transition=self.after_track_prepared)

@actor.no_retry("driver")
@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_JoinPointReached(self, msg, sender):
self.coordinator.joinpoint_reached(msg.worker_id, msg.worker_timestamp, msg.task)

@actor.no_retry("driver")
@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_UpdateSamples(self, msg, sender):
self.coordinator.update_samples(msg.samples)

@actor.no_retry("driver")
@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == DriverActor.RESET_RELATIVE_TIME_MARKER:
self.coordinator.reset_relative_time()
Expand Down Expand Up @@ -313,7 +313,7 @@ def load_local_config(coordinator_config):


class TrackPreparationActor(actor.RallyActor):
@actor.no_retry("track preparator")
@actor.no_retry("track preparator") # pylint: disable=no-value-for-parameter
def receiveMsg_PrepareTrack(self, msg, sender):
# load node-specific config to have correct paths available
cfg = load_local_config(msg.config)
Expand Down Expand Up @@ -746,7 +746,7 @@ def calculate_worker_assignments(host_configs, client_count):
clients_per_worker[c % workers_on_this_host] += 1

# assign client ids to workers
for worker_idx, client_count_for_worker in enumerate(clients_per_worker):
for client_count_for_worker in clients_per_worker:
worker_assignment = []
assignment["workers"].append(worker_assignment)
for c in range(client_idx, client_idx + client_count_for_worker):
Expand Down Expand Up @@ -816,7 +816,7 @@ def __init__(self):
self.wakeup_interval = Worker.WAKEUP_INTERVAL_SECONDS
self.sample_queue_size = None

@actor.no_retry("worker")
@actor.no_retry("worker") # pylint: disable=no-value-for-parameter
def receiveMsg_StartWorker(self, msg, sender):
self.logger.info("Worker[%d] is about to start.", msg.worker_id)
self.master = sender
Expand All @@ -837,15 +837,15 @@ def receiveMsg_StartWorker(self, msg, sender):
track.load_track_plugins(self.config, runner.register_runner, scheduler.register_scheduler)
self.drive()

@actor.no_retry("worker")
@actor.no_retry("worker") # pylint: disable=no-value-for-parameter
def receiveMsg_Drive(self, msg, sender):
sleep_time = datetime.timedelta(seconds=msg.client_start_timestamp - time.perf_counter())
self.logger.info("Worker[%d] is continuing its work at task index [%d] on [%f], that is in [%s].",
self.worker_id, self.current_task_index, msg.client_start_timestamp, sleep_time)
self.start_driving = True
self.wakeupAfter(sleep_time)

@actor.no_retry("worker")
@actor.no_retry("worker") # pylint: disable=no-value-for-parameter
def receiveMsg_CompleteCurrentTask(self, msg, sender):
# finish now ASAP. Remaining samples will be sent with the next WakeupMessage. We will also need to skip to the next
# JoinPoint. But if we are already at a JoinPoint at the moment, there is nothing to do.
Expand All @@ -857,7 +857,7 @@ def receiveMsg_CompleteCurrentTask(self, msg, sender):
str(self.worker_id), self.current_task_index)
self.complete.set()

@actor.no_retry("worker")
@actor.no_retry("worker") # pylint: disable=no-value-for-parameter
def receiveMsg_WakeupMessage(self, msg, sender):
# it would be better if we could send ourselves a message at a specific time, simulate this with a boolean...
if self.start_driving:
Expand Down
51 changes: 48 additions & 3 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def register_default_runners():
register_runner(track.OperationType.Refresh.name, Retry(Refresh()), async_runner=True)
register_runner(track.OperationType.CreateIndex.name, Retry(CreateIndex()), async_runner=True)
register_runner(track.OperationType.DeleteIndex.name, Retry(DeleteIndex()), async_runner=True)
register_runner(track.OperationType.CreateDataStream.name, Retry(CreateDataStream()), async_runner=True)
register_runner(track.OperationType.DeleteDataStream.name, Retry(DeleteDataStream()), async_runner=True)
register_runner(track.OperationType.CreateIndexTemplate.name, Retry(CreateIndexTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteIndexTemplate.name, Retry(DeleteIndexTemplate()), async_runner=True)
register_runner(track.OperationType.ShrinkIndex.name, Retry(ShrinkIndex()), async_runner=True)
Expand Down Expand Up @@ -127,7 +129,7 @@ def __init__(self, *args, **kwargs):
async def __aenter__(self):
return self

async def __call__(self, *args):
async def __call__(self, es, params):
"""
Runs the actual method that should be benchmarked.
Expand Down Expand Up @@ -543,7 +545,7 @@ def detailed_stats(self, params, bulk_size, response):

bulk_request_size_bytes += line_size

for idx, item in enumerate(response["items"]):
for item in response["items"]:
# there is only one (top-level) item
op, data = next(iter(item.items()))
if op not in ops:
Expand Down Expand Up @@ -591,7 +593,7 @@ def simple_stats(self, bulk_size, response):
if props.get("errors", False):
# Reparse fully in case of errors - this will be slower
parsed_response = json.loads(response.getvalue())
for idx, item in enumerate(parsed_response["items"]):
for item in parsed_response["items"]:
data = next(iter(item.values()))
if data["status"] > 299 or ('_shards' in data and data["_shards"]["failed"] > 0):
bulk_error_count += 1
Expand Down Expand Up @@ -1067,6 +1069,22 @@ def __repr__(self, *args, **kwargs):
return "create-index"


class CreateDataStream(Runner):
"""
Execute the `create data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-data-stream.html>`_.
"""

async def __call__(self, es, params):
data_streams = mandatory(params, "data-streams", self)
request_params = mandatory(params, "request-params", self)
for data_stream in data_streams:
await es.indices.create_data_stream(data_stream, params=request_params)
return len(data_streams), "ops"

def __repr__(self, *args, **kwargs):
return "create-data-stream"


class DeleteIndex(Runner):
"""
Execute the `delete index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html>`_.
Expand Down Expand Up @@ -1094,6 +1112,33 @@ def __repr__(self, *args, **kwargs):
return "delete-index"


class DeleteDataStream(Runner):
"""
Execute the `delete data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-data-stream.html>`_.
"""

async def __call__(self, es, params):
ops = 0

data_streams = mandatory(params, "data-streams", self)
only_if_exists = mandatory(params, "only-if-exists", self)
request_params = mandatory(params, "request-params", self)

for data_stream in data_streams:
if not only_if_exists:
await es.indices.delete_data_stream(data_stream, ignore=[404], params=request_params)
ops += 1
elif only_if_exists and await es.indices.exists(index=data_stream):
self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream)
await es.indices.delete_data_stream(data_stream, params=request_params)
ops += 1

return ops, "ops"

def __repr__(self, *args, **kwargs):
return "delete-data-stream"


class CreateIndexTemplate(Runner):
"""
Execute the `PUT index template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html>`_.
Expand Down
16 changes: 8 additions & 8 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def to_ip_port(hosts):

def extract_all_node_ips(ip_port_pairs):
all_node_ips = set()
for ip, port in ip_port_pairs:
for ip, _ in ip_port_pairs:
all_node_ips.add(ip)
return all_node_ips

Expand Down Expand Up @@ -342,7 +342,7 @@ def receiveMsg_PoisonMessage(self, msg, sender):
self.logger.error(failmsg)
self.send(self.race_control, actor.BenchmarkFailure(failmsg))

@actor.no_retry("mechanic")
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_StartEngine(self, msg, sender):
self.logger.info("Received signal from race control to start engine.")
self.race_control = sender
Expand Down Expand Up @@ -380,7 +380,7 @@ def receiveMsg_StartEngine(self, msg, sender):
self.status = "starting"
self.received_responses = []

@actor.no_retry("mechanic")
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_NodesStarted(self, msg, sender):
# Initially the addresses of the children are not
# known and there is just a None placeholder in the
Expand All @@ -392,7 +392,7 @@ def receiveMsg_NodesStarted(self, msg, sender):

self.transition_when_all_children_responded(sender, msg, "starting", "cluster_started", self.on_all_nodes_started)

@actor.no_retry("mechanic")
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_ResetRelativeTime(self, msg, sender):
if msg.reset_in_seconds > 0:
self.wakeupAfter(msg.reset_in_seconds, payload=MechanicActor.WAKEUP_RESET_RELATIVE_TIME)
Expand All @@ -408,7 +408,7 @@ def receiveMsg_WakeupMessage(self, msg, sender):
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(self.race_control, msg)

@actor.no_retry("mechanic")
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_StopEngine(self, msg, sender):
# we might have experienced a launch error or the user has cancelled the benchmark. Hence we need to allow to stop the
# cluster from various states and we don't check here for a specific one.
Expand All @@ -417,7 +417,7 @@ def receiveMsg_StopEngine(self, msg, sender):
else:
self.send_to_children_and_transition(sender, StopNodes(), [], "cluster_stopping")

@actor.no_retry("mechanic")
@actor.no_retry("mechanic") # pylint: disable=no-value-for-parameter
def receiveMsg_NodesStopped(self, msg, sender):
self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped)

Expand Down Expand Up @@ -456,7 +456,7 @@ def __init__(self):
self.pending = None
self.remotes = None

@actor.no_retry("mechanic dispatcher")
@actor.no_retry("mechanic dispatcher") # pylint: disable=no-value-for-parameter
def receiveMsg_StartEngine(self, startmsg, sender):
self.start_sender = sender
self.pending = []
Expand Down Expand Up @@ -569,7 +569,7 @@ def receiveMsg_StartNodes(self, msg, sender):
self.logger.exception("Cannot process message [%s]", msg)
# avoid "can't pickle traceback objects"
import traceback
ex_type, ex_value, ex_traceback = sys.exc_info()
_, ex_value, _ = sys.exc_info()
self.send(getattr(msg, "reply_to", sender), actor.BenchmarkFailure(ex_value, traceback.format_exc()))

def receiveMsg_PoisonMessage(self, msg, sender):
Expand Down
4 changes: 2 additions & 2 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def delete_path(p):

def _apply_config(source_root_path, target_root_path, config_vars):
logger = logging.getLogger(__name__)
for root, dirs, files in os.walk(source_root_path):
for root, _, files in os.walk(source_root_path):
env = jinja2.Environment(loader=jinja2.FileSystemLoader(root))

relative_root = root[len(source_root_path) + 1:]
Expand Down Expand Up @@ -438,7 +438,7 @@ def prepare(self, binary):
mounts = {}

for car_config_path in self.car.config_paths:
for root, dirs, files in os.walk(car_config_path):
for root, _, files in os.walk(car_config_path):
env = jinja2.Environment(loader=jinja2.FileSystemLoader(root))

relative_root = root[len(car_config_path) + 1:]
Expand Down
4 changes: 2 additions & 2 deletions esrally/mechanic/supplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def create(cfg, sources, distribution, build, car, plugins=None):
distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
supply_requirements = _supply_requirements(sources, distribution, build, plugins, revisions, distribution_version)
build_needed = any([build for _, _, build in supply_requirements.values()])
es_supplier_type, es_version, es_build = supply_requirements["elasticsearch"]
es_supplier_type, es_version, _ = supply_requirements["elasticsearch"]
src_config = cfg.all_opts("source")
suppliers = []

Expand Down Expand Up @@ -108,7 +108,7 @@ def create(cfg, sources, distribution, build, car, plugins=None):
suppliers.append(ElasticsearchDistributionSupplier(repo, es_version, distributions_root))

for plugin in plugins:
supplier_type, plugin_version, build_plugin = supply_requirements[plugin.name]
supplier_type, plugin_version, _ = supply_requirements[plugin.name]

if supplier_type == "source":
if CorePluginSourceSupplier.can_handle(plugin):
Expand Down
Loading

0 comments on commit fdd418f

Please sign in to comment.