Skip to content

Commit

Permalink
Add type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
SkypLabs committed Aug 13, 2023
1 parent 1475ad2 commit 27b1992
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 142 deletions.
176 changes: 91 additions & 85 deletions hdlcontroller/hdlcontroller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from queue import Full, Queue
from threading import Event, Lock, Thread
from time import sleep, time
from typing import Callable, Dict, NewType, Union

from yahdlc import (
FRAME_ACK,
Expand All @@ -12,52 +13,56 @@
get_data,
)

SequenceNumber = NewType("SequenceNumber", int)
Timeout = NewType("Timeout", float)

ReadFunction = Callable[[], bytes]
WriteFunction = Callable[[bytes], Union[int, None]]

Callback = Callable[[bytes], None]


class HDLController:
"""
An HDLC controller based on python4yahdlc.
"""

# pylint: disable=too-many-instance-attributes

MAX_SEQ_NO = 8
MIN_SENDING_TIMEOUT = 0.5

def __init__(
self,
read_func,
write_func,
sending_timeout=2,
window=3,
frames_queue_size=0,
fcs_nack=True,
read_func: ReadFunction,
write_func: WriteFunction,
sending_timeout: Timeout = Timeout(2.0),
window: int = 3,
frames_queue_size: int = 0,
fcs_nack: bool = True,
):
# pylint: disable=too-many-arguments
if not callable(read_func):
raise TypeError("'read_func' is not callable")

if not hasattr(read_func, "__call__"):
raise TypeError("The read function parameter is not a callable object")
if not callable(write_func):
raise TypeError("'write_func' is not callable")

if not hasattr(write_func, "__call__"):
raise TypeError("The write function parameter is not a callable object")
self.read: ReadFunction = read_func
self.write: WriteFunction = write_func

self.read = read_func
self.write = write_func
self.window: int = window
self.fcs_nack: bool = fcs_nack
self.senders: Dict[SequenceNumber, HDLController.Sender] = {}
self.send_lock: Lock = Lock()
self.new_seq_no: SequenceNumber = SequenceNumber(0)

self.window = window
self.fcs_nack = fcs_nack
self.senders = {}
self.send_lock = Lock()
self.new_seq_no = 0

self.send_callback = None
self.receive_callback = None
self.send_callback: Union[Callback, None] = None
self.receive_callback: Union[Callback, None] = None

self.set_sending_timeout(sending_timeout)

self.receiver = None
self.frames_received = Queue(maxsize=frames_queue_size)
self.receiver: Union[HDLController.Receiver, None] = None
self.frames_received: Queue = Queue(maxsize=frames_queue_size)

def start(self):
def start(self) -> None:
"""
Starts HDLC controller's threads.
"""
Expand All @@ -74,7 +79,7 @@ def start(self):

self.receiver.start()

def stop(self):
def stop(self) -> None:
"""
Stops HDLC controller's threads.
"""
Expand All @@ -85,7 +90,7 @@ def stop(self):
for sender in self.senders.values():
sender.join()

def set_send_callback(self, callback):
def set_send_callback(self, callback: Callback) -> None:
"""
Sets the send callback function.
Expand All @@ -94,39 +99,39 @@ def set_send_callback(self, callback):
sent.
"""

if not hasattr(callback, "__call__"):
raise TypeError("The callback function parameter is not a callable object")
if not callable(callback):
raise TypeError("'callback' is not callable")

self.send_callback = callback

def set_receive_callback(self, callback):
def set_receive_callback(self, callback: Callback) -> None:
"""
Sets the receive callback function.
This method has to be called before starting the HDLC controller.
"""

if not hasattr(callback, "__call__"):
raise TypeError("The callback function parameter is not a callable object")
if not callable(callback):
raise TypeError("'callback' is not callable")

self.receive_callback = callback

def set_sending_timeout(self, sending_timeout):
def set_sending_timeout(self, sending_timeout: Timeout) -> None:
"""
Sets the sending timeout.
"""

if sending_timeout >= HDLController.MIN_SENDING_TIMEOUT:
self.sending_timeout = sending_timeout

def get_senders_number(self):
def get_senders_number(self) -> int:
"""
Returns the number of active senders.
"""

return len(self.senders)

