Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
mri-data-analysis/mriDataAnalysis.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
executable file
289 lines (240 sloc)
12.4 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Some generalizable scripts that enable MR data analysis | |
# genR: linear exp fit to data at TE | |
# MODELS: different models to fit MR data w/ noise | |
# -- parabula: to include quadratic exp decay | |
# -- parabula2d: same for 2D data | |
# -- noisyQuadDecaySe/Ge: simple (false) noise floor implementation | |
# -- biasedMagnitude: implementation of rician noise floor. | |
# -- biasedQuadExp: helper for above function. | |
# -- ricianFit3/2D: rician fit to 3/2D data. | |
# -- getImgs, getData: load Nifti images / their data into pandas dataframe (ordered according to TE) | |
__all__ = ['genR', 'parabula', 'parabula2d', 'noisyQuadDecaySe', 'noisyQuadDecayGe', 'biasedMagnitude', 'biasedQuadExp', 'ricianFit3D', 'getImgs', 'loadSequenceData', 'loadSequenceImgs','getData','saveRmap','saveNifti','maskFromData','maskFromNoise'] | |
import numpy as np | |
from scipy.optimize import minimize | |
import pandas as pd | |
import os | |
import nibabel as nib | |
from scipy.special import iv | |
import json | |
import scipy.ndimage as ndimage | |
def genR(TE, data): | |
""" CAUTION: To avoid log NaN's, the data is clipped at 1e-25. Might predictions of very steep decays. | |
This fit does exactly behave like a linear least square fit with curve_fit (copied from spm). | |
data slice in the shape [time asteps, xdim, ydim] """ | |
reg = np.array([np.ones(len(TE)), TE]).transpose() | |
W = np.linalg.solve(np.dot(reg.transpose(),reg),reg.transpose()) | |
W = - W[1,:].transpose() | |
R = np.zeros(data.shape[1:]) | |
for i in range(W.shape[0]): | |
R += W[i] * np.log(np.clip(data[i], 1e-25, data.max()+1e-25)) | |
return np.clip(R,0,np.amax(R)) | |
# MODELS | |
def parabula(t, rho, R, A): | |
return -t**2*rho**2/2 - np.absolute(R)*t + A | |
def parabula2d(t, rho, R, A): | |
return -t[:,None,None]**2*rho[None,:,:]**2/2 - np.absolute(R[None,:,:])*t[:,None,None] + A[None,:,:] | |
def noisyQuadDecaySe(t,rho,R,A,B=20,TwoD = False): | |
if TwoD: | |
return np.sqrt(np.exp(parabula2d(t,rho,R,A))**2 + B**2) | |
return np.sqrt(np.exp(parabula(t,rho,R,A))**2 + B**2) | |
def noisyQuadDecayGe(t,rho,R,A,B=13,TwoD = False): | |
if TwoD: | |
return np.sqrt(np.exp(parabula2d(t,rho,R,A))**2 + B**2) | |
return np.sqrt(np.exp(parabula(t,rho,R,A))**2 + B**2) | |
def biasedMagnitude(trueMagnitude, sig): | |
A=trueMagnitude**2/(4*sig**2) | |
biasedMagn = np.zeros(trueMagnitude.shape) | |
for i in range(trueMagnitude.shape[0]): | |
if trueMagnitude[i]/sig < 10: | |
biasedMagn[i] = (sig*np.sqrt(np.pi/2)*np.exp(-A[i])*((1+2*A[i])*iv(0,A[i])+2*A[i]*iv(1,A[i]))) | |
elif trueMagnitude[i]/sig > 10: | |
biasedMagn[i] = np.sqrt(trueMagnitude[i]**2 + sig**2) | |
return biasedMagn | |
def biasedQuadExp(T,rho,R,A,sig): | |
return biasedMagnitude(np.exp(parabula(T,rho,R,A)), sig) | |
def ricianFit3D(data,TE,sig0,regweight1,regweight2=1,rho0=50,A0=.8,R2s=0,R2sGiven=False,limitRandSig=True): | |
""" data in format [time, x,y,z] | |
sig is parameter of Rician noise floor | |
regweights 1 for regweights of quadratic contribution | |
regweights 2 for regularization weight on difference from noise floor estimation. | |
chi2 not yet implemented!!!!! | |
limitR limits linear fit parameter to maximum inclination between first and second echo """ | |
if R2sGiven==False: | |
R2s=genR(TE,data) | |
if limitRandSig: | |
Rmax = 2*np.nan_to_num((data[0]-data[1])/(data[0])).max()/(TE[1]-TE[0]) | |
print("WARNING: R is bounded from above by", Rmax) | |
sigMin = 1e-6 | |
rho = np.zeros(data[0].shape) | |
R = np.zeros(data[0].shape) | |
A = np.zeros(data[0].shape) | |
sig = np.zeros(data[0].shape) | |
chi2 = np.zeros([data[0].shape[0],data[0].shape[1],data[0].shape[2],3,3]) | |
for i in range(data[0].shape[0]): | |
print(np.around(i/data[0].shape[0]*1e2,1), '% done.') # progress number | |
for j in range(data[0].shape[1]): | |
for k in range(data[0].shape[2]): | |
if (np.mean(data[:,i,j,k] == np.zeros(TE.shape))==True): # masked voxels set to zero | |
rho[i,j,k] = 0 | |
R[i,j,k] = 0 | |
A[i,j,k] = 0 | |
sig[i,j,k] = 0 | |
else: | |
def cost(params): # simply use globally defined data | |
rho, R, A, sig = params | |
reg1 = rho**2 | |
reg2 = (sig-sig0)**2 # deviation from empiric noise | |
return (((biasedMagnitude(np.exp(parabula(TE,rho, R, A)),sig) - data[:,i,j,k])**2).mean() # standard cost function | |
+reg1*regweight1 # regularization of quadratic term | |
+reg2*regweight2) # regularization of Noise floor | |
try: | |
if limitRandSig: | |
res = minimize(cost, [rho0, R2s[i,j,k],A0,sig0], bounds=[(0,None),(0,Rmax),(None,None),(sigMin,None)]) | |
else: | |
res = minimize(cost, [rho0, R2s[i,j,k],A0,sig0], bounds=[(0,None),(0,None),(None,None),(0,None)]) | |
par = res.x | |
except RuntimeError: | |
print('Warning: Runtime ERROR\nResult set to zero.') | |
par = [0,0,0,0] | |
rho[i,j,k] = np.absolute(par[0]) | |
R[i,j,k] = np.absolute(par[1]) | |
A[i,j,k] = par[2] | |
sig[i,j,k] = par[3] | |
return [rho,R,A,sig] | |
def loadSequenceData(datadir,sequencenumber): | |
"""Loads nifti data from sequence sequencenumber in directory datadir. | |
Returns [TE,data]""" | |
meta = [] | |
data = [] | |
print("Collected files:") | |
for file in os.listdir(datadir): | |
if file[-3:]=='nii': # just look at niftis | |
if file[0:len(sequencenumber)]==sequencenumber and file[len(sequencenumber)]=='_': # correct sequence | |
print(file) | |
meta.append(json.load(open(datadir+file[:-4]+'.json'))) | |
data.append(nib.load(datadir+file).get_data()) | |
TE = [] | |
for m in meta: | |
TE.append(m['EchoTime']) | |
frame = pd.DataFrame(data={'TE': TE, 'Imtx': data}).sort_values(by=['TE']) | |
TE = np.array(frame['TE']) | |
data = np.array([frame['Imtx'].iloc[i] for i in range(len(frame['Imtx']))]) | |
return [TE,data] | |
def loadSequenceImgs(datadir,sequencenumber): | |
"""Loads nifti images from sequence sequencenumber in directory datadir. | |
Returns [TE,imgs]""" | |
meta = [] | |
data = [] | |
print("Collected files:") | |
for file in os.listdir(datadir): | |
if file[-3:]=='nii': # just look at niftis | |
if file[0:len(sequencenumber)]==sequencenumber and file[len(sequencenumber)]=='_': # correct sequence | |
print(file) | |
meta.append(json.load(open(datadir+file[:-4]+'.json'))) | |
data.append(nib.load(datadir+file)) | |
TE = [] | |
for m in meta: | |
TE.append(m['EchoTime']) | |
frame = pd.DataFrame(data={'TE': TE, 'Imtx': data}).sort_values(by=['TE']) | |
TE = np.array(frame['TE']) | |
data = np.array([frame['Imtx'].iloc[i] for i in range(len(frame['Imtx']))]) | |
return [TE,data] | |
# OLD, VERY SPECIFIC -> see loadSequence for newer version | |
def getImgs(datadir,sequencenumber, ordered=True, timeInit=-11, timeEnd=-6, timeReadError=False): | |
TE =[] | |
imgs=[] | |
for file in os.listdir(datadir): | |
if file[1:3]==sequencenumber and file[3]=='_': | |
if timeReadError: | |
print('Complete file name:',file) | |
print('Extracted sequence:',file[timeInit:timeEnd].replace('E', '0').replace('T','0').replace('_','0').replace('R', '0').replace('e','0')) | |
else: | |
print('Complete file name:',file) | |
print('Extracted sequence:',file[timeInit:timeEnd].replace('E', '0').replace('T','0').replace('_','0').replace('R', '0').replace('e','0')) | |
TE.append(float(file[timeInit:timeEnd].replace('E', '0').replace('T','0').replace('_','0').replace('R', '0').replace('e','0'))) | |
imgs.append(nib.load(datadir+file)) | |
if timeReadError: | |
return 'Good luck spotting the error : )' | |
frame = pd.DataFrame(data={'TE': TE, 'Imtx': imgs}) | |
if ordered: # Sort according to echo times | |
frame = frame.sort_values(by=['TE']) | |
return frame | |
def getData(datadir,sequencenumber, ordered=True, timeInit=-11, timeEnd=-6, timeReadError=False): | |
TE =[] | |
imgs=[] | |
for file in os.listdir(datadir): | |
if file[1:3]==sequencenumber and file[3]=='_': | |
if timeReadError: | |
print('Complete file name:',file) | |
print('Extracted sequence:',file[timeInit:timeEnd].replace('E', '0').replace('T','0').replace('_','0').replace('R', '0').replace('e','0')) | |
else: | |
print('Complete file name:',file) | |
print('Extracted sequence:',file[timeInit:timeEnd].replace('E', '0').replace('T','0').replace('_','0').replace('R', '0').replace('e','0')) | |
TE.append(float(file[timeInit:timeEnd].replace('E', '0').replace('T','0').replace('_','0').replace('R', '0').replace('e','0'))) | |
imgs.append(nib.load(datadir+file).get_data()) | |
if timeReadError: | |
return 'Good luck spotting the error : )' | |
frame = pd.DataFrame(data={'TE': TE, 'Imtx': imgs}) | |
if ordered: # Sort according to echo times | |
frame_sorted = frame.sort_values(by=['TE']) | |
return frame_sorted | |
return frame | |
def saveRmap(data,originalImage,filename): | |
""" stores data in coordinate form from originalImage """ | |
RImage = nib.Nifti1Image(data, originalImage.affine,originalImage.header) | |
RImage.header['descrip'] = 'Reconstructed with MRI data analysis by MDB. Header copied from original image data, so maybe wrong bits.' | |
nib.save(RImage, filename) | |
return 'Result saved as '+filename+'.' | |
def saveNifti(data, filename): | |
""" stores data in unspecified coordinates at filename """ | |
img = nib.Nifti1Image(data, np.eye(4)) | |
nib.save(img, filename) | |
return 'Data saved as '+filename+'.' | |
def maskFromData(data,noiseaxis,noiserows,threshold=5,kernelsize=3): | |
""" generates a binary mask to get nicer R2x maps witout noise around them.""" | |
if noiseaxis==0: | |
mu = data[:noiserows,:,:].mean() | |
sig = data[:noiserows,:,:].std() | |
elif noiseaxis==1: | |
mu = data[:,:noiserows,:].mean() | |
sig= data[:,:noiserows,:].std() | |
elif noiseaxis==2: | |
mu = data[:,:,:noiserows].mean() | |
sig = data[:,:,:noiserows].std() | |
mask = data>mu+threshold*sig | |
if min(data.shape)>1: | |
mask = ndimage.binary_closing( ndimage.binary_opening(mask,structure=np.ones((kernelsize,kernelsize,kernelsize)).astype(mask.dtype)) | |
,structure=np.ones((kernelsize,kernelsize,kernelsize)).astype(mask.dtype)) | |
else: | |
mask = ndimage.binary_closing( ndimage.binary_opening(mask[:,:,0],structure=np.ones((kernelsize,kernelsize)).astype(mask.dtype)) | |
,structure=np.ones((kernelsize,kernelsize)).astype(mask.dtype))[:,:,None] | |
return mask | |
def maskFromNoise(data,noise,threshold=5,kernelsize=3): | |
""" generates a binary mask to get nicer R2x maps witout noise around them.""" | |
if isinstance(noise, np.ndarray): | |
mu = noise.mean() | |
else: | |
mu = noise | |
sig = noise.std() | |
mask = data>mu+threshold*sig | |
if len(data.shape)==3: | |
mask = ndimage.binary_closing( ndimage.binary_opening(mask,structure=np.ones((kernelsize,kernelsize,kernelsize)).astype(mask.dtype)) | |
,structure=np.ones((kernelsize,kernelsize,kernelsize)).astype(mask.dtype)) | |
elif (len(data.shape)==2): | |
mask = ndimage.binary_closing( ndimage.binary_opening(mask,structure=np.ones((kernelsize,kernelsize)).astype(mask.dtype)) | |
,structure=np.ones((kernelsize,kernelsize)).astype(mask.dtype)) | |
return mask | |
def nii2folders(sortdir): | |
"""Generates subfolders for nifti files that start with SEQUENCENUMBER_SEQENCENAME..._ECHONUMBER.nii/json. Be careful with phase data with ECHONUMBER_ph.nii etc.""" | |
for file in os.listdir(sortdir): | |
folderparts = file.split('_')[:-1] | |
foldername = '' | |
for folderpart in folderparts: | |
foldername += folderpart + '_' | |
foldername = foldername[:-1] | |
folderpath = sortdir+foldername | |
filepath = sortdir+file | |
if foldername not in os.listdir(sortdir): | |
!mkdir $folderpath | |
!cp $filepath $folderpath | |
return 0 |