Source code for psctb.analyse._thermokin

from __future__ import division, print_function
from __future__ import absolute_import
from __future__ import unicode_literals

from os import path

from numpy import log10, array, NaN, nanmin, nanmax, savetxt, hstack
from pysces import ModelMap, ParScanner, Scanner
from sympy import sympify, diff, Symbol

from ._thermokin_file_tools import get_subs_dict, get_reqn_path, \
    get_all_terms, get_term_types_from_raw_data, create_reqn_data, \
    write_reqn_file, create_gamma_keq_reqn_data, term_to_file
from ..latextools import LatexExpr
from ..modeltools import make_path, get_file_path
from ..utils.misc import do_safe_state, get_value, silence_print, print_f, \
    is_number, stringify, scanner_range_setup, DotDict, formatter_factory, \
    find_min, find_max
from ..utils.plotting import Data2D

__author__ = 'carl'
__all__ = ['ThermoKin']


def mult(lst):
    """
    Multiplies values of a list with each other and returns the result.

    Parameters
    ----------
    lst : list of numbers

    Returns
    -------
    number
        Same type as numbers in ``lst``.

    """
    ans = 1
    for each in lst:
        ans *= each
    return ans


def get_repr_latex(obj):
    """
    Creates the string that will be returned by the ``__repr_latex__``
    method of any of objects of ``Thermokin``. The value of the
    ``value`` field is used to dermine the float format.

    Parameters
    ----------
    obj : RateTerm, Term or RateEqn

    Returns
    -------
    str

    """
    if obj.value == 0:
        fmt = '$%s = %s = %.3f$'
    elif abs(obj.value) < 0.001 or abs(obj.value) > 10000:
        fmt = '$%s = %s = %.3e$'
    else:
        fmt = '$%s = %s = %.3f$'
    return fmt % (obj.latex_name,
                  obj.latex_expression,
                  obj.value)


@silence_print
def silent_state(mod):
    mod.doMca()
    mod.doState()