def send(self, data):
def send(self, data: bytes) -> None:
"""
Sends a new data frame.
Expand All @@ -147,9 +152,11 @@ def send(self, data):
)

self.senders[self.new_seq_no].start()
self.new_seq_no = (self.new_seq_no + 1) % HDLController.MAX_SEQ_NO
self.new_seq_no = SequenceNumber(
(self.new_seq_no + 1) % HDLController.MAX_SEQ_NO
)

def get_data(self):
def get_data(self) -> bytes:
"""
Gets the next frame received.
Expand All @@ -164,34 +171,38 @@ class Sender(Thread):
"""

def __init__(
self, write_func, send_lock, data, seq_no, timeout=2, callback=None
self,
write_func: WriteFunction,
send_lock: Lock,
data: bytes,
seq_no: SequenceNumber,
timeout: Timeout = Timeout(2.0),
callback: Union[Callback, None] = None,
):
# pylint: disable=too-many-arguments

super().__init__()
self.write = write_func
self.send_lock = send_lock
self.data = data
self.seq_no = seq_no
self.timeout = timeout
self.callback = callback

self.stop_sender = Event()
self.stop_timeout = Event()
self.next_timeout = 0

def run(self):
self.write: WriteFunction = write_func
self.send_lock: Lock = send_lock
self.data: bytes = data
self.seq_no: SequenceNumber = seq_no
self.timeout: Timeout = timeout
self.callback: Union[Callback, None] = callback

self.stop_sender: Event = Event()
self.stop_timeout: Event = Event()
self.next_timeout: Timeout = Timeout(0.0)

def run(self) -> None:
while not self.stop_sender.isSet():
self.stop_timeout.wait(max(0, self.next_timeout - time()))
self.stop_timeout.clear()

if not self.stop_sender.isSet():
self.next_timeout = time() + self.timeout
self.next_timeout = Timeout(time() + self.timeout)

with self.send_lock:
self.__send_data()

def join(self, timeout=None):
def join(self, timeout: Union[Timeout, None] = None) -> None:
"""
Stops the current thread.
"""
Expand All @@ -200,23 +211,23 @@ def join(self, timeout=None):
self.stop_timeout.set()
super().join(timeout)

def ack_received(self):
def ack_received(self) -> None:
"""
Informs the sender that the related ACK frame has been received.
As a consequence, the current thread is being stopped.
"""

self.join()

def nack_received(self):
def nack_received(self) -> None:
"""
Informs the sender that an NACK frame has been received. As a
Informs the sender that an NACK frame has been received. As a
consequence, the data frame is being resent.
"""

self.stop_timeout.set()

def __send_data(self):
def __send_data(self) -> None:
"""
Sends a new data frame.
"""
Expand All @@ -233,30 +244,26 @@ class Receiver(Thread):

def __init__(
self,
read_func,
write_func,
send_lock,
senders_list,
frames_received,
callback=None,
fcs_nack=True,
read_func: ReadFunction,
write_func: WriteFunction,
send_lock: Lock,
senders_list: Dict[SequenceNumber, "HDLController.Sender"],
frames_received: Queue,
callback: Union[Callback, None] = None,
fcs_nack: bool = True,
):
# pylint: disable=too-many-arguments

super().__init__()
self.read = read_func
self.write = write_func
self.send_lock = send_lock
self.senders = senders_list
self.frames_received = frames_received
self.callback = callback
self.fcs_nack = fcs_nack
self.read: ReadFunction = read_func
self.write: WriteFunction = write_func
self.send_lock: Lock = send_lock
self.senders: Dict[SequenceNumber, "HDLController.Sender"] = senders_list
self.frames_received: Queue = frames_received
self.callback: Union[Callback, None] = callback
self.fcs_nack: bool = fcs_nack

self.stop_receiver = Event()
self.stop_receiver: Event = Event()

def run(self):
# pylint: disable=too-many-branches

while not self.stop_receiver.isSet():
try:
data, ftype, seq_no = get_data(self.read())
Expand All @@ -283,12 +290,11 @@ def run(self):
# Drops bad (N)ACKs.
pass
except Full:
# Drops new data frames when the receive queue
# is full.
# Drops new data frames when the receive queue is full.
pass
except FCSError as err:
# Sends back an NACK if a corrupted frame is received
# and if the FCS NACK option is enabled.
# Sends back an NACK if a corrupted frame is received and
# if the FCS NACK option is enabled.
if self.fcs_nack:
with self.send_lock:
self.__send_nack(err.args[0])
Expand All @@ -300,22 +306,22 @@ def run(self):
# 200 µs.
sleep(200 / 1000000.0)

def join(self, timeout=None):
def join(self, timeout: Union[Timeout, None] = None):
"""
Stops the current thread.
"""

self.stop_receiver.set()
super().join(timeout)

def __send_ack(self, seq_no):
def __send_ack(self, seq_no: SequenceNumber):
"""
Sends a new ACK frame.
"""

self.write(frame_data("", FRAME_ACK, seq_no))

def __send_nack(self, seq_no):
def __send_nack(self, seq_no: SequenceNumber):
"""
Sends a new NACK frame.
"""
Expand Down
Loading

0 comments on commit 27b1992

Please sign in to comment.