Skip to content

Commit

Permalink
Add: Draft new implementation for GMP protocol classes base on the co…
Browse files Browse the repository at this point in the history
…re module

Use the core module to re-implement the current GMP protocol classes.
  • Loading branch information
bjoernricks authored and greenbonebot committed Jun 14, 2024
1 parent a65c046 commit d00f702
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 31 deletions.
12 changes: 12 additions & 0 deletions gvm/protocols/gmp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from ._gmp import GMP

Gmp = GMP # for backwards compatibility

__all__ = (
"GMP",
"Gmp",
)
53 changes: 22 additions & 31 deletions gvm/protocols/gmp.py → gvm/protocols/gmp/_gmp.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
# SPDX-FileCopyrightText: 2019-2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

"""
Module for communication with gvmd
"""
from types import TracebackType
from typing import Any, Callable, Optional, Type, Union
from typing import Callable, Optional, Type, Union

from gvm.connections import GvmConnection
from gvm.errors import GvmError
from gvm.protocols.base import GvmConnection, GvmProtocol
from gvm.protocols.gmpv208 import Gmp as Gmpv208
from gvm.protocols.gmpv214 import Gmp as Gmpv214
from gvm.protocols.gmpv224 import Gmp as Gmpv224
from gvm.protocols.gmpv225 import Gmp as Gmpv225
from gvm.transforms import EtreeCheckCommandTransform
from gvm.xml import XmlCommand

SUPPORTED_GMP_VERSIONS = Union[ # pylint: disable=invalid-name
Gmpv208, Gmpv214, Gmpv224, Gmpv225
]
from .._protocol import GvmProtocol, T, str_transform
from ._gmp224 import GMPv224
from ._gmp225 import GMPv225
from .core import Response
from .core.requests import Version

SUPPORTED_GMP_VERSIONS = Union[GMPv224, GMPv225]


