Source code for paws.core.operations.IO.PIF.ShipToDataSet

from collections import OrderedDict
import os

from pypif import pif
import pypif.obj as pifobj 

from ... import Operation as opmod 
from ...Operation import Operation

inputs=OrderedDict(
    pif=None,
    client=None,
    dsid=None,
    json_dirpath=None,
    json_filename=None,
    keep_json=False,
    ship_flag=False)
outputs=OrderedDict(response=None)
        
[docs]class ShipToDataSet(Operation): """ Take a pypif.obj.System object and ship it to a given Citrination data set. """ def __init__(self): super(ShipToDataSet,self).__init__(inputs,outputs) self.input_doc['pif'] = 'A pypif.obj.System object or an array/list thereof' self.input_doc['client'] = 'A running Citrination client' self.input_doc['dsid'] = 'Data set ID where the pif record(s) will be stored on Citrination' self.input_doc['json_dirpath'] = 'Filesystem path where a json file of the pif(s) will be saved' self.input_doc['json_filename'] = 'Name of the .json file where the pif(s) will be saved' self.input_doc['keep_json'] = 'Flag for whether or not to keep the json file' self.input_doc['ship_flag'] = 'Flag for shipping the pif- set to False for a dry run' self.output_doc['response'] = 'The Citrination server response to the shipment'
[docs] def run(self): cl = self.inputs['client'] dsid = self.inputs['dsid'] p = self.inputs['pif'] json_dir = self.inputs['json_dirpath'] json_file = self.inputs['json_filename'] if not os.path.splitext(json_file)[1] == 'json': json_file = json_file+'.json' json_file = os.path.join(json_dir,json_file) json_flag = self.inputs['keep_json'] ship_flag = self.inputs['ship_flag'] pif.dump(p, open(json_file,'w')) if ship_flag: r = cl.upload_file(json_file,dataset_id = dsid) else: r = 'dry run: no shipment occurred.' if not json_flag: os.remove(json_file) self.outputs['response'] = r