Skip to content

Commit

Permalink
Fixed some issues in request_status
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor8mr authored and jleaniz committed May 30, 2024
1 parent e1358a8 commit e40d212
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 33 deletions.
22 changes: 11 additions & 11 deletions turbinia/api/models/request_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,23 @@ def get_request_data(

self.request_id = request_id

if not summary:
self.tasks = tasks if tasks else _state_manager.get_task_data(
instance=turbinia_config.INSTANCE_ID, request_id=request_id)

# Gets the information from the request if it is stored in Redis
if _state_manager.key_exists(f'TurbiniaRequest:{request_id}'):
saved_request = _state_manager.get_request_data(request_id)
self.evidence_name = saved_request.get('original_evidence').get('name')
self.evidence_id = saved_request.get('original_evidence').get('id')
self.requester = saved_request.get('requester')
self.reason = saved_request.get('reason')
self.task_status = saved_request.get('status')
self.task_last_update = saved_request.get('last_update')
self.sucessful_tasks = len(saved_request.get('succesful_tasks'))
self.failed_tasks = len(saved_request.get('failed_tasks'))
self.queued_tasks = len(saved_request.get('queued_tasks'))
self.running_tasks = len(saved_request.get('running_tasks'))
self.status = saved_request.get('status')
self.last_task_update_time = saved_request.get('last_update')
self.successful_tasks = len(saved_request.get('succesful_tasks', []))
self.failed_tasks = len(saved_request.get('failed_tasks', []))
self.queued_tasks = len(saved_request.get('queued_tasks', []))
self.running_tasks = len(saved_request.get('running_tasks', []))
# If the request is not stored in redis, uses legacy get_request_data
else:
if not tasks:
Expand Down Expand Up @@ -194,17 +198,13 @@ class RequestsSummary(BaseModel):
"""Represents a summary view of multiple Turbinia requests."""
requests_status: List[RequestStatus] = []

#Todo(igormr): Change this to iterate over requests only

def get_requests_summary(self) -> bool:
"""Generates a status summary for each Turbinia request."""
_state_manager = state_manager.get_state_manager()
request_ids = [request_key.split(':')[1] for request_key in _state_manager.iterate_keys('Requests')]
request_ids = set()
request_ids = [request_key.split(':')[1] for request_key in _state_manager.iterate_keys('Request')]

for request_id in request_ids:
request_status = RequestStatus()
request_status.get_request_data(request_id, summary=True)
self.requests_status.append(request_status)

return bool(self.requests_status)
80 changes: 58 additions & 22 deletions turbinia/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,24 +296,36 @@ def update_request_task(self, task):
request_key = ':'.join(('TurbiniaRequest', task.request_id))
self.add_to_list(request_key, 'task_ids', task.id)
request_last_update = datetime.strptime(self.get_attribute(
request_key, 'last_update', decode_json=False).decode(), DATETIME_FORMAT)
last_update = max(request_last_update, task.last_update).strftime(
DATETIME_FORMAT)
request_key, 'last_update'), DATETIME_FORMAT)
try:
last_update = json.dumps(max(request_last_update, task.last_update).strftime(
DATETIME_FORMAT))
except redis.RedisError as exception:
error_message = f'Error encoding key {request_key} in Redis'
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception
self.set_attribute(request_key, 'last_update',last_update)
statuses_to_remove = ['succesful_tasks', 'failed_tasks','running_tasks', 'queued_tasks']
# 'successful' could be None or False, which means different things.
# If False, the task has failed, If None, could be queued or running.
if hasattr(task, 'succesful'):
if task.successful:
self.add_to_list(request_key, 'succesful_tasks', task.id)
statuses_to_remove.remove('succesful_tasks')
if task.successful is False:
self.add_to_list(request_key, 'failed_tasks', task.id)
statuses_to_remove.remove('failed_tasks')
elif task.successful is None:
if task.status:
if 'running' in task.status:
self.add_to_list(request_key, 'running_tasks', task.id)
statuses_to_remove.remove('running_tasks')
else:
# 'successful' is None and 'status' is None
self.add_to_list(request_key, 'running_tasks', task.id)
self.add_to_list(request_key, 'queued_tasks', task.id)
statuses_to_remove.remove('queued_tasks')
for status_name in statuses_to_remove:
self.remove_from_list(request_key, status_name, task.id)

