import warnings
from pathlib import Path
from threading import RLock
from typing import Union
import dask.dataframe as dd
from cachetools import LRUCache, cached
from cbgen import bgen_file, bgen_metafile
from cbgen.typing import Partition
from dask.delayed import delayed
from pandas import DataFrame
from ._environment import BGEN_READER_CACHE_HOME
from ._file import (
assert_file_exist,
assert_file_readable,
is_file_writable,
path_to_filename,
)
_metafile_nowrite_dir = """\
You don't have permission to write `{filepath}`. This might prevent speeding-up the reading process
in future runs.
"""
def infer_metafile_filepath(bgen_filepath: Path, suffix: str = ".metafile") -> Path:
"""
Infer metafile filepath.
The resulting file name will the file name of ``bgen_filepath`` with the appended ``suffix``.
The root directory of the resulting filepath will be the directory of ``bgen_filepath`` if
the user has appropriate permissions. It falls back to the directory
BGEN_READER_CACHE_HOME / "metafile"
if necessary.
"""
metafile = bgen_filepath.with_suffix(bgen_filepath.suffix + suffix)
if metafile.exists():
try:
assert_file_readable(metafile)
return metafile
except RuntimeError as e:
warnings.warn(str(e), UserWarning)
return BGEN_READER_CACHE_HOME / "metafile" / path_to_filename(metafile)
else:
if is_file_writable(metafile):
return metafile
warnings.warn(_metafile_nowrite_dir.format(filepath=metafile), UserWarning)
return BGEN_READER_CACHE_HOME / "metafile" / path_to_filename(metafile)
def create_variants(filepath: Path, nvariants: int, npartitions: int, part_size: int):
dfs = []
index_base = 0
divisions = []
for i in range(npartitions):
divisions.append(index_base)
d = delayed(_read_partition)(filepath, i)
dfs.append(d)
index_base += part_size
divisions.append(nvariants - 1)
meta = [
("id", str),
("rsid", str),
("chrom", str),
("pos", int),
("nalleles", int),
("allele_ids", str),
("vaddr", int),
]
df = dd.from_delayed(dfs, meta=dd.utils.make_meta(meta), divisions=divisions)
return df
cache = LRUCache(maxsize=3)
lock = RLock()
@cached(cache, lock=lock)
def _read_partition(filepath: Path, partition: int) -> DataFrame:
with bgen_metafile(filepath) as mf:
part: Partition = mf.read_partition(partition)
v = part.variants
data = {
"id": v.id.astype(str),
"rsid": v.rsid.astype(str),
"chrom": v.chromosome.astype(str),
"pos": v.position.astype(int),
"nalleles": v.nalleles.astype(int),
"allele_ids": v.allele_ids.astype(str),
"vaddr": v.offset.astype(int),
}
df = DataFrame(data)
return df[["id", "rsid", "chrom", "pos", "nalleles", "allele_ids", "vaddr"]]