Source code for paws.core.operations.PACKAGING.PIF.PifNPSynthExperiment
from collections import OrderedDict
import numpy as np
import pypif.obj as pifobj
from saxskit import saxs_math, saxs_piftools
from ... import Operation as opmod
from ...Operation import Operation
inputs=OrderedDict(
recipe=None,
experiment_id=None,
t_T=None,
t_params=None,
t_populations=None,
t_reports=None)
outputs=OrderedDict(pif=None)
[docs]class PifNPSynthExperiment(Operation):
"""
Analyze a series of PIFs generated in a nanoparticle synthesis experiment
and produce a PIF that describes the overall experiment.
"""
def __init__(self):
super(PifNPSynthExperiment,self).__init__(inputs,outputs)
self.input_doc['recipe'] = 'dict describing the synthesis recipe'
self.input_doc['experiment_id'] = 'experiment id'
self.input_doc['t_T'] = 'n-by-2 array, Temperature vs. time'
self.input_doc['t_params'] = 'n-by-2 array, saxs params vs. time'
self.input_doc['t_populations'] = 'n-by-2 array, populations vs. time'
self.input_doc['t_reports'] = 'n-by-2 array, fit reports vs. time'
self.output_doc['pif'] = 'pypif.obj.ChemicalSystem object '\
'describing the synthesis experiment'
[docs] def run(self):
rcp = self.inputs['recipe']
expt_id = self.inputs['experiment_id']
t_T = self.inputs['t_T']
t_pars = self.inputs['t_params']
t_pops = self.inputs['t_populations']
t_rpts = self.inputs['t_reports']
csys = pifobj.ChemicalSystem()
csys.uid = expt_id
csys.ids = []
csys.ids.append(saxs_piftools.id_tag('EXPERIMENT_ID',expt_id))
if rcp:
prep,comp = self.process_np_synth_recipe(rcp)
csys.preparation = prep
csys.composition = comp
csys.properties = []
if t_T is not None:
csys.properties.append(
self.property_vs_time(
'temperature',t_T,'seconds','degrees C'))
if t_pars is not None:
for pname in saxs_math.parameter_keys:
nt = len(t_pars)
# collect the time and parameter list for all times where pname is reported
t = np.array([t_pars[it][0] for it in range(nt) if pname in t_pars[it][1].keys()])
par = [t_pars[it][1][pname] for it in range(nt) if pname in t_pars[it][1].keys()]
# count the instances of this parameter at each time point
npar = np.array([len(pari) for pari in par])
# TODO: this needs some additional work
# to ensure the continuity of parameter identities,
# e.g., so that a given peak retains its identity
# throughout the synthesis reaction
for ipar in range(max(npar)):
# get all t points at which this parameter exists
tt = t[(npar>ipar)]
# harvest the parameter value at all of those t points
nt_pari = len(tt)
pari = [par[it][ipar] for it in range(nt_pari)]
t_pari = zip(list(tt),pari)
if any(t_pari):
property_name = saxs_piftools.parameter_description[pname]
if max(npar) > 1:
property_name = property_name + ' ({})'.format(ipar)
csys.properties.append(self.property_vs_time(
property_name,t_pari,'seconds',\
saxs_piftools.parameter_units[pname]))
if t_rpts is not None:
# fit obj vs. time
nrpts = len(t_rpts)
t_obj = [[t_rpts[it][0],t_rpts[it][1]['objective_value']] for
it in range(nrpts) if 'objective_value' in t_rpts[it][1]]
if any(t_obj):
csys.properties.append(self.property_vs_time(
'spectrum fitting error',t_obj,'arb'))
self.outputs['pif'] = csys
[docs] def property_vs_time(self,name,t_param,t_units=None,param_units=None):
pf = pifobj.Property()
pf.name = name
npts = len(t_param)
pf.scalars = [pifobj.Scalar(t_param[i][1]) for i in range(npts)]
if param_units:
pf.units = param_units
pf.conditions = []
pf.conditions.append( pifobj.Value('reaction time',
[pifobj.Scalar(t_param[i][0]) for i in range(npts)],
None,None,None,t_units) )
return pf
[docs] def process_np_synth_recipe(self,rcp):
return None,None