Source code for discopat.repositories.local

from __future__ import annotations

import json
from io import BytesIO
from pathlib import Path

import numpy as np
import yaml

from discopat.core import Frame, Movie, NNModel
from discopat.repositories.repository import Repository

DATA_DIR_PATH = Path.home() / "data"
DISCOPATH = DATA_DIR_PATH / "pattern_discovery"


[docs] class LocalRepository(Repository): data_source = "local" def __init__(self, name: str): super().__init__(name) self._directory_path = DISCOPATH / self.name self._directory_path.mkdir(parents=True, exist_ok=True)
[docs] class LocalFrameRepository(LocalRepository): def __init__(self, name: str): self.name = name self._directory_path = { "input_frames": DISCOPATH / "input", "output_frames": DISCOPATH / "output", }[name]
[docs] def read(self, content_path: str or Path) -> Frame: experiment, field, frame_id = self._parse_frame_name(str(content_path)) file_stem = f"{field}_frame_{frame_id}" metadata_path = self._directory_path / experiment / f"{file_stem}.json" image_array_path = ( self._directory_path / experiment / f"{file_stem}.txt" ) if not metadata_path.exists() and not image_array_path.exists(): msg = f""" Could not find any information on frame '{content_path}'. Please make sure that either: - a JSON file with metadata or - a txt file containing the image array exists in the folder. """.strip() raise FileNotFoundError(msg) if not metadata_path.exists(): image_array = np.loadtxt(image_array_path) height, width = image_array.shape return Frame( name=file_stem, width=width, height=height, annotations=[], image_array=image_array, ) with Path.open(metadata_path) as f: frame = Frame.from_dict(json.load(f)) if not image_array_path.exists(): return frame frame.image_array = np.loadtxt(image_array_path) return frame
[docs] def write(self, content_path: str or Path, content: Frame) -> None: raise NotImplementedError
@staticmethod def _parse_frame_name(frame_name: str) -> tuple[str, str, int]: experiment = frame_name.split("/")[0] field, _, frame_id = frame_name.split("/")[1].split("_") return experiment, field, int(frame_id)
[docs] class LocalNNModelRepository(LocalRepository):
[docs] def read(self, content_path: str or Path) -> dict[str, dict or BytesIO]: return { "label_map": self._load_label_map(content_path), "model_parameters": self._load_model_parameters(content_path), "raw_net": self._load_raw_net(content_path), }
[docs] def write(self, content_path: str or Path, content: NNModel) -> None: raise NotImplementedError
def _load_label_map(self, content_path: str or Path) -> dict[str, int]: full_content_path = ( self._directory_path / content_path / "label_map.yaml" ) with Path.open(full_content_path) as f: return yaml.safe_load(f) def _load_model_parameters( self, content_path: str or Path ) -> dict[str, dict]: full_content_path = ( self._directory_path / content_path / "model_parameters.yaml" ) with Path.open(full_content_path) as f: return yaml.safe_load(f) def _load_raw_net(self, content_path: str or Path) -> BytesIO: full_content_path = self._directory_path / content_path / "net.pth" with Path.open(full_content_path, "rb") as f: return BytesIO(f.read())
[docs] class LocalMovieRepository(LocalRepository): def __init__(self, name: str): self.name = name self._directory_path = { "input_movies": DISCOPATH / "input", "output_movies": DISCOPATH / "output", }[name]
[docs] def read(self, content_path: str or Path) -> Movie: experiment, field = self._parse_movie_name(movie_name=str(content_path)) full_content_path = ( self._directory_path / experiment / f"{field}_movie.json" ) with Path.open(full_content_path) as f: movie = Movie.from_dict(json.load(f)) self._load_image_arrays(movie) return movie
[docs] def write(self, content_path: str or Path, content: Movie) -> None: experiment, field = self._parse_movie_name(movie_name=str(content_path)) full_content_path = ( self._directory_path / experiment / f"{field}_movie.json" ) full_content_path.parent.mkdir(exist_ok=True) with Path.open(full_content_path, "w") as f: json.dump(content.to_dict(), f, indent=2)
@staticmethod def _parse_movie_name(movie_name: str) -> tuple[str, str]: return tuple(movie_name.split("/")) def _load_image_array(self, movie: Movie, frame: Frame) -> None: experiment, field = self._parse_movie_name(movie_name=movie.name) frame_id = int(frame.name) image_array_path = ( self._directory_path / f"{experiment}/{field}_frame_{frame_id}.txt" ) frame.image_array = np.loadtxt(image_array_path) def _load_image_arrays(self, movie: Movie) -> None: for frame in movie.frames: self._load_image_array(movie, frame)