def write_new_task(self, task):
"""Writes task into redis.
Expand Down Expand Up @@ -344,6 +356,7 @@ def update_task(self, task):
if not task_key:
self.write_new_task(task)
return
self.update_request_task(task)
stored_task_dict = self.get_task(task_key)
stored_evidence_size = stored_task_dict.get('evidence_size')
stored_evidence_id = stored_task_dict.get('evidence_id')
Expand All @@ -354,7 +367,6 @@ def update_task(self, task):
log.info(f'Updating task {task.name:s} in Redis')
task_dict = self.format_task(task)
self.write_hash_object(task_key, task_dict)

return task_key

def set_attribute(
Expand Down Expand Up @@ -462,13 +474,13 @@ def iterate_attributes(self, key: str) -> Iterator[tuple]:
error_message = f'Error getting attributes from {key} in Redis'
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception
try:
for attribute in attributes:
for attribute in attributes:
try:
yield (attribute[0].decode(), json.loads(attribute[1]))
except (TypeError, ValueError) as exception:
error_message = f'Error decoding attribute in {key} in Redis'
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception
except (TypeError, ValueError) as exception:
error_message = f'Error decoding {attribute} in {key} in Redis'
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception

def key_exists(self, redis_key) -> bool:
"""Checks if the key is saved in Redis.
Expand Down Expand Up @@ -538,23 +550,41 @@ def add_to_list(self, redis_key, list_name, new_item, allow_repeated=False):
list_name (str): Name of the list attribute.
new_item (Any): Item to be saved.
repeated (bool): Allows repeated items to be saved.
Returns:
redis_key (str): The key corresponding to the object in Redis
"""
if not self.attribute_exists(redis_key, list_name):
list_attribute = [new_item]
else:
list_attribute = self.get_attribute(redis_key, list_name)
if new_item not in list_attribute and not allow_repeated:
list_attribute.append(new_item)
try:
list_attribute = self.get_attribute(redis_key, list_name)
if new_item not in list_attribute and not allow_repeated:
list_attribute.append(new_item)
self.set_attribute(redis_key, list_name, json.dumps(list_attribute))
except (TypeError, ValueError) as exception:
error_message = (
f'Error encoding list {list_attribute} from {redis_key} in Redis')
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception
self.set_attribute(redis_key, list_name, json.dumps(list_attribute))

def remove_from_list(self, redis_key, list_name, item):
"""Removes an item from a list attribute in a hashed Redis object.
Args:
redis_key (str): Key of the Redis object.
list_name (str): Name of the list attribute.
item (Any): Item to be removed.
"""
if not self.attribute_exists(redis_key, list_name):
return
list_attribute = self.get_attribute(redis_key, list_name)
if item in list_attribute:
list_attribute.remove(item)
try:
self.set_attribute(redis_key, list_name, json.dumps(list_attribute))
except (TypeError, ValueError) as exception:
error_message = (
f'Error encoding list {list_attribute} from {redis_key} in Redis')
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception

def write_hash_object(self, redis_key, object_dict):
"""Writes new hash object into redis. To save storage, the function does not
Expand Down Expand Up @@ -732,10 +762,16 @@ def write_request(self, request_dict: dict, update=False):
error_message = 'Error deserializing request attribute.'
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception
if not request_dict.get('last_update'):
request_dict['start_time'] = datetime.now().strftime(DATETIME_FORMAT)
if not request_dict.get('last_update'):
request_dict['last_update'] = datetime.now().strftime(DATETIME_FORMAT)
try:
if not request_dict.get('last_update'):
request_dict['start_time'] = json.dumps(datetime.now().strftime(DATETIME_FORMAT))
if not request_dict.get('last_update'):
request_dict['last_update'] = json.dumps(datetime.now().strftime(DATETIME_FORMAT))
request_dict['status'] = json.dumps(f'Task scheduled at {datetime.now().strftime(DATETIME_FORMAT)}')
except redis.RedisError as exception:
error_message = f'Error encoding key {request_key} in Redis'
log.error(f'{error_message}: {exception}')
raise TurbiniaException(error_message) from exception
# Either updates or write new key
if update == self.key_exists(request_key):
self.write_hash_object(request_key, request_dict)
Expand Down

0 comments on commit e40d212

Please sign in to comment.