Source code for buzzard._dataset_pools_container

import threading
import collections
import multiprocessing as mp
import multiprocessing.pool

[docs]class PoolsContainer(object): """Manages thread/process pools and aliases for a Dataset""" def __init__(self): self._aliases_per_pool = collections.defaultdict(set) self._aliases = {} self._managed_pools = set() self._lock = threading.Lock()
[docs] def alias(self, key, pool_or_none): """Register the given pool under the given key in this Dataset. The key can then be used to refer to that pool from within the async raster constructors. Parameters ---------- key: hashable (like a string) .. pool_or_none: multiprocessing.pool.Pool or multiprocessing.pool.ThreadPool or None .. """ with self._lock: if key in self._aliases: # pragma: no cover raise ValueError('Key `{}` is already bound to `{}`'.format( key, self._aliases[key] )) self._aliases_per_pool[pool_or_none].add(key) self._aliases[key] = pool_or_none
[docs] def manage(self, pool): """Add the given pool to the list of pools that must be terminated upon Dataset closing. Parameters ---------- pool: multiprocessing.pool.Pool or multiprocessing.pool.ThreadPool .. """ if not isinstance(pool, (mp.pool.Pool, mp.pool.ThreadPool)): # pragma: no cover raise TypeError('Can only manage pools') with self._lock: self._managed_pools.add(pool)
[docs] def __len__(self): """Number of pools registered in this Dataset""" with self._lock: return len( p for p in self._aliases_per_pool.keys() if p is not None )
[docs] def __iter__(self): """Generator of pools registered in this Dataset""" for p in self._aliases_per_pool.keys(): if p is not None: yield p
[docs] def __getitem__(self, key): """Pool or none getter from alias""" return self._aliases[key]
[docs] def __contains__(self, obj): """Is pool or alias registered in this Dataset""" with self._lock: return obj in self._aliases or obj in self._aliases_per_pool
# Private interface with Dataset ********************************************************* ** def _close(self): things_to_join = [] for pool in self._managed_pools: if isinstance(pool, mp.pool.ThreadPool): pool.terminate() things_to_join.append(pool) else: things_to_join.append(_create_process_pool_killer(pool)) for joinable in things_to_join: joinable.join() self._aliases.clear() self._aliases_per_pool.clear() self._managed_pools.clear() def _normalize_pool_parameter(self, pool_param, param_name): if isinstance(pool_param, (mp.pool.Pool, mp.pool.ThreadPool)): return pool_param if pool_param is None: return None if not (hasattr(pool_param, '__hash__') and hasattr(pool_param, '__eq__')): # pragma: no cover types = [ 'multiprocessing.pool.Pool', 'multiprocessing.pool.ThreadPool', 'None', 'hashable', ] raise TypeError('`{}` parameter should be one of {}'.format( param_name, ', '.join(types) )) with self._lock: if pool_param not in self._aliases: p = mp.pool.ThreadPool(mp.cpu_count()) self._aliases[pool_param] = p self._aliases_per_pool[p] = [pool_param] self._managed_pools.add(p) return self._aliases[pool_param]
def _create_process_pool_killer(pp): """ https://stackoverflow.com/questions/42782953/python-concurrent-futures-how-to-make-it-cancelable/45515052#45515052 """ def kill_process_pool_from_thread(): pp.close() pp.terminate() pp.join() t = threading.Thread(target=kill_process_pool_from_thread) t.start() return t