[docs]class ThermoKin(object): def __init__(self, mod, path_to_reqn_file=None, overwrite=False, warnings=True, ltxe=None): super(ThermoKin, self).__init__() self.mod = mod silent_state(mod) self._analysis_method = 'thermokin' self._working_dir = make_path(self.mod, self._analysis_method) if ltxe: self._ltxe = ltxe else: self._ltxe = LatexExpr(mod) if path_to_reqn_file: self._path_to = path_to_reqn_file else: self._path_to = get_reqn_path(self.mod) self._do_auto_actions(overwrite, warnings) self._raw_data, self._add_raw_data = get_all_terms(self._path_to) self._do_gamma_keq(overwrite, warnings) term_types = get_term_types_from_raw_data(self._raw_data).union( get_term_types_from_raw_data(self._add_raw_data)) self._ltxe.add_term_types(term_types) self._populate_object() self._populate_ec_results() def _do_gamma_keq(self, overwrite, warnings): if overwrite: return None gamma_keq_todo = [] add_raw_data = self._add_raw_data for reaction in self._raw_data.keys(): if not add_raw_data.get(reaction) or not add_raw_data.get( reaction).get('gamma_keq'): gamma_keq_todo.append(reaction) if len(gamma_keq_todo) != 0: reaction_printout = ', '.join(gamma_keq_todo[:-1]) + ' or ' + \ gamma_keq_todo[-1] print_f('%s does not contain Gamma/Keq terms for %s:' % ( self._path_to, reaction_printout), warnings) gamma_keq_data, messages = create_gamma_keq_reqn_data(self.mod) for required in gamma_keq_todo: print_f('{:10.10}: {}'.format(required, messages[required]), warnings) if required not in add_raw_data: add_raw_data[required] = {} add_raw_data[required]['gamma_keq'] = gamma_keq_data[required] def _do_auto_actions(self, overwrite, warnings): condition_1 = path.exists(self._path_to) and overwrite condition_2 = not path.exists(self._path_to) if condition_1: print_f( 'The file %s will be overwritten with automatically generated file.' % self._path_to, warnings) elif condition_2: print_f('A new file will be created at "%s".' % self._path_to, warnings) if condition_1 or condition_2: ma_terms, vc_binding_terms, gamma_keq_terms, messages = create_reqn_data( self.mod) for k, v in messages.items(): print_f('{:10.10}: {}'.format(k, v), warnings) write_reqn_file(self._path_to, self.mod.ModelFile, ma_terms, vc_binding_terms, gamma_keq_terms, messages) def _populate_object(self): self.reaction_results = DotDict() self.reaction_results._make_repr('"$" + v.latex_name + "$"', 'v.value', formatter_factory()) for reaction, terms_dict in self._raw_data.items(): additional_terms = self._add_raw_data.get(reaction) reqn_obj = RateEqn(self.mod, reaction, terms_dict, self._ltxe, additional_terms) setattr(self, 'J_' + reaction, reqn_obj) self.reaction_results['J_' + reaction] = reqn_obj for term in reqn_obj.terms.values(): self.reaction_results[term.name] = term def _populate_ec_results(self): self.ec_results = DotDict() self.ec_results._make_repr('"$" + v.latex_name + "$"', 'v.value', formatter_factory()) for rate_eqn in self.reaction_results.values(): self.ec_results.update(rate_eqn.ec_results)
[docs] def save_results(self, file_name=None, separator=',',fmt='%.9f'): file_name = get_file_path(working_dir=self._working_dir, internal_filename='tk_summary', fmt='csv', file_name=file_name, ) values = [] max_len = 0 for reaction_name in sorted(self.reaction_results.keys()): cols = (reaction_name, self.reaction_results[reaction_name].value, self.reaction_results[reaction_name].latex_name, self.reaction_results[reaction_name].latex_expression) values.append(cols) if len(cols[3]) > max_len: max_len = len(cols[3]) for elasticity_name in sorted( [ec for ec in list(self.ec_results.keys()) if ec.startswith('ec')]): if self.ec_results[elasticity_name].expression != 0: related_ecs = sorted([ec for ec in list(self.ec_results.keys()) if elasticity_name in ec]) for related_ec_name in related_ecs: cols = (related_ec_name, self.ec_results[related_ec_name].value, self.ec_results[related_ec_name].latex_name, self.ec_results[related_ec_name].latex_expression) values.append(cols) if len(cols[3]) > max_len: max_len = len(cols[3]) str_fmt = 'U%s' % max_len head = ['name', 'value', 'latex_name', 'latex_expression'] X = array(values, dtype=[(head[0], str_fmt), (head[1], 'float'), (head[2], str_fmt), (head[3], str_fmt)]) try: savetxt(fname=file_name, X=X, header=separator.join(head), delimiter=separator, fmt=['%s', fmt, '%s', '%s'], ) except IOError as e: print(e.strerror)
class RateEqn(object): def __init__(self, mod, name, term_dict, ltxe, additional_terms=None): super(RateEqn, self).__init__() self.mod = mod self.terms = DotDict() self.terms._make_repr('"$" + v.latex_name + "$"', 'v.value', formatter_factory()) self._unfac_expression = 1 self.name = 'J_' + name self._rname = name self._ltxe = ltxe for val in term_dict.values(): self._unfac_expression = self._unfac_expression * (sympify(val)) for term_name, expression in term_dict.items(): term = RateTerm(parent=self, mod=self.mod, name='J_%s_%s' % (self._rname, term_name), rname=term_name, expression=expression, ltxe=self._ltxe) setattr(self, term_name, term) self.terms[term_name] = term if additional_terms: for term_name, expression in additional_terms.items(): term = AdditionalRateTerm(parent=self, mod=self.mod, name='J_%s_%s' % ( self._rname, term_name), rname=term_name, expression=expression, ltxe=self._ltxe) setattr(self, term_name, term) self.terms[term_name] = term self._value = None self._str_expression_ = None self._expression = None self._latex_expression = None self._latex_name = None self.ec_results = DotDict() self.ec_results._make_repr('"$" + v.latex_name + "$"', 'v.value', formatter_factory()) self._populate_ec_results() def _populate_ec_results(self): expression_symbols = self._unfac_expression.atoms(Symbol) for each in expression_symbols: each = sympify(each) ec = diff(self._unfac_expression, each) * \ (each / self._unfac_expression) ec_name = 'ec%s_%s' % (self._rname, each) self.ec_results[ec_name] = Term(self, self.mod, ec_name, self._rname, ec, self._ltxe) for each in self.terms.values(): self.ec_results.update(each.ec_results) def _repr_latex_(self): return get_repr_latex(self) @property def _str_expression(self): if not self._str_expression_: self._str_expression_ = str(self._unfac_expression) return self._str_expression_ @property def expression(self): if not self._expression: self._expression = self._unfac_expression.factor() return self._expression @property def value(self): self._calc_value() return self._value @property def latex_name(self): if not self._latex_name: self._latex_name = self._ltxe.expression_to_latex(self.name) return self._latex_name @property def latex_expression(self): if not self._latex_expression: self._latex_expression = self._ltxe.expression_to_latex( self.expression, mul_symbol='dot') return self._latex_expression def _calc_value(self): subs_dict = get_subs_dict(self._unfac_expression, self.mod) for each in self.terms.values(): if type(each) is not AdditionalRateTerm: each._calc_value(subs_dict) self._value = mult([each._value for each in self.terms.values() if type(each) is not AdditionalRateTerm]) def _valscan_x(self, parameter, scan_range): scan_res = [list() for _ in range(len(list(self.terms.values())) + 2)] scan_res[0] = scan_range for parvalue in scan_range: state_valid = do_safe_state(self.mod, parameter, parvalue) for i, term in enumerate(self.terms.values()): if state_valid: scan_res[i + 1].append(term.value) else: scan_res[i + 1].append(NaN) if state_valid: scan_res[i + 2].append(self.value) else: scan_res[i + 2].append(NaN) return scan_res def _valscan(self, parameter, scan_range, par_scan=False, par_engine='multiproc'): # choose between parscanner or scanner if par_scan: # This is experimental scanner = ParScanner(self.mod, par_engine) else: scanner = Scanner(self.mod) scanner.quietRun = True # parameter scan setup and execution start, end, points, log = scanner_range_setup(scan_range) scanner.addScanParameter(parameter, start=start, end=end, points=points, log=log) needed_symbols = [parameter] + \ stringify(list(self.expression.atoms(Symbol))) scanner.addUserOutput(*needed_symbols) scanner.Run() # getting term/reaction values via substitution subs_dict = {} for i, symbol in enumerate(scanner.UserOutputList): subs_dict[symbol] = scanner.UserOutputResults[:, i] term_expressions = [term.expression for term in list(self.terms.values())]\ + [self.expression] term_str_expressions = stringify(term_expressions) parameter_values = subs_dict[parameter].reshape(points, 1) scan_res = [] # collecting results in an array for expr in term_str_expressions: scan_res.append(get_value(expr, subs_dict)) scan_res = array(scan_res).transpose() scan_res = hstack([parameter_values, scan_res]) return scan_res def _ecscan(self, parameter, scan_range, par_scan=False, par_engine='multiproc'): # choose between parscanner or scanner if par_scan: # This is experimental scanner = ParScanner(self.mod, par_engine) else: scanner = Scanner(self.mod) scanner.quietRun = True # parameter scan setup and execution start, end, points, log = scanner_range_setup(scan_range) scanner.addScanParameter(parameter, start=start, end=end, points=points, log=log) needed_symbols = [parameter] + \ stringify(list(self.expression.atoms(Symbol))) scanner.addUserOutput(*needed_symbols) scanner.Run() # getting term/reaction values via substitution subs_dict = {} for i, symbol in enumerate(scanner.UserOutputList): subs_dict[symbol] = scanner.UserOutputResults[:, i] # we include all ec_terms that are not zero (even though they are # included in the main dict) ec_term_expressions = [ec_term.expression for ec_term in list(self.ec_results.values()) if ec_term.expression != 0 and not ec_term.name.endswith('gamma_keq')] ec_term_str_expressions = stringify(ec_term_expressions) parameter_values = subs_dict[parameter].reshape(points, 1) scan_res = [] # collecting results in an array for expr in ec_term_str_expressions: val = get_value(expr, subs_dict) scan_res.append(val) scan_res = array(scan_res).transpose() scan_res = hstack([parameter_values, scan_res]) return scan_res def _ecscan_x(self, parameter, scan_range): mca_objects = [ec_term for ec_term in list(self.ec_results.values()) if ec_term.expression != 0 and not ec_term.name.endswith( 'gamma_keq')] scan_res = [list() for _ in range(len(mca_objects) + 1)] scan_res[0] = scan_range for parvalue in scan_range: state_valid = do_safe_state(self.mod, parameter, parvalue, type='mca') for i, term in enumerate(mca_objects): if state_valid: scan_res[i + 1].append(term.value) else: scan_res[i + 1].append(NaN) return scan_res def do_par_scan(self, parameter, scan_range, scan_type='value', init_return=True, par_scan=False, par_engine='multiproc'): try: assert scan_type in ['elasticity', 'value'], 'scan_type must be one\ of "value" or "elasticity".' except AssertionError as ae: print(ae) init = getattr(self.mod, parameter) if scan_type == 'elasticity': mca_objects = [ec_term for ec_term in list(self.ec_results.values()) if ec_term.expression != 0 and not ec_term.name.endswith('gamma_keq')] additional_cat_classes = { 'All Coefficients': ['Term Elasticities']} additional_cats = { 'Term Elasticities': [ec_term.name for ec_term in mca_objects if ec_term.name.startswith('p')]} column_names = [parameter] + \ [ec_term.name for ec_term in mca_objects] y_label = 'Elasticity Coefficient' scan_res = self._ecscan(parameter, scan_range, par_scan, par_engine) data_array = scan_res # ylim = [nanmin(data_array[:, 1:]), # nanmax(data_array[:, 1:]) * 1.1] yscale = 'linear' category_manifest = {pec: True for pec in additional_cats['Term Elasticities']} category_manifest['Elasticity Coefficients'] = True category_manifest['Term Elasticities'] = True elif scan_type == 'value': additional_cat_classes = {'All Fluxes/Reactions/Species': ['Term Rates']} term_names = [term.name for term in list(self.terms.values())] additional_cats = {'Term Rates': term_names} column_names = [parameter] + term_names + [self.name] y_label = 'Reaction/Term rate' scan_res = self._valscan(parameter, scan_range, par_scan, par_engine) data_array = scan_res # ylim = [nanmin(data_array[:, 1:]), # nanmax(data_array[:, 1:]) * 1.1] yscale = 'log' category_manifest = {'Flux Rates': True, 'Term Rates': True} if init_return: self.mod.SetQuiet() setattr(self.mod, parameter, init) self.mod.doMca() self.mod.SetLoud() mm = ModelMap(self.mod) species = mm.hasSpecies() if parameter in species: x_label = '[%s]' % parameter.replace('_', ' ') else: x_label = parameter xscale = 'log' if scanner_range_setup(scan_range)[3] else 'linear' ax_properties = {'ylabel': y_label, 'xlabel': x_label, 'xscale': xscale, 'yscale': yscale, } data = Data2D(mod=self.mod, column_names=column_names, data_array=data_array, ltxe=self._ltxe, analysis_method='thermokin', ax_properties=ax_properties, additional_cat_classes=additional_cat_classes, additional_cats=additional_cats, category_manifest=category_manifest,) if scan_type == 'elasticity': ec_names = [ec_term.name for ec_term in mca_objects if ec_term.name.startswith('ec')] for line in data._lines: for ec_name in ec_names: condition1 = line.name != ec_name condition2 = self.ec_results[line.name]._rname == ec_name if condition1 and condition2: line.categories.append(ec_name) return data def __add__(self, other): return generic_term_operation(self, other, '+') def __mul__(self, other): return generic_term_operation(self, other, '*') def __sub__(self, other): return generic_term_operation(self, other, '-') def __div__(self, other): return generic_term_operation(self, other, '/') def __radd__(self, other): return generic_term_operation(self, other, '+') def __rmul__(self, other): return generic_term_operation(self, other, '*') def __rsub__(self, other): return generic_term_operation(self, other, 'rsub') def __rdiv__(self, other): return generic_term_operation(self, other, 'rdiv') def __neg__(self): return AdditionalTerm(self, self.mod, '-' + self.name, '-' + self._rname, (-self._unfac_expression), self._ltxe, '-' + self.name) def __pow__(self, power, modulo=None): return AdditionalTerm(self, self.mod, self.name + '**' + str(power), self._rname + '**' + str(power), self._unfac_expression ** power, self._ltxe, self.name + '**' + str(power)) class Term(object): def __init__(self, parent, mod, name, rname, expression, ltxe): super(Term, self).__init__() self.name = name self._rname = rname self._unfac_expression = sympify(expression) self._parent = parent self.mod = mod self._ltxe = ltxe # properties self._expression = None self._str_expression_ = None self._value = None self._latex_name = None self._latex_expression = None def _repr_latex_(self): return get_repr_latex(self) @property def _str_expression(self): if not self._str_expression_: self._str_expression_ = str(self._unfac_expression) return self._str_expression_ @property def expression(self): if not self._expression: self._expression = self._unfac_expression.factor() return self._expression @property def value(self): self._calc_value() return self._value @property def latex_name(self): if not self._latex_name: self._latex_name = self._ltxe.expression_to_latex(self.name) return self._latex_name @property def latex_expression(self): if not self._latex_expression: self._latex_expression = self._ltxe.expression_to_latex( self.expression, mul_symbol='dot') return self._latex_expression def _calc_value(self, subs_dict=None): if not subs_dict: subs_dict = get_subs_dict(self._unfac_expression, self.mod) self._value = get_value(self._str_expression, subs_dict) def __add__(self, other): return generic_term_operation(self, other, '+') def __mul__(self, other): return generic_term_operation(self, other, '*') def __sub__(self, other): return generic_term_operation(self, other, '-') def __div__(self, other): return generic_term_operation(self, other, '/') def __radd__(self, other): return generic_term_operation(self, other, '+') def __rmul__(self, other): return generic_term_operation(self, other, '*') def __rsub__(self, other): return generic_term_operation(self, other, 'rsub') def __rdiv__(self, other): return generic_term_operation(self, other, 'rdiv') def __neg__(self): return AdditionalTerm(self._parent, self.mod, '-' + self.name, '-' + self._rname, (-self._unfac_expression), self._ltxe, '-' + self.name) def __pow__(self, power, modulo=None): return AdditionalTerm(self._parent, self.mod, self.name + '**' + str(power), self._rname + '**' + str(power), self._unfac_expression ** power, self._ltxe, self.name + '**' + str(power)) class RateTerm(Term): def __init__(self, parent, mod, name, rname, expression, ltxe): super(RateTerm, self).__init__(parent, mod, name, rname, expression, ltxe) self.ec_results = DotDict() self.ec_results._make_repr('"$" + v.latex_name + "$"', 'v.value', formatter_factory()) self._populate_ec_results() self._percentage = None @property def percentage(self): per = (log10(self.value) / log10(self._parent.value)) * 100 return per def _populate_ec_results(self): expression_symbols = self._parent._unfac_expression.atoms(Symbol) expression_symbols.update(self._unfac_expression.atoms(Symbol)) for each in expression_symbols: each = sympify(each) ec_name = 'ec%s_%s' % (self._parent._rname, each) pec_name = 'p%s_%s' % (ec_name, self._rname) ec = diff(self._unfac_expression, each) * \ (each / self._unfac_expression) self.ec_results[pec_name] = Term(self._parent, self.mod, pec_name, ec_name, ec, self._ltxe) class AdditionalRateTerm(RateTerm): @property def percentage(self): return 0.0 def append_to_file(self, file_name, term_name=None, parent=None): if not parent: parent = self._parent._rname if not term_name: term_name = self._rname term_to_file(file_name, self._unfac_expression, parent, term_name) class AdditionalTerm(Term): def __init__(self, parent, mod, name, rname, expression, ltxe, creation_operation): super(AdditionalTerm, self).__init__(parent, mod, name, rname, expression, ltxe) self.creation_operation = creation_operation def simplify_expression(self): self._expression = self._unfac_expression.factor() self._latex_expression = None def get_elasticity(self, var_par, term_name=None): if not term_name: term_name = self.name var_par = sympify(var_par) ec = diff(self._unfac_expression, var_par) * \ (var_par / self._unfac_expression) ec_name = 'ec_%s_%s' % (term_name, var_par) ec_term = Term(self, self.mod, ec_name, ec_name, ec, self._ltxe) ec_term._latex_name = '\\varepsilon^{%s}_{%s}' % ( term_name.replace('_', ''), str(var_par).replace('_', '')) return ec_term def append_to_file(self, file_name, term_name=None, parent=None): if not parent and self._parent: parent = self._parent._rname if not term_name and self.name != 'new_term': term_name = self.name term_to_file(file_name, self._unfac_expression, parent, term_name) @property def expression(self): if not self._expression: self._expression = self._unfac_expression return self._expression def generic_term_operation(self, other, operator, parent=None, name=None, rname=None): def get_parent(self): if type(self) is RateEqn: parent = self else: parent = self._parent return parent if operator == 'rsub': self = -self operator = '+' if operator == 'rdiv': self = AdditionalTerm(get_parent(self), self.mod, self.name, self._rname, (1 / self._unfac_expression), self._ltxe, '1/' + self.name) operator = '*' if is_number(other): other = AdditionalTerm(get_parent(self), self.mod, str(other), str(other), sympify(other), self._ltxe, str(other)) # TODO this type check is a hack - no idea how to check specifically for # sympy expressions elif 'sympy' in str(type(other)): other = AdditionalTerm(get_parent(self), self.mod, str(other), str(other), other, self._ltxe, str(other)) mod = self.mod operated_on = [] for term in (self, other): if type(term) is AdditionalTerm: operated_on.append(term.creation_operation) else: operated_on.append(term.name) if not name: name = 'new_term' if not rname: rname = 'new_term' if not parent: if hasattr(self, '_parent') and hasattr(other, '_parent'): parent = self._parent elif type(self) is RateEqn and type(other) is not RateEqn: parent = self elif type(other) is RateEqn and type(self) is not RateEqn: parent = other creation_operation = sympify( '%s %s %s' % (operated_on[0], operator, operated_on[1])) expression = sympify('(%s) %s (%s)' % ( self._str_expression, operator, other._str_expression)) ltxe = self._ltxe return AdditionalTerm(parent, mod, name, rname, expression, ltxe, creation_operation)