Skip to content

Commit

Permalink
Start adding tests for orca fc (#16)
Browse files Browse the repository at this point in the history
* avoid error when running tests that don't use the tmpdir

* add option to return hexdump output as list of strings

* add tests of orca_packet.py

* add ability to load packets by index

* add start of tests for orca fc decoding

* check packet counting function

* keep packet_id aligned with loc

* switch to new file and add some more tests

* fix last-packet check

* start to add status packet tests

* eliminate unneccessary build_raw warnings (do late eval)

* add missing include

* style: pre-commit fixes

* fixes for flake8 and dox

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jasondet and pre-commit-ci[bot] authored Oct 23, 2023
1 parent a564e3f commit 18eb650
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 25 deletions.
21 changes: 14 additions & 7 deletions src/daq2lh5/orca/orca_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ def hex_dump(
as_short: bool = False,
id_dict: dict = None,
use_logging: bool = True,
return_output=False,
) -> None:
dump_cmd = print # noqa: T202
if use_logging:
dump_cmd = log.debug

output = []
data_id = get_data_id(packet, shift=shift_data_id)
n_words = get_n_words(packet)
if id_dict is not None:
Expand All @@ -62,9 +60,9 @@ def hex_dump(
else:
heading = f"data ID = {data_id}"
if print_n_words:
dump_cmd(f"{heading}: {n_words} words")
output.append(f"{heading}: {n_words} words")
else:
dump_cmd(f"{heading}:")
output.append(f"{heading}:")
n_to_print = int(np.minimum(n_words, max_words))
pad = int(np.ceil(np.log10(n_to_print)))
for i in range(n_to_print):
Expand All @@ -76,4 +74,13 @@ def hex_dump(
line += f" {packet[i]}"
if as_short:
line += f" {np.frombuffer(packet[i:i+1].tobytes(), dtype='uint16')}"
dump_cmd(line)
output.append(line)

dump_cmd = print # noqa: T202
if use_logging:
dump_cmd = log.debug
for line in output:
dump_cmd(line)

if return_output:
return output
140 changes: 126 additions & 14 deletions src/daq2lh5/orca/orca_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,138 @@ class OrcaStreamer(DataStreamer):
def __init__(self) -> None:
super().__init__()
self.in_stream = None
self.packet_locs = []
self.buffer = np.empty(1024, dtype="uint32") # start with a 4 kB packet buffer
self.header = None
self.header_decoder = OrcaHeaderDecoder()
self.decoder_id_dict = {} # dict of data_id to decoder object
self.rbl_id_dict = {} # dict of RawBufferLists for each data_id
self.missing_decoders = []

def load_packet_header(self) -> np.uint32 | None:
"""Loads the packet header at the current read location into the buffer
and updates internal variables.
"""
pkt_hdr = self.buffer[:1]
n_bytes_read = self.in_stream.readinto(pkt_hdr) # buffer is at least 4 kB long
self.n_bytes_read += n_bytes_read
if n_bytes_read == 0: # EOF
return None
if n_bytes_read != 4:
raise RuntimeError(f"only got {n_bytes_read} bytes for packet header")

# packet is valid. Can set the packet_id and log its location
self.packet_id += 1
filepos = self.in_stream.tell() - n_bytes_read
if self.packet_id < len(self.packet_locs):
if self.packet_locs[self.packet_id] != filepos:
raise RuntimeError(
f"filepos for packet {self.packet_id} was {filepos} but {self.packet_locs[self.packet_id]} was expected"
)
else:
if len(self.packet_locs) != self.packet_id:
raise RuntimeError(
f"loaded packet {self.packet_id} after packet {len(self.packet_locs)-1}"
)
self.packet_locs.append(filepos)

return pkt_hdr

def skip_packet(self, n: int = 1) -> bool:
"""Skip a packets without loading it into the internal buffer.
Requires loading the header. Optionally skips n packets.
Returns
----------
succeeded
returns False if reached EOF, otherwise returns true
"""
if self.in_stream is None:
raise RuntimeError("self.in_stream is None")
if not int(n) >= 0:
raise ValueError(f"n must be a non-negative int, can't be {n}")
n = int(n)
while n > 0:
pkt_hdr = self.load_packet_header()
if pkt_hdr is None:
return False
self.in_stream.seek((orca_packet.get_n_words(pkt_hdr) - 1) * 4, 1)
n -= 1
return True

def build_packet_locs(self, saveloc=True) -> None:
loc = self.in_stream.tell()
pid = self.packet_id
if len(self.packet_locs) > 0:
self.in_stream.seek(self.packet_locs[-1])
self.packet_id = len(self.packet_locs) - 2
while self.skip_packet():
pass # builds the rest of the packet_locs list
if saveloc:
self.in_stream.seek(loc)
self.packet_id = pid

def count_packets(self, saveloc=True) -> None:
self.build_packet_locs(saveloc=saveloc)
return len(self.packet_locs)

# TODO: need to correct for endianness?
def load_packet(self, skip_unknown_ids: bool = False) -> np.uint32 | None:
def load_packet(
self, index: int = None, whence: int = 0, skip_unknown_ids: bool = False
) -> np.uint32 | None:
"""Loads the next packet into the internal buffer.
Returns packet as a :class:`numpy.uint32` view of the buffer (a slice),
returns ``None`` at EOF.
Parameters
----------
index
Optionally give an index of packet to skip to, relative to the
"whence" location. Can be positive or negative. If out-of-range for
the file, None will be returned.
whence
used when an index is supplied. Follows the file.seek() convention:
whence = 0 (default) means index is relative to the beginning of the
file; whence = 1 means index is relative to the current position in
the file; whence = 2 means relative to the end of the file.
Returns
----------
packet
a view of the internal buffer spanning the packet data (uint32
ndarray). If you want to hold on to the packet data while you load
more packets, you can call copy() on the view to make a copy.
"""
if self.in_stream is None:
raise RuntimeError("self.in_stream is None")

# read packet header
pkt_hdr = self.buffer[:1]
n_bytes_read = self.in_stream.readinto(pkt_hdr) # buffer is at least 4 kB long
self.n_bytes_read += n_bytes_read
if n_bytes_read == 0:
if index is not None:
if whence not in [0, 1, 2]:
raise ValueError(f"whence can't be {whence}")
index = int(index)
# convert whence 1 or 2 to whence = 0
if whence == 1: # index is relative to current position
index += self.packet_id - 1
elif whence == 2: # index is relative to end of file
self.build_packet_locs(saveloc=False)
index += len(self.packet_locs) - 2
if index < 0:
self.in_stream.seek(0)
self.packet_id = -1
return None
while index >= len(self.packet_locs):
if not self.skip_packet():
return None
self.in_stream.seek(self.packet_locs[index])
self.packet_id = index - 1

# load packet header
pkt_hdr = self.load_packet_header()
if pkt_hdr is None:
return None
if n_bytes_read != 4:
raise RuntimeError(f"only got {n_bytes_read} bytes for packet header")

# if it's a short packet, we are done
if orca_packet.is_short(pkt_hdr):
Expand All @@ -69,7 +177,6 @@ def load_packet(self, skip_unknown_ids: bool = False) -> np.uint32 | None:
not in self.decoder_id_dict
):
self.in_stream.seek((n_words - 1) * 4, 1)
self.n_bytes_read += (n_words - 1) * 4 # well, we didn't really read it...
return pkt_hdr

# load into buffer, resizing as necessary
Expand Down Expand Up @@ -204,15 +311,17 @@ def open_stream(
"""

self.set_in_stream(stream_name)
self.packet_id = -1

# read in the header
packet = self.load_packet()
if packet is None:
raise RuntimeError(f"no orca data in file {stream_name}")
if orca_packet.get_data_id(packet) != 0:
raise RuntimeError(
f"got data id {orca_packet.get_data_id(packet)} for header"
)

self.packet_id = 0
self.any_full |= self.header_decoder.decode_packet(packet, self.packet_id)
self.header = self.header_decoder.header

Expand Down Expand Up @@ -240,9 +349,7 @@ def open_stream(
name = id_to_dec_name_dict[data_id]
if name not in instantiated_decoders:
if name not in globals():
log.warning(
f"no implementation of {name}, corresponding packets will be skipped"
)
self.missing_decoders.append(data_id)
continue
decoder = globals()[name]
instantiated_decoders[name] = decoder(header=self.header)
Expand Down Expand Up @@ -296,13 +403,18 @@ def read_packet(self) -> bool:
packet = self.load_packet(skip_unknown_ids=True)
if packet is None:
return False
self.packet_id += 1

# look up the data id, decoder, and rbl
data_id = orca_packet.get_data_id(packet, shift=False)
log.debug(
f"packet {self.packet_id}: data_id = {data_id}, decoder = {'None' if data_id not in self.decoder_id_dict else type(self.decoder_id_dict[data_id]).__name__}"
)
if data_id in self.missing_decoders:
name = self.header.get_id_to_decoder_name_dict(shift_data_id=False)[
data_id
]
log.warning(f"no implementation of {name}, packets were skipped")
continue
if data_id in self.rbl_id_dict:
break

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def tmptestdir():


def pytest_sessionfinish(session, exitstatus):
if exitstatus == 0:
if exitstatus == 0 and os.path.exists(_tmptestdir):
shutil.rmtree(_tmptestdir)


Expand Down
2 changes: 1 addition & 1 deletion tests/orca/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
def orca_stream(lgnd_test_data):
orstr = OrcaStreamer()
orstr.open_stream(
lgnd_test_data.get_path("orca/fc/L200-comm-20220519-phy-geds.orca")
lgnd_test_data.get_path("orca/fc/l200-p02-r008-phy-20230113T174010Z.orca")
)
return orstr
40 changes: 40 additions & 0 deletions tests/orca/test_orca_fc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest

from daq2lh5.orca import orca_packet


@pytest.fixture(scope="module")
def fc_packets(orca_stream):
packets = []
packets.append(orca_stream.load_packet(3).copy()) # config
packets.append(orca_stream.load_packet(4).copy()) # status
packets.append(orca_stream.load_packet(13).copy()) # waveform
orca_stream.close_stream() # avoid warning that file is still open
return packets


def test_orfc_config_decoding(orca_stream, fc_packets):
config_packet = fc_packets[0]
assert config_packet is not None

data_id = orca_packet.get_data_id(config_packet)
name = orca_stream.header.get_id_to_decoder_name_dict()[data_id]
assert name == "ORFlashCamListenerConfigDecoder"


def test_orfc_status_decoding(orca_stream, fc_packets):
status_packet = fc_packets[1]
assert status_packet is not None

data_id = orca_packet.get_data_id(status_packet)
name = orca_stream.header.get_id_to_decoder_name_dict()[data_id]
assert name == "ORFlashCamListenerStatusDecoder"


def test_orfc_waveform_decoding(orca_stream, fc_packets):
wf_packet = fc_packets[2]
assert wf_packet is not None

data_id = orca_packet.get_data_id(wf_packet)
name = orca_stream.header.get_id_to_decoder_name_dict()[data_id]
assert name == "ORFlashCamWaveformDecoder"
43 changes: 41 additions & 2 deletions tests/orca/test_orca_packet.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,41 @@
def test_orca_packet_import():
pass
from daq2lh5.orca import orca_packet


def test_orca_packet_funcs(orca_stream):
# The values in this test are particular to the test orca file in
# legend-testdata and may need to be changed if that file is changed

assert orca_stream.count_packets() == 911

packet = orca_stream.load_packet()
assert orca_packet.is_short(packet) is False
assert orca_packet.get_data_id(packet) == 3
assert orca_packet.get_n_words(packet) == 4
assert orca_packet.hex_dump(packet, return_output=True)[-1] == "3 0x63c1977a"

id_dict = orca_stream.header.get_id_to_decoder_name_dict()
seen = []
for ii in range(100):
packet = orca_stream.load_packet(ii)
if packet is None:
break
name = id_dict[orca_packet.get_data_id(packet)]
# if ii < 20: print(ii, name)
if ii == 0:
assert name == "OrcaHeaderDecoder"
if ii == 1:
assert name == "ORRunDecoderForRun"
if ii == 910:
assert name == "ORRunDecoderForRun"
if name not in seen:
seen.append(name)
expected = [
"OrcaHeaderDecoder",
"ORRunDecoderForRun",
"ORFlashCamListenerConfigDecoder",
"ORFlashCamListenerStatusDecoder",
"ORFlashCamWaveformDecoder",
]
assert seen == expected

orca_stream.close_stream() # avoid warning that file is still open

0 comments on commit 18eb650

Please sign in to comment.