from __future__ import division, print_function
from __future__ import absolute_import
from __future__ import unicode_literals
from os import path
from colorsys import hsv_to_rgb, rgb_to_hsv
from collections import OrderedDict
from random import shuffle
import numpy
from pysces.PyscesModelMap import ModelMap
from pysces import Scanner
import pysces
from matplotlib.pyplot import get_cmap
from .. import modeltools
from ..latextools import LatexExpr
from ..utils.plotting import ScanFig, LineData, Data2D
from ..utils.misc import silence_print
from ..utils.misc import DotDict
from ..utils.misc import formatter_factory
exportLAWH = silence_print(pysces.write.exportLabelledArrayWithHeader)
__all__ = ['RateChar']
def strip_nan_from_scan(array_like):
# this function assumes that column
# zero contains valid data (the scan input)
t_f = list(numpy.isnan(array_like[:, 1]))
start = t_f.index(False)
end = len(t_f) - t_f[::-1].index(False)
return array_like[start:end, :]
[docs]class RateChar(object):
def __init__(self, mod, min_concrange_factor=100,
max_concrange_factor=100,
scan_points=256,
auto_load=False):
super(RateChar, self).__init__()
self.mod = mod
self.mod.SetQuiet()
self._model_map = ModelMap(mod)
self.mod.doState()
self._analysis_method = 'ratechar'
self._working_dir = modeltools.make_path(self.mod,
self._analysis_method)
self._min_concrange_factor = min_concrange_factor
self._max_concrange_factor = max_concrange_factor
self._scan_points = scan_points
self._ltxe = LatexExpr(self.mod)
for species in self.mod.species:
setattr(self, species, None)
if auto_load:
self.load_session()
[docs] def do_ratechar(self, fixed='all',
scan_min=None,
scan_max=None,
min_concrange_factor=None,
max_concrange_factor=None,
scan_points=None,
solver=0,
auto_save=False):
# this function wraps _do_scan functionality in a user friendly bubble
if fixed == 'all':
to_scan = self.mod.species
elif type(fixed) is list or type(fixed) is tuple:
for each in fixed:
assert each in self.mod.species, 'Invalid species'
to_scan = fixed
else:
assert fixed in self.mod.species, 'Invalid species'
to_scan = [fixed]
for i in to_scan:
each = str(i) # fix for Python 2 compatibility
fixed_mod, fixed_ss = self._fix_at_ss(each)
scan_start = self._min_max_chooser(fixed_ss,
scan_min,
min_concrange_factor,
'min')
scan_end = self._min_max_chooser(fixed_ss,
scan_max,
max_concrange_factor,
'max')
# here there could be a situation where a scan_min > scan_max
# I wonder what will happen....
if not scan_points:
scan_points = self._scan_points
column_names, results = self._do_scan(fixed_mod,
each,
scan_start,
scan_end,
scan_points)
cleaned_results = strip_nan_from_scan(results)
rcd = RateCharData(fixed_ss,
fixed_mod,
self.mod,
column_names,
cleaned_results,
self._model_map,
self._ltxe)
setattr(self, each, rcd)
if auto_save:
self.save_session()
def _min_max_chooser(self, ss, point, concrange, min_max):
# chooses a minimum or maximum point based
# on the information given by a user
# ie if a specific min/max point is given - use that
# if only concentration range is given -use that
# if nothing is given - use the defualt conc_range_factor
# pretty simple stuff
if point:
the_point = point
if not point and concrange:
if min_max == 'min':
the_point = ss / concrange
elif min_max == 'max':
the_point = ss * concrange
if not point and not concrange:
if min_max == 'min':
the_point = ss / self._min_concrange_factor
elif min_max == 'max':
the_point = ss * self._max_concrange_factor
return the_point
@silence_print
def _do_scan(self,
fixed_mod,
fixed,
scan_min,
scan_max,
scan_points,
solver=0):
# do scan is a simplified interface to pysces.Scanner
# more intuitive than Scan1 (functional vs OO??)
# returns the names of the scanned blocks together with
# the results of the scan
assert solver in (0, 1, 2), 'Solver mode can only be one of 0, 1 or 2'
fixed_mod.mode_solver = solver
demand_blocks = [
'J_' + r for r in getattr(self._model_map, fixed).isSubstrateOf()]
demand_blocks = [str(i) for i in demand_blocks]
supply_blocks = [
'J_' + r for r in getattr(self._model_map, fixed).isProductOf()]
supply_blocks = [str(i) for i in supply_blocks]
user_output = [fixed] + demand_blocks + supply_blocks
scanner = Scanner(fixed_mod)
scanner.quietRun = True
scanner.addScanParameter(
fixed, scan_min, scan_max, scan_points, log=True)
scanner.addUserOutput(*user_output)
scanner.Run()
return user_output, scanner.UserOutputResults
@silence_print
def _fix_at_ss(self, fixed):
# fixes the metabolite at the steady_state
# (calls psctb.modeltools.fix_metabolite)
# and returns both the ss value and the fixed model
self.mod.doState()
fixed_ss = getattr(self.mod, fixed + '_ss')
fixed_mod = modeltools.fix_metabolite(self.mod, fixed)
fixed_mod.SetQuiet()
# i don't like this approach at all, too many possible unintended side
# effects
# setattr(fixed_mod, fixed, fixed_ss)
# setattr(fixed_mod, 'fixed', fixed)
# setattr(fixed_mod, 'fixed_ss', fixed_ss)
fixed_mod.doState()
return fixed_mod, fixed_ss
[docs] def save_session(self, file_name=None):
file_name = modeltools.get_file_path(working_dir=self._working_dir,
internal_filename='save_data',
fmt='npz',
file_name=file_name,
write_suffix=False)
to_save = {}
for species in self.mod.species:
species_object = getattr(self, species)
try:
column_array = numpy.array(species_object._column_names)
scan_results = species_object._scan_results
to_save['col_{0}'.format(species)] = column_array
to_save['res_{0}'.format(species)] = scan_results
except:
pass
numpy.savez(file_name, **to_save)
[docs] def save_results(self, folder=None, separator=',',format='%f'):
base_folder = folder
for species in self.mod.species:
if folder:
folder = path.join(base_folder, species)
getattr(self, species).save_all_results(folder=folder,
separator=separator)
[docs] def load_session(self, file_name=None):
file_name = modeltools.get_file_path(working_dir=self._working_dir,
internal_filename='save_data',
fmt='npz',
file_name=file_name,
write_suffix=False)
loaded_data = {}
try:
with numpy.load(file_name) as data_file:
for k, v in data_file.items():
loaded_data[k] = v
except IOError as e:
raise e
for species in self.mod.species:
try:
column_names = [str(each) for each in
list(loaded_data['col_{0}'.format(species)])]
scan_results = loaded_data['res_{0}'.format(species)]
fixed_species = species
fixed_mod, fixed_ss = self._fix_at_ss(fixed_species)
rcd = RateCharData(fixed_ss=fixed_ss,
fixed_mod=fixed_mod,
basemod=self.mod, column_names=column_names,
scan_results=scan_results,
model_map=self._model_map, ltxe=self._ltxe)
setattr(self, fixed_species, rcd)
except:
pass
class RateCharData(object):
def __init__(self,
fixed_ss,
fixed_mod,
basemod,
column_names,
scan_results,
model_map,
ltxe):
super(RateCharData, self).__init__()
self.mod = fixed_mod
self.scan_results = DotDict()
self.mca_results = DotDict()
self._slope_range_factor = 3.0
self.scan_results['fixed'] = column_names[0]
self.scan_results['fixed_ss'] = fixed_ss
self.scan_results['scan_range'] = scan_results[:, 0]
self.scan_results['flux_names'] = column_names[1:]
self.scan_results['flux_data'] = scan_results[:, 1:]
self.scan_results['scan_points'] = len(self.scan_results.scan_range)
self.scan_results['flux_max'] = None
self.scan_results['flux_min'] = None
self.scan_results['scan_max'] = None
self.scan_results['scan_min'] = None
self.scan_results['ec_names'] = None
self.scan_results['ec_data'] = None
self.scan_results['rc_names'] = None
self.scan_results['rc_data'] = None
self.scan_results['prc_names'] = None
self.scan_results['prc_data'] = None
self._column_names = column_names
self._scan_results = scan_results
self._model_map = model_map
self._analysis_method = 'ratechar'
self._basemod = basemod
self._working_dir = modeltools.make_path(self._basemod,
self._analysis_method,
[self.scan_results.fixed])
self._ltxe = ltxe
self._color_dict_ = None
self._data_setup()
self.mca_results._ltxe = ltxe
self.mca_results._make_repr(
'"$" + self._ltxe.expression_to_latex(k) + "$"', 'v',
formatter_factory())
# del self.scan_results
# del self.mca_results
def _data_setup(self):
# reset value to do mcarc
setattr(self.mod, self.scan_results.fixed, self.scan_results.fixed_ss)
self.mod.doMcaRC()
self._make_attach_total_fluxes()
self._min_max_setup()
self._attach_fluxes_to_self()
self._make_all_coefficient_lines()
self._attach_all_coefficients_to_self()
self._make_all_summary()
self._make_all_line_data()
def _change_colour_order(self, order=None):
if not order:
order = list(self._color_dict_.keys())
shuffle(order)
self._color_dict_ = dict(list(zip(order, list(self._color_dict_.values()))))
self._make_all_line_data()
def _make_all_line_data(self):
self._make_flux_ld()
self._make_ec_ld()
self._make_rc_ld()
self._make_prc_ld()
self._make_total_flux_ld()
self._line_data_dict = OrderedDict()
self._line_data_dict.update(self._prc_ld_dict)
self._line_data_dict.update(self._flux_ld_dict)
self._line_data_dict.update(self._total_flux_ld_dict)
self._line_data_dict.update(self._ec_ld_dict)
self._line_data_dict.update(self._rc_ld_dict)
del self._flux_ld_dict
del self._ec_ld_dict
del self._rc_ld_dict
del self._prc_ld_dict
del self._total_flux_ld_dict
def _make_all_summary(self):
self._make_ec_summary()
self._make_cc_summary()
self._make_rc_summary()
self._make_prc_summary()
self.mca_results.update(self._ec_summary)
self.mca_results.update(self._cc_summary)
self.mca_results.update(self._rc_summary)
self.mca_results.update(self._prc_summary)
del self._ec_summary
del self._cc_summary
del self._rc_summary
del self._prc_summary
def _make_ec_summary(self):
ecs = {}
reagent_of = [each[2:] for each in self.scan_results.flux_names]
modifier_of = getattr(
self._model_map, self.scan_results.fixed).isModifierOf()
all_reactions = reagent_of + modifier_of
for reaction in all_reactions:
name = 'ec%s_%s' % (reaction, self.scan_results.fixed)
val = getattr(self.mod, name)
ecs[name] = val
self._ec_summary = ecs
def _make_rc_summary(self):
rcs = {}
for flux in self.scan_results.flux_names:
reaction = flux[2:]
name = '%s_%s' % (reaction, self.scan_results.fixed)
val = getattr(self.mod.rc, name)
name = 'rcJ' + name
rcs[name] = val
self._rc_summary = rcs
def _make_cc_summary(self):
ccs = {}
reagent_of = [each[2:] for each in self.scan_results.flux_names]
modifier_of = getattr(
self._model_map, self.scan_results.fixed).isModifierOf()
all_reactions = reagent_of + modifier_of
for flux_reaction in reagent_of:
for reaction in all_reactions:
name = 'ccJ%s_%s' % (flux_reaction, reaction)
val = getattr(self.mod, name)
ccs[name] = val
self._cc_summary = ccs
def _make_prc_summary(self):
prcs = {}
reagent_of = [each[2:] for each in self.scan_results.flux_names]
modifier_of = getattr(
self._model_map, self.scan_results.fixed).isModifierOf()
all_reactions = reagent_of + modifier_of
for flux_reaction in reagent_of:
for route_reaction in all_reactions:
ec = getattr(self.mod,
'ec%s_%s' % (
route_reaction, self.scan_results.fixed))
cc = getattr(self.mod,
'ccJ%s_%s' % (flux_reaction, route_reaction))
val = ec * cc
name = 'prcJ%s_%s_%s' % (flux_reaction,
self.scan_results.fixed,
route_reaction)
prcs[name] = val
self._prc_summary = prcs
def save_summary(self, file_name=None, separator=',',fmt='%f'):
file_name = modeltools.get_file_path(working_dir=self._working_dir,
internal_filename='mca_summary',
fmt='csv',
file_name=file_name, )
keys = list(self.mca_results.keys())
keys.sort()
values = numpy.array([self.mca_results[k]
for k in keys]).reshape(len(keys), 1)
try:
exportLAWH(values,
names=keys,
header=['Value'],
fname=file_name,
sep=separator,
format=fmt)
except IOError as e:
print(e.strerror)
def save_flux_results(self, file_name=None, separator=',',fmt='%f'):
file_name = modeltools.get_file_path(working_dir=self._working_dir,
internal_filename='flux_results',
fmt='csv',
file_name=file_name, )
scan_points = self.scan_results.scan_points
all_cols = numpy.hstack([
self._scan_results,
self.scan_results.total_supply.reshape(scan_points, 1),
self.scan_results.total_demand.reshape(scan_points, 1)])
column_names = self._column_names + ['Total Supply', 'Total Demand']
try:
exportLAWH(all_cols,
names=None,
header=column_names,
fname=file_name,
sep=separator,
format=fmt)
except IOError as e:
print(e.strerror)
def save_coefficient_results(self,
coefficient,
file_name=None,
separator=',',
folder=None,
fmt='%f'):
assert_message = 'coefficient must be one of "ec", "rc" or "prc"'
assert coefficient in ['rc', 'ec', 'prc'], assert_message
base_name = coefficient + '_results'
file_name = modeltools.get_file_path(working_dir=self._working_dir,
internal_filename=base_name,
fmt='csv',
file_name=file_name, )
results = getattr(self.scan_results, coefficient + '_data')
names = getattr(self.scan_results, coefficient + '_names')
new_names = []
for each in names:
new_names.append('x_vals')
new_names.append(each)
try:
exportLAWH(results,
names=None,
header=new_names,
fname=file_name,
sep=separator,
format=fmt)
except IOError as e:
print(e.strerror)
# TODO fix this method so that folder is a parameter only her
def save_all_results(self, folder=None, separator=',',fmt='%f'):
if not folder:
folder = self._working_dir
file_name = modeltools.get_file_path(working_dir=folder,
internal_filename='flux_results',
fmt='csv')
self.save_flux_results(separator=separator, file_name=file_name,fmt=fmt)
file_name = modeltools.get_file_path(working_dir=folder,
internal_filename='mca_summary',
fmt='csv')
self.save_summary(separator=separator, file_name=file_name, fmt=fmt)
for each in ['ec', 'rc', 'prc']:
base_name = each + '_results'
file_name = modeltools.get_file_path(working_dir=folder,
internal_filename=base_name,
fmt='csv')
self.save_coefficient_results(coefficient=each,
separator=separator,
file_name=file_name,
fmt=fmt)
def _min_max_setup(self):
# Negative minimum linear values mean nothing
# because they don't translate to a log space
# therefore we want the minimum non-negative/non-zero values.
# lets make sure there are no zeros
n_z_f = self.scan_results.flux_data[
numpy.nonzero(self.scan_results.flux_data)]
n_z_s = self.scan_results.scan_range[
numpy.nonzero(self.scan_results.scan_range)]
totals = numpy.vstack([self.scan_results.total_demand,
self.scan_results.total_supply])
n_z_t = totals[numpy.nonzero(totals)]
# and that the array is not now somehow empty
# although if this happens-you have bigger problems
if len(n_z_f) == 0:
n_z_f = numpy.array([0.01, 1])
if len(n_z_s) == 0:
n_z_s = numpy.array([0.01, 1])
# lets also (clumsily) find the non-negative mins and maxes
# by converting to logspace (to get NaNs) and back
# and then getting the min/max non-NaN
# PS flux max is the max of the totals
with numpy.errstate(all='ignore'):
self.scan_results.flux_max = numpy.nanmax(10 ** numpy.log10(n_z_t))
self.scan_results.flux_min = numpy.nanmin(10 ** numpy.log10(n_z_f))
self.scan_results.scan_max = numpy.nanmax(10 ** numpy.log10(n_z_s))
self.scan_results.scan_min = numpy.nanmin(10 ** numpy.log10(n_z_s))
def _attach_fluxes_to_self(self):
for i, each in enumerate(self.scan_results.flux_names):
# setattr(self, each, self.scan_results.flux_data[:, i])
self.scan_results[each] = self.scan_results.flux_data[:, i]
def _attach_all_coefficients_to_self(self):
setup_for = ['ec', 'rc', 'prc']
for each in setup_for:
eval('self._attach_coefficients_to_self(self.scan_results.' + each + '_names,\
self.scan_results.' + each + '_data)')
def _make_all_coefficient_lines(self):
setup_for = ['ec', 'rc', 'prc']
for each in setup_for:
eval('self._make_' + each + '_lines()')
def _make_attach_total_fluxes(self):
demand_blocks = getattr(
self._model_map, self.scan_results.fixed).isSubstrateOf()
supply_blocks = getattr(
self._model_map, self.scan_results.fixed).isProductOf()
dem_pos = [self.scan_results.flux_names.index('J_' + flux)
for flux in demand_blocks]
sup_pos = [self.scan_results.flux_names.index('J_' + flux)
for flux in supply_blocks]
self.scan_results['total_demand'] = numpy.sum(
[self.scan_results.flux_data[:, i]
for i in dem_pos],
axis=0)
self.scan_results['total_supply'] = numpy.sum(
[self.scan_results.flux_data[:, i]
for i in sup_pos],
axis=0)
def _make_rc_lines(self):
names = []
resps = []
for each in self.scan_results.flux_names:
reaction = each[2:]
name = reaction + '_' + self.scan_results.fixed
J_ss = getattr(self.mod, each)
slope = getattr(self.mod.rc, name)
resp = self._tangent_line(J_ss, slope)
name = 'rcJ' + name
names.append(name)
resps.append(resp)
resps = numpy.hstack(resps)
self.scan_results.rc_names = names
self.scan_results.rc_data = resps
def _make_prc_lines(self):
names = []
prcs = []
reagent_of = [each[2:] for each in self.scan_results.flux_names]
all_reactions = reagent_of + \
getattr(self._model_map,
self.scan_results.fixed).isModifierOf()
for flux_reaction in self.scan_results.flux_names:
J_ss = getattr(self.mod, flux_reaction)
reaction = flux_reaction[2:]
for route_reaction in all_reactions:
ec = getattr(
self.mod,
'ec' + route_reaction + '_' + self.scan_results.fixed)
cc = getattr(self.mod, 'ccJ' + reaction + '_' + route_reaction)
slope = ec * cc
prc = self._tangent_line(J_ss, slope)
name = 'prcJ%s_%s_%s' % (reaction,
self.scan_results.fixed,
route_reaction)
names.append(name)
prcs.append(prc)
prcs = numpy.hstack(prcs)
self.scan_results.prc_names = names
self.scan_results.prc_data = prcs
def _make_ec_lines(self):
names = []
elasts = []
for each in self.scan_results.flux_names:
reaction = each[2:]
name = 'ec' + reaction + '_' + self.scan_results.fixed
J_ss = getattr(self.mod, each)
slope = getattr(self.mod, name)
elast = self._tangent_line(J_ss, slope)
names.append(name)
elasts.append(elast)
elasts = numpy.hstack(elasts)
self.scan_results.ec_names = names
self.scan_results.ec_data = elasts
def _attach_coefficients_to_self(self, names, tangent_lines):
sp = 0
ep = 2
for name in names:
# setattr(self, name, tangent_lines[:, sp:ep])
self.scan_results[name] = tangent_lines[:, sp:ep]
sp = ep
ep += 2
def _tangent_line(self, J_ss, slope):
fix_ss = self.scan_results.fixed_ss
constant = J_ss / (fix_ss ** slope)
ydist = numpy.log10(self.scan_results.flux_max / self.scan_results.flux_min)
xdist = numpy.log10(self.scan_results.scan_max / self.scan_results.scan_min)
golden_ratio = (1 + numpy.sqrt(5)) / 2
xyscale = xdist / (ydist * golden_ratio * 1.5)
scale_factor = numpy.cos(numpy.arctan(slope * xyscale))
distance = numpy.log10(self._slope_range_factor) * scale_factor
range_min = fix_ss / (10 ** distance)
range_max = fix_ss * (10 ** distance)
scan_range = numpy.linspace(range_min, range_max, num=2)
rate = constant * scan_range ** (slope)
return numpy.vstack((scan_range, rate)).transpose()
@property
def _color_dict(self):
if not self._color_dict_:
fix_map = getattr(self._model_map, self.scan_results.fixed)
relavent_reactions = fix_map.isProductOf() + \
fix_map.isSubstrateOf() + \
fix_map.isModifierOf()
num_of_cols = len(relavent_reactions) + 3
cmap = get_cmap('Set2')(
numpy.linspace(0, 1.0, num_of_cols))[:, :3]
color_list = [rgb_to_hsv(*cmap[i, :]) for i in range(num_of_cols)]
relavent_reactions.sort()
color_dict = dict(
list(zip(['Total Supply'] +
['J_' + reaction for reaction in relavent_reactions] +
['Total Demand'],
color_list)))
# just to darken the colors a bit
for k, v in color_dict.items():
color_dict[k] = [v[0], 1, v[2]]
self._color_dict_ = color_dict
return self._color_dict_
def _make_flux_ld(self):
color_dict = self._color_dict
flux_ld_dict = {}
demand_blocks = ['J_' + dem_reac for dem_reac in getattr(
self._model_map, self.scan_results.fixed).isSubstrateOf()]
supply_blocks = ['J_' + sup_reac for sup_reac in getattr(
self._model_map, self.scan_results.fixed).isProductOf()]
for flux in self.scan_results.flux_names:
flux_col = self.scan_results.flux_names.index(flux)
x_data = self.scan_results.scan_range
y_data = self.scan_results.flux_data[:, flux_col]
latex_expr = self._ltxe.expression_to_latex(flux)
flux_color = self._color_dict[flux]
color = hsv_to_rgb(flux_color[0],
flux_color[1],
flux_color[2] * 0.9)
for dem in demand_blocks:
if dem == flux:
flux_ld_dict[flux] = \
LineData(name=flux,
x_data=x_data,
y_data=y_data,
categories=['Fluxes',
'Demand',
flux],
properties={'label': '$%s$' % latex_expr,
'color': color})
break
for sup in supply_blocks:
if sup == flux:
flux_ld_dict[flux] = \
LineData(name=flux,
x_data=x_data,
y_data=y_data,
categories=['Fluxes',
'Supply',
flux],
properties={'label': '$%s$' % latex_expr,
'color': color})
break
self._flux_ld_dict = flux_ld_dict
def _make_ec_ld(self):
ec_ld_dict = {}
for ec_name in self.scan_results.ec_names:
ec_name = str(ec_name) # Py2 fix with unicode_literals
for flux, flux_ld in self._flux_ld_dict.items():
ec_reaction = flux[2:]
if 'ec' + ec_reaction + '_' + self.scan_results.fixed in ec_name:
flux_color = self._color_dict[flux]
color = hsv_to_rgb(flux_color[0],
flux_color[1] * 0.5,
flux_color[2])
ec_data = self.scan_results[ec_name]
categories = ['Elasticity Coefficients'] + \
flux_ld.categories[1:]
latex_expr = self._ltxe.expression_to_latex(ec_name)
ec_ld_dict[ec_name] = \
LineData(name=ec_name,
x_data=ec_data[:, 0],
y_data=ec_data[:, 1],
categories=categories,
properties={'label': '$%s$' % latex_expr,
'color': color})
self._ec_ld_dict = ec_ld_dict
def _make_rc_ld(self):
rc_ld_dict = {}
for rc_name in self.scan_results.rc_names:
rc_name = str(rc_name) # Py2 fix with unicode_literals
for flux, flux_ld in self._flux_ld_dict.items():
rc_flux = 'J' + flux[2:]
if 'rc' + rc_flux + '_' in rc_name:
flux_color = self._color_dict[flux]
color = hsv_to_rgb(flux_color[0],
flux_color[1],
flux_color[2] * 0.7)
rc_data = self.scan_results[rc_name]
categories = ['Response Coefficients'] + \
flux_ld.categories[1:]
latex_expr = self._ltxe.expression_to_latex(rc_name)
rc_ld_dict[rc_name] = \
LineData(name=rc_name,
x_data=rc_data[:, 0],
y_data=rc_data[:, 1],
categories=categories,
properties={'label': '$%s$' % latex_expr,
'color': color,
'ls': '--'})
self._rc_ld_dict = rc_ld_dict
def _make_prc_ld(self):
def get_prc_route(prc, flux, fixed):
without_prefix = prc.split('prc')[1]
without_flux = without_prefix.split(flux)[1][1:]
route = without_flux.split(fixed)[1][1:]
return route
prc_ld_dict = {}
for prc_name in self.scan_results.prc_names:
prc_name = str(prc_name) # Py2 fix with unicode_literals
for flux, flux_ld in self._flux_ld_dict.items():
prc_flux = 'J' + flux[2:]
if 'prc' + prc_flux + '_' + self.scan_results.fixed in prc_name:
route_reaction = get_prc_route(prc_name,
prc_flux,
self.scan_results.fixed)
flux_color = self._color_dict['J_' + route_reaction]
color = hsv_to_rgb(flux_color[0],
flux_color[1] * 0.5,
flux_color[2])
prc_data = self.scan_results[prc_name]
categories = ['Partial Response Coefficients'] + \
flux_ld.categories[1:]
latex_expr = self._ltxe.expression_to_latex(prc_name)
prc_ld_dict[prc_name] = \
LineData(name=prc_name,
x_data=prc_data[:, 0],
y_data=prc_data[:, 1],
categories=categories,
properties={'label': '$%s$' % latex_expr,
'color': color})
self._prc_ld_dict = prc_ld_dict
def _make_total_flux_ld(self):
total_flux_ld_dict = {}
col = self._color_dict['Total Supply']
total_flux_ld_dict['Total Supply'] = \
LineData(name='Total Supply',
x_data=self.scan_results.scan_range,
y_data=self.scan_results.total_supply,
categories=['Fluxes',
'Supply',
'Total Supply'],
properties={'label': '$%s$' % 'Total\,Supply',
'color': hsv_to_rgb(col[0], col[1],
col[2] * 0.9),
'ls': '--'})
col = self._color_dict['Total Demand']
total_flux_ld_dict['Total Demand'] = \
LineData(name='Total Demand',
x_data=self.scan_results.scan_range,
y_data=self.scan_results.total_demand,
categories=['Fluxes',
'Demand',
'Total Demand'],
properties={'label': '$%s$' % 'Total\,Demand',
'color': hsv_to_rgb(col[0], col[1],
col[2] * 0.9),
'ls': '--'})
self._total_flux_ld_dict = total_flux_ld_dict
def plot(self):
category_classes = OrderedDict([
('Supply/Demand', [
'Supply',
'Demand']),
('Reaction Blocks',
self.scan_results.flux_names +
['Total Supply', 'Total Demand']),
('Lines', [
'Fluxes',
'Elasticity Coefficients',
'Response Coefficients',
'Partial Response Coefficients'])])
line_data_list = [v for v in self._line_data_dict.values()]
scan_fig = ScanFig(line_data_list,
ax_properties={'xlabel': '[%s]' %
self.scan_results.fixed.replace(
'_', ' '),
'ylabel': 'Rate',
'xscale': 'log',
'yscale': 'log',
'xlim': [self.scan_results.scan_min,
self.scan_results.scan_max],
'ylim': [self.scan_results.flux_min,
self.scan_results.flux_max * 2
]},
category_classes=category_classes,
base_name=self._analysis_method,
working_dir=self._working_dir)
scan_fig.toggle_category('Supply', True)
scan_fig.toggle_category('Demand', True)
scan_fig.toggle_category('Fluxes', True)
scan_fig.ax.axvline(self.scan_results.fixed_ss, ls=':', color='gray')
return scan_fig
def plot_decompose(self):
from warnings import warn, simplefilter
simplefilter('always', DeprecationWarning)
warn('plot_decompose has been renamed to `do_mca_scan, use that '
'method in the future`', DeprecationWarning, stacklevel=1)
simplefilter('default', DeprecationWarning)
return self.do_mca_scan()
@silence_print
def do_mca_scan(self):
ecs = []
ccs = []
prc_names = []
rc_names = []
rc_pos = []
reagent_of = [each[2:] for each in self.scan_results.flux_names]
all_reactions = reagent_of + \
getattr(self._model_map,
self.scan_results.fixed).isModifierOf()
arl = len(all_reactions)
strt = 0
stp = arl
for flux_reaction in self.scan_results.flux_names:
reaction = flux_reaction[2:]
rc_names.append('rcJ%s_%s' % (reaction, self.scan_results.fixed))
rc_pos.append(list(range(strt, stp)))
strt += arl
stp += arl
for route_reaction in all_reactions:
ec = 'ec' + route_reaction + '_' + self.scan_results.fixed
cc = 'ccJ' + reaction + '_' + route_reaction
name = 'prcJ%s_%s_%s' % (reaction,
self.scan_results.fixed,
route_reaction)
# ecs.append(ec)
if ec not in ecs:
ecs.append(ec)
ccs.append(cc)
prc_names.append(name)
ec_len = len(ecs)
user_output = [self.scan_results.fixed] + ecs + ccs
scanner = pysces.Scanner(self.mod)
scanner.quietRun = True
scanner.addScanParameter(self.scan_results.fixed,
self.scan_results.scan_min,
self.scan_results.scan_max,
self.scan_results.scan_points,
log=True)
scanner.addUserOutput(*user_output)
scanner.Run()
ax_properties = {'ylabel': 'Coefficient Value',
'xlabel': '[%s]' %
self.scan_results.fixed.replace('_', ' '),
'xscale': 'log',
'yscale': 'linear',
'xlim': [self.scan_results.scan_min,
self.scan_results.scan_max]}
cc_ec_data_obj = Data2D(mod=self.mod,
column_names=user_output,
data_array=scanner.UserOutputResults,
ltxe=self._ltxe,
analysis_method=self._analysis_method,
ax_properties=ax_properties,
file_name='cc_ec_scan',
num_of_groups=ec_len,
working_dir=path.split(self._working_dir)[0])
rc_data = []
all_outs = scanner.UserOutputResults[:, 1:]
ec_outs = all_outs[:, :ec_len]
cc_outs = all_outs[:, ec_len:]
ec_positions = list(range(ec_len)) * (len(prc_names)/ec_len)
for i, prc_name in enumerate(prc_names):
ec_col_data = ec_outs[:, ec_positions[i]]
cc_col_data = cc_outs[:, i]
# ec_col_data = outs[:, i]
# cc_col_data = outs[:, i + cc_s_pos]
col = ec_col_data * cc_col_data
rc_data.append(col)
temp = numpy.vstack(rc_data).transpose()
rc_data += [numpy.sum(temp[:, rc_pos[i]], axis=1) for i in
range(len(rc_names))]
rc_out_arr = [scanner.UserOutputResults[:, 0]] + rc_data
rc_out_arr = numpy.vstack(rc_out_arr).transpose()
rc_data_obj = Data2D(mod=self.mod,
column_names=[self.scan_results.fixed] + prc_names + rc_names,
data_array=rc_out_arr,
ltxe=self._ltxe,
analysis_method=self._analysis_method,
ax_properties=ax_properties,
file_name='prc_scan',
num_of_groups=ec_len,
working_dir=path.split(self._working_dir)[0])
#rc_data_obj._working_dir = path.split(self._working_dir)[0]
#cc_ec_data_obj._working_dir = path.split(self._working_dir)[0]
return rc_data_obj, cc_ec_data_obj