#!python
"""This module provides functions to handle Bruker data.
It primarily implements the TimsTOF class, that acts as an in-memory container
for Bruker data accession and storage.
"""
# builtin
import os
import sys
import contextlib
import logging
# external
import numpy as np
import pandas as pd
import h5py
# local
import alphatims
import alphatims.utils
import alphatims.tempmmap as tm
if sys.platform[:5] == "win32":
BRUKER_DLL_FILE_NAME = os.path.join(
alphatims.utils.EXT_PATH,
"timsdata.dll"
)
elif sys.platform[:5] == "linux":
BRUKER_DLL_FILE_NAME = os.path.join(
alphatims.utils.EXT_PATH,
"timsdata.so"
)
else:
logging.warning(
"WARNING: "
"No Bruker libraries are available for this operating system. "
"Mobility and m/z values need to be estimated. "
"While this estimation often returns acceptable results with errors "
"< 0.02 Th, huge errors (e.g. offsets of 6 Th) have already been "
"observed for some samples!"
)
logging.info("")
BRUKER_DLL_FILE_NAME = ""
[docs]def init_bruker_dll(bruker_dll_file_name: str = BRUKER_DLL_FILE_NAME):
"""Open a bruker.dll in Python.
Five functions are defined for this dll:
- tims_open: [c_char_p, c_uint32] -> c_uint64
- tims_close: [c_char_p, c_uint32] -> c_uint64
- tims_read_scans_v2: [c_uint64, c_int64, c_uint32, c_uint32, c_void_p, c_uint32] -> c_uint32
- tims_index_to_mz: [c_uint64, c_int64, POINTER(c_double), POINTER(c_double), c_uint32] -> None
- tims_scannum_to_oneoverk0: Same as "tims_index_to_mz"
Parameters
----------
bruker_dll_file_name : str
The absolute path to the timsdata.dll.
Default is alphatims.utils.BRUKER_DLL_FILE_NAME.
Returns
-------
: ctypes.cdll
The Bruker dll library.
"""
import ctypes
bruker_dll = ctypes.cdll.LoadLibrary(
os.path.realpath(bruker_dll_file_name)
)
bruker_dll.tims_open.argtypes = [ctypes.c_char_p, ctypes.c_uint32]
bruker_dll.tims_open.restype = ctypes.c_uint64
bruker_dll.tims_close.argtypes = [ctypes.c_uint64]
bruker_dll.tims_close.restype = None
bruker_dll.tims_read_scans_v2.argtypes = [
ctypes.c_uint64,
ctypes.c_int64,
ctypes.c_uint32,
ctypes.c_uint32,
ctypes.c_void_p,
ctypes.c_uint32
]
bruker_dll.tims_read_scans_v2.restype = ctypes.c_uint32
bruker_dll.tims_index_to_mz.argtypes = [
ctypes.c_uint64,
ctypes.c_int64,
ctypes.POINTER(ctypes.c_double),
ctypes.POINTER(ctypes.c_double),
ctypes.c_uint32
]
bruker_dll.tims_index_to_mz.restype = ctypes.c_uint32
bruker_dll.tims_scannum_to_oneoverk0.argtypes = [
ctypes.c_uint64,
ctypes.c_int64,
ctypes.POINTER(ctypes.c_double),
ctypes.POINTER(ctypes.c_double),
ctypes.c_uint32
]
bruker_dll.tims_scannum_to_oneoverk0.restype = ctypes.c_uint32
bruker_dll.tims_set_num_threads.argtypes = [ctypes.c_uint64]
bruker_dll.tims_set_num_threads.restype = None
bruker_dll.tims_set_num_threads(alphatims.utils.MAX_THREADS)
# multiple threads is equally fast as just 1 for io?
# bruker_dll.tims_set_num_threads(1)
return bruker_dll
[docs]@contextlib.contextmanager
def open_bruker_d_folder(
bruker_d_folder_name: str,
bruker_dll_file_name=BRUKER_DLL_FILE_NAME,
) -> tuple:
"""A context manager for a bruker dll connection to a .d folder.
Parameters
----------
bruker_d_folder_name : str
The name of a Bruker .d folder.
bruker_dll_file_name : str, ctypes.cdll
The path to Bruker' timsdata.dll library.
Alternatively, the library itself can be passed as argument.
Default is alphatims.utils.BRUKER_DLL_FILE_NAME,
which in itself is dependent on the OS.
Returns
-------
: tuple (ctypes.cdll, int).
The opened bruker dll and identifier of the .d folder.
"""
try:
if isinstance(bruker_dll_file_name, str):
bruker_dll = init_bruker_dll(bruker_dll_file_name)
logging.info(f"Opening handle for {bruker_d_folder_name}")
bruker_d_folder_handle = bruker_dll.tims_open(
bruker_d_folder_name.encode('utf-8'),
0
)
yield bruker_dll, bruker_d_folder_handle
finally:
logging.info(f"Closing handle for {bruker_d_folder_name}")
bruker_dll.tims_close(bruker_d_folder_handle)
[docs]def read_bruker_sql(
bruker_d_folder_name: str,
add_zeroth_frame: bool = True,
drop_polarity: bool = True,
convert_polarity_to_int: bool = True,
) -> tuple:
"""Read metadata, (fragment) frames and precursors from a Bruker .d folder.
Parameters
----------
bruker_d_folder_name : str
The name of a Bruker .d folder.
add_zeroth_frame : bool
Bruker uses 1-indexing for frames.
If True, a zeroth frame is added without any TOF detections to
make Python simulate this 1-indexing.
If False, frames are 0-indexed.
Default is True.
drop_polarity : bool
The polarity column of the frames table contains "+" or "-" and
is not numerical.
If True, the polarity column is dropped from the frames table.
this ensures a fully numerical pd.DataFrame.
If False, this column is kept, resulting in a pd.DataFrame with
dtype=object.
Default is True.
convert_polarity_to_int : bool
Convert the polarity to int (-1 or +1).
This allows to keep it in numerical form.
This is ignored if the polarity is dropped.
Default is True.
Returns
-------
: tuple
(str, dict, pd.DataFrame, pd.DataFrame, pd.DataFrame).
The acquisition_mode, global_meta_data, frames, fragment_frames
and precursors.
For diaPASEF, precursors is None.
"""
import sqlite3
logging.info(f"Reading frame metadata for {bruker_d_folder_name}")
with sqlite3.connect(
os.path.join(bruker_d_folder_name, "analysis.tdf")
) as sql_database_connection:
global_meta_data = pd.read_sql_query(
"SELECT * from GlobalMetaData",
sql_database_connection
)
frames = pd.read_sql_query(
"SELECT * FROM Frames",
sql_database_connection
)
if 9 in frames.MsMsType.values:
acquisition_mode = "diaPASEF"
fragment_frames = pd.read_sql_query(
"SELECT * FROM DiaFrameMsMsInfo",
sql_database_connection
)
fragment_frame_groups = pd.read_sql_query(
"SELECT * from DiaFrameMsMsWindows",
sql_database_connection
)
fragment_frames = fragment_frames.merge(
fragment_frame_groups,
how="left"
)
fragment_frames.rename(
columns={"WindowGroup": "Precursor"},
inplace=True
)
precursors = None
elif 8 in frames.MsMsType.values:
acquisition_mode = "ddaPASEF"
fragment_frames = pd.read_sql_query(
"SELECT * from PasefFrameMsMsInfo",
sql_database_connection
)
precursors = pd.read_sql_query(
"SELECT * from Precursors",
sql_database_connection
)
else:
acquisition_mode = "noPASEF"
fragment_frames = pd.DataFrame(
{
"Frame": np.array([0]),
"ScanNumBegin": np.array([0]),
"ScanNumEnd": np.array([0]),
"IsolationWidth": np.array([0]),
"IsolationMz": np.array([0]),
"Precursor": np.array([0]),
}
)
precursors = None
# raise ValueError("Scan mode is not ddaPASEF or diaPASEF")
if add_zeroth_frame:
frames = pd.concat(
[
pd.DataFrame(frames.iloc[0]).T,
frames,
],
ignore_index=True
)
frames.Id[0] = 0
frames.Time[0] = 0
frames.MaxIntensity[0] = 0
frames.SummedIntensities[0] = 0
frames.NumPeaks[0] = 0
polarity_col = frames["Polarity"].copy()
frames = pd.DataFrame(
{
col: pd.to_numeric(
frames[col]
) for col in frames if col != "Polarity"
}
)
if not drop_polarity:
if convert_polarity_to_int:
frames['Polarity'] = polarity_col.apply(
lambda x: 1 if x == "+" else -1
).astype(np.int8)
else:
frames['Polarity'] = polarity_col
return (
acquisition_mode,
global_meta_data,
frames,
fragment_frames,
precursors
)
[docs]@alphatims.utils.njit(nogil=True)
def parse_decompressed_bruker_binary_type2(decompressed_bytes: bytes) -> tuple:
"""Parse a Bruker binary frame buffer into scans, tofs and intensities.
Parameters
----------
decompressed_bytes : bytes
A Bruker frame binary buffer that is already decompressed with pyzstd.
Returns
-------
: tuple (np.uint32[:], np.uint32[:], np.uint32[:]).
The scan_indices, tof_indices and intensities present in this binary
array
"""
temp = np.frombuffer(decompressed_bytes, dtype=np.uint8)
buffer = np.frombuffer(temp.reshape(4, -1).T.flatten(), dtype=np.uint32)
scan_count = buffer[0]
scan_indices = buffer[:scan_count].copy() // 2
scan_indices[0] = 0
tof_indices = buffer[scan_count::2].copy()
index = 0
for size in scan_indices:
current_sum = 0
for i in range(size):
current_sum += tof_indices[index]
tof_indices[index] = current_sum
index += 1
intensities = buffer[scan_count + 1::2]
last_scan = len(intensities) - np.sum(scan_indices[1:])
scan_indices[:-1] = scan_indices[1:]
scan_indices[-1] = last_scan
return scan_indices, tof_indices, intensities
[docs]@alphatims.utils.njit(nogil=True)
def parse_decompressed_bruker_binary_type1(
decompressed_bytes: bytes,
scan_indices_: np.ndarray,
tof_indices_: np.ndarray,
intensities_: np.ndarray,
scan_start: int,
scan_index: int,
) -> int:
"""Parse a Bruker binary scan buffer into tofs and intensities.
Parameters
----------
decompressed_bytes : bytes
A Bruker scan binary buffer that is already decompressed with lzf.
scan_indices_ : np.ndarray
The scan_indices_ buffer array.
tof_indices_ : np.ndarray
The tof_indices_ buffer array.
intensities_ : np.ndarray
The intensities_ buffer array.
scan_start : int
The offset where to start new tof_indices and intensity_values.
scan_index : int
The scan index.
Returns
-------
: int
The number of peaks in this scan.
"""
buffer = np.frombuffer(decompressed_bytes, dtype=np.int32)
tof_index = 0
previous_was_intensity = True
current_index = scan_start
for value in buffer:
if value >= 0:
if previous_was_intensity:
tof_index += 1
tof_indices_[current_index] = tof_index
intensities_[current_index] = value
previous_was_intensity = True
current_index += 1
else:
tof_index -= value
previous_was_intensity = False
scan_size = current_index - scan_start
scan_indices_[scan_index] = scan_size
return scan_size
[docs]def process_frame(
frame_id: int,
tdf_bin_file_name: str,
tims_offset_values: np.ndarray,
scan_indptr: np.ndarray,
intensities: np.ndarray,
tof_indices: np.ndarray,
frame_indptr: np.ndarray,
max_scan_count: int,
compression_type: int,
max_peaks_per_scan: int,
) -> None:
"""Read and parse a frame directly from a Bruker .d.analysis.tdf_bin.
Parameters
----------
frame_id : int
The frame number that should be processed.
Note that this is interpreted as 1-indixed instead of 0-indexed,
so that it is compatible with Bruker.
tdf_bin_file_name : str
The full file name of the SQL database "analysis.tdf_bin" in a Bruker
.d folder.
tims_offset_values : np.int64[:]
The offsets that indicate the starting indices of each frame in the
binary.
These are contained in the "TimsId" column of the frames table in
"analysis.tdf_bin".
scan_indptr : np.int64[:]
A buffer containing zeros that can store the cumulative number of
detections per scan.
The size should be equal to max_scan_count * len(frames) + 1.
A dummy 0-indexed frame is required to be present for len(frames).
The last + 1 allows to explicitly interpret the end of a scan as
the start of a subsequent scan.
intensities : np.uint16[:]
A buffer that can store the intensities of all detections.
It's size can be determined by summing the "NumPeaks" column from
the frames table in "analysis.tdf_bin".
tof_indices : np.uint32[:]
A buffer that can store the tof indices of all detections.
It's size can be determined by summing the "NumPeaks" column from
the frames table in "analysis.tdf_bin".
frame_indptr : np.int64[:]
The cumulative sum of the number of detections per frame.
The size should be equal to len(frames) + 1.
A dummy 0-indexed frame is required to be present for len(frames).
The last + 1 allows to explicitly interpret the end of a frame as
the start of a subsequent frame.
max_scan_count : int
The maximum number of scans a single frame can have.
compression_type : int
The compression type. This must be either 1 or 2.
Should be treieved from the global metadata.
max_peaks_per_scan : int
The maximum number of peaks per scan.
Should be retrieved from the global metadata.
"""
with open(tdf_bin_file_name, "rb") as infile:
frame_start = frame_indptr[frame_id]
frame_end = frame_indptr[frame_id + 1]
if frame_start != frame_end:
offset = tims_offset_values[frame_id]
infile.seek(offset)
bin_size = int.from_bytes(infile.read(4), "little")
scan_count = int.from_bytes(infile.read(4), "little")
max_peak_count = min(
max_peaks_per_scan,
frame_end - frame_start
)
if compression_type == 1:
import lzf
compression_offset = 8 + (scan_count + 1) * 4
scan_offsets = np.frombuffer(
infile.read((scan_count + 1) * 4),
dtype=np.int32
) - compression_offset
compressed_data = infile.read(bin_size - compression_offset)
scan_indices_ = np.zeros(scan_count, dtype=np.int64)
tof_indices_ = np.empty(
frame_end - frame_start,
dtype=np.uint32
)
intensities_ = np.empty(
frame_end - frame_start,
dtype=np.uint16
)
scan_start = 0
for scan_index in range(scan_count):
start = scan_offsets[scan_index]
end = scan_offsets[scan_index + 1]
if start == end:
continue
decompressed_bytes = lzf.decompress(
compressed_data[start: end],
max_peak_count * 4 * 2
)
scan_start += parse_decompressed_bruker_binary_type1(
decompressed_bytes,
scan_indices_,
tof_indices_,
intensities_,
scan_start,
scan_index,
)
elif compression_type == 2:
import pyzstd
compressed_data = infile.read(bin_size - 8)
decompressed_bytes = pyzstd.decompress(compressed_data)
(
scan_indices_,
tof_indices_,
intensities_
) = parse_decompressed_bruker_binary_type2(decompressed_bytes)
else:
raise ValueError("TimsCompressionType is not 1 or 2.")
scan_start = frame_id * max_scan_count
scan_end = scan_start + scan_count
scan_indptr[scan_start: scan_end] = scan_indices_
tof_indices[frame_start: frame_end] = tof_indices_
intensities[frame_start: frame_end] = intensities_
[docs]def read_bruker_binary(
frames: np.ndarray,
bruker_d_folder_name: str,
compression_type: int,
max_peaks_per_scan: int,
mmap_detector_events: bool = None,
) -> tuple:
"""Read all data from an "analysis.tdf_bin" of a Bruker .d folder.
Parameters
----------
frames : pd.DataFrame
The frames from the "analysis.tdf" SQL database of a Bruker .d folder.
These can be acquired with e.g. alphatims.bruker.read_bruker_sql.
bruker_d_folder_name : str
The full path to a Bruker .d folder.
compression_type : int
The compression type. This must be either 1 or 2.
max_peaks_per_scan : int
The maximum number of peaks per scan.
Should be treieved from the global metadata.
mmap_detector_events : bool
Do not save the intensity_values and tof_indices in memory,
but use an mmap instead.
Default is True
Returns
-------
: tuple (np.int64[:], np.uint32[:], np.uint16[:]).
The scan_indptr, tof_indices and intensities.
"""
frame_indptr = np.empty(frames.shape[0] + 1, dtype=np.int64)
frame_indptr[0] = 0
frame_indptr[1:] = np.cumsum(frames.NumPeaks.values)
max_scan_count = frames.NumScans.max() + 1
scan_count = max_scan_count * frames.shape[0]
scan_indptr = np.zeros(scan_count + 1, dtype=np.int64)
if mmap_detector_events:
intensities = tm.empty(int(frame_indptr[-1]), dtype=np.uint16)
tof_indices = tm.empty(int(frame_indptr[-1]), dtype=np.uint32)
else:
intensities = np.empty(int(frame_indptr[-1]), dtype=np.uint16)
tof_indices = np.empty(int(frame_indptr[-1]), dtype=np.uint32)
tdf_bin_file_name = os.path.join(bruker_d_folder_name, "analysis.tdf_bin")
tims_offset_values = frames.TimsId.values
logging.info(
f"Reading {frame_indptr.size - 2:,} frames with "
f"{frame_indptr[-1]:,} detector events for {bruker_d_folder_name}"
)
if compression_type == 1:
process_frame_func = alphatims.utils.threadpool(
process_frame,
thread_count=1
)
else:
process_frame_func = alphatims.utils.threadpool(process_frame)
process_frame_func(
range(1, len(frames)),
tdf_bin_file_name,
tims_offset_values,
scan_indptr,
intensities,
tof_indices,
frame_indptr,
max_scan_count,
compression_type,
max_peaks_per_scan,
)
scan_indptr[1:] = np.cumsum(scan_indptr[:-1])
scan_indptr[0] = 0
return scan_indptr, tof_indices, intensities
[docs]class TimsTOF(object):
"""A class that stores Bruker TimsTOF data in memory for fast access.
Data can be read directly from a Bruker .d folder.
All OS's are supported,
but reading mz_values and mobility_values from a .d folder
requires Windows or Linux due to availability of Bruker libraries.
On MacOS, they are estimated based on metadata,
but these values are not guaranteed to be correct.
Often they fall within 0.02 Th, but errors up to 6 Th have already
been observed!
A TimsTOF object can also be exported to HDF for subsequent access.
This file format is portable to all OS's.
As such, initial reading on Windows with correct mz_values and
mobility_values can be done and the resulting HDF file can
safely be read on MacOS.
This HDF file also provides improved accession times for subsequent use.
After reading, data can be accessed with traditional Python slices.
As TimsTOF data is 5-dimensional, the data can be sliced in 5 dimensions
as well. These dimensions follows the design of the TimsTOF Pro:
1 LC: rt_values, frame_indices
The first dimension allows to slice retention_time values
or frames indices. These values and indices
have a one-to-one relationship.
2 TIMS: mobility_values, scan_indices
The second dimension allows to slice mobility values or
scan indices (i.e. a single push).
These values and indices have a one-to-one relationship.
3 QUAD: quad_mz_values, precursor_indices
The third dimension focusses on the quadrupole and indirectly
on the collision cell. It allows to slice lower and upper
quadrupole mz values (e.g. the m/z of
unfragmented ions / precursors). If set to -1, the quadrupole and
collision cell are assumed to be inactive, i.e. precursor ions
are detected instead of fragments.
Equally, this dimension allows to slice precursor indices.
Precursor index 0 defaults to all precusors (i.e. quad mz values
equal to -1). In DDA, precursor indices larger than 0 point
to ddaPASEF MSMS spectra.
In DIA, precursor indices larger than 0 point to windows,
i.e. all scans in a frame with equal quadrupole and collision
settings that is repeated once per full cycle.
Note that these values do not have a one-to-one relationship.
4 TOF: mz_values, tof_indices
The fourth dimension allows to slice (fragment) mz_values
or tof indices. Note that the quadrupole dimension determines
if precursors are detected or fragments.
These values and indices have a one-to-one relationship.
5 DETECTOR: intensity_values
The fifth dimension allows to slice intensity values.
Note that all dimensions except for the detector have both
(float) values and (integer) indices.
For each dimension, slices can be provided in several different ways:
- int:
A single int can be used to select a single index.
If used in the fifth dimension, it still allows to select
intensity_values
- float:
A single float can be used to select a single value.
As the values arrays are discrete, the smallest index with a value
equal to or larger than this value is actually selected.
For intensity_value slicing, the exact value is used.
- slice:
A Python slice with start, stop and step can be provided.
Start and stop values can independently be set to int or float.
If a float is provided it conversed to an int as previously
described.
The step always needs to be provided as an int.
Since there is not one-to-one relation from values to indices for
QUAD and DETECTOR, the step value is ignored in these cases and
only start and stop can be used.
**IMPORTANT NOTE:** negative start, step and stop integers are not
supported!
- iterable:
An iterable with (mixed) floats and ints can also be provided,
in a similar fashion as Numpy's fancy indexing.
**IMPORTANT NOTE:** The resulting integers after float->int
conversion need to be sorted in ascending order!
- np.ndarray:
Multiple slicing is supported by providing either a
np.int64[:, 3] array, where each row is assumed to be a
(start, stop, step) tuple or np.float64[:, 2] where each row
is assumed to be a (start, stop) tuple.
**IMPORTANT NOTE:** These arrays need to be sorted,
disjunct and strictly increasing
(i.e. np.all(np.diff(precursor_slices[:, :2].ravel()) >= 0)
= True).
Alternatively, a dictionary can be used to define filters for each
dimension (see examples).
The result of such slicing is a pd.DataFrame with the following columns:
- raw_indices
- frame_indices
- scan_indices
- precursor_indices
- tof_indices
- rt_values
- mobility_values
- quad_low_mz_values
- quad_high_mz_values
- mz_values
- intensity_values
Instead of returning a pd.DataFrame, raw indices can be returned by
setting the last slice element to "raw".
Examples
--------
>>> data[:100.0]
# Return all datapoints with rt_values < 100.0 seconds
>>> data[:, 450]
# Return all datapoints with scan_index = 450
>>> data[:, :, 700.: 710.]
# Return all datapoints with 700.0 <= quad_mz_values < 710.0
>>> data[:, :, :, 621.9: 191000]
# Return all datapoints with 621.9 <= mz_values and
# tof_indices < 191000
>>> data[[1, 8, 10], :, 0, 621.9: np.inf]
# Return all datapoints from frames 1, 8 and 10, which are unfragmented
# (precursor_index = 0) and with 621.9 <= mz_values < np.inf
>>> data[:, :, 999]
# Return all datapoints from precursor 999
# (for diaPASEF this is a traditional MSMS spectrum)
>>> scan_slices = np.array([[10, 20, 1], [100, 200, 10]])
>>> data[:, scan_slices, :, :, :]
# Return all datapoints with scan_indices in range(10, 20) or
# range(100, 200, 10)
>>> df = data[
... {
... "frame_indices": [1, 191],
... "scan_indices": slice(300, 800, 10),
... "mz_values": slice(None, 400.5),
... "intensity_values": 50,
... }
... ]
# Slice by using a dictionary
>>> data[:, :, 999, "raw"]
# Return the raw indices of datapoints from precursor 999
"""
@property
def sample_name(self):
""": str : The sample name of this TimsTOF object."""
file_name = os.path.basename(self.bruker_d_folder_name)
return '.'.join(file_name.split('.')[:-1])
@property
def directory(self):
""": str : The directory of this TimsTOF object."""
return os.path.dirname(self.bruker_d_folder_name)
@property
def is_compressed(self):
""": bool : HDF array is compressed or not."""
return self._compressed
@property
def version(self):
""": str : AlphaTims version used to create this TimsTOF object."""
return self._version
@property
def acquisition_mode(self):
""": str : The acquisition mode."""
return self._acquisition_mode
@property
def meta_data(self):
""": dict : The metadata for the acquisition."""
return self._meta_data
@property
def rt_values(self):
""": np.ndarray : np.float64[:] : The rt values."""
return self._rt_values
@property
def mobility_values(self):
""": np.ndarray : np.float64[:] : The mobility values."""
return self._mobility_values
@property
def mz_values(self):
""": np.ndarray : np.float64[:] : The mz values."""
if self._use_calibrated_mz_values_as_default:
return self._calibrated_mz_values
else:
return self._mz_values
@property
def calibrated_mz_values(self):
""": np.ndarray : np.float64[:] : The global calibrated mz values."""
return self._calibrated_mz_values
@property
def quad_mz_values(self):
""": np.ndarray : np.float64[:, 2] : The (low, high) quad mz values."""
return self._quad_mz_values
@property
def intensity_values(self):
""": np.ndarray : np.uint16[:] : The intensity values."""
return self._intensity_values
@property
def frame_max_index(self):
""": int : The maximum frame index."""
return self._frame_max_index
@property
def scan_max_index(self):
""": int : The maximum scan index."""
return self._scan_max_index
@property
def tof_max_index(self):
""": int : The maximum tof index."""
return self._tof_max_index
@property
def precursor_max_index(self):
""": int : The maximum precursor index."""
return self._precursor_max_index
@property
def mz_min_value(self):
""": float : The minimum mz value."""
return self.mz_values[0]
@property
def mz_max_value(self):
""": float : The maximum mz value."""
return self.mz_values[-1]
@property
def calibrated_mz_min_value(self):
""": float : The minimum calibrated mz value."""
return self.calibrated_mz_values[0]
@property
def calibrated_mz_max_value(self):
""": float : The maximum calibrated mz value."""
return self.calibrated_mz_values[-1]
@property
def rt_max_value(self):
""": float : The maximum rt value."""
return self.rt_values[-1]
@property
def quad_mz_min_value(self):
""": float : The minimum quad mz value."""
return self._quad_min_mz_value
@property
def quad_mz_max_value(self):
""": float : The maximum quad mz value."""
return self._quad_max_mz_value
@property
def mobility_min_value(self):
""": float : The minimum mobility value."""
return self._mobility_min_value
@property
def mobility_max_value(self):
""": float : The maximum mobility value."""
return self._mobility_max_value
@property
def intensity_min_value(self):
""": float : The minimum intensity value."""
return self._intensity_min_value
@property
def intensity_max_value(self):
""": float : The maximum intensity value."""
return self._intensity_max_value
@property
def frames(self):
""": pd.DataFrame : The frames table of the analysis.tdf SQL."""
return self._frames
@property
def fragment_frames(self):
""": pd.DataFrame : The fragment frames table."""
return self._fragment_frames
@property
def precursors(self):
""": pd.DataFrame : The precursor table."""
return self._precursors
@property
def tof_indices(self):
""": np.ndarray : np.uint32[:] : The tof indices."""
return self._tof_indices
@property
def push_indptr(self):
""": np.ndarray : np.int64[:] : The tof indptr."""
return self._push_indptr
@property
def quad_indptr(self):
""": np.ndarray : np.int64[:] : The quad indptr (tof_indices)."""
return self._quad_indptr
@property
def raw_quad_indptr(self):
""": np.ndarray : np.int64[:] : The raw quad indptr (push indices)."""
return self._raw_quad_indptr
@property
def precursor_indices(self):
""": np.ndarray : np.int64[:] : The precursor indices."""
return self._precursor_indices
@property
def dia_precursor_cycle(self):
""": np.ndarray : np.int64[:] : The precursor indices of a DIA cycle."""
return self._dia_precursor_cycle
@property
def dia_mz_cycle(self):
""": np.ndarray : np.float64[:, 2] : The mz_values of a DIA cycle."""
return self._dia_mz_cycle
@property
def zeroth_frame(self):
""": bool : A blank zeroth frame is present so frames are 1-indexed."""
return self._zeroth_frame
@property
def max_accumulation_time(self):
""": float : The maximum accumulation time of all frames."""
return self._max_accumulation_time
@property
def accumulation_times(self):
""": np.ndarray : The accumulation times of all frames."""
return self._accumulation_times
@property
def intensity_corrections(self):
""": np.ndarray : The intensity_correction per frame."""
return self._intensity_corrections
[docs] def __init__(
self,
bruker_d_folder_name: str,
*,
mz_estimation_from_frame: int = 1,
mobility_estimation_from_frame: int = 1,
slice_as_dataframe: bool = True,
use_calibrated_mz_values_as_default: int = 0,
use_hdf_if_available: bool = True,
mmap_detector_events: bool = True,
drop_polarity: bool = True,
convert_polarity_to_int: bool = True,
):
"""Create a Bruker TimsTOF object that contains all data in-memory.
Parameters
----------
bruker_d_folder_name : str
The full file name to a Bruker .d folder.
Alternatively, the full file name of an already exported .hdf
can be provided as well.
mz_estimation_from_frame : int
If larger than 0, mz_values from this frame are read as
default mz_values with the Bruker library.
If 0, mz_values are being estimated with the metadata
based on "MzAcqRangeLower" and "MzAcqRangeUpper".
IMPORTANT NOTE: MacOS defaults to 0, as no Bruker library
is available.
Default is 1.
mobility_estimation_from_frame : int
If larger than 0, mobility_values from this frame are read as
default mobility_values with the Bruker library.
If 0, mobility_values are being estimated with the metadata
based on "OneOverK0AcqRangeLower" and "OneOverK0AcqRangeUpper".
IMPORTANT NOTE: MacOS defaults to 0, as no Bruker library
is available.
Default is 1.
slice_as_dataframe : bool
If True, slicing returns a pd.DataFrame by default.
If False, slicing provides a np.int64[:] with raw indices.
This value can also be modified after creation.
Default is True.
use_calibrated_mz_values : int
If not 0, the mz_values are overwritten with global
calibrated_mz_values.
If 1, calibration at the MS1 level is performed.
If 2, calibration at the MS2 level is performed.
Default is 0.
use_hdf_if_available : bool
If an HDF file is available, use this instead of the .d folder.
Default is True.
mmap_detector_events : bool
Do not save the intensity_values and tof_indices in memory,
but use an mmap instead.
Default is True
drop_polarity : bool
The polarity column of the frames table contains "+" or "-" and
is not numerical.
If True, the polarity column is dropped from the frames table.
this ensures a fully numerical pd.DataFrame.
If False, this column is kept, resulting in a pd.DataFrame with
dtype=object.
Default is True.
convert_polarity_to_int : bool
Convert the polarity to int (-1 or +1).
This allows to keep it in numerical form.
This is ignored if the polarity is dropped.
Default is True.
"""
if bruker_d_folder_name.endswith("/"):
bruker_d_folder_name = bruker_d_folder_name[:-1]
logging.info(f"Importing data from {bruker_d_folder_name}")
if bruker_d_folder_name.endswith(".d"):
bruker_hdf_file_name = f"{bruker_d_folder_name[:-2]}.hdf"
hdf_file_exists = os.path.exists(bruker_hdf_file_name)
if use_hdf_if_available and hdf_file_exists:
self._import_data_from_hdf_file(
bruker_hdf_file_name,
mmap_detector_events,
)
self.bruker_hdf_file_name = bruker_hdf_file_name
else:
self.bruker_d_folder_name = os.path.abspath(
bruker_d_folder_name
)
self._import_data_from_d_folder(
bruker_d_folder_name,
mz_estimation_from_frame,
mobility_estimation_from_frame,
drop_polarity,
convert_polarity_to_int,
mmap_detector_events,
)
elif bruker_d_folder_name.endswith(".hdf"):
self._import_data_from_hdf_file(
bruker_d_folder_name,
mmap_detector_events,
)
self.bruker_hdf_file_name = bruker_d_folder_name
else:
raise NotImplementedError(
"WARNING: file extension not understood"
)
if not hasattr(self, "version"):
self._version = "N.A."
if self.version != alphatims.__version__:
logging.info(
"WARNING: "
f"AlphaTims version {self.version} was used to initialize "
f"{bruker_d_folder_name}, while the current version of "
f"AlphaTims is {alphatims.__version__}."
)
self.slice_as_dataframe = slice_as_dataframe
self.use_calibrated_mz_values_as_default(
use_calibrated_mz_values_as_default
)
# Precompile
self[0, "raw"]
logging.info(f"Successfully imported data from {bruker_d_folder_name}")
def __len__(self):
return len(self.intensity_values)
def __hash__(self):
return hash(self.bruker_d_folder_name)
def _import_data_from_d_folder(
self,
bruker_d_folder_name: str,
mz_estimation_from_frame: int,
mobility_estimation_from_frame: int,
drop_polarity: bool = True,
convert_polarity_to_int: bool = True,
mmap_detector_events: bool = True
):
logging.info(f"Using .d import for {bruker_d_folder_name}")
self._version = alphatims.__version__
self._zeroth_frame = True
(
self._acquisition_mode,
global_meta_data,
self._frames,
self._fragment_frames,
self._precursors,
) = read_bruker_sql(
bruker_d_folder_name,
self._zeroth_frame,
drop_polarity,
convert_polarity_to_int,
)
self._meta_data = dict(
zip(global_meta_data.Key, global_meta_data.Value)
)
(
self._push_indptr,
self._tof_indices,
self._intensity_values,
) = read_bruker_binary(
self.frames,
bruker_d_folder_name,
int(self._meta_data["TimsCompressionType"]),
int(self._meta_data["MaxNumPeaksPerScan"]),
)
logging.info(f"Indexing {bruker_d_folder_name}...")
self._use_calibrated_mz_values_as_default = False
self._frame_max_index = self.frames.shape[0]
self._scan_max_index = int(self.frames.NumScans.max()) + 1
self._tof_max_index = int(self.meta_data["DigitizerNumSamples"]) + 1
self._rt_values = self.frames.Time.values.astype(np.float64)
self._mobility_min_value = float(
self.meta_data["OneOverK0AcqRangeLower"]
)
self._mobility_max_value = float(
self.meta_data["OneOverK0AcqRangeUpper"]
)
self._accumulation_times = self.frames.AccumulationTime.values.astype(
np.float64
)
self._max_accumulation_time = np.max(self._accumulation_times)
self._intensity_corrections = self._max_accumulation_time / self._accumulation_times
bruker_dll_available = BRUKER_DLL_FILE_NAME != ""
if (mobility_estimation_from_frame != 0) and bruker_dll_available:
import ctypes
with alphatims.bruker.open_bruker_d_folder(
bruker_d_folder_name
) as (bruker_dll, bruker_d_folder_handle):
logging.info(
f"Fetching mobility values from {bruker_d_folder_name}"
)
indices = np.arange(self.scan_max_index).astype(np.float64)
self._mobility_values = np.empty_like(indices)
bruker_dll.tims_scannum_to_oneoverk0(
bruker_d_folder_handle,
mobility_estimation_from_frame,
indices.ctypes.data_as(
ctypes.POINTER(ctypes.c_double)
),
self.mobility_values.ctypes.data_as(
ctypes.POINTER(ctypes.c_double)
),
self.scan_max_index
)
else:
if (mobility_estimation_from_frame != 0):
logging.info(
"Bruker DLL not available, estimating mobility values"
)
self._mobility_values = self.mobility_max_value - (
self.mobility_max_value - self.mobility_min_value
) / self.scan_max_index * np.arange(self.scan_max_index)
mz_min_value = float(self.meta_data["MzAcqRangeLower"])
mz_max_value = float(self.meta_data["MzAcqRangeUpper"])
tof_intercept = np.sqrt(mz_min_value)
tof_slope = (
np.sqrt(mz_max_value) - tof_intercept
) / self.tof_max_index
if (mz_estimation_from_frame != 0) and bruker_dll_available:
import ctypes
with alphatims.bruker.open_bruker_d_folder(
bruker_d_folder_name
) as (bruker_dll, bruker_d_folder_handle):
logging.info(
f"Fetching mz values from {bruker_d_folder_name}"
)
indices = np.arange(self.tof_max_index).astype(np.float64)
self._mz_values = np.empty_like(indices)
bruker_dll.tims_index_to_mz(
bruker_d_folder_handle,
mz_estimation_from_frame,
indices.ctypes.data_as(
ctypes.POINTER(ctypes.c_double)
),
self._mz_values.ctypes.data_as(
ctypes.POINTER(ctypes.c_double)
),
self.tof_max_index
)
else:
if (mz_estimation_from_frame != 0):
logging.info(
"Bruker DLL not available, estimating mz values"
)
self._mz_values = (
tof_intercept + tof_slope * np.arange(self.tof_max_index)
)**2
self._parse_quad_indptr()
self._intensity_min_value = int(np.min(self.intensity_values))
self._intensity_max_value = int(np.max(self.intensity_values))
[docs] def save_as_hdf(
self,
directory: str,
file_name: str,
overwrite: bool = False,
compress: bool = False,
return_as_bytes_io: bool = False,
):
"""Save the TimsTOF object as an hdf file.
Parameters
----------
directory : str
The directory where to save the HDF file.
Ignored if return_as_bytes_io == True.
file_name : str
The file name of the HDF file.
Ignored if return_as_bytes_io == True.
overwrite : bool
If True, an existing file is truncated.
If False, the existing file is appended to only if the original
group, array or property does not exist yet.
Default is False.
compress : bool
If True, compression is used.
This roughly halves files sizes (on-disk),
at the cost of taking 3-6 longer accession times.
See also alphatims.utils.create_hdf_group_from_dict.
If False, no compression is used
Default is False.
return_as_bytes_io
If True, the HDF file is only created in memory and returned
as a bytes stream.
If False, the file is written to disk.
Default is False.
Returns
-------
str, io.BytesIO
The full file name or a bytes stream containing the HDF file.
"""
import io
if overwrite:
hdf_mode = "w"
else:
hdf_mode = "a"
if return_as_bytes_io:
full_file_name = io.BytesIO()
else:
full_file_name = os.path.join(
directory,
file_name
)
logging.info(
f"Writing TimsTOF data to {full_file_name}."
)
self._compressed = compress
with h5py.File(full_file_name, hdf_mode) as hdf_root:
# hdf_root.swmr_mode = True
alphatims.utils.create_hdf_group_from_dict(
hdf_root.create_group("raw"),
self.__dict__,
overwrite=overwrite,
compress=compress,
)
if return_as_bytes_io:
full_file_name.seek(0)
else:
logging.info(
f"Successfully wrote TimsTOF data to {full_file_name}."
)
return full_file_name
def _import_data_from_hdf_file(
self,
bruker_d_folder_name: str,
mmap_detector_events: bool = False,
):
logging.info(f"Using HDF import for {bruker_d_folder_name}")
with h5py.File(bruker_d_folder_name, "r") as hdf_root:
mmap_arrays = []
if mmap_detector_events:
mmap_arrays.append("/raw/_tof_indices")
mmap_arrays.append("/raw/_intensity_values")
self.__dict__ = alphatims.utils.create_dict_from_hdf_group(
hdf_root["raw"],
mmap_arrays,
bruker_d_folder_name,
)
[docs] def convert_from_indices(
self,
raw_indices,
*,
frame_indices=None,
quad_indices=None,
scan_indices=None,
tof_indices=None,
return_raw_indices: bool = False,
return_frame_indices: bool = False,
return_scan_indices: bool = False,
return_quad_indices: bool = False,
return_tof_indices: bool = False,
return_precursor_indices: bool = False,
return_rt_values: bool = False,
return_rt_values_min: bool = False,
return_mobility_values: bool = False,
return_quad_mz_values: bool = False,
return_push_indices: bool = False,
return_mz_values: bool = False,
return_intensity_values: bool = False,
return_corrected_intensity_values: bool = False,
raw_indices_sorted: bool = False,
) -> dict:
"""Convert selected indices to a dict.
Parameters
----------
raw_indices : np.int64[:], None
The raw indices for which coordinates need to be retrieved.
frame_indices : np.int64[:], None
The frame indices for which coordinates need to be retrieved.
quad_indices : np.int64[:], None
The quad indices for which coordinates need to be retrieved.
scan_indices : np.int64[:], None
The scan indices for which coordinates need to be retrieved.
tof_indices : np.int64[:], None
The tof indices for which coordinates need to be retrieved.
return_raw_indices : bool
If True, include "raw_indices" in the dict.
Default is False.
return_frame_indices : bool
If True, include "frame_indices" in the dict.
Default is False.
return_scan_indices : bool
If True, include "scan_indices" in the dict.
Default is False.
return_quad_indices : bool
If True, include "quad_indices" in the dict.
Default is False.
return_tof_indices : bool
If True, include "tof_indices" in the dict.
Default is False.
return_precursor_indices : bool
If True, include "precursor_indices" in the dict.
Default is False.
return_rt_values : bool
If True, include "rt_values" in the dict.
Default is False.
return_rt_values_min : bool
If True, include "rt_values_min" in the dict.
Default is False.
return_mobility_values : bool
If True, include "mobility_values" in the dict.
Default is False.
return_quad_mz_values : bool
If True, include "quad_low_mz_values" and
"quad_high_mz_values" in the dict.
Default is False.
return_push_indices : bool
If True, include "push_indices" in the dict.
Default is False.
return_mz_values : bool
If True, include "mz_values" in the dict.
Default is False.
return_intensity_values : bool
If True, include "intensity_values" in the dict.
Default is False.
return_corrected_intensity_values : bool
If True, include "corrected_intensity_values" in the dict.
Default is False.
raw_indices_sorted : bool
If True, raw_indices are assumed to be sorted,
resulting in a faster conversion.
Default is False.
Returns
-------
dict
A dict with all requested columns.
"""
try:
iter(raw_indices)
except TypeError:
raw_indices = [raw_indices]
if not isinstance(raw_indices, np.ndarray):
raw_indices = np.array(raw_indices)
result = {}
if (raw_indices is not None) and any(
[
return_frame_indices,
return_scan_indices,
return_quad_indices,
return_rt_values,
return_rt_values_min,
return_mobility_values,
return_quad_mz_values,
return_precursor_indices,
return_push_indices,
return_corrected_intensity_values,
]
):
if raw_indices_sorted:
push_indices = indptr_lookup(
self.push_indptr,
raw_indices,
)
else:
push_indices = np.searchsorted(
self.push_indptr,
raw_indices,
"right"
) - 1
if (
any(
[
return_frame_indices,
return_rt_values,
return_rt_values_min,
return_corrected_intensity_values,
]
)
) and (
frame_indices is None
):
frame_indices = push_indices // self.scan_max_index
if (return_scan_indices or return_mobility_values) and (
scan_indices is None
):
scan_indices = push_indices % self.scan_max_index
if any(
[
return_quad_indices,
return_quad_mz_values,
return_precursor_indices
]
) and (
quad_indices is None
):
if raw_indices_sorted:
quad_indices = indptr_lookup(
self.quad_indptr,
raw_indices,
)
else:
quad_indices = np.searchsorted(
self.quad_indptr,
raw_indices,
"right"
) - 1
if (return_tof_indices or return_mz_values) and (tof_indices is None):
tof_indices = self.tof_indices[raw_indices]
if return_raw_indices:
result["raw_indices"] = raw_indices
if return_frame_indices:
result["frame_indices"] = frame_indices
if return_scan_indices:
result["scan_indices"] = scan_indices
if return_quad_indices:
result["quad_indices"] = quad_indices
if return_precursor_indices:
result["precursor_indices"] = self.precursor_indices[quad_indices]
if return_push_indices:
result["push_indices"] = push_indices
if return_tof_indices:
result["tof_indices"] = tof_indices
if return_rt_values:
result["rt_values"] = self.rt_values[frame_indices]
if return_rt_values_min:
if "rt_values" in result:
result['rt_values_min'] = result["rt_values"] / 60
else:
result['rt_values_min'] = self.rt_values[frame_indices] / 60
if return_mobility_values:
result["mobility_values"] = self.mobility_values[scan_indices]
if return_quad_mz_values:
selected_quad_values = self.quad_mz_values[quad_indices]
low_mz_values = selected_quad_values[:, 0]
high_mz_values = selected_quad_values[:, 1]
result["quad_low_mz_values"] = low_mz_values
result["quad_high_mz_values"] = high_mz_values
if return_mz_values:
result["mz_values"] = self.mz_values[tof_indices]
if return_intensity_values:
result["intensity_values"] = self.intensity_values[raw_indices]
if return_corrected_intensity_values:
result["corrected_intensity_values"] = (
self.intensity_values[raw_indices] * self.intensity_corrections[frame_indices]
).astype(np.uint32)
return result
[docs] def convert_to_indices(
self,
values: np.ndarray,
*,
return_frame_indices: bool = False,
return_scan_indices: bool = False,
return_tof_indices: bool = False,
side: str = "left",
return_type: str = "",
):
"""Convert selected values to an array in the requested dimension.
Parameters
----------
values : float, np.float64[...], iterable
The raw values for which indices need to be retrieved.
return_frame_indices : bool
If True, convert the values to "frame_indices".
Default is False.
return_scan_indices : bool
If True, convert the values to "scan_indices".
Default is False.
return_tof_indices : bool
If True, convert the values to "tof_indices".
Default is False.
side : str
If there is an exact match between the values and reference array,
which index should be chosen. See also np.searchsorted.
Options are "left" or "right".
Default is "left".
return_type : str
Alternative way to define the return type.
Options are "frame_indices", "scan_indices" or "tof_indices".
Default is "".
Returns
-------
np.int64[...], int
An array with the same shape as values or iterable or an int
which corresponds to the requested value.
Raises
------
PrecursorFloatError
When trying to convert a quad float other than np.inf or -np.inf
to precursor index.
"""
if return_frame_indices:
return_type = "frame_indices"
elif return_scan_indices:
return_type = "scan_indices"
elif return_tof_indices:
return_type = "tof_indices"
if return_type == "frame_indices":
return np.searchsorted(self.rt_values, values, side)
elif return_type == "scan_indices":
return self.scan_max_index - np.searchsorted(
self.mobility_values[::-1],
values,
"left" if side == "right" else "right"
)
elif return_type == "tof_indices":
return np.searchsorted(self.mz_values, values, side)
elif return_type == "precursor_indices":
try:
if values not in [-np.inf, np.inf]:
raise PrecursorFloatError(
"Can not convert values to precursor_indices"
)
except ValueError:
raise PrecursorFloatError(
"Can not convert values to precursor_indices"
)
if values == -np.inf:
return 0
elif values == np.inf:
return self.precursor_max_index
else:
raise KeyError(f"return_type '{return_type}' is invalid")
def __getitem__(self, keys):
if not isinstance(keys, tuple):
keys = tuple([keys])
if isinstance(keys[-1], str):
if keys[-1] == "df":
as_dataframe = True
elif keys[-1] == "raw":
as_dataframe = False
else:
raise ValueError(f"Cannot use {keys[-1]} as a key")
keys = keys[:-1]
else:
as_dataframe = self.slice_as_dataframe
parsed_keys = parse_keys(self, keys)
raw_indices = filter_indices(
frame_slices=parsed_keys["frame_indices"],
scan_slices=parsed_keys["scan_indices"],
precursor_slices=parsed_keys["precursor_indices"],
tof_slices=parsed_keys["tof_indices"],
quad_slices=parsed_keys["quad_values"],
intensity_slices=parsed_keys["intensity_values"],
frame_max_index=self.frame_max_index,
scan_max_index=self.scan_max_index,
push_indptr=self.push_indptr,
precursor_indices=self.precursor_indices,
quad_mz_values=self.quad_mz_values,
quad_indptr=self.quad_indptr,
tof_indices=self.tof_indices,
intensities=self.intensity_values
)
if as_dataframe:
return self.as_dataframe(raw_indices)
else:
return raw_indices
[docs] def estimate_strike_count(
self,
frame_slices: np.ndarray,
scan_slices: np.ndarray,
precursor_slices: np.ndarray,
tof_slices: np.ndarray,
quad_slices: np.ndarray,
) -> int:
"""Estimate the number of detector events, given a set of slices.
Parameters
----------
frame_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted,
disjunct and strictly increasing
(i.e. np.all(np.diff(frame_slices[:, :2].ravel()) >= 0) = True).
scan_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted,
disjunct and strictly increasing
(i.e. np.all(np.diff(scan_slices[:, :2].ravel()) >= 0) = True).
precursor_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted,
disjunct and strictly increasing
(i.e. np.all(np.diff(precursor_slices[:, :2].ravel()) >= 0)
= True).
tof_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted,
disjunct and strictly increasing
(i.e. np.all(np.diff(tof_slices[:, :2].ravel()) >= 0) = True).
quad_slices : np.float64[:, 2]
Each row of the array is assumed to be (lower_mz, upper_mz) tuple.
This array is assumed to be sorted,
disjunct and strictly increasing
(i.e. np.all(np.diff(quad_slices.ravel()) >= 0) = True).
Returns
-------
int
The estimated number of detector events given these slices.
"""
frame_count = 0
for frame_start, frame_end, frame_stop in frame_slices:
frame_count += len(range(frame_start, frame_end, frame_stop))
scan_count = 0
for scan_start, scan_end, scan_stop in scan_slices:
scan_count += len(range(scan_start, scan_end, scan_stop))
tof_count = 0
for tof_start, tof_end, tof_stop in tof_slices:
tof_count += len(range(tof_start, tof_end, tof_stop))
precursor_count = 0
precursor_index_included = False
for precursor_start, precursor_end, precursor_stop in precursor_slices:
precursor_count += len(
range(precursor_start, precursor_end, precursor_stop)
)
if 0 in range(precursor_start, precursor_end, precursor_stop):
precursor_index_included = True
quad_count = 0
precursor_quad_included = False
for quad_start, quad_end in quad_slices:
if quad_start < 0:
precursor_quad_included = True
if quad_start < self.quad_mz_min_value:
quad_start = self.quad_mz_min_value
if quad_end > self.quad_mz_max_value:
quad_end = self.quad_mz_max_value
if quad_start < quad_end:
quad_count += quad_end - quad_start
estimated_count = len(self)
estimated_count *= frame_count / self.frame_max_index
estimated_count *= scan_count / self.scan_max_index
estimated_count *= tof_count / self.tof_max_index
fragment_multiplier = 0.5 * min(
precursor_count / (self.precursor_max_index),
quad_count / (
self.quad_mz_max_value - self.quad_mz_min_value
)
)
if fragment_multiplier < 0:
fragment_multiplier = 0
if precursor_index_included and precursor_quad_included:
fragment_multiplier += 0.5
estimated_count *= fragment_multiplier
return int(estimated_count)
[docs] def bin_intensities(self, indices: np.ndarray, axis: tuple):
"""Sum and project the intensities of the indices along 1 or 2 axis.
Parameters
----------
indices : np.int64[:]
The selected indices whose coordinates need to be summed along
the selected axis.
axis : tuple
Must be length 1 or 2 and can only contain the elements
"rt_values", "mobility_values" and "mz_values".
Returns
-------
np.float64[:], np.float64[:, 2]
An array or heatmap that express the summed intensity along
the selected axis.
"""
intensities = self.intensity_values[indices].astype(np.float64)
max_index = {
"rt_values": self.frame_max_index,
"mobility_values": self.scan_max_index,
"mz_values": self.tof_max_index,
}
parsed_indices = self.convert_from_indices(
indices,
return_frame_indices="rt_values" in axis,
return_scan_indices="mobility_values" in axis,
return_tof_indices="mz_values" in axis,
)
binned_intensities = np.zeros(tuple([max_index[ax] for ax in axis]))
parse_dict = {
"rt_values": "frame_indices",
"mobility_values": "scan_indices",
"mz_values": "tof_indices",
}
add_intensity_to_bin(
range(indices.size),
intensities,
tuple(
[
parsed_indices[parse_dict[ax]] for ax in axis
]
),
binned_intensities
)
return binned_intensities
[docs] def as_dataframe(
self,
indices: np.ndarray,
*,
raw_indices: bool = True,
frame_indices: bool = True,
scan_indices: bool = True,
quad_indices: bool = False,
tof_indices: bool = True,
precursor_indices: bool = True,
rt_values: bool = True,
rt_values_min: bool = True,
mobility_values: bool = True,
quad_mz_values: bool = True,
push_indices: bool = True,
mz_values: bool = True,
intensity_values: bool = True,
corrected_intensity_values: bool = True,
raw_indices_sorted: bool = False,
):
"""Convert raw indices to a pd.DataFrame.
Parameters
----------
indices : np.int64[:]
The raw indices for which coordinates need to be retrieved.
raw_indices : bool
If True, include "raw_indices" in the dataframe.
Default is True.
frame_indices : bool
If True, include "frame_indices" in the dataframe.
Default is True.
scan_indices : bool
If True, include "scan_indices" in the dataframe.
Default is True.
quad_indices : bool
If True, include "quad_indices" in the dataframe.
Default is False.
tof_indices : bool
If True, include "tof_indices" in the dataframe.
Default is True.
precursor_indices : bool
If True, include "precursor_indices" in the dataframe.
Default is True.
rt_values : bool
If True, include "rt_values" in the dataframe.
Default is True.
rt_values_min : bool
If True, include "rt_values_min" in the dataframe.
Default is True.
mobility_values : bool
If True, include "mobility_values" in the dataframe.
Default is True.
quad_mz_values : bool
If True, include "quad_low_mz_values" and
"quad_high_mz_values" in the dict.
Default is True.
push_indices : bool
If True, include "push_indices" in the dataframe.
Default is True.
mz_values : bool
If True, include "mz_values" in the dataframe.
Default is True.
intensity_values : bool
If True, include "intensity_values" in the dataframe.
Default is True.
corrected_intensity_values : bool
If True, include "corrected_intensity_values" in the dataframe.
Default is True.
raw_indices_sorted : bool
If True, raw_indices are assumed to be sorted,
resulting in a faster conversion.
Default is False.
Returns
-------
pd.DataFrame
A dataframe with all requested columns.
"""
return pd.DataFrame(
self.convert_from_indices(
indices,
return_raw_indices=raw_indices,
return_frame_indices=frame_indices,
return_scan_indices=scan_indices,
return_quad_indices=quad_indices,
return_precursor_indices=precursor_indices,
return_tof_indices=tof_indices,
return_rt_values=rt_values,
return_rt_values_min=rt_values_min,
return_mobility_values=mobility_values,
return_quad_mz_values=quad_mz_values,
return_push_indices=push_indices,
return_mz_values=mz_values,
return_intensity_values=intensity_values,
return_corrected_intensity_values=corrected_intensity_values,
raw_indices_sorted=raw_indices_sorted,
)
)
def _parse_quad_indptr(self) -> None:
logging.info("Indexing quadrupole dimension")
frame_ids = self.fragment_frames.Frame.values + 1
scan_begins = self.fragment_frames.ScanNumBegin.values
scan_ends = self.fragment_frames.ScanNumEnd.values
isolation_mzs = self.fragment_frames.IsolationMz.values
isolation_widths = self.fragment_frames.IsolationWidth.values
precursors = self.fragment_frames.Precursor.values
if (precursors[0] is None):
if self.zeroth_frame:
frame_groups = self.frames.MsMsType.values[1:]
else:
frame_groups = self.frames.MsMsType.values
precursor_frames = np.flatnonzero(frame_groups == 0)
group_sizes = np.diff(precursor_frames)
group_size = group_sizes[0]
if np.any(group_sizes != group_size):
raise ValueError("Sample type not understood")
precursors = (1 + frame_ids - frame_ids[0]) % group_size
if self.zeroth_frame:
precursors[0] = 0
self.fragment_frames.Precursor = precursors
self._acquisition_mode = "diaPASEF"
scan_max_index = self.scan_max_index
frame_max_index = self.frame_max_index
quad_indptr = [0]
quad_low_values = []
quad_high_values = []
precursor_indices = []
high = -1
for (
frame_id,
scan_begin,
scan_end,
isolation_mz,
isolation_width,
precursor
) in zip(
frame_ids - 1,
scan_begins,
scan_ends,
isolation_mzs,
isolation_widths / 2,
precursors
):
low = frame_id * scan_max_index + scan_begin
# TODO: CHECK?
# if low < high:
# print(frame_id, low, frame_id * scan_max_index + scan_end, high, low - high)
if low != high:
quad_indptr.append(low)
quad_low_values.append(-1)
quad_high_values.append(-1)
precursor_indices.append(0)
high = frame_id * scan_max_index + scan_end
quad_indptr.append(high)
quad_low_values.append(isolation_mz - isolation_width)
quad_high_values.append(isolation_mz + isolation_width)
precursor_indices.append(precursor)
quad_max_index = scan_max_index * frame_max_index
if high < quad_max_index:
quad_indptr.append(quad_max_index)
quad_low_values.append(-1)
quad_high_values.append(-1)
precursor_indices.append(0)
self._quad_mz_values = np.stack([quad_low_values, quad_high_values]).T
self._precursor_indices = np.array(precursor_indices)
self._raw_quad_indptr = np.array(quad_indptr)
self._quad_indptr = self.push_indptr[self._raw_quad_indptr]
self._quad_max_mz_value = np.max(self.quad_mz_values[:, 1])
self._quad_min_mz_value = np.min(
self.quad_mz_values[
self.quad_mz_values[:, 0] >= 0,
0
]
)
self._precursor_max_index = int(np.max(self.precursor_indices)) + 1
if self._acquisition_mode == "diaPASEF":
offset = int(self.zeroth_frame)
cycle_index = np.searchsorted(
self.raw_quad_indptr,
(self.scan_max_index) * (self.precursor_max_index + offset),
"r"
) + 1
repeats = np.diff(self.raw_quad_indptr[: cycle_index])
if self.zeroth_frame:
repeats[0] -= self.scan_max_index
cycle_length = self.scan_max_index * self.precursor_max_index
repeat_length = np.sum(repeats)
if repeat_length != cycle_length:
repeats[-1] -= repeat_length - cycle_length
self._dia_mz_cycle = np.empty((cycle_length, 2))
self._dia_mz_cycle[:, 0] = np.repeat(
self.quad_mz_values[: cycle_index - 1, 0],
repeats
)
self._dia_mz_cycle[:, 1] = np.repeat(
self.quad_mz_values[: cycle_index - 1, 1],
repeats
)
self._dia_precursor_cycle = np.repeat(
self.precursor_indices[: cycle_index - 1],
repeats
)
else:
self._dia_mz_cycle = np.empty((0, 2))
self._dia_precursor_cycle = np.empty(0, dtype=np.int64)
[docs] def index_precursors(
self,
centroiding_window: int = 0,
keep_n_most_abundant_peaks: int = -1,
) -> tuple:
"""Retrieve all MS2 spectra acquired with DDA.
IMPORTANT NOTE: This function is intended for DDA samples.
While it in theory works for DIA sample too, this probably has little
value.
Parameters
----------
centroiding_window : int
The centroiding window to use.
If 0, no centroiding is performed.
Default is 0.
keep_n_most_abundant_peaks : int
Keep the n most abundant peaks.
If -1, all peaks are retained.
Default is -1.
Returns
-------
: tuple (np.int64[:], np.uint32[:], np.uint32[:])
The spectrum_indptr array, spectrum_tof_indices array and
spectrum_intensity_values array.
"""
precursor_order = np.argsort(self.precursor_indices)
precursor_offsets = np.empty(
self.precursor_max_index + 1,
dtype=np.int64
)
precursor_offsets[0] = 0
precursor_offsets[1:-1] = np.flatnonzero(
np.diff(self.precursor_indices[precursor_order]) > 0) + 1
precursor_offsets[-1] = len(precursor_order)
offset = precursor_offsets[1]
offsets = precursor_order[offset:]
counts = np.empty(len(offsets) + 1, dtype=np.int)
counts[0] = 0
counts[1:] = np.cumsum(
self.quad_indptr[offsets + 1] - self.quad_indptr[offsets]
)
spectrum_indptr = np.empty(
self.precursor_max_index + 1,
dtype=np.int64
)
spectrum_indptr[1:] = counts[
precursor_offsets[1:] - precursor_offsets[1]
]
spectrum_indptr[0] = 0
spectrum_counts = np.zeros_like(spectrum_indptr)
spectrum_tof_indices = np.empty(spectrum_indptr[-1], dtype=np.uint32)
spectrum_intensity_values = np.empty(
len(spectrum_tof_indices),
dtype=np.float64
)
set_precursor(
range(1, self.precursor_max_index),
precursor_order,
precursor_offsets,
self.quad_indptr,
self.tof_indices,
self.intensity_values,
spectrum_tof_indices,
spectrum_intensity_values,
spectrum_indptr,
spectrum_counts,
)
if centroiding_window > 0:
centroid_spectra(
range(1, self.precursor_max_index),
spectrum_indptr,
spectrum_counts,
spectrum_tof_indices,
spectrum_intensity_values,
centroiding_window,
)
if keep_n_most_abundant_peaks > -1:
filter_spectra_by_abundant_peaks(
range(1, self.precursor_max_index),
spectrum_indptr,
spectrum_counts,
spectrum_tof_indices,
spectrum_intensity_values,
keep_n_most_abundant_peaks,
)
new_spectrum_indptr = np.empty_like(spectrum_counts)
new_spectrum_indptr[1:] = np.cumsum(spectrum_counts[:-1])
new_spectrum_indptr[0] = 0
trimmed_spectrum_tof_indices = np.empty(
new_spectrum_indptr[-1],
dtype=np.uint32
)
trimmed_spectrum_intensity_values = np.empty(
len(trimmed_spectrum_tof_indices),
dtype=np.float64
)
spectrum_intensity_values
trim_spectra(
range(1, self.precursor_max_index),
spectrum_tof_indices,
spectrum_intensity_values,
spectrum_indptr,
trimmed_spectrum_tof_indices,
trimmed_spectrum_intensity_values,
new_spectrum_indptr,
)
return (
new_spectrum_indptr,
trimmed_spectrum_tof_indices,
trimmed_spectrum_intensity_values
)
[docs] def save_as_mgf(
self,
directory: str,
file_name: str,
overwrite: bool = False,
centroiding_window: int = 5,
keep_n_most_abundant_peaks: int = -1,
):
"""Save profile spectra from this TimsTOF object as an mgf file.
Parameters
----------
directory : str
The directory where to save the mgf file.
file_name : str
The file name of the mgf file.
overwrite : bool
If True, an existing file is truncated.
If False, nothing happens if a file already exists.
Default is False.
centroiding_window : int
The centroiding window to use.
If 0, no centroiding is performed.
Default is 5.
keep_n_most_abundant_peaks : int
Keep the n most abundant peaks.
If -1, all peaks are retained.
Default is -1.
Returns
-------
str
The full file name of the mgf file.
"""
full_file_name = os.path.join(
directory,
file_name
)
if self.acquisition_mode != "ddaPASEF":
logging.info(
f"File {self.bruker_d_folder_name} is not "
"a ddaPASEF file, nothing to do."
)
return full_file_name
if os.path.exists(full_file_name):
if not overwrite:
logging.info(
f"File {full_file_name} already exists, nothing to do."
)
return full_file_name
logging.info(f"Indexing spectra of {self.bruker_d_folder_name}...")
(
spectrum_indptr,
spectrum_tof_indices,
spectrum_intensity_values,
) = self.index_precursors(
centroiding_window=centroiding_window,
keep_n_most_abundant_peaks=keep_n_most_abundant_peaks,
)
mono_mzs = self.precursors.MonoisotopicMz.values
average_mzs = self.precursors.AverageMz.values
charges = self.precursors.Charge.values
charges[np.flatnonzero(np.isnan(charges))] = 0
charges = charges.astype(np.int64)
rtinseconds = self.rt_values[self.precursors.Parent.values]
intensities = self.precursors.Intensity.values
mobilities = self.mobility_values[
self.precursors.ScanNumber.values.astype(np.int64)
]
with open(full_file_name, "w") as infile:
logging.info(f"Exporting profile spectra to {full_file_name}...")
for index in alphatims.utils.progress_callback(
range(1, self.precursor_max_index)
):
start = spectrum_indptr[index]
end = spectrum_indptr[index + 1]
title = (
f"index: {index}, "
f"intensity: {intensities[index - 1]:.1f}, "
f"mobility: {mobilities[index - 1]:.3f}, "
f"average_mz: {average_mzs[index - 1]:.3f}"
)
infile.write("BEGIN IONS\n")
infile.write(f'TITLE="{title}"\n')
infile.write(f"PEPMASS={mono_mzs[index - 1]:.6f}\n")
infile.write(f"CHARGE={charges[index - 1]}\n")
infile.write(f"RTINSECONDS={rtinseconds[index - 1]:.2f}\n")
for mz, intensity in zip(
self.mz_values[spectrum_tof_indices[start: end]],
spectrum_intensity_values[start: end],
):
infile.write(f"{mz:.6f} {intensity}\n")
infile.write("END IONS\n")
logging.info(
f"Succesfully wrote {self.precursor_max_index - 1:,} "
f"spectra to {full_file_name}."
)
return full_file_name
[docs] def calculate_global_calibrated_mz_values(
self,
calibrant1: tuple = (922.009798, 1.1895, slice(0, 1)),
calibrant2: tuple = (1221.990637, 1.3820, slice(0, 1)),
mz_tolerance: float = 10, # in Th
mobility_tolerance: float = 0.1, # in 1/k0,
) -> None:
"""Calculate global calibrated_mz_values based on two calibrant ions.
Parameters
----------
calibrant1 : tuple
The first calibrant ion.
This is a tuple with (mz, mobility, precursor_slice) foat values.
Default is (922.009798, 1.1895, slice(0, 1)).
calibrant2 : tuple
The first calibrant ion.
This is a tuple with (mz, mobility, precursor_slice) foat values.
Default is (1221.990637, 1.3820, slice(0, 1)).
mz_tolerance : float
The tolerance window (in Th) with respect to the
uncalibrated mz_values. If this is too large,
the calibrant ion might not be the most intense ion anymore.
If this is too small, the calibrant ion might not be contained.
Default is 10.
mobility_tolerance : float
The tolerance window with respect to the
uncalibrated mobility_values. If this is too large,
the calibrant ion might not be the most intense ion anymore.
If this is too small, the calibrant ion might not be contained.
Default is 0.1.
"""
logging.info("Calculating global calibrated mz values...")
if calibrant1[0] > calibrant2[0]:
calibrant1, calibrant2 = calibrant2, calibrant1
calibrant1_lower_mz = calibrant1[0] - mz_tolerance
calibrant1_upper_mz = calibrant1[0] + mz_tolerance
calibrant1_lower_mobility = calibrant1[1] - mobility_tolerance
calibrant1_upper_mobility = calibrant1[1] + mobility_tolerance
calibrant1_tof = np.argmax(
np.bincount(
self.tof_indices[
self[
:,
calibrant1_lower_mobility: calibrant1_upper_mobility,
calibrant1[2],
calibrant1_lower_mz: calibrant1_upper_mz,
"raw"
]
]
)
)
calibrant2_lower_mz = calibrant2[0] - mz_tolerance
calibrant2_upper_mz = calibrant2[0] + mz_tolerance
calibrant2_lower_mobility = calibrant2[1] - mobility_tolerance
calibrant2_upper_mobility = calibrant2[1] + mobility_tolerance
calibrant2_tof = np.argmax(
np.bincount(
self.tof_indices[
self[
:,
calibrant2_lower_mobility: calibrant2_upper_mobility,
calibrant2[2],
calibrant2_lower_mz: calibrant2_upper_mz,
"raw"
]
]
)
)
tof_slope = (
np.sqrt(calibrant2[0]) - np.sqrt(calibrant1[0])
) / (calibrant2_tof - calibrant1_tof)
tof_intercept = np.sqrt(calibrant1[0]) - tof_slope * calibrant1_tof
self._calibrated_mz_values = (
tof_intercept + tof_slope * np.arange(self.tof_max_index)
)**2
ppms = 10**6 * (
self._mz_values - self._calibrated_mz_values
) / self._mz_values
logging.info(
"Global calibration of mz values yielded differences between "
f"{np.min(ppms):.2f} and {np.max(ppms):.2f} ppm."
)
[docs] def use_calibrated_mz_values_as_default(
self,
use_calibrated_mz_values: int
) -> None:
"""Override the default mz_values with the global calibrated_mz_values.
Calibrated_mz_values will be calculated if they do not exist yet.
Parameters
----------
use_calibrated_mz_values : int
If not 0, the mz_values are overwritten with global
calibrated_mz_values.
If 1, calibration at the MS1 level is performed.
If 2, calibration at the MS2 level is performed.
"""
if use_calibrated_mz_values != 0:
if not hasattr(self, "_calibrated_mz_values"):
if use_calibrated_mz_values == 1:
ms_level = 0
if use_calibrated_mz_values == 2:
ms_level = slice(1, None)
self.calculate_global_calibrated_mz_values(
calibrant1=(922.009798, 1.1895, ms_level),
calibrant2=(1221.990637, 1.3820, ms_level),
mz_tolerance=1
)
self._use_calibrated_mz_values_as_default = use_calibrated_mz_values
[docs]class PrecursorFloatError(TypeError):
"""Used to indicate that a precursor value is not an int but a float."""
pass
[docs]@alphatims.utils.pjit(
signature_or_function="void(i8,i8[:],i8[:],i8[:],u4[:],u2[:],u4[:],f8[:],i8[:],i8[:])"
)
def set_precursor(
precursor_index: int,
offset_order: np.ndarray,
precursor_offsets: np.ndarray,
quad_indptr: np.ndarray,
tof_indices: np.ndarray,
intensities: np.ndarray,
spectrum_tof_indices: np.ndarray,
spectrum_intensity_values: np.ndarray,
spectrum_indptr: np.ndarray,
spectrum_counts: np.ndarray,
) -> None:
"""Sum the intensities of all pushes belonging to a single precursor.
IMPORTANT NOTE: This function is decorated with alphatims.utils.pjit.
The first argument is thus expected to be provided as an iterable
containing ints instead of a single int.
Parameters
----------
precursor_index : int
The precursor index indicating which MS2 spectrum to determine.
offset_order : np.int64[:]
The order of self.precursor_indices, obtained with np.argsort.
precursor_offsets : np.int64[:]
An index pointer array for precursor offsets.
quad_indptr : np.int64[:]
The self.quad_indptr array of a TimsTOF object.
tof_indices : np.uint32[:]
The self.tof_indices array of a TimsTOF object.
intensities : np.uint16[:]
The self.intensity_values array of a TimsTOF object.
spectrum_tof_indices : np.uint32[:]
A buffer array to store tof indices of the new spectrum.
spectrum_intensity_values : np.float64[:]
A buffer array to store intensity values of the new spectrum.
spectrum_indptr : np.int64[:]
An index pointer array defining the original spectrum boundaries.
spectrum_counts : np. int64[:]
An buffer array defining how many distinct tof indices the new
spectrum has.
"""
offset = spectrum_indptr[precursor_index]
precursor_offset_lower = precursor_offsets[precursor_index]
precursor_offset_upper = precursor_offsets[precursor_index + 1]
selected_offsets = offset_order[
precursor_offset_lower: precursor_offset_upper
]
starts = quad_indptr[selected_offsets]
ends = quad_indptr[selected_offsets + 1]
offset_index = offset
for start, end in zip(starts, ends):
spectrum_tof_indices[
offset_index: offset_index + end - start
] = tof_indices[start: end]
spectrum_intensity_values[
offset_index: offset_index + end - start
] = intensities[start: end]
offset_index += end - start
offset_end = spectrum_indptr[precursor_index + 1]
order = np.argsort(spectrum_tof_indices[offset: offset_end])
current_index = offset - 1
previous_tof_index = -1
for tof_index, intensity in zip(
spectrum_tof_indices[offset: offset_end][order],
spectrum_intensity_values[offset: offset_end][order],
):
if tof_index != previous_tof_index:
current_index += 1
spectrum_tof_indices[current_index] = tof_index
spectrum_intensity_values[current_index] = intensity
previous_tof_index = tof_index
else:
spectrum_intensity_values[current_index] += intensity
spectrum_tof_indices[current_index + 1: offset_end] = 0
spectrum_intensity_values[current_index + 1: offset_end] = 0
spectrum_counts[precursor_index] = current_index + 1 - offset
[docs]@alphatims.utils.pjit
def centroid_spectra(
index: int,
spectrum_indptr: np.ndarray,
spectrum_counts: np.ndarray,
spectrum_tof_indices: np.ndarray,
spectrum_intensity_values: np.ndarray,
window_size: int,
):
"""Smoothen and centroid a profile spectrum (inplace operation).
IMPORTANT NOTE: This function will overwrite all input arrays.
IMPORTANT NOTE: This function is decorated with alphatims.utils.pjit.
The first argument is thus expected to be provided as an iterable
containing ints instead of a single int.
Parameters
----------
index : int
The push index whose intensity_values and tof_indices will be
centroided.
spectrum_indptr : np.int64[:]
An index pointer array defining the (untrimmed) spectrum boundaries.
spectrum_counts : np. int64[:]
The original array defining how many distinct tof indices each
spectrum has.
spectrum_tof_indices : np.uint32[:]
The original array containing tof indices.
spectrum_intensity_values : np.float64[:]
The original array containing intensity values.
window_size : int
The window size to use for smoothing and centroiding peaks.
"""
start = spectrum_indptr[index]
end = start + spectrum_counts[index]
if start == end:
return
mzs = spectrum_tof_indices[start: end]
ints = spectrum_intensity_values[start: end]
smooth_ints = ints.copy()
for i, self_mz in enumerate(mzs[:-1]):
for j in range(i + 1, len(mzs)):
other_mz = mzs[j]
diff = other_mz - self_mz + 1
if diff >= window_size:
break
smooth_ints[i] += ints[j] / diff
smooth_ints[j] += ints[i] / diff
pre_apex = True
maxima = [mzs[0]]
intensities = [ints[0]]
for i, self_mz in enumerate(mzs[1:], 1):
if self_mz > mzs[i - 1] + window_size:
maxima.append(mzs[i])
intensities.append(0)
pre_apex = True
elif pre_apex:
if smooth_ints[i] < smooth_ints[i - 1]:
pre_apex = False
maxima[-1] = mzs[i - 1]
elif smooth_ints[i] > smooth_ints[i - 1]:
maxima.append(mzs[i])
intensities.append(0)
pre_apex = True
intensities[-1] += ints[i]
spectrum_tof_indices[start: start + len(maxima)] = np.array(
maxima,
dtype=spectrum_tof_indices.dtype
)
spectrum_intensity_values[start: start + len(maxima)] = np.array(
intensities,
dtype=spectrum_intensity_values.dtype
)
spectrum_counts[index] = len(maxima)
[docs]@alphatims.utils.pjit
def filter_spectra_by_abundant_peaks(
index: int,
spectrum_indptr: np.ndarray,
spectrum_counts: np.ndarray,
spectrum_tof_indices: np.ndarray,
spectrum_intensity_values: np.ndarray,
keep_n_most_abundant_peaks: int,
):
"""Filter a spectrum to retain only the most abundant peaks.
IMPORTANT NOTE: This function will overwrite all input arrays.
IMPORTANT NOTE: This function is decorated with alphatims.utils.pjit.
The first argument is thus expected to be provided as an iterable
containing ints instead of a single int.
Parameters
----------
index : int
The push index whose intensity_values and tof_indices will be
centroided.
spectrum_indptr : np.int64[:]
An index pointer array defining the (untrimmed) spectrum boundaries.
spectrum_counts : np. int64[:]
The original array defining how many distinct tof indices each
spectrum has.
spectrum_tof_indices : np.uint32[:]
The original array containing tof indices.
spectrum_intensity_values : np.float64[:]
The original array containing intensity values.
keep_n_most_abundant_peaks : int
Keep only this many abundant peaks.
"""
start = spectrum_indptr[index]
end = start + spectrum_counts[index]
if end - start <= keep_n_most_abundant_peaks:
return
mzs = spectrum_tof_indices[start: end]
ints = spectrum_intensity_values[start: end]
selected_indices = np.sort(
np.argsort(ints)[-keep_n_most_abundant_peaks:]
)
count = len(selected_indices)
spectrum_tof_indices[start: start + count] = mzs[selected_indices]
spectrum_intensity_values[start: start + count] = ints[selected_indices]
spectrum_counts[index] = count
[docs]@alphatims.utils.pjit
def trim_spectra(
index: int,
spectrum_tof_indices: np.ndarray,
spectrum_intensity_values: np.ndarray,
spectrum_indptr: np.ndarray,
trimmed_spectrum_tof_indices: np.ndarray,
trimmed_spectrum_intensity_values: np.ndarray,
new_spectrum_indptr: np.ndarray,
) -> None:
"""Trim remaining bytes after merging of multiple pushes.
IMPORTANT NOTE: This function is decorated with alphatims.utils.pjit.
The first argument is thus expected to be provided as an iterable
containing ints instead of a single int.
Parameters
----------
index : int
The push index whose intensity_values and tof_indices will be trimmed.
spectrum_tof_indices : np.uint32[:]
The original array containing tof indices.
spectrum_intensity_values : np.float64[:]
The original array containing intensity values.
spectrum_indptr : np.int64[:]
An index pointer array defining the original spectrum boundaries.
trimmed_spectrum_tof_indices : np.uint32[:]
A buffer array to store new tof indices.
trimmed_spectrum_intensity_values : np.float64[:]
A buffer array to store new intensity values.
new_spectrum_indptr : np.int64[:]
An index pointer array defining the trimmed spectrum boundaries.
"""
start = spectrum_indptr[index]
new_start = new_spectrum_indptr[index]
new_end = new_spectrum_indptr[index + 1]
trimmed_spectrum_tof_indices[new_start: new_end] = spectrum_tof_indices[
start: start + new_end - new_start
]
trimmed_spectrum_intensity_values[
new_start: new_end
] = spectrum_intensity_values[
start: start + new_end - new_start
]
[docs]def parse_keys(data: TimsTOF, keys) -> dict:
"""Convert different keys to a key dict with defined types.
NOTE: Negative slicing is not supported and all indiviudal keys
are assumed to be sorted, disjunct and strictly increasing
Parameters
----------
data : alphatims.bruker.TimsTOF
The TimsTOF objext for which to get slices.
keys : tuple
A tuple of at most 5 elemens, containing
slices, ints, floats, Nones, and/or iterables.
See `alphatims.bruker.convert_slice_key_to_int_array` and
`alphatims.bruker.convert_slice_key_to_float_array` for more details.
Returns
-------
: dict
The resulting dict always has the following items:
- "frame_indices": np.int64[:, 3]
- "scan_indices": np.int64[:, 3]
- "tof_indices": np.int64[:, 3]
- "precursor_indices": np.int64[:, 3]
- "quad_values": np.float64[:, 2]
- "intensity_values": np.float64[:, 2]
"""
dimensions = [
"frame_indices",
"scan_indices",
"precursor_indices",
"tof_indices",
]
dimension_slices = {}
if len(keys) > (len(dimensions) + 1):
raise KeyError(
"LC-IMS-MSMS data can be sliced in maximum 5 dimensions. "
"Integers are assumed to be indices, while "
"floats are assumed as values. Intensity is always casted "
"to integer values, regardless of input type."
)
if isinstance(keys[0], dict):
new_keys = []
dimension_translations = {
"frame_indices": "rt_values",
"scan_indices": "mobility_values",
"precursor_indices": "quad_mz_values",
"tof_indices": "mz_values",
}
for indices, values in dimension_translations.items():
if indices in keys[0]:
new_keys.append(keys[0][indices])
elif values in keys[0]:
new_keys.append(keys[0][values])
else:
new_keys.append(slice(None))
if "intensity_values" in keys[0]:
new_keys.append(keys[0]["intensity_values"])
keys = new_keys
for i, dimension in enumerate(dimensions):
try:
dimension_slices[
dimension
] = convert_slice_key_to_int_array(
data,
keys[i] if (i < len(keys)) else slice(None),
dimension
)
except PrecursorFloatError:
dimension_slices[
"precursor_indices"
] = convert_slice_key_to_int_array(
data,
slice(None),
"precursor_indices"
)
dimension_slices[
"quad_values"
] = convert_slice_key_to_float_array(keys[i])
dimension_slices[
"intensity_values"
] = convert_slice_key_to_float_array(
keys[-1] if (len(keys) > len(dimensions)) else slice(None)
)
if "quad_values" not in dimension_slices:
dimension_slices["quad_values"] = np.array(
[[-np.inf, np.inf]],
dtype=np.float64
)
return dimension_slices
[docs]def convert_slice_key_to_float_array(key):
"""Convert a key to a slice float array.
NOTE: This function should only be used for QUAD or DETECTOR dimensions.
Parameters
----------
key : slice, int, float, None, iterable
The key that needs to be converted.
Returns
-------
: np.float64[:, 2]
Each row represent a a (start, stop) slice.
Raises
------
ValueError
When the key is an np.ndarray with more than 2 columns.
"""
try:
iter(key)
except TypeError:
if key is None:
key = slice(None)
if isinstance(key, slice):
start = key.start
if start is None:
start = -np.inf
stop = key.stop
if stop is None:
stop = np.inf
else:
start = key
stop = key
return np.array([[start, stop]], dtype=np.float64)
else:
if not isinstance(key, np.ndarray):
key = np.array(key, dtype=np.float64)
key = key.astype(np.float64)
if len(key.shape) == 1:
return np.array([key, key]).T
elif len(key.shape) == 2:
if key.shape[1] != 2:
raise ValueError
return key
else:
raise ValueError
[docs]def convert_slice_key_to_int_array(data: TimsTOF, key, dimension: str):
"""Convert a key of a data dimension to a slice integer array.
Parameters
----------
data : alphatims.bruker.TimsTOF
The TimsTOF objext for which to get slices.
key : slice, int, float, None, iterable
The key that needs to be converted.
dimension : str
The dimension for which the key needs to be retrieved
Returns
-------
: np.int64[:, 3]
Each row represent a a (start, stop, step) slice.
Raises
------
ValueError
When the key contains elements other than int or float.
PrecursorFloatError
When trying to convert a quad float to precursor index.
"""
result = np.empty((0, 3), dtype=np.int64)
inverse_of_scans = False
try:
iter(key)
except TypeError:
if key is None:
key = slice(None)
if isinstance(key, slice):
if dimension == "scan_indices":
if isinstance(key.start, (np.inexact, float)) or isinstance(
key.stop,
(np.inexact, float)
):
key = slice(key.stop, key.start, key.step)
start = key.start
if not isinstance(start, (np.integer, int)):
if start is None:
if dimension == "scan_indices":
start = np.inf
else:
start = -np.inf
if not isinstance(start, (np.inexact, float)):
raise ValueError
start = data.convert_to_indices(
start,
return_type=dimension
)
stop = key.stop
if not isinstance(stop, (np.integer, int)):
if stop is None:
if dimension == "scan_indices":
stop = -np.inf
else:
stop = np.inf
if not isinstance(stop, (np.inexact, float)):
raise ValueError
stop = data.convert_to_indices(
stop,
return_type=dimension,
)
step = key.step
if not isinstance(step, (np.integer, int)):
if step is not None:
raise ValueError
step = 1
result = np.array([[start, stop, step]])
elif isinstance(key, (np.integer, int)):
result = np.array([[key, key + 1, 1]])
elif isinstance(key, (np.inexact, float)):
key = data.convert_to_indices(key, return_type=dimension)
if dimension == "scan_indices":
result = np.array([[key - 1, key, 1]])
else:
result = np.array([[key, key + 1, 1]])
else:
raise ValueError
else:
if not isinstance(key, np.ndarray):
key = np.array(key)
step = 1
if not isinstance(key.ravel()[0], np.integer):
key = data.convert_to_indices(key, return_type=dimension)
if dimension == "scan_indices":
key -= 1
if len(key.shape) == 1:
result = np.array([key, key + 1, np.repeat(step, key.size)]).T
elif len(key.shape) == 2:
if key.shape[1] != 3:
raise ValueError
result = key
else:
raise ValueError
if inverse_of_scans:
return result[:, [1, 0, 2]]
else:
return result
[docs]@alphatims.utils.njit
def calculate_dia_cycle_mask(
dia_mz_cycle: np.ndarray,
quad_slices: np.ndarray,
dia_precursor_cycle: np.ndarray = None,
precursor_slices: np.ndarray = None,
):
"""Calculate a boolean mask for cyclic push indices satisfying queries.
Parameters
----------
dia_mz_cycle : np.float64[:, 2]
An array with (upper, lower) mz values of a DIA cycle (per push).
quad_slices : np.float64[:, 2]
Each row of the array is assumed to be (lower_mz, upper_mz) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(quad_slices.ravel()) >= 0) = True).
dia_precursor_cycle : np.int64[:]
An array with precursor indices of a DIA cycle (per push).
precursor_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(precursor_slices[:, :2].ravel()) >= 0) = True).
Returns
-------
: np.bool_[:]
A mask that determines if a cyclic push index is valid given the
requested slices.
"""
mz_mask = np.zeros(len(dia_mz_cycle), dtype=np.bool_)
for i, (mz_start, mz_stop) in enumerate(dia_mz_cycle):
for quad_mz_start, quad_mz_stop in quad_slices:
if (quad_mz_start <= mz_stop) and (quad_mz_stop >= mz_start):
mz_mask[i] = True
break
if precursor_slices is not None:
precursor_mask = np.zeros(len(dia_mz_cycle), dtype=np.bool_)
for i, precursor_index in enumerate(dia_precursor_cycle):
for (start, stop, step) in precursor_slices:
if precursor_index in range(start, stop, step):
precursor_mask[i] = True
break
return mz_mask & precursor_mask
else:
return mz_mask
[docs]@alphatims.utils.njit
def valid_quad_mz_values(
low_mz_value: float,
high_mz_value: float,
quad_slices: np.ndarray,
) -> bool:
"""Check if the low and high quad mz values are included in the slices.
NOTE: Just a part of the quad range needs to overlap with a part
of a single slice.
Parameters
----------
low_mz_value : float
The lower mz value of the current quad selection.
high_mz_value : float
The upper mz value of the current quad selection.
quad_slices : np.float64[:, 2]
Each row of the array is assumed to be (lower_mz, upper_mz) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(quad_slices.ravel()) >= 0) = True).
Returns
-------
: bool
True if some part of the quad overlaps with some part of some slice.
False if there is no overlap in the range.
"""
slice_index = np.searchsorted(
quad_slices[:, 0].ravel(),
high_mz_value,
"right"
)
if slice_index == 0:
return False
if low_mz_value <= quad_slices[slice_index - 1, 1]:
return True
return False
[docs]@alphatims.utils.njit
def valid_precursor_index(
precursor_index: int,
precursor_slices: np.ndarray
) -> bool:
"""Check if a precursor index is included in the slices.
Parameters
----------
precursor_index : int
The precursor index to validate.
precursor_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(precursor_slices[:, :2].ravel()) >= 0) = True).
Returns
-------
: bool
True if the precursor index is present in any of the slices.
False otherwise.
"""
slice_index = np.searchsorted(
precursor_slices[:, 0].ravel(),
precursor_index,
side="right"
)
if slice_index == 0:
return False
return precursor_index in range(
precursor_slices[slice_index - 1, 0],
precursor_slices[slice_index - 1, 1],
precursor_slices[slice_index - 1, 2],
)
[docs]@alphatims.utils.njit
def filter_indices(
frame_slices: np.ndarray,
scan_slices: np.ndarray,
precursor_slices: np.ndarray,
tof_slices: np.ndarray,
quad_slices: np.ndarray,
intensity_slices: np.ndarray,
frame_max_index: int,
scan_max_index: int,
push_indptr: np.ndarray,
precursor_indices: np.ndarray,
quad_mz_values: np.ndarray,
quad_indptr: np.ndarray,
tof_indices: np.ndarray,
intensities: np.ndarray,
):
"""Filter raw indices by slices from all dimensions.
Parameters
----------
frame_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(frame_slices[:, :2].ravel()) >= 0) = True).
scan_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(scan_slices[:, :2].ravel()) >= 0) = True).
precursor_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(precursor_slices[:, :2].ravel()) >= 0) = True).
tof_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(tof_slices[:, :2].ravel()) >= 0) = True).
quad_slices : np.float64[:, 2]
Each row of the array is assumed to be (lower_mz, upper_mz) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(quad_slices.ravel()) >= 0) = True).
intensity_slices : np.float64[:, 2]
Each row of the array is assumed to be (lower_mz, upper_mz) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(intensity_slices.ravel()) >= 0) = True).
frame_max_index : int
The maximum frame index of a TimsTOF object.
scan_max_index : int
The maximum scan index of a TimsTOF object.
push_indptr : np.int64[:]
The self.push_indptr array of a TimsTOF object.
precursor_indices : np.int64[:]
The self.precursor_indices array of a TimsTOF object.
quad_mz_values : np.float64[:, 2]
The self.quad_mz_values array of a TimsTOF object.
quad_indptr : np.int64[:]
The self.quad_indptr array of a TimsTOF object.
tof_indices : np.uint32[:]
The self.tof_indices array of a TimsTOF object.
intensities : np.uint16[:]
The self.intensity_values array of a TimsTOF object.
Returns
-------
: np.int64[:]
The raw indices that satisfy all the slices.
"""
result = []
quad_index = -1
new_quad_index = -1
quad_end = -1
is_valid_quad_index = True
starts = push_indptr[:-1].reshape(
frame_max_index,
scan_max_index
)
ends = push_indptr[1:].reshape(
frame_max_index,
scan_max_index
)
for frame_start, frame_stop, frame_step in frame_slices:
for frame_start_slice, frame_end_slice in zip(
starts[slice(frame_start, frame_stop, frame_step)],
ends[slice(frame_start, frame_stop, frame_step)]
):
for scan_start, scan_stop, scan_step in scan_slices:
for sparse_start, sparse_end in zip(
frame_start_slice[slice(scan_start, scan_stop, scan_step)],
frame_end_slice[slice(scan_start, scan_stop, scan_step)]
):
if (sparse_start == sparse_end):
continue
while quad_end < sparse_end:
new_quad_index += 1
quad_end = quad_indptr[new_quad_index + 1]
if quad_index != new_quad_index:
quad_index = new_quad_index
if not valid_quad_mz_values(
quad_mz_values[quad_index, 0],
quad_mz_values[quad_index, 1],
quad_slices
):
is_valid_quad_index = False
elif not valid_precursor_index(
precursor_indices[quad_index],
precursor_slices,
):
is_valid_quad_index = False
else:
is_valid_quad_index = True
if not is_valid_quad_index:
continue
idx = sparse_start
for tof_start, tof_stop, tof_step in tof_slices:
idx += np.searchsorted(
tof_indices[idx: sparse_end],
tof_start
)
tof_value = tof_indices[idx]
while (tof_value < tof_stop) and (idx < sparse_end):
if tof_value in range(
tof_start,
tof_stop,
tof_step
):
intensity = intensities[idx]
for (
low_intensity,
high_intensity
) in intensity_slices:
if (low_intensity <= intensity):
if (intensity <= high_intensity):
result.append(idx)
break
idx += 1
tof_value = tof_indices[idx]
return np.array(result)
# Overhead of using more than 1 threads is actually slower
[docs]@alphatims.utils.pjit(thread_count=1, include_progress_callback=False)
def add_intensity_to_bin(
query_index: int,
intensities: np.ndarray,
parsed_indices: np.ndarray,
intensity_bins: np.ndarray,
) -> None:
"""Add the intensity of a query to the appropriate bin.
IMPORTANT NOTE: This function is decorated with alphatims.utils.pjit.
The first argument is thus expected to be provided as an iterable
containing ints instead of a single int.
Parameters
----------
query_index : int
The query whose intensity needs to be binned
The first argument is thus expected to be provided as an iterable
containing ints instead of a single int.
intensities : np.float64[:]
An array with intensities that need to be binned.
parsed_indices : np.int64[:], np.int64[:, :]
Description of parameter `parsed_indices`.
intensity_bins : np.float64[:]
A buffer with intensity bins to which the current query will be added.
"""
intensity = intensities[query_index]
if len(parsed_indices) == 1:
intensity_bins[parsed_indices[0][query_index]] += intensity
elif len(parsed_indices) == 2:
intensity_bins[
parsed_indices[0][query_index],
parsed_indices[1][query_index]
] += intensity
[docs]@alphatims.utils.njit(nogil=True)
def indptr_lookup(
targets: np.ndarray,
queries: np.ndarray,
momentum_amplifier: int = 2
):
"""Find the indices of queries in targets.
This function is equivalent to
"np.searchsorted(targets, queries, "right") - 1".
By utilizing the fact that queries are also sorted,
it is significantly faster though.
Parameters
----------
targets : np.int64[:]
A sorted list of index pointers where queries needs to be looked up.
queries : np.int64[:]
A sorted list of queries whose index pointers needs to be looked up.
momentum_amplifier : int
Factor to add momentum to linear searching, attempting to quickly
discard empty range without hits.
Invreasing it can speed up searching of queries if they are sparsely
spread out in targets.
Returns
-------
: np.int64[:]
The indices of queries in targets.
"""
hits = np.empty_like(queries)
target_index = 0
no_target_overflow = True
for i, query_index in enumerate(queries):
while no_target_overflow:
momentum = 1
while targets[target_index] <= query_index:
target_index += momentum
if target_index >= len(targets):
break
momentum *= momentum_amplifier
else:
if momentum <= momentum_amplifier:
break
else:
target_index -= momentum // momentum_amplifier - 1
continue
if momentum == 1:
no_target_overflow = False
else:
target_index -= momentum
hits[i] = target_index - 1
return hits
[docs]@alphatims.utils.njit(nogil=True)
def get_dia_push_indices(
frame_slices: np.ndarray,
scan_slices: np.ndarray,
quad_slices: np.ndarray,
scan_max_index: int,
dia_mz_cycle: np.ndarray,
dia_precursor_cycle: np.ndarray = None,
precursor_slices: np.ndarray = None,
zeroth_frame: bool = True,
):
"""Filter DIA push indices by slices from LC, TIMS and QUAD.
Parameters
----------
frame_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(frame_slices[:, :2].ravel()) >= 0) = True).
scan_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(scan_slices[:, :2].ravel()) >= 0) = True).
quad_slices : np.float64[:, 2]
Each row of the array is assumed to be (lower_mz, upper_mz) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(quad_slices.ravel()) >= 0) = True).
scan_max_index : int
The maximum scan index of a TimsTOF object.
dia_mz_cycle : np.float64[:, 2]
An array with (upper, lower) mz values of a DIA cycle (per push).
dia_precursor_cycle : np.int64[:]
An array with precursor indices of a DIA cycle (per push).
precursor_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(precursor_slices[:, :2].ravel()) >= 0) = True).
zeroth_frame : bool
Indicates if a zeroth frame was used before a DIA cycle.
Returns
-------
: np.int64[:]
The raw push indices that satisfy all the slices.
"""
result = []
quad_mask = calculate_dia_cycle_mask(
dia_mz_cycle=dia_mz_cycle,
quad_slices=quad_slices,
dia_precursor_cycle=dia_precursor_cycle,
precursor_slices=precursor_slices
)
for frame_start, frame_stop, frame_step in frame_slices:
for frame_index in range(frame_start, frame_stop, frame_step):
for scan_start, scan_stop, scan_step in scan_slices:
for scan_index in range(scan_start, scan_stop, scan_step):
push_index = frame_index * scan_max_index + scan_index
if zeroth_frame:
cyclic_push_index = push_index - scan_max_index
else:
cyclic_push_index = push_index
if quad_mask[cyclic_push_index % len(dia_mz_cycle)]:
result.append(push_index)
return np.array(result)
[docs]@alphatims.utils.njit(nogil=True)
def filter_tof_to_csr(
tof_slices: np.ndarray,
push_indices: np.ndarray,
tof_indices: np.ndarray,
push_indptr: np.ndarray,
) -> tuple:
"""Get a CSR-matrix with raw indices satisfying push indices and tof slices.
Parameters
----------
tof_slices : np.int64[:, 3]
Each row of the array is assumed to be a (start, stop, step) tuple.
This array is assumed to be sorted, disjunct and strictly increasing
(i.e. np.all(np.diff(tof_slices[:, :2].ravel()) >= 0) = True).
push_indices : np.int64[:]
The push indices from where to retrieve the TOF slices.
tof_indices : np.uint32[:]
The self.tof_indices array of a TimsTOF object.
push_indptr : np.int64[:]
The self.push_indptr array of a TimsTOF object.
Returns
-------
(np.int64[:], np.int64[:], np.int64[:],)
An (indptr, values, columns) tuple, where indptr are push indices,
values raw indices, and columns the tof_slices.
"""
indptr = [0]
values = []
columns = []
for push_index in push_indices:
start = push_indptr[push_index]
end = push_indptr[push_index + 1]
idx = start
for i, (tof_start, tof_stop, tof_step) in enumerate(tof_slices):
idx += np.searchsorted(tof_indices[idx: end], tof_start)
tof_value = tof_indices[idx]
while (tof_value < tof_stop) and (idx < end):
if tof_value in range(tof_start, tof_stop, tof_step):
values.append(idx)
columns.append(i)
break # TODO what if multiple hits?
idx += 1
tof_value = tof_indices[idx]
indptr.append(len(values))
return np.array(indptr), np.array(values), np.array(columns)