Source code for alphatims.utils

#!python
"""This module provides generic utilities.
These utilities primarily focus on:

    - logging
    - compilation
    - parallelization
    - generic io
"""

# builtin
import logging
import os
import sys
import json
import contextlib
import multiprocessing
# local
import alphatims
# external
import numpy as np


BASE_PATH = os.path.dirname(__file__)
EXT_PATH = os.path.join(BASE_PATH, "ext")
IMG_PATH = os.path.join(BASE_PATH, "img")
LIB_PATH = os.path.join(BASE_PATH, "lib")
LOG_PATH = os.path.join(BASE_PATH, "logs")
DOC_PATH = os.path.join(BASE_PATH, "docs")
MAX_THREADS = multiprocessing.cpu_count()
LATEST_GITHUB_INIT_FILE = (
    "https://raw.githubusercontent.com/MannLabs/alphatims/"
    "master/alphatims/__init__.py"
)
PROGRESS_CALLBACK = True
DEMO_SAMPLE = "20201207_tims03_Evo03_PS_SA_HeLa_200ng_EvoSep_prot_DDA_21min_8cm_S1-C10_1_22476.d"
DEMO_FILE_NAME = os.path.join(
    BASE_PATH,
    "sandbox_data",
    f"{DEMO_SAMPLE}"
)
DEMO_FILE_NAME_GITHUB = (
    "https://github.com/MannLabs/alphatims/releases/"
    f"download/0.1.210317/{DEMO_SAMPLE}.zip"
)

