from PUMI.engine import AnatPipeline, QcPipeline
from PUMI.interfaces.HDBet import HDBet
from PUMI.utils import create_segmentation_qc
from nipype.interfaces import fsl
from nipype.interfaces.utility import Split
from matplotlib.colors import LinearSegmentedColormap
from nipype import Function
from PUMI.engine import AnatPipeline
from PUMI.engine import NestedNode as Node
import os
from PUMI.pipelines.multimodal.image_manipulation import pick_volume
[docs]@QcPipeline(inputspec_fields=['background', 'overlay'],
outputspec_fields=['out_file'])
def qc_segmentation(wf, fmri=False, **kwargs):
"""
Create quality check images for background extraction workflows
Inputs:
background (str): Path to the extracted brain.
overlay (str): Path to the overlay.
Outputs:
out_file (str): Path to the quality check image
Sinking:
- The quality check image
"""
plot = Node(Function(input_names=['overlay', 'bg_img', 'cmap'],
output_names=['out_file'],
function=create_segmentation_qc),
name='plot')
wf.connect('inputspec', 'background', plot, 'bg_img')
wf.connect('inputspec', 'overlay', plot, 'overlay')
# sinking
if fmri:
wf.connect(plot, 'out_file', 'sinker', 'qc_func_segmentation')
else:
wf.connect(plot, 'out_file', 'sinker', 'qc_anat_segmentation')
# output
wf.connect(plot, 'out_file', 'outputspec', 'out_file')
[docs]@QcPipeline(inputspec_fields=['in_file'],
outputspec_fields=['out_file'])
def qc_tissue_segmentation(wf, **kwargs):
"""
Create quality check images for tissue segmentation workflows
Inputs:
in_file (str): Path to the partial volume map
Outputs:
out_file (str): Path to the quality check image
Sinking:
- The quality check image
"""
plot = Node(Function(input_names=['overlay', 'cmap'],
output_names=['out_file'],
function=create_segmentation_qc),
name='plot')
colors = ['#00A859', '#FFCC29', '#3E4095']
plot.inputs.cmap = LinearSegmentedColormap.from_list('tissue_segmentation_colors', colors, N=3)
wf.connect('inputspec', 'in_file', plot, 'overlay')
# sinking
wf.connect(plot, 'out_file', 'sinker', 'qc_tissue_segmentation')
# output
wf.connect(plot, 'out_file', 'outputspec', 'out_file')
[docs]@AnatPipeline(inputspec_fields=['in_file'],
outputspec_fields=['out_file', 'brain_mask'])
def bet_fsl(wf, fmri=False, volume='middle', **kwargs):
"""
Performs Brain extraction of a 3d-vloume.
User can choose the position of the 3d-volume
Parameter:
fmri(bool):
Inputs:
in_file(str): Path to the functional 4d-image.
Outputs:
out_file(str):
brain_mask(str):
"""
# bet
bet = Node(interface=fsl.BET(), name='bet')
bet.inputs.mask = True
bet.inputs.vertical_gradient = wf.cfg_parser.getfloat('FSL', 'bet_vertical_gradient', fallback=-0.3)
wf.connect('inputspec', 'in_file', bet, 'in_file')
if fmri:
bet.inputs.frac = wf.cfg_parser.getfloat('FSL', 'bet_frac_func', fallback=0.3)
bet.inputs.functional = True
bet_vol = pick_volume('bet_vol', volume=volume)
wf.connect(bet, 'mask_file', bet_vol, 'in_file')
apply_mask = Node(fsl.ApplyMask(), name="apply_mask")
wf.connect(bet_vol, 'out_file', apply_mask, 'mask_file')
wf.connect('inputspec', 'in_file', apply_mask, 'in_file')
else:
bet.inputs.frac = wf.cfg_parser.getfloat('FSL', 'bet_frac_anat', fallback=0.3)
bet.inputs.robust = True
# quality check
if fmri:
overlay = pick_volume('qc_overlay', volume=volume)
wf.connect(apply_mask, 'out_file', overlay, 'in_file')
background = pick_volume('qc_background')
wf.connect('inputspec', 'in_file', background, 'in_file')
qc = qc_segmentation(name='qc_segmentation', fmri=True, qc_dir=wf.qc_dir)
wf.connect(overlay, 'out_file', qc, 'overlay')
wf.connect(background, 'out_file', qc, 'background')
#wf.connect(overlay, 'out_file', qc, 'background')
#wf.connect(background, 'out_file', qc, 'overlay')
else:
qc = qc_segmentation(name='qc_segmentation', qc_dir=wf.qc_dir)
wf.connect(bet, 'out_file', qc, 'overlay')
wf.connect('inputspec', 'in_file', qc, 'background')
# sinking
wf.connect(bet, 'out_file', 'sinker', 'out_file')
wf.connect(bet, 'mask_file', 'sinker', 'mask_file')
# output
wf.connect(bet, 'mask_file', 'outputspec', 'brain_mask')
if fmri:
wf.connect(apply_mask, 'out_file', 'outputspec', 'out_file')
else:
wf.connect(bet, 'out_file', 'outputspec', 'out_file')
[docs]@AnatPipeline(inputspec_fields=['in_file'],
outputspec_fields=['out_file', 'brain_mask'])
def bet_hd(wf, **kwargs):
"""
Does brain extraction with HD-Bet.
"""
# bet
bet = Node(interface=HDBet(), name='bet')
bet.inputs.mode = kwargs.get('mode', wf.cfg_parser.get('HD-Bet', 'mode', fallback='accurate'))
bet.inputs.device = kwargs.get('device', wf.cfg_parser.get('HD-Bet', 'device', fallback='0'))
bet.inputs.tta = kwargs.get('tta', wf.cfg_parser.getint('HD-Bet', 'tta', fallback=1))
bet.inputs.postprocessing = kwargs.get('postprocessing',
wf.cfg_parser.getint('HD-Bet', 'postprocessing', fallback=1))
bet.inputs.save_mask = kwargs.get('save_mask', wf.cfg_parser.getint('HD-Bet', 'save_mask', fallback=1))
bet.inputs.overwrite_existing = kwargs.get('overwrite_existing',
wf.cfg_parser.getint('HD-Bet', 'overwrite_existing', fallback=1))
wf.connect('inputspec', 'in_file', bet, 'in_file')
# quality check
qc = qc_segmentation(name='qc_segmentation', qc_dir=wf.qc_dir)
wf.connect('inputspec', 'in_file', qc, 'background')
wf.connect(bet, 'out_file', qc, 'overlay')
wf.connect(bet, 'out_file', 'outputspec', 'out_file')
wf.connect(bet, 'mask_file', 'outputspec', 'brain_mask')
[docs]@AnatPipeline(inputspec_fields=['brain', 'stand2anat_xfm'],
outputspec_fields=['probmap_csf', 'probmap_gm', 'probmap_wm', 'mixeltype', 'parvol_csf', 'parvol_gm',
'parvol_wm', 'partial_volume_map'])
def tissue_segmentation_fsl(wf, priormap=True, **kwargs):
"""
Perform segmentation of a brain extracted T1w image.
Parameters:
priormap (bool): Set to True if you want to use prior probability maps.
In that case the stand2anat_xfm input is needed (otherwise not).
Inputs:
brain (str): Path to the brain which should be segmented.
stand2anat_xfm (str): Path to standard2input matrix calculated by FSL FLIRT.
Only necessary when using prior probability maps!
Outputs:
probmap_csf (str): Path to csf probability map.
probmap_gm (str): Path to gm probability map.
probmap_wm (str): Path to wm probability map
mixeltype (str): Path to mixeltype volume file
parvol_csf (str): Path to csf partial volume file
parvol_gm (str): Path to gm partial volume file
parvol_wm (str): Path to wm partial volume file
partial_volume_map (str): Path to partial volume map
Sinking:
- The probability maps for csf, gm and wm.
Acknowledgements:
Modified version of CPAC.seg_preproc.seg_preproc (https://github.com/FCP-INDI/C-PAC) and Balint Kinces code (2018)
For a deeper insight:
- https://fsl.fmrib.ox.ac.uk/fsl/fslwiki/FAST
- https://nipype.readthedocs.io/en/latest/api/generated/nipype.interfaces.fsl.preprocess.html
"""
if priormap:
priorprob_csf = os.path.join(os.environ['FSLDIR'], '/data/standard/tissuepriors/avg152T1_csf.hdr')
priorprob_gm = os.path.join(os.environ['FSLDIR'], '/data/standard/tissuepriors/avg152T1_gray.hdr')
priorprob_wm = os.path.join(os.environ['FSLDIR'], '/data/standard/tissuepriors/avg152T1_white.hdr')
fast = Node(interface=fsl.FAST(), name='fast')
fast.inputs.img_type = 1
fast.inputs.segments = True
fast.inputs.probability_maps = True
fast.inputs.out_basename = 'fast'
wf.connect('inputspec', 'brain', fast, 'in_files')
if priormap:
wf.connect('inputspec', 'stand2anat_xfm', fast, 'init_transform')
split_probability_maps = Node(interface=Split(), name='split_probability_maps')
split_probability_maps.inputs.splits = [1, 1, 1]
split_probability_maps.inputs.squeeze = True
wf.connect(fast, 'probability_maps', split_probability_maps, 'inlist')
split_partial_volume_files = Node(interface=Split(), name='split_partial_volume_files')
split_partial_volume_files.inputs.splits = [1, 1, 1]
split_partial_volume_files.inputs.squeeze = True
wf.connect(fast, 'partial_volume_files', split_partial_volume_files, 'inlist')
qc = qc_tissue_segmentation(name='qc', qc_dir=wf.qc_dir)
wf.connect(fast, 'partial_volume_map', qc, 'in_file')
# sinking
wf.connect(split_probability_maps, 'out1', 'sinker', 'fast_csf')
wf.connect(split_probability_maps, 'out2', 'sinker', 'fast_gm')
wf.connect(split_probability_maps, 'out3', 'sinker', 'fast_wm')
# output
wf.get_node('outputspec').inputs.probmap_csf = priorprob_csf
wf.get_node('outputspec').inputs.probmap_gm = priorprob_gm
wf.get_node('outputspec').inputs.probmap_wm = priorprob_wm
wf.connect(fast, 'mixeltype', 'outputspec', 'mixeltype')
wf.connect(fast, 'partial_volume_map', 'outputspec', 'partial_volume_map')
wf.connect(split_probability_maps, 'out1', 'outputspec', 'probmap_csf')
wf.connect(split_probability_maps, 'out2', 'outputspec', 'probmap_gm')
wf.connect(split_probability_maps, 'out3', 'outputspec', 'probmap_wm')
wf.connect(split_partial_volume_files, 'out1', 'outputspec', 'parvol_csf')
wf.connect(split_partial_volume_files, 'out2', 'outputspec', 'parvol_gm')
wf.connect(split_partial_volume_files, 'out3', 'outputspec', 'parvol_wm')
[docs]def pydeface_wrapper(infile, force):
"""
Workaround to store the defaced image in the correct place with useful name
"""
from pydeface import utils
outfile = os.path.join(os.getcwd(), 'defaced_' + os.path.basename(infile))
warped_mask_img, warped_mask, template_reg, template_reg_mat = utils.deface_image(infile=infile,
outfile=outfile,
force=force)
return outfile, warped_mask
'''
Pipline for defacing anatomical images
in_file : String
Path to anat image
outfile : String
Path to anat image
If not specified, the output will be stored in the folder of the input.
deface_mask : String
Path to defacing mask
'''
[docs]@AnatPipeline(inputspec_fields=['in_file'],
outputspec_fields=['out_file', 'deface_mask'])
def defacing(wf, **kwargs):
"""
Pipeline for defacing anatomical images.
Also creates quality check images
Inputs
in_file(str): Path to anat image.
Outputs
out_file(nii.gz) : Defaced anatomical image
deface_mask(str): Path to the defacing mask.
Sinker
- Defaced anatomical image
"""
deface_node = Node(Function(input_names=['infile', 'force'],
output_names=['warped_mask_img', 'warped_mask'],
function=pydeface_wrapper,
imports=[
'import sys', 'import shutil', 'from nipype.interfaces import fsl',
'from pydeface.utils import initial_checks, output_checks, generate_tmpfiles',
'from pydeface.utils import cleanup_files, get_outfile_type, generate_tmpfiles',
'from nibabel import load, Nifti1Image', 'import os']
),
name='deface_node')
# If set to True, the previous defaced img will be overwritten
deface_node.inputs.force = True
wf.connect('inputspec', 'in_file', deface_node, 'infile')
# Quality Check
defacing_qc = qc_segmentation(name='defacing_qc', qc_dir=wf.qc_dir)
wf.connect('inputspec', 'in_file', defacing_qc, 'background')
wf.connect(deface_node, 'warped_mask', defacing_qc, 'overlay')
# Sink defaced image
wf.connect('deface_node', 'warped_mask_img', 'sinker', 'defaced')
wf.connect('deface_node', 'warped_mask_img', 'outputspec', 'out_file')
wf.connect('deface_node', 'warped_mask', 'outputspec', 'deface_mask')