Source code for PUMI.engine

import argparse
from configparser import SafeConfigParser

from PUMI._version import get_versions
from nipype.pipeline.engine.workflows import *
from nipype.pipeline.engine.nodes import *
import nipype.interfaces.utility as utility
from nipype.interfaces import BIDSDataGrabber
from nipype.interfaces.io import DataSink
from nipype import Function
from nipype.utils.filemanip import list_to_filename
from hashlib import sha1
import re
import ast
from PUMI import globals

def _parameterization_dir(param):
    """
    Returns
        the directory name for the given parameterization string as follows:
        - If the parameterization is longer than 32 characters, then
          return the SHA-1 hex digest.
        - Otherwise, return the parameterization unchanged.
    """
    if len(param) > 32:
        return sha1(param.encode()).hexdigest()
    return param


[docs]class NestedNode(Node): # costumizing directories
[docs] def output_dir(self): """Return the location of the output directory for the node""" # Output dir is cached if self._output_dir: return self._output_dir # Calculate & cache otherwise if self.base_dir is None: self.base_dir = mkdtemp() outputdir = self.base_dir # todo: maybe does not work with multiple mapnodes/iterables if self.parameterization: params_str = ["{}".format(p) for p in self.parameterization] # regexp magic to make subject handling more BIDS-like: #params_str = [re.sub('^_subject_', 'sub-', param) for param in params_str] if not str2bool(self.config["execution"]["parameterize_dirs"]): params_str = [_parameterization_dir(p) for p in params_str] if self._hierarchy: # top level remains outside outputdir = op.join(outputdir, self._hierarchy.split(".")[0]) outputdir = op.join(outputdir, *params_str) if self._hierarchy: # subworkflows go inside outputdir = op.join(outputdir, *self._hierarchy.split(".")[1:]) elif self._hierarchy: outputdir = op.join(outputdir, *self._hierarchy.split(".")) self._output_dir = op.realpath(op.join(outputdir, self.name)) if isinstance(self._interface, DataSink): if self.parameterization: params_str = ["{}".format(p) for p in self.parameterization] # regexp magic to make subject handling more BIDS-like: params_str = [re.sub('^_subject_', 'sub-', param) for param in params_str] if not str2bool(self.config["execution"]["parameterize_dirs"]): params_str = [_parameterization_dir(p) for p in params_str] self._output_dir = op.join(self._output_dir, '-'.join(params_str)) print(self._output_dir) return self._output_dir
[docs]class NestedMapNode(MapNode, NestedNode): # costumizing directories # output_dir() is from NestedNode
[docs] def output_dir(self): return super().output_dir()
[docs]class NestedWorkflow(Workflow): # input/output filed naming is convenient: no need for 'inputspec.in_file' but simply 'in_file' (as if it was a node) # this is realized by tweaking the inputs of connect # plus: connect accepts names instead of objects (for using the pre-specified in/outpoutspec nodes)
[docs] def connect(self, *args, **kwargs): """ Todo docs """ # we convert the input to handle the subworkflow-shortcuts # (i.e. adding 'inoutspec.' and 'outputspec.' when needed) if len(args) == 1: connection_list = args[0] elif len(args) == 4: connection_list = [(args[0], args[2], [(args[1], args[3])])] else: raise TypeError( "connect() takes either 4 arguments, or 1 list of" " connection tuples (%d args given)" % len(args) ) connection_list = list(map(list, connection_list)) # 1. Handle shortcut for i_conn_list, (srcnode, destnode, connects) in enumerate(connection_list): if isinstance(srcnode, str): connection_list[i_conn_list][0] = srcnode = self.get_node(srcnode) if isinstance(destnode, str): connection_list[i_conn_list][1] = destnode = self.get_node(destnode) for i_connects, (source, dest) in enumerate(connects): if isinstance(source, tuple): # handles the case that source is specified # with a function sourcename = source[0] elif isinstance(source, (str, bytes)): sourcename = source else: raise Exception( ( "Unknown source specification in " "connection from output of %s" ) % srcnode.name ) if sourcename and not srcnode._check_outputs(sourcename): outputspec_source = 'outputspec.' + sourcename if srcnode._check_outputs(outputspec_source): connection_list[i_conn_list][2][i_connects] = (outputspec_source, dest) if not destnode._check_inputs(dest): inputspec_dest = 'inputspec.' + dest if destnode._check_inputs(inputspec_dest): src = connection_list[i_conn_list][2][i_connects][0] connection_list[i_conn_list][2][i_connects] = (src, inputspec_dest) # connect nodes super().connect(connection_list, **kwargs)
# decorator class
[docs]class PumiPipeline: """ # Todo docs """ def __init__(self, inputspec_fields=None, outputspec_fields=None, regexp_sub=None): if outputspec_fields is None: outputspec_fields = [] if inputspec_fields is None: inputspec_fields = [] if regexp_sub is None: regexp_sub = [] self.inputspec_fields = inputspec_fields self.outputspec_fields = outputspec_fields self.regexp_sub = regexp_sub def __call__(self, pipeline_fun): from functools import wraps @wraps(pipeline_fun) # So that decorated functions can be documented properly def wrapper(name, base_dir='.', sink_dir=None, qc_dir=None, **kwargs): if sink_dir is None: sink_dir = globals.cfg_parser.get('SINKING', 'sink_dir', fallback='derivatives') if not sink_dir.startswith('/'): sink_dir = os.path.abspath(sink_dir) if qc_dir is None: qc_dir = globals.cfg_parser.get('SINKING', 'qc_dir', fallback='qc') if not qc_dir.startswith('/'): qc_dir = os.path.abspath(os.path.join(sink_dir, qc_dir)) wf = NestedWorkflow(name, base_dir) wf.sink_dir = sink_dir wf.qc_dir = qc_dir wf.cfg_parser = globals.cfg_parser if len(self.inputspec_fields) != 0: inputspec = NestedNode( utility.IdentityInterface( fields=self.inputspec_fields, mandatory_inputs=True ), name='inputspec' ) wf.add_nodes([inputspec]) if len(self.outputspec_fields) != 0: outputspec = NestedNode( utility.IdentityInterface( fields=self.outputspec_fields, mandatory_inputs=True ), name='outputspec' ) wf.add_nodes([outputspec]) sinker = NestedNode( DataSink(), name='sinker' ) sinker.inputs.base_directory = wf.qc_dir if isinstance(self, QcPipeline) else wf.sink_dir sinker.inputs.regexp_substitutions = self.regexp_sub wf.add_nodes([sinker]) pipeline_fun(wf=wf, **kwargs) # todo: should we do any post workflow checks # e.g. is outputspec connected # or unconnected nodes return wf return wrapper def _regex(self): print('Regexp substitutions:', self.regexp_sub)
[docs]class AnatPipeline(PumiPipeline): """ # Todo docs """ def __init__(self, inputspec_fields, outputspec_fields, regexp_sub=None, default_regexp_sub=True): regexp_sub = [] if regexp_sub is None else regexp_sub substitutions = [] if default_regexp_sub: substitutions = [(r'(.*\/)([^\/]+)\/([^\/]+)\/([^\/]+)$', r'\g<1>\g<3>/\g<4>'), ('_subject_', 'sub-')] substitutions.extend(regexp_sub) super().__init__(inputspec_fields, outputspec_fields, substitutions) def __call__(self, anat_fun): return super().__call__(anat_fun)
[docs]class QcPipeline(PumiPipeline): """ # Todo docs """ def __init__(self, inputspec_fields, outputspec_fields, regexp_sub=None, default_regexp_sub=True): regexp_sub = [] if regexp_sub is None else regexp_sub substitutions = [] if default_regexp_sub: substitutions = [(r'(.*\/)([^\/]+)\/([^\/]+)$', r'\g<1>\g<2>.png'), ('_subject_', 'sub-')] substitutions.extend(regexp_sub) super().__init__(inputspec_fields, outputspec_fields, substitutions) def __call__(self, qc_fun): return super().__call__(qc_fun)
[docs]class FuncPipeline(PumiPipeline): """ # Todo docs """ def __init__(self, inputspec_fields, outputspec_fields, regexp_sub=None, default_regexp_sub=True): regexp_sub = [] if regexp_sub is None else regexp_sub substitutions = [] if default_regexp_sub: substitutions = [(r'(.*\/)([^\/]+)\/([^\/]+)\/([^\/]+)$', r'\g<1>\g<3>/\g<4>'), ('_subject_', 'sub-')] substitutions.extend(regexp_sub) super().__init__(inputspec_fields, outputspec_fields, substitutions) def __call__(self, func_fun): return super().__call__(func_fun)
[docs]class GroupPipeline(PumiPipeline): def __init__(self, inputspec_fields, outputspec_fields, regexp_sub=None, default_regexp_sub=True): regexp_sub = [] if regexp_sub is None else regexp_sub substitutions = [] cfg_parser = SafeConfigParser() cfg_parser.read('settings.ini') sink_dir = cfg_parser.get('SINKING', 'sink_dir', fallback='derivatives') if default_regexp_sub: substitutions = [(r'(.*\/)([^\/]+)\/([^\/]+)$', r'\g<1>//group/\g<2>/\g<3>')] substitutions.extend(regexp_sub) super().__init__(inputspec_fields, outputspec_fields, substitutions) def __call__(self, group_fun): return super().__call__(group_fun)
[docs]class BidsPipeline(PumiPipeline): """ decorator for top-level pipelines, with BIDS input """ def __init__(self, output_query=None): #regexp_sub = [] if regexp_sub is None else regexp_sub #substitutions = [] #if default_regexp_sub: # substitutions = [] # not needed here probably? #substitutions.extend(regexp_sub) if output_query is None: self.output_query = { 'T1w': dict( datatype='anat', suffix='T1w', extension=['nii', 'nii.gz'] ), #'rest': dict( # todo: how to get rests only # datatype='func', # suffix='bold', # extension=['nii', 'nii.gz'] #), 'bold': dict( # this should get all task, only datatype='func', suffix='bold', extension=['nii', 'nii.gz'] ), #'fmap': dict( # modality='fmap', # extension=['nii', 'nii.gz'] #) #'dwi': dict( # modality='dwi', # extension=['nii', 'nii.gz'] #) } else: self.output_query = output_query super().__init__(None, None, None) def __call__(self, pipeline_fun): def wrapper(name, bids_dir, subjects=None, base_dir='.', sink_dir=None, qc_dir=None, run_args=None, **kwargs): """ # Todo Docs """ # default: multiproc if run_args is None: run_args = {'plugin':'MultiProc'} if sink_dir is None: sink_dir = globals.cfg_parser.get('SINKING', 'sink_dir', fallback='derivatives') if not sink_dir.startswith('/'): sink_dir = os.path.abspath(sink_dir) globals.cfg_parser.set('SINKING', 'sink_dir', sink_dir) if qc_dir is None: qc_dir = globals.cfg_parser.get('SINKING', 'qc_dir', fallback='qc') if not qc_dir.startswith('/'): qc_dir = os.path.abspath(os.path.join(sink_dir, qc_dir)) globals.cfg_parser.set('SINKING', 'qc_dir', qc_dir) # main workflow wf = NestedWorkflow(name, base_dir) wf.sink_dir = sink_dir wf.qc_dir = qc_dir wf.cfg_parser = globals.cfg_parser # instead of inputspec, we need a bidsgrabber if subjects is None: # parse all subjects subjects = [] for sub in glob(bids_dir + '/sub-*'): subjects.append(sub.split('sub-')[-1]) # Create a subroutine (subgraph) for every subject subject_iterator = Node(interface=utility.IdentityInterface(fields=['subject']), name='subject_iterator') subject_iterator.iterables = [('subject', subjects)] # create a BIDS-node bids_grabber = Node(BIDSDataGrabber(), name='bids_grabber') bids_grabber.inputs.base_dir = os.path.abspath(bids_dir) bids_grabber.inputs.output_query = self.output_query wf.connect(subject_iterator, 'subject', bids_grabber, 'subject') inputspec = NestedNode( utility.IdentityInterface( fields=[*self.output_query] ), name='inputspec' ) # 'Unpack' list from bids_grabber # bids_grabber returns a list with a string (path to the anat image of a subject), # but most other nodes do not take a list as input file for bids_modality in [*self.output_query]: print(bids_modality) path_extractor = Node( Function( input_names=["filelist"], output_names=[bids_modality], function=list_to_filename ), name="path_extractor_" + bids_modality ) wf.connect(bids_grabber, bids_modality, path_extractor, 'filelist') wf.connect(path_extractor, bids_modality, inputspec, bids_modality) # there is no outputspec, this pipeline should not be nested! # in case it's needed: sinker = NestedNode( DataSink(), name='sinker' ) sinker.inputs.base_directory = wf.qc_dir if isinstance(self, QcPipeline) else wf.sink_dir sinker.inputs.regexp_substitutions = self.regexp_sub wf.add_nodes([sinker]) pipeline_fun(wf=wf, bids_dir=bids_dir, **kwargs) # todo: should we do any post workflow checks # e.g. is outputspec connected # or unconnected nodes wf.run(**run_args) return wf return wrapper
[docs]class BidsApp: def __init__(self, pipeline, name, bids_dir=None, output_dir=None, analysis_level=None, participant_label=None, working_dir='.', run_args=None, description=None, **kwargs): if description is None: self.parser = argparse.ArgumentParser() else: self.parser = argparse.ArgumentParser(description=description) self.parser.add_argument('--bids_dir', required=False, help='Root directory of the BIDS-compliant input dataset.') self.parser.add_argument('--output_dir', required=False, help='Directory where the results will be stored.') self.parser.add_argument('--analysis_level', required=False, choices=['participant'], help='Level of the analysis that will be performed. Default is participant.') self.parser.add_argument('--participant_label', required=False, help='Space delimited list of participant-label(s) (e.g. "001 002 003"). ' 'Perform the tool on the given participants or if this parameter is not ' 'provided then perform the procedure on all subjects.', nargs="+") self.parser.add_argument('--version', action='version', version='Version {}'.format(get_versions()['version']), help='Print version of PUMI') self.parser.add_argument('--working_dir', type=str, help='Directory where temporary data will be stored. Default is the current working directory.') self.parser.add_argument('--plugin', type=str, help='Nipype plugin (e.g. MultiProc, Slurm). If not set, MultiProc is used.') self.parser.add_argument('--n_procs', type=int, help='Amount of threads to execute in parallel.' + 'If not set, the amount of CPU cores is used.' + 'Caution: Does only work with the MultiProc-plugin!') self.parser.add_argument('--memory_gb', type=int, help='Memory limit in GB. If not set, use 90% of the available memory' + 'Caution: Does only work with the MultiProc-plugin!') self.parser.add_argument('--plugin_args', type=str, help='Nipype plugin arguments in dictionary format (e. g. {\'memory_gb\': 6})' + 'Caution: If set, you need to supply the --plugin argument and' + 'the command line arguments --n_procs and --memory_gb will be ignored!') self.pipeline = pipeline # mandatory via script self.name = name # mandatory via script self.bids_dir = bids_dir self.output_dir = output_dir self.analysis_level = analysis_level self.participant_label = participant_label self.working_dir = working_dir self.run_args = run_args self.kwargs = kwargs
[docs] def run(self): cli_args = self.parser.parse_args() bids_specified = ['bids_dir', 'output_dir', 'analysis_level', 'participant_label', 'version', 'working_dir'] pipeline_specific_arguments = {key:dict(vars(cli_args))[key] for key in dict(vars(cli_args)).keys() if key not in bids_specified} if cli_args.plugin_args is not None: plugin_args_dict = ast.literal_eval(cli_args.plugin_args) self.run_args = {'plugin': cli_args.plugin, 'plugin_args': plugin_args_dict} else: if cli_args.plugin is not None: self.run_args = {'plugin': cli_args.plugin} if cli_args.plugin == 'MultiProc': self.run_args['plugin_args'] = {} if cli_args.n_procs is not None: self.run_args['plugin_args']['n_procs'] = cli_args.n_procs if cli_args.memory_gb is not None: self.run_args['plugin_args']['memory_gb'] = cli_args.memory_gb if (cli_args.bids_dir is None) and (self.bids_dir is None): raise ValueError('The argument "bids_dir" has to be set!') else: self.bids_dir = cli_args.bids_dir if (cli_args.bids_dir is not None) else self.bids_dir # Use specification from CLI if available. Otherwise, use the specification from the BidsApp-constructor. # If output_dir is None, BidsApp and PumiPipeline are going to read the location specified in the settings.ini self.output_dir = cli_args.output_dir if (cli_args.output_dir is not None) else self.output_dir self.participant_label = cli_args.participant_label if (cli_args.participant_label is not None) else self.participant_label self.working_dir = cli_args.working_dir if (cli_args.working_dir is not None) else self.working_dir # todo: integrate analysis_level if self.run_args is None: self.pipeline(self.name, bids_dir=self.bids_dir, sink_dir=self.output_dir, base_dir=self.working_dir, subjects=self.participant_label, **pipeline_specific_arguments, **self.kwargs) else: self.pipeline(self.name, bids_dir=self.bids_dir, sink_dir=self.output_dir, base_dir=self.working_dir, subjects=self.participant_label, run_args=self.run_args, **pipeline_specific_arguments, **self.kwargs)