[docs]def set_logger( *, log_file_name="", stream: bool = True, log_level: int = logging.INFO, overwrite: bool = False, ) -> str: """Set the log stream and file. All previously set handlers will be disabled with this command. Parameters ---------- log_file_name : str, None The file name to where the log is written. Folders are automatically created if needed. This is relative to the current path. When an empty string is provided, a log is written to the AlphaTims "logs" folder with the name "log_yymmddhhmmss" (reversed timestamp year to seconds). If None, no log file is saved. Default is "". stream : bool If False, no log data is sent to stream. If True, all logging can be tracked with stdout stream. Default is True. log_level : int The logging level. Usable values are defined in Python's "logging" module. Default is logging.INFO. overwrite : bool If True, overwrite the log_file if one exists. If False, append to this log file. Default is False. Returns ------- : str The file name to where the log is written. """ import time global PROGRESS_CALLBACK root = logging.getLogger() formatter = logging.Formatter( '%(asctime)s> %(message)s', "%Y-%m-%d %H:%M:%S" ) root.setLevel(log_level) while root.hasHandlers(): root.removeHandler(root.handlers[0]) if stream: stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setLevel(log_level) stream_handler.setFormatter(formatter) root.addHandler(stream_handler) if log_file_name is not None: if log_file_name == "": if not os.path.exists(LOG_PATH): os.makedirs(LOG_PATH) log_file_name = LOG_PATH log_file_name = os.path.abspath(log_file_name) if os.path.isdir(log_file_name): current_time = time.localtime() current_time = "".join( [ f'{current_time.tm_year:04}', f'{current_time.tm_mon:02}', f'{current_time.tm_mday:02}', f'{current_time.tm_hour:02}', f'{current_time.tm_min:02}', f'{current_time.tm_sec:02}', ] ) log_file_name = os.path.join( log_file_name, f"log_{current_time}.txt" ) directory = os.path.dirname(log_file_name) if not os.path.exists(directory): os.makedirs(directory) if overwrite: file_handler = logging.FileHandler(log_file_name, mode="w") else: file_handler = logging.FileHandler(log_file_name, mode="a") file_handler.setLevel(log_level) file_handler.setFormatter(formatter) root.addHandler(file_handler) return log_file_name
[docs]def show_platform_info() -> None: """Log all platform information. This is done in the following format: - [timestamp]> Platform information: - [timestamp]> system - [...] - [timestamp]> release - [...] - [timestamp]> version - [...] - [timestamp]> machine - [...] - [timestamp]> processor - [...] - [timestamp]> cpu count - [...] - [timestamp]> cpu frequency - [...] - [timestamp]> ram - [...]/[...] Gb (available/total) """ import platform import psutil logging.info("Platform information:") logging.info(f"system - {platform.system()}") logging.info(f"release - {platform.release()}") if platform.system() == "Darwin": logging.info(f"version - {platform.mac_ver()[0]}") else: logging.info(f"version - {platform.version()}") logging.info(f"machine - {platform.machine()}") logging.info(f"processor - {platform.processor()}") logging.info( f"cpu count - {psutil.cpu_count()}" # f" ({100 - psutil.cpu_percent()}% unused)" ) logging.info(f"cpu frequency - {psutil.cpu_freq().current:.2f} Mhz") logging.info( f"ram - " f"{psutil.virtual_memory().available/1024**3:.1f}/" f"{psutil.virtual_memory().total/1024**3:.1f} Gb " f"(available/total)" ) logging.info("")
[docs]def show_python_info() -> None: """Log all Python information. This is done in the following format: - [timestamp]> Python information: - [timestamp]> alphatims - [current_version] - [timestamp]> [required package] - [current_version] - ... - [timestamp]> [required package] - [current_version] """ import importlib.metadata import platform module_versions = { "python": platform.python_version(), "alphatims": alphatims.__version__ } requirements = importlib.metadata.requires("alphatims") for requirement in requirements: parts = requirement.split(";") if len(parts) > 1: if "development" in parts[1]: continue if "win32" in parts[1]: continue module_name = parts[0].split("=")[0].split()[0] try: module_version = importlib.metadata.version(module_name) except importlib.metadata.PackageNotFoundError: module_version = "" module_versions[module_name] = module_version max_len = max(len(key) for key in module_versions) logging.info("Python information:") for key, value in sorted(module_versions.items()): logging.info(f"{key:<{max_len}} - {value}") logging.info("")
[docs]def check_github_version(silent=False) -> str: """Checks and logs the current version of AlphaTims. Check if the local version equals the AlphaTims GitHub master branch. This is only possible with an active internet connection and if no credentials are required for GitHub. Parameters ---------- silent : str Use the logger to display the obtained conclusion. Default is False. Returns ------- : str The version on the AlphaTims GitHub master branch. "" if no version can be found on GitHub """ import urllib.request import urllib.error try: with urllib.request.urlopen(LATEST_GITHUB_INIT_FILE) as version_file: for line in version_file.read().decode('utf-8').split("\n"): if line.startswith("__version__"): github_version = line.split()[2][1:-1] if not silent: if github_version != alphatims.__version__: logging.info( f"You are currently using AlphaTims version " f"{alphatims.__version__}. " f"However, the latest version of AlphaTims on " f"GitHub is {github_version}. Checkout " "https://github.com/MannLabs/alphatims.git " "for instructions on how to update AlphaTims" "..." ) logging.info("") else: logging.info( "Current AlphaTims version is up-to-date " "with GitHub." ) logging.info("") return github_version except IndexError: logging.info( "Could not check GitHub for the latest AlphaTims release." ) logging.info("") return "" except urllib.error.URLError: logging.info( "Could not check GitHub for the latest AlphaTims release." ) logging.info("") return ""
[docs]def save_parameters(parameter_file_name: str, paramaters: dict) -> None: """Save parameters to a parameter file. IMPORTANT NOTE: This overwrites any existing file. Parameters ---------- parameter_file_name : str The file name to where the parameters are written. paramaters : dict A dictionary with parameters. """ logging.info(f"Saving parameters to {parameter_file_name}") with open(parameter_file_name, "w") as outfile: json.dump(paramaters, outfile, indent=4, sort_keys=True)
[docs]def load_parameters(parameter_file_name: str) -> dict: """Load a parameter dict from a file. Parameters ---------- parameter_file_name : str A file name that contains parameters in .json format. Returns ------- : dict A dict with parameters. """ with open(parameter_file_name, "r") as infile: return json.load(infile)
[docs]def set_threads(threads: int, set_global: bool = True) -> int: """Parse and set the (global) number of threads. Parameters ---------- threads : int The number of threads. If larger than available cores, it is trimmed to the available maximum. If 0, it is set to the maximum cores available. If negative, it indicates how many cores NOT to use. set_global : bool If False, the number of threads is only parsed to a valid value. If True, the number of threads is saved as a global variable. Default is True. Returns ------- : int The number of threads. """ max_cpu_count = multiprocessing.cpu_count() if threads > max_cpu_count: threads = max_cpu_count else: while threads <= 0: threads += max_cpu_count if set_global: global MAX_THREADS MAX_THREADS = threads return threads
[docs]def threadpool( _func=None, *, thread_count=None, include_progress_callback: bool = True, return_results: bool = False, ) -> None: """A decorator that parallelizes a function with threads and callback. The original function should accept a single element as its first argument. If the caller function provides an iterable as first argument, the function is applied to each element of this iterable in parallel. Parameters ---------- _func : callable, None The function to decorate. thread_count : int, None The number of threads to use. This is always parsed with alphatims.utils.set_threads. Not possible as positional arguments, it always needs to be an explicit keyword argument. Default is None. include_progress_callback : bool If True, the default progress callback will be used as callback. (See "progress_callback" function.) If False, no callback is added. See `set_progress_callback` for callback styles. Default is True. return_results : bool If True, it returns the results in the same order as the iterable. This can be much slower than not returning results. Iti is better to store them in a buffer results array instead (be carefull to avoid race conditions). If the iterable is not an iterable but a single index, a result is always returned. Default is False. Returns ------- : function A parallelized decorated function. """ import multiprocessing.pool import functools def parallel_func_inner(func): def wrapper(iterable, *args, **kwargs): def starfunc(iterable): return func(iterable, *args, **kwargs) try: iter(iterable) except TypeError: return func(iterable, *args, **kwargs) if thread_count is None: current_thread_count = MAX_THREADS else: current_thread_count = set_threads( thread_count, set_global=False ) with multiprocessing.pool.ThreadPool(current_thread_count) as pool: if return_results: results = [] for result in progress_callback( pool.imap(starfunc, iterable), total=len(iterable), include_progress_callback=include_progress_callback ): results.append(result) return results else: for result in progress_callback( pool.imap_unordered(starfunc, iterable), total=len(iterable), include_progress_callback=include_progress_callback ): pass return functools.wraps(func)(wrapper) if _func is None: return parallel_func_inner else: return parallel_func_inner(_func)
[docs]def njit(_func=None, *args, **kwargs): """A wrapper for the numba.njit decorator. The "cache" option is set to True by default. This can be overriden with kwargs. Parameters ---------- _func : callable, None The function to decorate. *args See numba.njit decorator. **kwargs See numba.njit decorator. Returns ------- : function A numba.njit decorated function. """ import numba if "cache" in kwargs: cache = kwargs.pop("cache") else: cache = True return numba.njit(_func, *args, cache=cache, **kwargs)
[docs]def pjit( _func=None, *, thread_count=None, include_progress_callback: bool = True, cache: bool = True, **kwargs ): """A decorator that parallelizes the numba.njit decorator with threads. The first argument of the decorated function need to be an iterable. A range-object will be most performant as iterable. The original function should accept a single element of this iterable as its first argument. The original function cannot return values, instead it should store results in e.g. one if its input arrays that acts as a buffer array. The original function needs to be numba.njit compatible. Numba argument "nogil" is always set to True. Parameters ---------- _func : callable, None The function to decorate. thread_count : int, None The number of threads to use. This is always parsed with alphatims.utils.set_threads. Not possible as positional arguments, it always needs to be an explicit keyword argument. Default is None. include_progress_callback : bool If True, the default progress callback will be used as callback. (See "progress_callback" function.) If False, no callback is added. See `set_progress_callback` for callback styles. Default is True. cache : bool See numba.njit decorator. Default is True (in contrast to numba). Returns ------- : function A parallelized numba.njit decorated function. """ import functools import threading import numba import numpy as np def parallel_compiled_func_inner(func): if "cache" in kwargs: cache = kwargs.pop("cache") else: cache = True numba_func = numba.njit(nogil=True, cache=cache, **kwargs)(func) @numba.njit(nogil=True, cache=cache) def numba_func_parallel( iterable, thread_id, progress_counter, start, stop, step, *args, ): if len(iterable) == 0: for i in range(start, stop, step): numba_func(i, *args) progress_counter[thread_id] += 1 else: for i in iterable: numba_func(i, *args) progress_counter[thread_id] += 1 def wrapper(iterable, *args): if thread_count is None: current_thread_count = MAX_THREADS else: current_thread_count = set_threads( thread_count, set_global=False ) threads = [] progress_counter = np.zeros(current_thread_count, dtype=np.int64) for thread_id in range(current_thread_count): local_iterable = iterable[thread_id::current_thread_count] if isinstance(local_iterable, range): start = local_iterable.start stop = local_iterable.stop step = local_iterable.step local_iterable = np.array([], dtype=np.int64) else: start = -1 stop = -1 step = -1 thread = threading.Thread( target=numba_func_parallel, args=( local_iterable, thread_id, progress_counter, start, stop, step, *args ), daemon=True ) thread.start() threads.append(thread) if include_progress_callback: import time # progress_count = 0 progress_bar = 0 progress_count = np.sum(progress_counter) for result in progress_callback( iterable, include_progress_callback=include_progress_callback ): while progress_bar >= progress_count: time.sleep(0.01) progress_count = np.sum(progress_counter) progress_bar += 1 for thread in threads: thread.join() del thread return functools.wraps(func)(wrapper) if _func is None: return parallel_compiled_func_inner else: return parallel_compiled_func_inner(_func)
[docs]def progress_callback( iterable, include_progress_callback: bool = True, total: int = -1 ): """A generator that adds progress callback to an iterable. Parameters ---------- iterable An iterable. include_progress_callback : bool If True, the default progress callback will be used as callback. If False, no callback is added. See `set_progress_callback` for callback styles. Default is True. total : int The length of the iterable. If -1, this will be read as len(iterable), if __len__ is implemented. Default is -1. Returns ------- : iterable A generator over the iterable with added callback. """ global PROGRESS_CALLBACK if include_progress_callback: current_progress_callback = PROGRESS_CALLBACK else: current_progress_callback = None if total == -1: total = len(iterable) if current_progress_callback is None: for element in iterable: yield element elif isinstance( current_progress_callback, bool ) and current_progress_callback: import tqdm with tqdm.tqdm(total=total) as progress_bar: for element in iterable: yield element progress_bar.update() else: try: current_progress_callback.max = total current_progress_callback.value = 0 except AttributeError: raise ValueError("Not a valid progress callback") steps = current_progress_callback.max / 1000 progress = 0 for element in iterable: progress += 1 if progress % steps < 1: current_progress_callback.value = progress yield element current_progress_callback.value = total
[docs]def set_progress_callback(progress_callback): """Set the global progress callback. Parameters ---------- progress_callback : The new global progress callback. Options are: - None, no progress callback will be used - True, a textual progress callback (tqdm) will be enabled - Any object that supports a `max` and `value` variable. """ global PROGRESS_CALLBACK PROGRESS_CALLBACK = progress_callback
[docs]def create_hdf_group_from_dict( hdf_group, data_dict: dict, *, overwrite: bool = False, compress: bool = False, recursed: bool = False, chunked: bool = False ) -> None: """Save a dict to an open hdf group. Parameters ---------- hdf_group : h5py.File.group An open and writable HDF group. data_dict : dict A dict that needs to be written to HDF. Keys always need to be strings. Values are stored as follows: - subdicts -> subgroups. - np.array -> array - pd.dataframes -> subdicts with "is_pd_dataframe: True" attribute. - bool, int, float and str -> attrs. - None values are skipped and not stored explicitly. overwrite : bool If True, existing subgroups, arrays and attrs are fully truncated/overwritten. If False, the existing value in HDF remains unchanged. Default is False. compress : bool If True, all arrays are compressed with binary shuffle and "lzf" compression. If False, arrays are saved as provided. On average, compression halves file sizes, at the cost of 2-10 time longer accession times. Default is False. recursed : bool If False, the default progress callback is added while itereating over the keys of the data_dict. If True, no callback is added, allowing subdicts to not trigger callback. Default is False. chunked : bool If True, all arrays are chunked. If False, arrays are saved as provided. Default is False. Raises ------ ValueError When a value of data_dict cannot be converted to an HDF value (see data_dict). KeyError When a key of data_dict is not a string. """ import pandas as pd import h5py if recursed: iterable_dict = data_dict.items() else: iterable_dict = progress_callback(data_dict.items()) for key, value in iterable_dict: if not isinstance(key, str): raise KeyError(f"Key {key} is not a string.") if isinstance(value, pd.core.frame.DataFrame): new_dict = {key: dict(value)} new_dict[key]["is_pd_dataframe"] = True create_hdf_group_from_dict( hdf_group, new_dict, overwrite=overwrite, recursed=True, compress=compress, chunked=chunked, ) elif isinstance(value, (np.ndarray, pd.core.series.Series)): if isinstance(value, (pd.core.series.Series)): value = value.values if overwrite and (key in hdf_group): del hdf_group[key] if key not in hdf_group: if value.dtype.type == np.str_: value = value.astype(np.dtype('O')) if value.dtype == np.dtype('O'): hdf_group.create_dataset( key, data=value, dtype=h5py.string_dtype() ) else: hdf_group.create_dataset( key, data=value, compression="lzf" if compress else None, # compression="gzip" if compress else None, # TODO slower to make, faster to load? shuffle=compress, chunks=True if chunked else None, ) elif isinstance(value, (bool, int, float, str, np.bool_)): if overwrite or (key not in hdf_group.attrs): hdf_group.attrs[key] = value elif isinstance(value, dict): if key not in hdf_group: hdf_group.create_group(key) create_hdf_group_from_dict( hdf_group[key], value, overwrite=overwrite, recursed=True, compress=compress, ) elif value is None: continue else: raise ValueError( f"The type of {key} is {type(value)}, which " "cannot be converted to an HDF value." )
[docs]def create_dict_from_hdf_group( hdf_group, mmap_arrays=None, parent_file_name: str = None, ) -> dict: """Convert the contents of an HDF group and return as normal Python dict. Parameters ---------- hdf_group : h5py.File.group An open and readable HDF group. mmap_arrays : iterable These array will be mmapped instead of pre-loaded. Default is None parent_file_name : str The parent_file_name. This is required when mmap_arrays is not None. Default is None. Returns ------- : dict A Python dict. Keys of the dict are names of arrays, attrs and subgroups. Values are corresponding arrays and attrs. Subgroups are converted to subdicts. If a subgroup has an "is_pd_dataframe=True" attr, it is automatically converted to a pd.dataFrame. Raises ------ ValueError When an attr value in the HDF group is not an int, float, str or bool. """ import h5py import pandas as pd result = {} for key in hdf_group.attrs: value = hdf_group.attrs[key] if isinstance(value, np.integer): result[key] = int(value) elif isinstance(value, np.float64): result[key] = float(value) elif isinstance(value, (str, bool, np.bool_)): result[key] = value else: raise ValueError( f"The type of {key} is {type(value)}, which " "cannot be converted properly." ) for key in hdf_group: subgroup = hdf_group[key] if isinstance(subgroup, h5py.Dataset): if (mmap_arrays is not None) and (subgroup.name in mmap_arrays): offset = subgroup.id.get_offset() if offset is not None: shape = subgroup.shape import mmap with open(parent_file_name, "rb") as raw_hdf_file: mmap_obj = mmap.mmap( raw_hdf_file.fileno(), 0, access=mmap.ACCESS_READ ) result[key] = np.frombuffer( mmap_obj, dtype=subgroup.dtype, count=np.prod(shape), offset=offset ).reshape(shape) # TODO WARNING: mmap is not closed! # result[key] = np.memmap( # dia_data_file_name, # dtype=subgroup.dtype, # mode="r", # offset=offset, # shape=shape, # ) else: raise IOError( f"Array {subgroup.name} cannot be mmapped. " "Perhaps it is compressed or chunked?" ) else: result[key] = subgroup[:] else: if "is_pd_dataframe" in subgroup.attrs: result[key] = pd.DataFrame( { column: subgroup[column][:] for column in sorted( subgroup ) } ) else: result[key] = create_dict_from_hdf_group( hdf_group[key], mmap_arrays, parent_file_name, ) return result
[docs]class Option_Stack(object): """A stack with the option to redo and undo."""
[docs] def __init__(self, option_name: str, option_initial_value): """Create an option stack. Parameters ---------- option_name : str The name of this option. option_initial_value : type The initial value of this stack. Can be any object that supports the "!=" operator. """ self._stack = [option_initial_value] self._stack_pointer = 0 self._option_name = option_name
@property def current_value(self): """: type : The current value of this stack.""" return self._stack[self._stack_pointer] @property def size(self) -> int: """: int : The size of this stack without the initial value.""" return len(self._stack) - 1 @property def option_name(self) -> str: """: str : The name of this stack.""" return self._option_name
[docs] def update(self, option_value) -> bool: """Update this stack with the value. Parameters ---------- option_value : type An value to add to this stack. Can be any object that supports the "!=" operator. Returns ------- bool True if the stack was updated. False if the provided value equald the current value of this stack. """ if self.current_value != option_value: self.trim() self._stack.append(option_value) self._stack_pointer += 1 return True return False
[docs] def redo(self): """Increase the stack pointer with 1. Returns ------- type None if the pointer was already at the maximum. Otherwise the new value if the pointer was increased. """ if self._stack_pointer < self.size: self._stack_pointer += 1 return self.current_value return None
[docs] def undo(self): """Reduce the stack pointer with 1. Returns ------- type None if the pointer was already at the maximum. Otherwise the new value if the pointer was reduced. """ if self._stack_pointer > 0: self._stack_pointer -= 1 return self.current_value return None
[docs] def trim(self) -> bool: """Remove all elements above of the current stack pointer Returns ------- bool True if something was removed, i.e. if stack pointer was not at the top. False if nothing could be deleted, i.e. the stack pointer was at the top. """ if self._stack_pointer != self.size: self._stack = self._stack[:self._stack_pointer + 1] return True return False
def __str__(self): return f"{self._stack_pointer} {self._option_name} {self._stack}"
[docs]class Global_Stack(object): """A stack that holds multiple option stacks. The current value of each option stack can be retrieved by indexing, i.e. option_value = self[option_key]. """
[docs] def __init__(self, all_available_options: dict): """Create a global stack. Parameters ---------- all_available_options : dict A dictionary whose items are (str, type), which can be used to create an Option_Stack. """ self._option_stacks = { option_key: Option_Stack( option_key, option_value ) for option_key, option_value in all_available_options.items() } self._number_of_options = len(all_available_options) self._stack_pointer = 0 self._stack = [None] self._is_locked = False self._key = -1
@property def is_locked(self): """: bool : A flag to check if this stack is modifiable""" return self._is_locked
[docs] @contextlib.contextmanager def lock(self): """A context manager to lock this stack and prevent modification.""" import random key = random.random() try: if self._key == -1: self._key = key self._is_locked = True yield self finally: if self._key == key: self._is_locked = False self._key = -1
@property def current_values(self) -> dict: """: dict : A dict with (option_key: option_value) mapping.""" return { option_key: option_stack.current_value for ( option_key, option_stack ) in self._option_stacks.items() } @property def size(self): """: int : The size of this stack without the initial value.""" return len(self._stack) - 1 def __getitem__(self, key: str): return self._option_stacks[key].current_value
[docs] def update(self, option_key: str, option_value) -> tuple: """Update an option stack with a value. Parameters ---------- option_key : str The name of the option stack to update. option_value : type An value to add to this stack. Can be any object that supports the "!=" operator. Returns ------- tuple ("", None) if the pointer was not updated, i.e. the latest update was equal to the current update. Otherwise (option_name, new_value). """ if self.is_locked: return "", None current_value = self[option_key] if current_value == option_value: return "", None self._option_stacks[option_key].update(option_value) self.trim() self._stack_pointer += 1 self._stack.append(option_key) return option_key, option_value
[docs] def redo(self) -> tuple: """Increase the stack pointer with 1. Returns ------- tuple ("", None) if the pointer was already at the maximum. Otherwise (option_name, new_value) if the pointer was increased. """ if self.is_locked: return "", None if self._stack_pointer < self.size: self._stack_pointer += 1 option_key = self._stack[self._stack_pointer] option_value = self._option_stacks[option_key].redo() if option_value is not None: return option_key, option_value return "", None
[docs] def undo(self) -> tuple: """Reduce the stack pointer with 1. Returns ------- tuple ("", None) if the pointer was already at the maximum. Otherwise (option_name, new_value) if the pointer was reduced. """ if self.is_locked: return "", None if self._stack_pointer > 0: option_key = self._stack[self._stack_pointer] self._stack_pointer -= 1 option_value = self._option_stacks[option_key].undo() if option_value is not None: return option_key, option_value return "", None
[docs] def trim(self) -> bool: """Remove all elements above of the current stack pointer Returns ------- bool True if something was removed, i.e. if stack pointer was not at the top. False if nothing could be deleted, i.e. the stack pointer was at the top. """ if self._stack_pointer != self.size: self._stack = self._stack[:self._stack_pointer + 1] for stack in self._option_stacks.values(): stack.trim() return True return False
def __str__(self): result = " ".join( [ str(stack) for stack in self._option_stacks.values() ] ) # values = str(self.current_values) return result + " " + " ".join( [ str(self._stack_pointer), "global", str(self._stack) ] )