Skip to content

Commit

Permalink
review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Jul 16, 2024
1 parent 0ecb32c commit 5079331
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 23 deletions.
71 changes: 55 additions & 16 deletions fgpyo/fasta/sequence_dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
Create a `pysam.AlignmentHeader` from a sequence dictionary:
>>> sd.to_sam()
>>> sd.to_sam_header()
<pysam.libcalignmentfile.AlignmentHeader object at 0x10e93f5f0>
>>> print(sd.to_sam())
>>> print(sd.to_sam_header())
@HD VN:1.5
@SQ SN:chr1 LN:101 AN:foo,bar,car
@SQ SN:chr2 LN:101
Expand All @@ -82,11 +82,11 @@
Create a `pysam.AlignmentHeader` from a sequence dictionary with extra header items:
>>> sd.to_sam(
>>> sd.to_sam_header(
... extra_header={"RG": [{"ID": "A", "LB": "a-library"}, {"ID": "B", "LB": "b-library"}]}
... )
<pysam.libcalignmentfile.AlignmentHeader object at 0x10e93fe30>
>>> print(sd.to_sam(
>>> print(sd.to_sam_header(
... extra_header={"RG": [{"ID": "A", "LB": "a-library"}, {"ID": "B", "LB": "b-library"}]}
... ))
@HD VN:1.5
Expand Down Expand Up @@ -194,13 +194,17 @@ def parse(value: str) -> "AlternateLocus":


@dataclass(frozen=True, init=True)
class SequenceMetadata(MutableMapping[str, str]):
class SequenceMetadata(MutableMapping[Union[Keys, str], str]):
"""Stores information about a single Sequence (ex. chromosome, contig).
Implements the mutable mapping interface, which provide access to the attributes of this
sequence, excluding name, length, and index.
sequence, including name, length, but not index. The length will be an `str` when accessing
via `get`; access the length directly or use `len`.
The length method returns the length of the sequence, not the length of the attributes.
The `len` method returns the length of the sequence, not the length of the attributes.
All attributes except name and length may be set. Use `dataclasses.replace` to create a new
copy in such cases.
Attributes:
name: the primary name of the sequence
Expand All @@ -212,7 +216,7 @@ class SequenceMetadata(MutableMapping[str, str]):
name: str
length: int
index: int
attributes: Dict[str, str] = field(default_factory=dict)
attributes: Dict[Union[Keys, str], str] = field(default_factory=dict)

def __post_init__(self) -> None:
"""Any post initialization validation should go here"""
Expand Down Expand Up @@ -327,17 +331,25 @@ def from_sam(meta: Dict[Union[Keys, str], Any], index: int) -> "SequenceMetadata
del attributes[Keys.SEQUENCE_LENGTH]
return SequenceMetadata(name=name, length=length, index=index, attributes=attributes)

def __getitem__(self, key: str) -> str:
def __getitem__(self, key: Union[Keys, str]) -> Any:
if key == Keys.SEQUENCE_NAME.value:
return self.name
elif key == Keys.SEQUENCE_LENGTH.value:
return f"{self.length}"
return self.attributes[key]

def __setitem__(self, key: str, value: str) -> None:
def __setitem__(self, key: Union[Keys, str], value: str) -> None:
if key == Keys.SEQUENCE_NAME or key == Keys.SEQUENCE_LENGTH:
raise KeyError(f"Cannot set '{key}' on SequenceMetadata with name '{self.name}'")
self.attributes[key] = value

def __delitem__(self, key: str) -> None:
def __delitem__(self, key: Union[Keys, str]) -> None:
if key == Keys.SEQUENCE_NAME or key == Keys.SEQUENCE_LENGTH:
raise KeyError(f"Cannot delete '{key}' on SequenceMetadata with name '{self.name}'")
del self.attributes[key]

def __iter__(self) -> Iterator[str]:
return iter(self.attributes)
def __iter__(self) -> Iterator[Union[Keys, str]]:
return iter([Keys.SEQUENCE_NAME, Keys.SEQUENCE_LENGTH] + list(self.attributes))

def __len__(self) -> int:
return self.length
Expand All @@ -353,6 +365,13 @@ def __index__(self) -> int:
class SequenceDictionary(Mapping[Union[str, int], SequenceMetadata]):
"""Contains an ordered collection of sequences.
A specific `SequenceMetadata` may be retrieved by name (`str`) or index (`int`), either by
using the generic `get` method or by the correspondingly named `by_name` and `by_index` methods.
The latter methods provide faster retrieval when the type is known.
This _mapping_ collection iterates over the _keys_. To iterate over each `SequenceMetadata`,
either use the typical `values()` method or access the metadata directly with `infos`.
Attributes:
infos: the ordered collection of sequence metadata
"""
Expand Down Expand Up @@ -382,7 +401,11 @@ def same_as(self, other: "SequenceDictionary") -> bool:
return False

Check warning on line 401 in fgpyo/fasta/sequence_dictionary.py

View check run for this annotation

Codecov / codecov/patch

fgpyo/fasta/sequence_dictionary.py#L401

Added line #L401 was not covered by tests
return all(this.same_as(that) for this, that in zip(self.infos, other.infos))

def to_sam(
def to_sam(self) -> List[Dict[str, Any]]:
"""Converts the list of dictionaries, one per sequence."""
return [meta.to_sam() for meta in self.infos]

def to_sam_header(
self,
extra_header: Optional[Dict[str, Any]] = None,
) -> pysam.AlignmentHeader:
Expand All @@ -394,7 +417,7 @@ def to_sam(
"""
header_dict: Dict[str, Any] = {
"HD": {"VN": "1.5"},
"SQ": [meta.to_sam() for meta in self.infos],
"SQ": self.to_sam(),
}
if extra_header is not None:
header_dict = {**header_dict, **extra_header}
Expand All @@ -412,6 +435,8 @@ def from_sam(header: List[Dict[str, Any]]) -> "SequenceDictionary": ...
def from_sam(
header: Union[pysam.AlignmentHeader, List[Dict[str, Any]]],
) -> "SequenceDictionary":
"""Creates a `SequenceDictionary` from either a `pysam.AlignmentHeader` or from
the list of sequences returned by `pysam.AlignmentHeader#to_dict()["SQ"]`."""
if isinstance(header, pysam.AlignmentHeader):
return SequenceDictionary.from_sam(header=header.to_dict()["SQ"])

Expand All @@ -421,7 +446,7 @@ def from_sam(

return SequenceDictionary(infos=infos)

# FIXME: mypyp doesn't like these
# TODO: mypyp doesn't like these
# @overload
# def __getitem__(self, key: str) -> SequenceMetadata: ...
#
Expand All @@ -431,6 +456,20 @@ def from_sam(
def __getitem__(self, key: Union[str, int]) -> SequenceMetadata:
return self._dict[key] if isinstance(key, str) else self.infos[key]

def get_by_name(self, name: str) -> Optional[SequenceMetadata]:
"""Gets a `SequenceMetadata` explicitly by `name`. Returns None if
the name does not exist in this dictionary"""
return self._dict.get(name)

Check warning on line 462 in fgpyo/fasta/sequence_dictionary.py

View check run for this annotation

Codecov / codecov/patch

fgpyo/fasta/sequence_dictionary.py#L462

Added line #L462 was not covered by tests

def by_name(self, name: str) -> SequenceMetadata:
"""Gets a `SequenceMetadata` explicitly by `name`. The name must exist."""
return self._dict[name]

Check warning on line 466 in fgpyo/fasta/sequence_dictionary.py

View check run for this annotation

Codecov / codecov/patch

fgpyo/fasta/sequence_dictionary.py#L466

Added line #L466 was not covered by tests

def by_index(self, index: int) -> SequenceMetadata:
"""Gets a `SequenceMetadata` explicitly by `name`. Raises an `IndexError`
if the index is out of bounds."""
return self.infos[index]

Check warning on line 471 in fgpyo/fasta/sequence_dictionary.py

View check run for this annotation

Codecov / codecov/patch

fgpyo/fasta/sequence_dictionary.py#L471

Added line #L471 was not covered by tests

def __iter__(self) -> Iterator[str]:
return iter(self._dict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ def test_sequence_metadata_alternate() -> None:

def test_sequence_metadata_keys() -> None:
meta = SequenceMetadata(name="1", length=1, index=0)
for key in Keys:
for key in Keys.attributes():
assert meta.get(key) is None, f"key: {key} meta: {meta}"
assert meta[Keys.SEQUENCE_NAME] == "1"
assert meta[Keys.SEQUENCE_LENGTH] == "1"
assert meta.length == 1
assert len(meta) == 1

valid_attributes = Keys.attributes()
attributes = {key: f"value-{key}" for key in valid_attributes}
Expand All @@ -106,8 +110,10 @@ def test_sequence_metadata_keys() -> None:
if key in valid_attributes:
assert meta.get(key) is not None, f"key: {key} meta: {meta}"
assert meta[key] == attributes[key]
else:
assert meta.get(key) is None, f"key: {key} meta: {meta}"
assert meta[Keys.SEQUENCE_NAME] == "1"
assert meta[Keys.SEQUENCE_LENGTH] == "1"
assert meta.length == 1
assert len(meta) == 1

assert meta.md5 == attributes[Keys.MD5]
assert meta.assembly == attributes[Keys.ASSEMBLY]
Expand Down Expand Up @@ -219,17 +225,28 @@ def test_sequence_dictionary_mutable_mapping() -> None:
# __setitem__
meta[Keys.MD5] = "foo"
assert meta[Keys.MD5] == "foo"
for key in [Keys.SEQUENCE_NAME, Keys.SEQUENCE_LENGTH]:
with pytest.raises(KeyError, match="Cannot set"):
meta[key] = "foo"
# __delitem__
del meta[Keys.MD5]
assert Keys.MD5 not in meta
assert meta.get(Keys.MD5) is None
for key in [Keys.SEQUENCE_NAME, Keys.SEQUENCE_LENGTH]:
with pytest.raises(KeyError, match="Cannot delete"):
del meta[key]
# __iter__
assert list(meta) == [Keys.ALIASES, Keys.ASSEMBLY]
assert list(meta) == [Keys.SEQUENCE_NAME, Keys.SEQUENCE_LENGTH, Keys.ALIASES, Keys.ASSEMBLY]
# __len__
assert len(meta) == 123
# other methods
assert list(meta.values()) == ["2", "as"]
assert list(meta.items()) == [(Keys.ALIASES, "2"), (Keys.ASSEMBLY, "as")]
assert list(meta.values()) == ["1", "123", "2", "as"]
assert list(meta.items()) == [
(Keys.SEQUENCE_NAME, "1"),
(Keys.SEQUENCE_LENGTH, "123"),
(Keys.ALIASES, "2"),
(Keys.ASSEMBLY, "as"),
]


@pytest.mark.parametrize(
Expand Down Expand Up @@ -319,7 +336,7 @@ def test_sequence_dictionary_to_and_from_sam() -> None:

assert SequenceDictionary.from_sam(mapping) == sd
assert SequenceDictionary.from_sam(header) == sd
assert sd.to_sam(extra_header={"RG": [{"ID": "foo"}]})
assert sd.to_sam_header(extra_header={"RG": [{"ID": "foo"}]})


def test_sequence_dictionary_mapping() -> None:
Expand Down

0 comments on commit 5079331

Please sign in to comment.