-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Ted Brookings
committed
Jul 11, 2023
1 parent
0c17fe0
commit 97ef17c
Showing
3 changed files
with
732 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,369 @@ | ||
""" | ||
Classes for generating VCF and records for testing | ||
-------------------------------------------------- | ||
This module contains utility classes for the generation of VCF files and variant records, for use | ||
in testing. | ||
The module contains the following public classes: | ||
- :class:`~variantbuilder.VariantBuilder` -- A builder class that allows the | ||
accumulation of variant records and access as a list and writing to file. | ||
Examples | ||
~~~~~~~~ | ||
Typically, we have :class:`~cyvcf2.Variant` records obtained from reading from a VCF file. The | ||
:class:`~variantbuilder.VariantBuilder` class builds such records. The variants produced | ||
are from a single-sample. | ||
The preferred usage is within a context manager (i.e. use `with`): | ||
>>> from fgpyo.vcf import VariantBuilder | ||
>>> with VariantBuilder() as builder: | ||
>>> builder.add() # uses the defaults | ||
This enables the automatic release resources used by `VariantBuilder` to create `Variant` records. | ||
One may also use the :func:`~variantbuilder.VariantBuilder.close()` method: | ||
>>> from fgpyo.vcf import VariantBuilder | ||
>>> builder: VariantBuilder = VariantBuilder() | ||
>>> builder.add() # uses the defaults | ||
>>> builder.close() | ||
Variants are added with the :func:`~variantbuilder.VariantBuilder.add()` method, which | ||
returns a `Variant`. | ||
>>> with VariantBuilder() as builder: | ||
>>> variant: Variant = builder.add( | ||
>>> chr="chr2", pos=1001, id="rs1234", ref="C", alt=["T"], | ||
>>> qual=40, flt=["PASS"], gt="0|1" | ||
>>> ) | ||
The variants stored in the builder can be retrieved as a coordinate sorted VCF file via the | ||
:func:`~variantbuilder.VariantBuilder.to_path()` method: | ||
>>> from pathlib import Path | ||
>>> path_to_vcf: Path = builder.to_path() | ||
The variants may also be retrieved in the order they were added via the | ||
:func:`~variantbuilder.VariantBuilder.to_unsorted_list()` method and in coordinate sorted | ||
order via the :func:`~variantbuilder.VariantBuilder.to_sorted_list()` method. | ||
""" | ||
import os | ||
import sys | ||
from abc import ABC | ||
from abc import abstractmethod | ||
from collections import OrderedDict | ||
from contextlib import contextmanager | ||
from pathlib import Path | ||
from tempfile import NamedTemporaryFile | ||
from typing import Any | ||
from typing import ClassVar | ||
from typing import Dict | ||
from typing import Generator | ||
from typing import Generic | ||
from typing import Iterable | ||
from typing import List | ||
from typing import Optional | ||
from typing import TextIO | ||
from typing import Tuple | ||
from typing import TypeVar | ||
from typing import Union | ||
|
||
from pysam import VariantFile | ||
from pysam import VariantFile as VcfReader | ||
from pysam import VariantFile as VcfWriter | ||
from pysam import VariantHeader | ||
|
||
from fgpyo.sam.builder import SamBuilder | ||
|
||
Variant = TypeVar("Variant") | ||
|
||
|
||
class Defaults: | ||
Contig: ClassVar[str] = "chr1" | ||
Position: ClassVar[int] = 1000 | ||
Id: ClassVar[str] = "." | ||
Ref: ClassVar[str] = "A" | ||
Alts: ClassVar[Optional[List[str]]] = None | ||
Qual: ClassVar[int] = 60 | ||
|
||
@staticmethod | ||
def init( | ||
chr: Optional[str] = None, | ||
pos: Optional[int] = None, | ||
id: Optional[str] = None, | ||
ref: Optional[str] = None, | ||
alts: Optional[Iterable[str]] = None, | ||
qual: Optional[int] = None, | ||
) -> Tuple[str, int, str, str, List[str], int]: | ||
"""Sets the defaults if the values are not defined. | ||
Args: | ||
chr: the chromosome name | ||
pos: the position | ||
id: the variant id | ||
ref: the reference allele | ||
alts: the list of alternate alleles, None if no alternates | ||
qual: the variant quality | ||
""" | ||
return ( | ||
Defaults.Contig if chr is None else chr, | ||
Defaults.Position if pos is None else pos, | ||
Defaults.Id if id is None else id, | ||
Defaults.Ref if ref is None else ref, | ||
Defaults.Alts if alts is None else list(alts), | ||
Defaults.Qual if qual is None else qual, | ||
) | ||
|
||
|
||
"""The valid base classes for opening a SAM/BAM/CRAM file.""" | ||
VcfPath = Union[Path, str, TextIO] | ||
|
||
|
||
@contextmanager | ||
def redirect_dev_null(file_num: int = sys.stderr.fileno()) -> Generator[None, None, None]: | ||
"""A context manager that redirects output of file handle to /dev/null | ||
Args: | ||
file_num: number of filehandle to redirect. Uses stderr by default | ||
""" | ||
# open /dev/null for writing | ||
f_devnull = os.open(os.devnull, os.O_RDWR) | ||
# save old file descriptor and redirect stderr to /dev/null | ||
save_stderr = os.dup(file_num) | ||
os.dup2(f_devnull, file_num) | ||
|
||
yield | ||
|
||
# restore file descriptor and close devnull | ||
os.dup2(save_stderr, file_num) | ||
os.close(f_devnull) | ||
|
||
|
||
@contextmanager | ||
def reader(path: VcfPath) -> Generator[VcfReader, None, None]: | ||
"""Opens the given path for VCF reading | ||
Args: | ||
path: the path to a VCF, or an open file handle | ||
""" | ||
if isinstance(path, (str, Path, TextIO)): | ||
with redirect_dev_null(): | ||
# to avoid spamming log about index older than vcf, redirect stderr to /dev/null: only | ||
# when first opening the file | ||
_reader = VariantFile(path, mode="r") # type: ignore | ||
# now stderr is back, so any later stderr messages will go through | ||
yield _reader | ||
_reader.close() | ||
else: | ||
raise TypeError(f"Cannot open '{type(path)}' for VCF reading.") | ||
|
||
|
||
@contextmanager | ||
def writer( | ||
path: VcfPath, reader: Optional[VcfReader] = None, header: Optional[VariantHeader] = None | ||
) -> Generator[VcfWriter, None, None]: | ||
"""Opens the given path for VCF writing. Either reader or header must be specified. | ||
Args: | ||
path: the path to a VCF, or an open filehandle | ||
reader: the source VCF from which variants are read | ||
header: the VCF header to use for the output VCF | ||
""" | ||
# Convert Path to str such that pysam will autodetect to write as a gzipped file if provided | ||
# with a .vcf.gz suffix. | ||
if isinstance(path, Path): | ||
path = str(path) | ||
if not isinstance(path, (str, TextIO)): | ||
raise TypeError(f"Cannot open '{type(path)}' for VCF writing.") | ||
if reader is not None and header is not None: | ||
raise ValueError("Either reader or header must be specified when writing a VCF.") | ||
elif reader is not None: | ||
_writer = VariantFile(path, header=reader.header, mode="w") | ||
elif header is not None: | ||
_writer = VariantFile(path, header=header, mode="w") | ||
else: | ||
raise ValueError("Either reader or header must be given when writing a VCF.") | ||
yield _writer | ||
_writer.close() | ||
|
||
|
||
class VariantBuilder(Generic[Variant], ABC): | ||
"""Builder for constructing one or more variant records (Variant in cyvcf2 terms) for a | ||
single-sample VCF. | ||
Provides the ability to manufacture variants from minimal arguments, while generating | ||
any remaining attributes to ensure a valid variant. | ||
A builder is constructed with a handful of defaults including the sample name and sequence | ||
dictionary. | ||
Variants are then added using the :func:`~fgpyo.vcf.VariantBuilder.add` method. | ||
Once accumulated the variants can be accessed in the order in which they were created through | ||
the :func:`~fgpyo.vcf.VariantBuilder.to_unsorted_list` function, or in a | ||
list sorted by coordinate order via | ||
:func:`~fgpyo.vcf.VariantBuilder.to_sorted_list`. Lastly, the records can | ||
be written to a temporary file using :func:`~fgpyo.vcf.VariantBuilder.to_path`. | ||
Attributes: | ||
samples: the sample name(s) | ||
sd: list of dictionaries with names and lengths of each contig | ||
seq_idx_lookup: dictionary mapping contig name to index of contig in sd | ||
records: the list of variant records | ||
""" | ||
|
||
samples: List[str] | ||
sd: Dict[str, Dict[str, Any]] | ||
seq_idx_lookup: Dict[str, int] | ||
records: List[Variant] | ||
|
||
@classmethod | ||
def default_sd(cls) -> Dict[str, Dict[str, Any]]: | ||
"""Generates the sequence dictionary that is used by default by VariantBuilder. | ||
Matches the names and lengths of the HG19 reference. | ||
Returns: | ||
A new copy of the sequence dictionary as a map of contig name to dictionary, one per | ||
contig. | ||
""" | ||
sd: Dict[str, Dict[str, Any]] = OrderedDict() | ||
for sequence in SamBuilder.default_sd(): | ||
contig = sequence["SN"] | ||
sd[contig] = {"ID": contig, "length": sequence["LN"]} | ||
return sd | ||
|
||
@classmethod | ||
def build_header_string( | ||
cls, samples: Optional[List[str]] = None, sd: Optional[Dict[str, Dict[str, Any]]] = None | ||
) -> str: | ||
"""Builds the VCF header with the given sample name(s) and sequence dictionary. | ||
Args: | ||
samples: the sample name(s). | ||
sd: the sequence dictionary mapping the contig name to the key-value pairs for the | ||
given contig. Must include "ID" and "length" for each contig. If no sequence | ||
dictionary is given, will use the default dictionary. | ||
""" | ||
if sd is None: | ||
sd = VariantBuilder.default_sd() | ||
buffer = "##fileformat=VCFv4.2\n" | ||
buffer += '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">\n' | ||
for d in sd.values(): | ||
assert "ID" in d, "ID missing in sequence dictionary" | ||
assert "length" in d, "length missing in sequence dictionary" | ||
contig_id = d["ID"] | ||
contig_length = d["length"] | ||
contig_buffer = f"##contig=<ID={contig_id},length={contig_length}" | ||
for key, value in d.items(): | ||
if key == "ID" or key == "length": | ||
continue | ||
contig_buffer += f",{key}={value}" | ||
contig_buffer += ">\n" | ||
buffer += contig_buffer | ||
|
||
buffer += "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT" | ||
if samples is not None: | ||
buffer += "\t" | ||
buffer += "\t".join(samples) | ||
buffer += "\n" | ||
|
||
return buffer | ||
|
||
def __init__( | ||
self, samples: Optional[List[str]] = None, sd: Optional[Dict[str, Dict[str, Any]]] = None | ||
) -> None: | ||
"""Initializes a new VariantBuilder for generating variants and VCF files. | ||
Args: | ||
samples: the name of the sample(s) | ||
sd: optional sequence dictionary | ||
""" | ||
self.samples: List[str] = samples if samples is not None else [] | ||
self.sd: Dict[str, Dict[str, Any]] = sd if sd is not None else VariantBuilder.default_sd() | ||
self.seq_idx_lookup: Dict[str, int] = {name: i for i, name in enumerate(self.sd.keys())} | ||
self.records: List[Variant] = [] | ||
|
||
def __enter__(self) -> "VariantBuilder": | ||
return self | ||
|
||
def __exit__(self, *args: Any) -> bool: # type: ignore | ||
self.close() | ||
return False | ||
|
||
@abstractmethod | ||
def close(self) -> None: | ||
"""Closes this builder""" | ||
pass | ||
|
||
@abstractmethod | ||
def add( | ||
self, | ||
chr: Optional[str] = None, | ||
pos: Optional[int] = None, | ||
id: Optional[str] = None, | ||
ref: Optional[str] = None, | ||
alts: Optional[List[str]] = None, | ||
qual: Optional[int] = None, | ||
filters: Optional[List[str]] = None, | ||
infos: Optional[Dict[str, Any]] = None, | ||
formats: Optional[Dict[str, Dict[str, Any]]] = None, | ||
) -> Variant: | ||
"""Generates a new variant and adds it to the internal collection. | ||
Args: | ||
chr: the chromosome name | ||
pos: the position | ||
id: the variant id | ||
ref: the reference allele | ||
alts: the list of alternate alleles, None if no alternates | ||
qual: the variant quality | ||
filters: the list of filters, None if no filters (ex. PASS) | ||
infos: the dictionary of INFO key-value pairs | ||
formats: the dictionary of sample name to FORMAT key-value pairs. | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def to_path(self, path: Optional[Path] = None) -> Path: | ||
"""Returns a path to a VCF for variants added to this builder. | ||
Args: | ||
path: optional path to the VCF | ||
""" | ||
pass | ||
|
||
@staticmethod | ||
def _update_path(path: Optional[Path]) -> Path: | ||
"""Gets the path to a VCF file. If path is a directory, a temporary VCF will be created in | ||
that directory. If path is `None`, then a temporary VCF will be created. Otherwise, the | ||
given path is simply returned. | ||
Args: | ||
path: optionally the path to the VCF, or a directory to create a temporary VCF. | ||
""" | ||
if path is None: | ||
with NamedTemporaryFile(suffix=".vcf", delete=False) as fp: | ||
path = Path(fp.name) | ||
assert path.is_file() | ||
return path | ||
|
||
def to_unsorted_list(self) -> List[Variant]: | ||
"""Returns the accumulated records in the order they were created.""" | ||
return list(self.records) | ||
|
||
@abstractmethod | ||
def _get_chrom_pos_end(self, variant: Variant) -> Tuple[str, int, int]: | ||
"""Getter for chromosome, start and end pos from Variant""" | ||
pass | ||
|
||
def _sort_key(self, variant: Variant) -> Tuple[int, int, int]: | ||
chrom, pos, end = self._get_chrom_pos_end(variant) | ||
return self.seq_idx_lookup[chrom], pos, end | ||
|
||
def to_sorted_list(self) -> List[Variant]: | ||
"""Returns the accumulated records in coordinate order.""" | ||
chrom, pos, end = self._get_chrom_pos_end(self.records[0]) | ||
return sorted(self.records, key=self._sort_key) |
Oops, something went wrong.