Skip to content

Commit

Permalink
Add: Add a new GvmProtocol class that will replace the one in gvm.base
Browse files Browse the repository at this point in the history
Changes compared to the gvm.base class:
* Changed send_command into a private method that expects Request
  instances
* Changed the signatures of the transform callable to expect a Response
  instance as argument instead of bytes
* Changed the new GvmProtocol class into a Generic
  • Loading branch information
bjoernricks authored and greenbonebot committed Jun 14, 2024
1 parent 94e3998 commit a65c046
Showing 1 changed file with 126 additions and 0 deletions.
126 changes: 126 additions & 0 deletions gvm/protocols/_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# SPDX-FileCopyrightText: 2019-2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from types import TracebackType
from typing import Callable, Generic, Optional, Type

from typing_extensions import Self, TypeVar

from gvm.connections import GvmConnection

from .gmp.core import Connection, Request, Response

T = TypeVar("T", default=str)


def str_transform(response: Response) -> str:
return str(response)


class GvmProtocol(Generic[T]):
"""Base class for different GVM protocols
Attributes:
connection: Connection to use to talk with the remote daemon. See
:mod:`gvm.connections` for possible connection types.
transform: Optional transform `callable`_ to convert response data.
After each request the callable gets passed the plain response data
which can be used to check the data and/or conversion into different
representations like a xml dom.
"""

def __init__(
self,
connection: GvmConnection,
*,
transform: Callable[[Response], T] = str_transform, # type: ignore[assignment] # this should work with mypy 1.9.0 without an ignore
):
self._connection = connection
self._protocol = Connection()

self._connected = False

self._transform_callable = transform

def __enter__(self) -> Self:
self.connect()
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.disconnect()

def _read(self) -> bytes:
"""Read a command response from gvmd
Returns:
str: Response from server.
"""
return self._connection.read()

def _send(self, data: bytes) -> None:
"""Send a command to the server
Arguments:
data (str): Data to be send over the connection to the server
"""
self.connect()
self._connection.send(data)

def is_connected(self) -> bool:
"""Status of the current connection
Returns:
True if a connection to the remote server has been established.
"""
return self._connected

def connect(self) -> None:
"""Initiates a protocol connection
Normally connect is not called directly. Either it is called
automatically when sending a protocol command or when using a
`with statement`_.
.. _with statement:
https://docs.python.org/3/reference/datamodel.html#with-statement-context-managers
"""
if not self.is_connected():
self._connection.connect()
self._connected = True

def disconnect(self) -> None:
"""Disconnect the connection
Ends and closes the connection.
"""
if self.is_connected():
self._connection.disconnect()
self._connected = False

self._protocol.close()

def _transform(self, response: Response) -> T:
transform = self._transform_callable
return transform(response)

def _send_command(self, cmd: Request) -> Response:
try:
send_data = self._protocol.send(cmd)
self._send(send_data)
response: Optional[Response] = None
while not response:
received_data = self._read()
response = self._protocol.receive_data(received_data)
return response
except Exception as e:
self.disconnect()
raise e

def _send_and_transform_command(self, cmd: Request) -> T:
return self._transform(self._send_command(cmd))

0 comments on commit a65c046

Please sign in to comment.