Skip to content

Commit

Permalink
feat(#3338): Add schema for ALB table
Browse files Browse the repository at this point in the history
  • Loading branch information
nico-stefani committed Jan 9, 2023
1 parent f5eef7a commit 0fdddf3
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions deps/wazuh_testing/wazuh_testing/modules/aws/db_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
import sqlite3
from collections import namedtuple
from pathlib import Path
from typing import Iterator
from typing import Iterator, Type

from .constants import S3_CLOUDTRAIL_DB_PATH
from .constants import S3_CLOUDTRAIL_DB_PATH, CLOUD_TRAIL_TYPE, ALB_TYPE

SELECT_QUERY_TEMPLATE = 'SELECT * FROM {table_name}'

S3CloudTrailRow = namedtuple(
'S3CloudTrailRow', 'bucket_path aws_account_id aws_region log_key processed_date created_date'
)

S3ALBFlowRow = namedtuple(
'S3ALBFlowRow', 'bucket_path aws_account_id log_key processed_date created_date'
)

s3_rows_map = {
CLOUD_TRAIL_TYPE: S3CloudTrailRow,
ALB_TYPE: S3ALBFlowRow
}


def _get_s3_row_type(bucket_type: str) -> Type[S3CloudTrailRow]:
return s3_rows_map.get(bucket_type, S3CloudTrailRow)


def get_db_connection(path: Path) -> sqlite3.Connection:
return sqlite3.connect(path)
Expand Down Expand Up @@ -45,8 +58,9 @@ def get_s3_db_row(table_name: str) -> S3CloudTrailRow:
connection = get_db_connection(S3_CLOUDTRAIL_DB_PATH)
cursor = connection.cursor()
result = cursor.execute(SELECT_QUERY_TEMPLATE.format(table_name=table_name)).fetchone()
row_type = _get_s3_row_type(table_name)

return S3CloudTrailRow(*result)
return row_type(*result)


def get_multiple_s3_db_row(table_name: str) -> Iterator[S3CloudTrailRow]:
Expand All @@ -60,9 +74,10 @@ def get_multiple_s3_db_row(table_name: str) -> Iterator[S3CloudTrailRow]:
"""
connection = get_db_connection(S3_CLOUDTRAIL_DB_PATH)
cursor = connection.cursor()
row_type = _get_s3_row_type(table_name)

for row in cursor.execute(SELECT_QUERY_TEMPLATE.format(table_name=table_name)):
yield S3CloudTrailRow(*row)
yield row_type(*row)


def table_exists(table_name: str) -> bool:
Expand Down

0 comments on commit 0fdddf3

Please sign in to comment.