Skip to content

Commit

Permalink
Move abort detection into test invocation (#3040)
Browse files Browse the repository at this point in the history
Similar to reboot detection, hiding the details from individual
execution plugins.
  • Loading branch information
happz authored and kkaarreell committed Aug 6, 2024
1 parent 6f9636f commit 812227b
Showing 1 changed file with 97 additions and 88 deletions.
185 changes: 97 additions & 88 deletions tmt/steps/report/reportportal.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,110 +480,119 @@ def go(self, *, logger: Optional[tmt.log.Logger] = None) -> None:
# For each test
for test in self.step.plan.discover.tests():
test_time = self.time()
results = [None]
if executed:
result = next((result for result in self.step.plan.execute.results()
if test.serial_number == result.serial_number), None)
results = [result for result in self.step.plan.execute.results()
if test.serial_number == result.serial_number]
for result in results:
# TODO: for happz, connect Test to Result if possible
# (but now it is probably too hackish to be fixed)

item_attributes = attributes.copy()

if result:
test_time = result.start_time or self.time()
# TODO: for happz, connect Test to Result if possible
# (but now it is probably too hackish to be fixed)

item_attributes = attributes.copy()
if test.contact:
item_attributes.append({"key": "contact", "value": test.contact[0]})
env_vars = [
{'key': key, 'value': value}
for key, value in test.environment.items()
if not re.search(envar_pattern, key)]

if create_test:

test_description = test.summary or ""
if ((self.data.upload_to_launch and launch_per_plan)
or self.data.upload_to_suite):
test_description = self.append_description(test_description)

# Create a test item
self.info("test", test.name, color="cyan")
response = session.post(
url=f"{self.get_url()}/item{f'/{suite_uuid}' if suite_uuid else ''}",
headers=self.get_headers(),
json={"name": test.name,
"description": test_description,
"attributes": item_attributes,
"parameters": env_vars,
"codeRef": test.web_link() or None,
"launchUuid": launch_uuid,
"type": "step",
"testCaseId": test.id or None,
"startTime": test_time})
self.handle_response(response)
item_uuid = yaml_to_dict(response.text).get("id")
assert item_uuid is not None
self.verbose("uuid", item_uuid, "yellow", shift=1)
self.data.test_uuids[test.serial_number] = item_uuid
else:
item_uuid = self.data.test_uuids[test.serial_number]

# Support for idle tests
status = "SKIPPED"
if executed and result:
# For each log
for index, log_path in enumerate(result.log):
try:
log = self.step.plan.execute.read(log_path)
except tmt.utils.FileError:
continue

level = "INFO" if log_path == result.log[0] else "TRACE"
status = self.TMT_TO_RP_RESULT_STATUS[result.result]

# Upload log
# for multi-host tests store also provision name and role
if result.guest.name != 'default-0':
item_attributes.append({'key': 'guest_name', 'value': result.guest.name})
if result.guest.role:
item_attributes.append({'key': 'guest_role', 'value': result.guest.role})

if test.contact:
item_attributes.append({"key": "contact", "value": test.contact[0]})
env_vars = [
{'key': key, 'value': value}
for key, value in test.environment.items()
if not re.search(envar_pattern, key)]

if create_test:

test_description = test.summary or ""
if ((self.data.upload_to_launch and launch_per_plan)
or self.data.upload_to_suite):
test_description = self.append_description(test_description)

# Create a test item
self.info("test", test.name, color="cyan")
response = session.post(
url=f"{self.get_url()}/log/entry",
url=f"{self.get_url()}/item{f'/{suite_uuid}' if suite_uuid else ''}",
headers=self.get_headers(),
json={"message": _filter_invalid_chars(log),
"itemUuid": item_uuid,
json={"name": test.name,
"description": test_description,
"attributes": item_attributes,
"parameters": env_vars,
"codeRef": test.web_link() or None,
"launchUuid": launch_uuid,
"level": level,
"time": result.end_time})
"type": "step",
"testCaseId": test.id or None,
"startTime": test_time})
self.handle_response(response)

# Write out failures
if index == 0 and status == "FAILED":
message = _filter_invalid_chars(result.failures(log))
item_uuid = yaml_to_dict(response.text).get("id")
assert item_uuid is not None
self.verbose("uuid", item_uuid, "yellow", shift=1)
self.data.test_uuids[test.serial_number] = item_uuid
else:
item_uuid = self.data.test_uuids[test.serial_number]

# Support for idle tests
status = "SKIPPED"
if executed and result:
# For each log
for index, log_path in enumerate(result.log):
try:
log = self.step.plan.execute.read(log_path)
except tmt.utils.FileError:
continue

level = "INFO" if log_path == result.log[0] else "TRACE"
status = self.TMT_TO_RP_RESULT_STATUS[result.result]

# Upload log
response = session.post(
url=f"{self.get_url()}/log/entry",
headers=self.get_headers(),
json={"message": message,
json={"message": _filter_invalid_chars(log),
"itemUuid": item_uuid,
"launchUuid": launch_uuid,
"level": "ERROR",
"level": level,
"time": result.end_time})
self.handle_response(response)

# TODO: Add tmt files as attachments

test_time = result.end_time or self.time()

# Finish the test item
response = session.put(
url=f"{self.get_url()}/item/{item_uuid}",
headers=self.get_headers(),
json={
"launchUuid": launch_uuid,
"endTime": test_time,
"status": status,
"issue": {
"issueType": self.get_defect_type_locator(session, defect_type)}})
self.handle_response(response)
launch_time = test_time
# Write out failures
if index == 0 and status == "FAILED":
message = _filter_invalid_chars(result.failures(log))
response = session.post(
url=f"{self.get_url()}/log/entry",
headers=self.get_headers(),
json={"message": message,
"itemUuid": item_uuid,
"launchUuid": launch_uuid,
"level": "ERROR",
"time": result.end_time})
self.handle_response(response)

# TODO: Add tmt files as attachments

test_time = result.end_time or self.time()

# Finish the test item
response = session.put(
url=f"{self.get_url()}/item/{item_uuid}",
headers=self.get_headers(),
json={
"launchUuid": launch_uuid,
"endTime": test_time,
"status": status,
"issue": {
"issueType": self.get_defect_type_locator(session, defect_type)}})
self.handle_response(response)
launch_time = test_time

# TODO: Resolve the problem with reporting original defect type (idle)
# after additional report of results
# Temporary solution idea:
# if again_additional_tests and status failed,
# get test_id, report passed and then again failed
# TODO: Resolve the problem with reporting original defect type (idle)
# after additional report of results
# Temporary solution idea:
# if again_additional_tests and status failed,
# get test_id, report passed and then again failed

if create_suite:
# Finish the test suite
Expand Down

0 comments on commit 812227b

Please sign in to comment.