Skip to content

Commit

Permalink
replace staticmethods with pure functions
Browse files Browse the repository at this point in the history
We should not use static methods as a substitute for pure function

Change-Id: Id2760291027bb0df3c276a5e271617389ff86389
  • Loading branch information
s-kipnis committed Jul 7, 2023
1 parent 7e9d7e7 commit 579f698
Showing 1 changed file with 126 additions and 127 deletions.
253 changes: 126 additions & 127 deletions cmk/active_checks/check_disk_smb.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ def __call__(
ip_address: str | None = None,
configfile: str | None = None,
) -> ErrorResult | SMBShare:
return self._analyse_completed_result(
self._execute_disk_usage_command(
return _analyse_completed_result(
_execute_disk_usage_command(
share=share,
hostname=hostname,
user=user,
Expand All @@ -184,132 +184,131 @@ def __call__(
share,
)

@staticmethod
def _execute_disk_usage_command(
*,
share: str,
hostname: str,
user: str,
password: str,
workgroup: str | None = None,
port: int | None = None,
ip_address: str | None = None,
configfile: str | None = None,
) -> str:
smbclient = shutil.which("smbclient")

cmd = [
smbclient or "/usr/bin/smbclient",
f"//{hostname}/{share}",
"-U",
f"{user}%{password}",
"-c",
"du",
]

if workgroup:
cmd += ["-W", workgroup]
if port:
cmd += ["-p", str(port)]
if configfile:
cmd += ["-s", configfile]
if ip_address:
cmd += ["-I", ip_address]

try:
return subprocess.run(
cmd,
capture_output=True,
encoding="utf8",
check=True,
).stdout

except subprocess.CalledProcessError as e:
return e.stderr or e.stdout

def _analyse_completed_result(
self, completed_result: str, hostname: str, share: str
) -> ErrorResult | SMBShare:
result_lines = self._cleanup_result(completed_result)

for line in result_lines:
if disk_usage := re.search(
r"\s*(\d*) blocks of size (\d*)\. (\d*) blocks available", line
):
# The line matches the regex
return self._extract_data_from_matching_line(
block_count=int(disk_usage[1]),
block_size=int(disk_usage[2]),
available_blocks=int(disk_usage[3]),
hostname=hostname,
share=share,
)

# No line matches the regex
return self._extract_data_from_not_matching_lines(result_lines, hostname, share)

@staticmethod
def _cleanup_result(completed_result: str) -> Sequence[str]:
# Remove \t and split on \n
result_lines = completed_result.replace("\t", "").splitlines()

if len(result_lines) == 1: # If there is an error, there will be only one sentence.
return result_lines
return [
line for line in result_lines[:-1] if line
] # We don't need the last line and empty lines

@staticmethod
def _extract_data_from_matching_line(
*,
block_count: int,
block_size: int,
available_blocks: int,
hostname: str,
share: str,
) -> SMBShare:
"""
>>> _SMBShareDiskUsage._extract_data_from_matching_line(block_count=100, block_size=1024, available_blocks=50, hostname="hostname", share="share")
SMBShare(mountpoint='\\\\\\\\hostname\\\\share', total_bytes=102400, available_bytes=51200)
"""

return SMBShare(
mountpoint=f"\\\\{hostname}\\{share}",
total_bytes=block_size * block_count,
available_bytes=block_size * available_blocks,
)

@staticmethod
def _extract_data_from_not_matching_lines(
result_lines: Sequence[str], hostname: str, share: str
) -> ErrorResult:
"""
>>> _SMBShareDiskUsage._extract_data_from_not_matching_lines(["session setup failed: NT_STATUS_LOGON_FAILURE"], "hostname", "share")
ErrorResult(state=2, summary='Access Denied')
>>> _SMBShareDiskUsage._extract_data_from_not_matching_lines(["do_connect: Connection to 192.168.0.11 failed (Error NT_STATUS_HOST_UNREACHABLE)"], "hostname", "share")
ErrorResult(state=2, summary='Connection to 192.168.0.11 failed')
>>> _SMBShareDiskUsage._extract_data_from_not_matching_lines(["tree connect failed: NT_STATUS_BAD_NETWORK_NAME"], "hostname", "share")
ErrorResult(state=2, summary='Invalid share name \\\\\\\\hostname\\\\share')
>>> _SMBShareDiskUsage._extract_data_from_not_matching_lines(["some other error"], "hostname", "share")
ErrorResult(state=3, summary='Result from smbclient not suitable')
"""
# Default case
state, summary = 3, "Result from smbclient not suitable"

for line in result_lines:
# access denied or logon failure
if re.search(r"(Access denied|NT_STATUS_LOGON_FAILURE|NT_STATUS_ACCESS_DENIED)", line):
return ErrorResult(2, "Access Denied")

# unknown host or connection failure
if (error := re.search(r"(Unknown host \w*|Connection.*failed)", line)) is not None:
return ErrorResult(2, error[0])

# invalid share name
if re.search(r"(You specified an invalid share name|NT_STATUS_BAD_NETWORK_NAME)", line):
return ErrorResult(2, f"Invalid share name \\\\{hostname}\\{share}")

return ErrorResult(state, summary)
def _execute_disk_usage_command(
*,
share: str,
hostname: str,
user: str,
password: str,
workgroup: str | None = None,
port: int | None = None,
ip_address: str | None = None,
configfile: str | None = None,
) -> str:
smbclient = shutil.which("smbclient")

cmd = [
smbclient or "/usr/bin/smbclient",
f"//{hostname}/{share}",
"-U",
f"{user}%{password}",
"-c",
"du",
]

if workgroup:
cmd += ["-W", workgroup]
if port:
cmd += ["-p", str(port)]
if configfile:
cmd += ["-s", configfile]
if ip_address:
cmd += ["-I", ip_address]

try:
return subprocess.run(
cmd,
capture_output=True,
encoding="utf8",
check=True,
).stdout

except subprocess.CalledProcessError as e:
return e.stderr or e.stdout


def _analyse_completed_result(
completed_result: str, hostname: str, share: str
) -> ErrorResult | SMBShare:
result_lines = _cleanup_result(completed_result)

for line in result_lines:
if disk_usage := re.search(r"\s*(\d*) blocks of size (\d*)\. (\d*) blocks available", line):
# The line matches the regex
return _extract_data_from_matching_line(
block_count=int(disk_usage[1]),
block_size=int(disk_usage[2]),
available_blocks=int(disk_usage[3]),
hostname=hostname,
share=share,
)

# No line matches the regex
return _extract_data_from_not_matching_lines(result_lines, hostname, share)


def _cleanup_result(completed_result: str) -> Sequence[str]:
# Remove \t and split on \n
result_lines = completed_result.replace("\t", "").splitlines()

if len(result_lines) == 1: # If there is an error, there will be only one sentence.
return result_lines
return [
line for line in result_lines[:-1] if line
] # We don't need the last line and empty lines


def _extract_data_from_matching_line(
*,
block_count: int,
block_size: int,
available_blocks: int,
hostname: str,
share: str,
) -> SMBShare:
"""
>>> _extract_data_from_matching_line(block_count=100, block_size=1024, available_blocks=50, hostname="hostname", share="share")
SMBShare(mountpoint='\\\\\\\\hostname\\\\share', total_bytes=102400, available_bytes=51200)
"""

return SMBShare(
mountpoint=f"\\\\{hostname}\\{share}",
total_bytes=block_size * block_count,
available_bytes=block_size * available_blocks,
)


def _extract_data_from_not_matching_lines(
result_lines: Sequence[str], hostname: str, share: str
) -> ErrorResult:
"""
>>> _extract_data_from_not_matching_lines(["session setup failed: NT_STATUS_LOGON_FAILURE"], "hostname", "share")
ErrorResult(state=2, summary='Access Denied')
>>> _extract_data_from_not_matching_lines(["do_connect: Connection to 192.168.0.11 failed (Error NT_STATUS_HOST_UNREACHABLE)"], "hostname", "share")
ErrorResult(state=2, summary='Connection to 192.168.0.11 failed')
>>> _extract_data_from_not_matching_lines(["tree connect failed: NT_STATUS_BAD_NETWORK_NAME"], "hostname", "share")
ErrorResult(state=2, summary='Invalid share name \\\\\\\\hostname\\\\share')
>>> _extract_data_from_not_matching_lines(["some other error"], "hostname", "share")
ErrorResult(state=3, summary='Result from smbclient not suitable')
"""
# Default case
state, summary = 3, "Result from smbclient not suitable"

for line in result_lines:
# access denied or logon failure
if re.search(r"(Access denied|NT_STATUS_LOGON_FAILURE|NT_STATUS_ACCESS_DENIED)", line):
return ErrorResult(2, "Access Denied")

# unknown host or connection failure
if (error := re.search(r"(Unknown host \w*|Connection.*failed)", line)) is not None:
return ErrorResult(2, error[0])

# invalid share name
if re.search(r"(You specified an invalid share name|NT_STATUS_BAD_NETWORK_NAME)", line):
return ErrorResult(2, f"Invalid share name \\\\{hostname}\\{share}")

return ErrorResult(state, summary)


def _check_smb_share(
Expand Down

0 comments on commit 579f698

Please sign in to comment.