Source code for sdanalysis.relaxation

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2017 Malcolm Ramsay <malramsay64@gmail.com>
#
# Distributed under terms of the MIT license.
"""These are a series of summary values of the dynamics quantities.

This provides methods of easily comparing values across variables.
"""

import logging
from pathlib import Path
from typing import NamedTuple

import numpy as np
import pandas
import tables
from scipy.optimize import curve_fit, newton
from scipy.stats import hmean

logger = logging.getLogger(__name__)


[docs]class Result(NamedTuple): """Hold the result of a relaxation calculation. This uses the NamedTuple class to make the access of the returned values more transparent and easier to understand. """ mean: float error: float
def _msd_function(x: np.ndarray, m: float, b: float) -> np.ndarray: """Fit for the mean squared displacement. The relaxation value of the mean squared displacement (MSD) is found by fitting the line .. math:: y = mx + b with the relaxation value, i.e. the Diffusion constant, being proportional to :math:`m`. """ return m * x + b def _exponential_decay(x: np.ndarray, a: float, b: float, c: float = 0) -> np.ndarray: """Fit for exponential decay. This is is the functional form used to fit a function which exhibits exponential decay. The important parameter here is :math:`b`, which is the rate of the decay. """ return a * np.exp(-b * x) + c # pylint: disable=unused-argument def _ddx_exponential_decay( x: np.ndarray, a: float, b: float, c: float = 0 ) -> np.ndarray: """First derivative of the exponential decay function. This is the analytical first derivative of the function used for the fit of the exponential decay. This is used to speed up the root finding step. .. note: This function needs to have the same input arguments as the function it is a derivative of. """ return -b * a * np.exp(-b * x) def _d2dx2_exponential_decay( x: np.ndarray, a: float, b: float, c: float = 0 ) -> np.ndarray: """Second derivative of the exponential decay function. This is the analytical first derivative of the function used for the fit of the exponential decay. This is used to speed up the root finding step. .. note: This function needs to have the same input arguments as the function it is a derivative of. """ return b * b * a * np.exp(-b * x) # pylint: enable=unused-argument
[docs]def diffusion_constant( time: np.ndarray, msd: np.ndarray, dimensions: int = 2 ) -> Result: """Compute the diffusion_constant from the mean squared displacement. Args: time: The timesteps corresponding to each msd value. msd: Values of the mean squared displacement Returns: diffusion_constant (float): The diffusion constant error (float): The error in the fit of the diffusion constant """ try: linear_region = np.logical_and(2 < msd, msd < 50) if np.sum(linear_region) <= 2: return Result(np.nan, np.nan) except FloatingPointError as err: logger.exception("%s", err) return Result(np.nan, np.nan) try: popt, pcov = curve_fit(_msd_function, time[linear_region], msd[linear_region]) except TypeError as err: logger.debug("time: %s", time[linear_region]) logger.exception("%s", err) return Result(np.nan, np.nan) perr = 2 * np.sqrt(np.diag(pcov)) return Result(popt[0] / (2 * dimensions), perr[0] / (2 * dimensions))
[docs]def threshold_relaxation( time: np.ndarray, value: np.ndarray, threshold: float = 1 / np.exp(1), decay: bool = True, ) -> Result: """Compute the relaxation through the reaching of a specific value. Args: time: The timesteps corresponding to each msd value. value: Values of the relaxation parameter Returns: relaxation time (float): The relaxation time for the given quantity. error (float): The error in the fit of the relaxation """ try: if decay: index = np.argmax(value < threshold) else: index = np.argmax(value > threshold) except FloatingPointError: return Result(np.nan, np.nan) return Result(time[index], time[index] - time[index - 1])
[docs]def exponential_relaxation( time: np.ndarray, value: np.ndarray, sigma: np.ndarray = None, value_width: float = 0.3, ) -> Result: """Fit a region of the exponential relaxation with an exponential. This fits an exponential to the small region around the value 1/e. A small region is chosen as the interest here is the time for the decay to reach a value, rather than a fit to the overall curve, so this provides a method of getting an accurate time, while including a collection of points. Args: time: The timesteps corresponding to each value value: The value at each point in time sigma: The uncertainty associated with each point value_width: The width of values over which the fit takes place Returns: relaxation_time (float): The relaxation time for the given quantity error (float): Estimated error of the relaxation time. """ if time.shape != value.shape: raise RuntimeError( "Time and value have different shapes. " "time: {time.shape}, values: {value.shape}" ) exp_value = 1 / np.exp(1) mask = np.isfinite(value) time = time[mask] value = value[mask] if np.any(~np.isfinite(value)): raise ValueError("There are non-finite values present in `value`.") fit_region = np.logical_and( (exp_value - value_width / 2) < value, (exp_value + value_width / 2) > value ) logger.debug("Num elements: %d", np.sum(fit_region)) # The number of values has to be greater than the number of fit parameters. if np.sum(fit_region) <= 3: return Result(np.nan, np.nan) zero_est = time[np.argmin(np.abs(value - exp_value))] try: p0 = (1.0, 1 / zero_est) except (ZeroDivisionError, FloatingPointError) as err: logger.warning("Handled exception in estimating zero\n%s", err) p0 = (1.0, 0.0) if sigma is not None: sigma = sigma[fit_region] try: popt, pcov = curve_fit( _exponential_decay, time[fit_region], value[fit_region], p0=p0, sigma=sigma ) except (RuntimeError, FloatingPointError) as err: logger.warning("Exception in fitting curve, returning Nan\n%s", err) return Result(np.nan, np.nan) try: perr = 2 * np.sqrt(np.diag(pcov)) except FloatingPointError as err: logger.exception("%s", err) return Result(np.nan, np.nan) logger.debug("Fit Parameters: %s", popt) def find_root(a, b): return newton( _exponential_decay, args=(a, b, -exp_value), x0=zero_est, fprime=_ddx_exponential_decay, maxiter=20, tol=1e-4, ) try: val_mean: float = find_root(*popt) try: val_min: float = find_root(*(popt - perr)) val_max: float = find_root(*(popt + perr)) except FloatingPointError as err: logger.warning("Handled Exception in calculating Error bars\n%s", err) val_min = 0 val_max = val_mean - val_min if val_mean > 0: return Result(val_mean, val_max - val_min) logger.warning("Rate is less than 0, returning Nan") return Result(np.nan, np.nan) except RuntimeError as err: logger.warning("Failed to converge on value with\n%s\nReturning NaN", err) return Result(np.nan, np.nan)
[docs]def max_time_relaxation(time: np.ndarray, value: np.ndarray) -> Result: """Time at which the maximum value is recorded. Args: time: The time index value: The value at each of the time indices Returns: float: The time at which the maximum value occurs. float: Estimate of the error of the time """ if time.shape != value.shape: raise RuntimeError( "Time and value have different shapes. " "time: {time.shape}, values: {value.shape}" ) try: max_val_index = np.nanargmax(value) except ValueError as err: logger.exception("%s", err) return Result(np.nan, np.nan) if max_val_index == len(value) - 1: error = time[max_val_index] - time[max_val_index - 1] elif max_val_index == 0: error = time[max_val_index + 1] - time[max_val_index] else: error = (time[max_val_index + 1] - time[max_val_index - 1]) / 2 return Result(time[max_val_index], error)
[docs]def max_value_relaxation(time: np.ndarray, value: np.ndarray) -> Result: """Maximum value recorded. Args: time: The time index value: The value at each of the time indices Returns: float: The value at which the maximum value occurs. float: Estimate of the error in the maximum value. """ if time.shape != value.shape: raise RuntimeError( "Time and value have different shapes. " "time: {time.shape}, values: {value.shape}" ) try: max_val_index = np.nanargmax(value) except ValueError as err: logger.exception("%s", err) return Result(np.nan, np.nan) if max_val_index == len(value) - 1: error = value[max_val_index] - value[max_val_index - 1] elif max_val_index == 0: error = value[max_val_index] - value[max_val_index + 1] else: error = ( (value[max_val_index] - value[max_val_index - 1]) + (value[max_val_index] - value[max_val_index + 1]) ) / 2 return Result(value[max_val_index], error)
[docs]def translate_relaxation(quantity: str) -> str: """Convert names of dynamic quantities to their relaxations. Args: quantity: The name of the quantity to convert the name of. Returns: The translated name. """ translation = { "alpha": "max_alpha_time", "gamma": "max_gamma_time", "com_struct": "tau_F", "msd": "diffusion_constant", "rot1": "tau_R1", "rot2": "tau_R2", "struct": "tau_S", } return translation.get(quantity, quantity)
[docs]def compute_relaxation_value( timesteps: np.ndarray, values: np.ndarray, relax_type: str ) -> Result: """Compute a single representative value for each dynamic quantity. Args: timesteps: The timestep for each value of the relaxation. values: The values of the relaxation quantity for each time interval. relax_type: A string describing the relaxation. Returns: The representative relaxation time for a quantity. There are some special values of the relaxation which are treated in a special way. The main one of these is the "msd", for which the relaxation is fitted to a straight line. The "struct_msd" relaxation, is a threshold_relaxation, with the time required to pass the threshold of 0.16. The other relaxations which are treated separately are the "alpha" and "gamma" relaxations, where the relaxation time is the maximum of these functions. All other relaxations are assumed to have the behaviour of exponential decay, with the representative time being how long it takes to decay to the value 1/e. """ if timesteps.shape != values.shape: raise RuntimeError( "Timesteps and values have different shapes. " "timesteps: {timesteps.shape}, values: {values.shape}" ) if relax_type in ["msd"]: return diffusion_constant(timesteps, values, dimensions=2) if relax_type in ["msr"]: return diffusion_constant(timesteps, values, dimensions=1) if relax_type in ["struct_msd"]: return threshold_relaxation(timesteps, values, threshold=0.16, decay=False) if relax_type in ["alpha", "gamma"]: return max_time_relaxation(timesteps, values) return threshold_relaxation(timesteps, values)
[docs]def series_relaxation_value(series: pandas.Series) -> float: """Calculate the relaxation of a pandas Series. When a `pandas.Series` object, which has an index being the timesteps, and the name of the series being the dynamic quantity, this function provides a simple method of calculating the relaxation aggregation. In particular this function is useful to use with the aggregate function. Args: series: The series containing the relaxation quantities Returns: The calculated value of the relaxation. .. note: This function will discard the error in the relaxation calculation for simplicity in working with the resulting DataFrame. """ if series.index.values.shape != series.values.shape: raise RuntimeError( "Index and values have different shapes." f"index: {series.index.value.shape}, values: {series.value.shape}" ) for level in ["temperature", "pressure"]: if level in series.index.names: series.reset_index(level=level, drop=True, inplace=True) result = compute_relaxation_value(series.index.values, series.values, series.name) return result.mean
[docs]def compute_relaxations(infile) -> None: """Summary time value for the dynamic quantities. This computes the characteristic timescale of the dynamic quantities which have been calculated and are present in INFILE. The INFILE is a path to the pre-computed dynamic quantities and needs to be in the HDF5 format with either the '.hdf5' or '.h5' extension. The output is written to the table 'relaxations' in INFILE. """ infile = Path(infile) # Check is actually an HDF5 file try: with tables.open_file(str(infile)): pass except tables.HDF5ExtError: raise ValueError("The argument 'infile' requires an hdf5 input file.") # Check input file contains the tables required with tables.open_file(str(infile)) as src: if "/dynamics" not in src: raise KeyError( "Table 'dynamics' not found in input file," " try rerunning `sdanalysis comp_dynamics`." ) if "/molecular_relaxations" not in src: raise KeyError( f"Table 'molecular_relaxations' not found in input file," " try rerunning `sdanalysis comp_dynamics`." ) with pandas.HDFStore(infile) as src: df_dyn = src.get("dynamics") logger.debug(df_dyn.columns) # Remove columns with no relaxation value to calculate extra_columns = ["mean_displacement", "mean_rotation", "mfd", "overlap"] for col in extra_columns: if col in df_dyn.columns: df_dyn.drop(columns=col, inplace=True) relaxations = df_dyn.groupby(["keyframe", "temperature", "pressure"]).agg( series_relaxation_value ) logger.info("Shape of df_dyn relaxations is %s", df_dyn.shape) with pandas.HDFStore(infile) as src: df_mol_input = src.get("molecular_relaxations") logger.info("Shape of df_mol_input relaxations is %s", df_mol_input.shape) df_mol = compute_molecular_relaxations( pandas.read_hdf(infile, "molecular_relaxations") ) logger.info("Shape of df_mol relaxations is %s", df_mol.shape) df_all = df_mol.join( relaxations, on=["keyframe", "temperature", "pressure"], lsuffix="mol" ).reset_index() logger.info("Shape of all relaxations is %s", df_all.shape) if "temperature" not in df_all.columns: raise RuntimeError( "Temperature not in columns, something has gone really wrong." ) if "pressure" not in df_all.columns: raise RuntimeError("Pressure not in columns, something has gone really wrong.") df_all.to_hdf(infile, "relaxations")
[docs]def compute_molecular_relaxations(df: pandas.DataFrame) -> pandas.DataFrame: if "temperature" not in df.columns: raise ValueError("The column 'temperature' is required") if "pressure" not in df.columns: raise ValueError("The column 'pressure' is required") if "keyframe" not in df.columns: raise ValueError("The column 'keyframe' is required") logger.debug("Initial molecular shape: %s", df.shape) df.replace(2 ** 32 - 1, np.nan, inplace=True) # Initial frames with any NaN value are excluded from analysis. It is assumed they # didn't run for a long enough time. df = df.groupby(["keyframe", "temperature", "pressure"]).filter( lambda x: x.isna().sum().sum() == 0 ) logger.debug("Filtered molecular shape: %s", df.shape) # Calculate statistics for each initial_frame df = df.groupby(["keyframe", "temperature", "pressure"]).agg(["mean", hmean]) logger.debug("Aggregated molecular shape: %s", df.shape) df.columns = ["_".join(f) for f in df.columns.tolist()] return df