"""KSpace IO API."""
__all__ = ["read_rawdata"]
import math
import time
import numpy as np
import torch
from . import gehc as _gehc
from . import mrd as _mrd
# from . import siemens as _siemens
[docs]def read_rawdata(filepath, acqheader=None, device="cpu", verbose=0):
"""
Read kspace data from file.
Currently, handles data written in ISMRMD format [1] (vendor agnostic)
and GEHC proprietary raw data (requires access to a private repository).
Parameters
----------
filepath : str
Path to kspace file. Supports wildcard (e.g., ``/path-to-data-folder/*.h5``).
acqheader : Header, optional
Acquisition header loaded from trajectory.
If not provided, assume Cartesian acquisition and infer from data.
The default is ``None``.
device : str, optional
Computational device for internal attributes. The default is ``cpu``.
verbose : int, optional
Verbosity level ``(0=Silent, 1=Less, 2=More)``. The default is ``0``.
Returns
-------
data : torch.tensor
Complex k-space data.
head : Header
Metadata for image reconstruction.
Example
-------
>>> import deepmr
Get the filename for an example ``.mrd`` file.
>>> filepath = deepmr.testdata("mrd")
Load the file contents.
>>> data, head = deepmr.io.read_rawdata(filepath)
The result is a data/header pair. ``Data`` contains k-space data.
Here, it represents a 2D spiral acquisition with 1 slice, 36 coils, 32 arms and 1284 samples per arm:
>>> data.shape
torch.Size([1, 36, 1, 32, 1284])
``Head`` contains the acquisition information. We can inspect the k-space trajectory and dcf size,
the expected image shape and resolution:
>>> head.traj.shape
torch.Size([1, 32, 1284, 2])
>>> head.dcf.shape
torch.Size([1, 32, 1284])
>>> head.shape
tensor([ 1, 192, 192])
>>> head.ref_dicom.SliceThickness
'5.0'
>>> head.ref_dicom.PixelSpacing
[1.56, 1.56]
``Head`` also contains contrast information (for forward simulation and parameter inference):
>>> head.FA
10.0
>>> head.TE
0.86
>>> head.TR
4.96
Notes
-----
The returned ``data`` tensor contains raw k-space data. Dimensions are defined as following:
* **2Dcart:** ``(nslices, ncoils, ncontrasts, ny, nx)``.
* **2Dnoncart:** ``(nslices, ncoils, ncontrasts, nviews, nsamples)``.
* **3Dcart:** ``(nx, ncoils, ncontrasts, nz, ny)``.
* **3Dnoncart:** ``(ncoils, ncontrasts, nviews, nsamples)``.
When possible, data are already pre-processed:
* For Cartesian data (2D and 3D) readout oversampling is removed if the number of samples along readout is larger than the number of rows in the image space (shape[-1]).
* For Non-Cartesian (2D and 3D), fov is centered according to trajectory and isocenter info from the header.
* For separable acquisitions (3D stack-of-Non-Cartesians and 3D Cartesians), k-space is decoupled via FFT (along slice and readout axes, respectively).
The returned ``head`` (:func:`deepmr.Header`) is a structure with the following fields:
* shape (torch.Tensor):
This is the expected image size of shape ``(nz, ny, nx)``.
* resolution (torch.Tensor):
This is the expected image resolution in mm of shape ``(dz, dy, dx)``.
* t (torch.Tensor):
This is the readout sampling time ``(0, t_read)`` in ``ms``.
with shape ``(nsamples,)``.
* traj (torch.Tensor):
This is the k-space trajectory normalized as ``(-0.5, 0.5)``
with shape ``(ncontrasts, nviews, nsamples, ndims)``.
* dcf (torch.Tensor):
This is the k-space sampling density compensation factor
with shape ``(ncontrasts, nviews, nsamples)``.
* FA (torch.Tensor, float):
This is either the acquisition flip angle in degrees or the list
of flip angles of shape ``(ncontrasts,)`` for each image in the series.
* TR (torch.Tensor, float):
This is either the repetition time in ms or the list
of repetition times of shape ``(ncontrasts,)`` for each image in the series.
* TE (torch.Tensor, float):
This is either the echo time in ms or the list
of echo times of shape ``(ncontrasts,)`` for each image in the series.
* TI (torch.Tensor, float):
This is either the inversion time in ms or the list
of inversion times of shape ``(ncontrasts,)`` for each image in the series.
* user (dict):
User parameters. Some examples are:
* ordering (torch.Tensor):
Indices for reordering (acquisition to reconstruction)
of acquired k-space data, shaped ``(3, nslices * ncontrasts * nview)``, whose rows are
``contrast_index``, ``slice_index`` and ``view_index``, respectively.
* mode (str):
Acquisition mode (``2Dcart``, ``3Dcart``, ``2Dnoncart``, ``3Dnoncart``).
* separable (bool):
Whether the acquisition can be decoupled by fft along ``slice``/ ``readout`` directions
(3D stack-of-noncartesian / 3D cartesian, respectively) or not (3D noncartesian and 2D acquisitions).
* slice_profile (torch.Tensor):
Flip angle scaling along slice profile of shape ``(nlocs,)``.
* basis (torch.Tensor):
Low rank subspace basis for subspace reconstruction of shape ``(ncoeff, ncontrasts)``.
* affine (np.ndarray):
Affine matrix describing image spacing, orientation and origin of shape ``(4, 4)``.
* ref_dicom (pydicom.Dataset):
Template dicom for image export.
* flip (list):
List of spatial axis to be flipped after image reconstruction.
The default is an empty list (no flipping).
* transpose (list):
Permutation of image dimensions after reconstruction, depending on acquisition mode:
* **2Dcart:** reconstructed image has ``(nslices, ncontrasts, ny, nx) -> transpose = [1, 0, 2, 3]``
* **2Dnoncart:** reconstructed image has ``(nslices, ncontrasts, ny, nx) -> transpose = [1, 0, 2, 3]``
* **3Dcart:** reconstructed image has ``(ncontrasts, nz, ny, nx) -> transpose = [0, 1, 2, 3]``
* **3Dnoncart:** reconstructed image has ``(nx, ncontrasts, nz, ny) -> transpose = [1, 2, 3, 0]``
The default is an empty list (no transposition).
References
----------
[1]: Inati, S.J., Naegele, J.D., Zwart, N.R., Roopchansingh, V., Lizak, M.J., Hansen, D.C., Liu, C.-Y., Atkinson, D.,
Kellman, P., Kozerke, S., Xue, H., Campbell-Washburn, A.E., Sørensen, T.S. and Hansen, M.S. (2017),
ISMRM Raw data format: A proposed standard for MRI raw datasets. Magn. Reson. Med., 77: 411-421.
https://doi.org/10.1002/mrm.26089
"""
tstart = time.time()
if verbose >= 1:
print(f"Reading raw k-space from file {filepath}...", end="\t")
done = False
# convert header to numpy
if acqheader is not None:
acqheader.numpy()
# mrd
if verbose == 2:
t0 = time.time()
try:
data, head = _mrd.read_mrd_rawdata(filepath)
done = True
except Exception as e:
msg0 = _get_error(e)
# gehc
if not (done):
try:
data, head = _gehc.read_gehc_rawdata(filepath, acqheader)
done = True
except Exception as e:
msg1 = _get_error(e)
# siemens
# if not(done):
# try:
# head = _siemens.read_siemens_rawdata(filepath, acqheader)
# done = True
# except Exception:
# pass
# check if we loaded data
if not (done):
raise RuntimeError(
f"File (={filepath}) not recognized! Errors:\nMRD: {msg0}\nGEHC: {msg1}"
)
if verbose == 2:
t1 = time.time()
print(f"done! Elapsed time: {round(t1-t0, 2)} s")
# transpose
data = data.transpose(2, 0, 1, 3, 4) # (slice, coil, contrast, view, sample)
# select actual readout
if verbose == 2:
nsamples = data.shape[-1]
print("Selecting actual readout samples...", end="\t")
t0 = time.time()
data = _select_readout(data, head)
if verbose == 2:
t1 = time.time()
print(
f"done! Selected {data.shape[-1]} out of {nsamples} samples. Elapsed time: {round(t1-t0, 2)} s"
)
# center fov
if verbose == 2:
if head.traj is not None:
t0 = time.time()
ndim = head.traj.shape[-1]
shift = head._shift[:ndim]
if ndim == 2:
print(f"Shifting FoV by (dx={shift[0]}, dy={shift[1]}) mm", end="\t")
if ndim == 3:
print(
f"Shifting FoV by (dx={shift[0]}, dy={shift[1]}, dz={shift[2]}) mm",
end="\t",
)
data = _fov_centering(data, head)
if verbose == 2:
if head.traj is not None:
t1 = time.time()
print(f"done! Elapsed time: {round(t1-t0, 2)} s")
# remove oversampling for Cartesian
if "mode" in head.user:
if head.user["mode"][2:] == "cart":
if verbose == 2:
t0 = time.time()
ns1 = data.shape[0]
ns2 = head.shape[0]
print(
f"Removing oversampling along readout ({round(ns1/ns2, 2)})...",
end="\t",
)
data, head = _remove_oversampling(data, head)
if verbose == 2:
t1 = time.time()
print(f"done! Elapsed time: {round(t1-t0, 2)} s")
# transpose readout in slice direction for 3D Cartesian
if "mode" in head.user:
if head.user["mode"] == "3Dcart":
data = data.transpose(
-1, 1, 2, 0, 3
) # (z, ch, e, y, x) -> (x, ch, e, z, y)
# decouple separable acquisition
if "separable" in head.user and head.user["separable"]:
if verbose == 2:
t0 = time.time()
print("Separable 3D acquisition, performing FFT along slice...", end="\t")
data = _fft(data, 0)
if verbose == 2:
t1 = time.time()
print(f"done! Elapsed time: {round(t1-t0, 4)} s")
# set-up transposition
if "mode" in head.user:
if head.user["mode"] == "2Dcart":
head.transpose = [1, 0, 2, 3]
if verbose == 2:
print("Acquisition mode: 2D Cartesian")
print(
f"K-space shape: (nslices={data.shape[0]}, nchannels={data.shape[1]}, ncontrasts={data.shape[2]}, ny={data.shape[3]}, nx={data.shape[4]})"
)
print(
f"Expected image shape: (nslices={data.shape[0]}, nchannels={data.shape[1]}, ncontrasts={data.shape[2]}, ny={head.shape[1]}, nx={head.shape[2]})"
)
elif head.user["mode"] == "2Dnoncart":
head.transpose = [1, 0, 2, 3]
if verbose == 2:
print("Acquisition mode: 2D Non-Cartesian")
print(
f"K-space shape: (nslices={data.shape[0]}, nchannels={data.shape[1]}, ncontrasts={data.shape[2]}, nviews={data.shape[3]}, nsamples={data.shape[4]})"
)
print(
f"Expected image shape: (nslices={data.shape[0]}, nchannels={data.shape[1]}, ncontrasts={data.shape[2]}, ny={head.shape[1]}, nx={head.shape[2]})"
)
elif head.user["mode"] == "3Dnoncart":
data = data[0]
head.transpose = [1, 0, 2, 3]
if verbose == 2:
print("Acquisition mode: 3D Non-Cartesian")
print(
f"K-space shape: (nchannels={data.shape[0]}, ncontrasts={data.shape[1]}, nviews={data.shape[2]}, nsamples={data.shape[3]})"
)
print(
f"Expected image shape: (nchannels={data.shape[0]}, ncontrasts={data.shape[1]}, nz={head.shape[0]}, ny={head.shape[1]}, nx={head.shape[2]})"
)
elif head.user["mode"] == "3Dcart":
head.transpose = [1, 2, 3, 0]
if verbose == 2:
print("Acquisition mode: 3D Cartesian")
print(
f"K-space shape: (nx={data.shape[0]}, nchannels={data.shape[1]}, ncontrasts={data.shape[2]}, nz={data.shape[3]}, ny={data.shape[4]})"
)
print(
f"Expected image shape: (nx={head.shape[2]}, nchannels={data.shape[1]}, ncontrasts={data.shape[2]}, nz={head.shape[0]}, ny={head.shape[1]})"
)
# remove unused trajectory for cartesian
if head.user["mode"][2:] == "cart":
head.traj = None
head.dcf = None
# clean header
head.user.pop("mode", None)
head.user.pop("separable", None)
# final report
if verbose == 2:
print(f"Readout time: {round(float(head.t[-1]), 2)} ms")
if head.traj is not None:
print(f"Trajectory range: ({head.traj.min()},{head.traj.max()})")
print(
f"Trajectory shape: (ncontrasts={head.traj.shape[0]}, nviews={head.traj.shape[1]}, nsamples={head.traj.shape[2]}, ndim={head.traj.shape[-1]})"
)
if head.dcf is not None:
if len(head.dcf.shape) == 1:
print(f"DCF shape: (nsamples={head.dcf.shape[-1]})")
else:
print(
f"DCF shape: (ncontrasts={head.dcf.shape[0]}, nviews={head.dcf.shape[1]}, nsamples={head.dcf.shape[2]})"
)
if head.FA is not None:
if len(np.unique(head.FA)) > 1:
if verbose == 2:
print(f"Flip Angle train length: {len(head.FA)}")
else:
FA = float(np.unique(head.FA)[0])
head.FA = FA
if verbose == 2:
print(f"Constant FA: {round(abs(FA), 2)} deg")
if head.TR is not None:
if len(np.unique(head.TR)) > 1:
if verbose == 2:
print(f"TR train length: {len(head.TR)}")
else:
TR = float(np.unique(head.TR)[0])
head.TR = TR
if verbose == 2:
print(f"Constant TR: {round(TR, 2)} ms")
if head.TE is not None:
if len(np.unique(head.TE)) > 1:
if verbose == 2:
print(f"Echo train length: {len(head.TE)}")
else:
TE = float(np.unique(head.TE)[0])
head.TE = TE
if verbose == 2:
print(f"Constant TE: {round(TE, 2)} ms")
if head.TI is not None and np.allclose(head.TI, 0.0) is False:
if len(np.unique(head.TI)) > 1:
if verbose == 2:
print(f"Inversion train length: {len(head.TI)}")
else:
TI = float(np.unique(head.TI)[0])
head.TI = TI
if verbose == 2:
print(f"Constant TI: {round(TI, 2)} ms")
# cast
data = torch.as_tensor(
np.ascontiguousarray(data), dtype=torch.complex64, device=device
)
head.torch(device)
tend = time.time()
if verbose == 1:
print(f"done! Elapsed time: {round(tend-tstart, 2)} s")
elif verbose == 2:
print(f"Total elapsed time: {round(tend-tstart, 2)} s")
return data, head
# %% sub routines
def _get_error(ex):
trace = []
tb = ex.__traceback__
while tb is not None:
trace.append(
{
"filename": tb.tb_frame.f_code.co_filename,
"name": tb.tb_frame.f_code.co_name,
"lineno": tb.tb_lineno,
}
)
tb = tb.tb_next
return str({"type": type(ex).__name__, "message": str(ex), "trace": trace})
def _select_readout(data, head):
if head._adc is not None:
if head._adc[-1] == data.shape[-1]:
data = data[..., head._adc[0] :]
else:
data = data[..., head._adc[0] : head._adc[1] + 1]
return data
def _fov_centering(data, head):
if head.traj is not None and np.allclose(head._shift, 0.0) is False:
# ndimensions
ndim = head.traj.shape[-1]
# shift (mm)
dr = np.asarray(head._shift)[:ndim]
# convert in units of voxels
dr = dr / head.resolution[::-1][:ndim] / head.shape[::-1][:ndim]
# apply
data *= np.exp(1j * 2 * math.pi * (head.traj * dr).sum(axis=-1))
return data
def _remove_oversampling(data, head):
if data.shape[-1] != head.shape[-1]: # oversampled
center = int(data.shape[-1] // 2)
hwidth = int(head.shape[-1] // 2)
data = _fft(data, -1)
data = data[..., center - hwidth : center + hwidth]
data = _fft(data, -1)
head.t = np.linspace(0, head.t[-1], data.shape[-1])
return data, head
def _fft(data, axis):
tmp = torch.as_tensor(data)
tmp = torch.fft.fftshift(
torch.fft.fft(torch.fft.fftshift(tmp, dim=axis), dim=axis), dim=axis
)
return tmp.numpy()