class Gmp(GvmProtocol):
class GMP(GvmProtocol[T]):
"""Dynamically select supported GMP protocol of the remote manager daemon.
Must be used as a `Context Manager
Expand Down Expand Up @@ -59,19 +53,18 @@ def __init__(
self,
connection: GvmConnection,
*,
transform: Optional[Callable[[str], Any]] = None,
transform: Callable[[Response], T] = str_transform, # type: ignore[assignment] # this should work with mypy 1.9.0 without an ignore
):
super().__init__(connection, transform=EtreeCheckCommandTransform())
self._gmp_transform = transform
super().__init__(connection, transform=transform)

def determine_remote_gmp_version(self) -> str:
"""Determine the supported GMP version of the remote daemon"""
self.connect()
resp = self._send_xml_command(XmlCommand("get_version"))
resp = self._send_command(Version.get_version())
self.disconnect()

version_el = resp.find("version")
if version_el is None:
version_el = resp.xml().find("version")
if version_el is None or not version_el.text:
raise GvmError(
"Invalid response from manager daemon while requesting the "
"version information."
Expand All @@ -86,21 +79,19 @@ def determine_supported_gmp(self) -> SUPPORTED_GMP_VERSIONS:
version_str = self.determine_remote_gmp_version().split(".", 1)
major_version = int(version_str[0])
minor_version = int(version_str[1])
if major_version == 20:
gmp_class = Gmpv208
elif major_version == 21 and minor_version == 4:
gmp_class = Gmpv214
elif major_version == 22 and minor_version == 4:
gmp_class = Gmpv224
# if major_version == 22 and minor_version == 4:
# gmp_class = Gmpv224
if major_version == 22 and minor_version == 4:
gmp_class = GMPv224
elif major_version == 22 and minor_version == 5:
gmp_class = Gmpv225
gmp_class = GMPv225
else:
raise GvmError(
"Remote manager daemon uses an unsupported version of GMP. "
f"The GMP version was {major_version}.{minor_version}"
)

return gmp_class(self._connection, transform=self._gmp_transform)
return gmp_class(self._connection, transform=self._transform_callable)

def __enter__(self):
self._gmp = self.determine_supported_gmp()
Expand All @@ -114,6 +105,6 @@ def __exit__(
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Any:
) -> None:
self._gmp.disconnect()
self._gmp = None
149 changes: 149 additions & 0 deletions gvm/protocols/gmp/_gmp224.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import Optional, Union

from .._protocol import GvmProtocol, T
from .core.requests import (
Authentication,
PortList,
PortRangeType,
Version,
)


class GMPv224(GvmProtocol[T]):
_authenticated = False

def is_authenticated(self) -> bool:
"""Checks if the user is authenticated
If the user is authenticated privileged GMP commands like get_tasks
may be send to gvmd.
Returns:
bool: True if an authenticated connection to gvmd has been
established.
"""
return self._authenticated

def authenticate(self, username: str, password: str) -> T:
"""Authenticate to gvmd.
The generated authenticate command will be send to server.
Afterwards the response is read, transformed and returned.
Arguments:
username: Username
password: Password
"""
response = self._send_command(
Authentication.authenticate(username=username, password=password)
)

if response.is_success:
self._authenticated = True

return self._transform(response)

def describe_auth(self) -> T:
"""Describe authentication methods
Returns a list of all used authentication methods if such a list is
available.
Returns:
The response. See :py:meth:`send_command` for details.
"""
return self._send_and_transform_command(Authentication.describe_auth())

def modify_auth(
self, group_name: str, auth_conf_settings: dict[str, str]
) -> T:
"""Modifies an existing auth.
Arguments:
group_name: Name of the group to be modified.
auth_conf_settings: The new auth config.
"""
return self._send_and_transform_command(
Authentication.modify_auth(group_name, auth_conf_settings)
)

def get_version(self) -> T:
return self._send_and_transform_command(Version.get_version())

def clone_port_list(self, port_list_id: str) -> T:
return self._send_and_transform_command(
PortList.clone_port_list(port_list_id)
)

def create_port_list(
self, name: str, port_range: str, *, comment: Optional[str] = None
) -> T:
return self._send_and_transform_command(
PortList.create_port_list(name, port_range, comment=comment)
)

def create_port_range(
self,
port_list_id: str,
start: int,
end: int,
port_range_type: Union[str, PortRangeType],
*,
comment: Optional[str] = None,
) -> T:
return self._send_and_transform_command(
PortList.create_port_range(
port_list_id, start, end, port_range_type, comment=comment
)
)

def delete_port_list(
self, port_list_id: str, *, ultimate: bool = False
) -> T:
return self._send_and_transform_command(
PortList.delete_port_list(port_list_id, ultimate=ultimate)
)

def delete_port_range(self, port_range_id: str) -> T:
return self._send_and_transform_command(
PortList.delete_port_range(port_range_id)
)

def get_port_lists(
self,
*,
filter_string: Optional[str] = None,
filter_id: Optional[str] = None,
details: Optional[bool] = None,
targets: Optional[bool] = None,
trash: Optional[bool] = None,
) -> T:
return self._send_and_transform_command(
PortList.get_port_lists(
filter_string=filter_string,
filter_id=filter_id,
details=details,
targets=targets,
trash=trash,
)
)

def get_port_list(self, port_list_id: str) -> T:
return self._send_and_transform_command(
PortList.get_port_list(port_list_id)
)

def modify_port_list(
self,
port_list_id: str,
*,
comment: Optional[str] = None,
name: Optional[str] = None,
) -> T:
return self._send_and_transform_command(
PortList.modify_port_list(port_list_id, comment=comment, name=name)
)
55 changes: 55 additions & 0 deletions gvm/protocols/gmp/_gmp225.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import Optional

from .._protocol import T
from ._gmp224 import GMPv224
from .core.requests import (
ResourceNames,
ResourceType,
)


class GMPv225(GMPv224[T]):
def get_resource_names(
self,
resource_type: ResourceType,
*,
filter_string: Optional[str] = None,
) -> T:
"""Request a list of resource names and IDs
Arguments:
resource_type: Type must be either ALERT, CERT_BUND_ADV,
CONFIG, CPE, CREDENTIAL, CVE, DFN_CERT_ADV, FILTER,
GROUP, HOST, NOTE, NVT, OS, OVERRIDE, PERMISSION,
PORT_LIST, REPORT_FORMAT, REPORT, RESULT, ROLE,
SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE
or USER
filter_string: Filter term to use for the query
"""
return self._send_and_transform_command(
ResourceNames.get_resource_names(
resource_type, filter_string=filter_string
)
)

def get_resource_name(
self, resource_id: str, resource_type: ResourceType
) -> T:
"""Request a single resource name
Arguments:
resource_id: ID of an existing resource
resource_type: Type must be either ALERT, CERT_BUND_ADV,
CONFIG, CPE, CREDENTIAL, CVE, DFN_CERT_ADV, FILTER,
GROUP, HOST, NOTE, NVT, OS, OVERRIDE, PERMISSION,
PORT_LIST, REPORT_FORMAT, REPORT, RESULT, ROLE,
SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE
or USER
"""
return self._send_and_transform_command(
ResourceNames.get_resource_name(resource_id, resource_type)
)

0 comments on commit d00f702

Please sign in to comment.