Source code for sdanalysis.read._read

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2017 Malcolm Ramsay <malramsay64@gmail.com>
#
# Distributed under terms of the MIT license.
"""Read input files and compute dynamic and thermodynamic quantities."""

import datetime
import logging
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple

import attr
import gsd.hoomd
import numpy as np
import pandas
import tqdm

from ..dynamics import Dynamics, Relaxations
from ..frame import Frame, HoomdFrame
from ..molecules import Trimer
from ..util import get_filename_vars
from ._gsd import FileIterator, read_gsd_trajectory
from ._lammps import parse_lammpstrj, read_lammps_trajectory

logger = logging.getLogger(__name__)


@attr.s(auto_attribs=True)
class WriteCache:
    _filename: Optional[Path] = None
    group: str = "dynamics"
    cache_multiplier: int = 1
    to_append: bool = False

    _cache: List[Any] = attr.ib(default=attr.Factory(list), init=False)
    _cache_default: int = attr.ib(default=8192, init=False)
    _emptied_count: int = attr.ib(default=0, init=False)

    def __attr_post_init__(self):
        if self.group is None:
            raise ValueError("Group can not be None.")

    @property
    def _cache_size(self) -> int:
        return self.cache_multiplier * self._cache_default

    def append(self, item: Any) -> None:
        # Cache of size 0 or with val None will act as list
        if self._cache and len(self._cache) == self._cache_size:
            self.flush()
            self._emptied_count += 1
        self._cache.append(item)

    def _flush_file(self, df) -> None:
        assert self.filename is not None
        assert self.group is not None
        df.to_hdf(self.filename, self.group, format="table", append=self.to_append)
        self.to_append = True

    def flush(self) -> None:
        df = self.to_dataframe()
        self._flush_file(df)
        self._cache.clear()

    @property
    def filename(self) -> Optional[Path]:
        if self._filename is not None:
            return Path(self._filename)
        return None

    def __len__(self) -> int:
        # Total number of elements added
        return self._cache_size * self._emptied_count + len(self._cache)

    def to_dataframe(self):
        return pandas.DataFrame.from_records(self._cache)


[docs]def process_file( infile: Path, wave_number: float, steps_max: Optional[int] = None, linear_steps: Optional[int] = None, keyframe_interval: int = 1_000_000, keyframe_max: int = 500, mol_relaxations: List[Dict[str, Any]] = None, outfile: Optional[Path] = None, scattering_function: bool = False, ) -> Optional[pandas.DataFrame]: """Read a file and compute the dynamics quantities. This computes the dynamic quantities from a file returning the result as a pandas DataFrame. This is only suitable for cases where all the data will fit in memory, as there is no writing to a file. Args: Returns: (py:class:`pandas.DataFrame`): DataFrame with the dynamics quantities. """ assert infile is not None infile = Path(infile) logger.info("Processing %s", infile) start_time = datetime.datetime.now() sim_variables = get_filename_vars(infile) if outfile is not None: dataframes = WriteCache(filename=outfile) else: dataframes = WriteCache() keyframes: Dict[int, Tuple[Dynamics, Relaxations]] = {} if infile.suffix == ".gsd": file_iterator: FileIterator = read_gsd_trajectory( infile=infile, steps_max=steps_max, linear_steps=linear_steps, keyframe_interval=keyframe_interval, keyframe_max=keyframe_max, ) elif infile.suffix == ".lammpstrj": file_iterator = read_lammps_trajectory(infile, steps_max=steps_max) for indexes, frame in file_iterator: if frame.position.shape[0] == 0: logger.warning("Found malformed frame in %s... continuing", infile.name) continue logger.info("Indexes for step %s: %s", frame.timestep, indexes) for index in indexes: dyn, relax = keyframes.setdefault( index, ( Dynamics.from_frame(frame, Trimer(), wave_number), Relaxations.from_frame(frame, Trimer(), wave_number), ), ) # Set custom relaxation functions if mol_relaxations is not None: relax.set_mol_relax(mol_relaxations) try: dynamics_series = dyn.compute_all( timestep=frame.timestep, position=frame.position, orientation=frame.orientation, scattering_function=scattering_function, ) relax.add(frame.timestep, frame.position, frame.orientation) except (ValueError, RuntimeError) as e: logger.warning(e) continue logger.debug("Series: %s", index) dynamics_series["keyframe"] = index if sim_variables.temperature is None: dynamics_series["temperature"] = np.nan else: dynamics_series["temperature"] = float(sim_variables.temperature) if sim_variables.pressure is None: dynamics_series["pressure"] = np.nan else: dynamics_series["pressure"] = float(sim_variables.pressure) dataframes.append(dynamics_series) end_time = datetime.datetime.now() processing_time = end_time - start_time logger.info("Finished processing %s, took %s", infile, processing_time) if outfile is not None: dataframes.flush() mol_relax = pandas.concat( (r.summary() for _, r in keyframes.values()), keys=list(keyframes.keys()) ) if sim_variables.temperature is None: mol_relax["temperature"] = np.nan else: mol_relax["temperature"] = float(sim_variables.temperature) if sim_variables.pressure is None: mol_relax["pressure"] = np.nan else: mol_relax["pressure"] = float(sim_variables.pressure) mol_relax.index.names = ["keyframe", "molecule"] mol_relax = mol_relax.reset_index() mol_relax.to_hdf(outfile, "molecular_relaxations") return None df = dataframes.to_dataframe() assert len(df) > 0 return dataframes.to_dataframe()
[docs]def open_trajectory( filename: Path, progressbar=None, frame_interval=1 ) -> Iterator[Frame]: """Open a simulation trajectory for processing. This reads each configuration in turn from the trajectory, handling most of the common errors with reading a file. This handles trajectories in both the gsd file format and simple lammpstrj files. Args: filename: The path to the file which is to be opened. progressbar: Whether to display a progress bar when reading the file. Returns: Generator which returns class:`Frame` objects. """ filename = Path(filename) if filename.suffix == ".gsd": with gsd.hoomd.open(str(filename)) as trj: # Work out which iterator to use if progressbar is None: iterator = range(0, len(trj), frame_interval) elif progressbar is True: iterator = tqdm.trange(0, len(trj), frame_interval) elif progressbar == "notebook": iterator = tqdm.tqdm_notebook(range(0, len(trj), frame_interval)) else: raise ValueError( "Invalid value for progressbar. Takes either True or 'notebook'" ) for index in iterator: try: yield HoomdFrame(trj[index]) except RuntimeError: logger.info("Found corrupt frame at index %s continuing", index) continue elif filename.suffix == ".lammpstrj": trj = parse_lammpstrj(filename) for frame in trj: yield frame