import collections
import weakref
import glob
import os
import numpy as np
import rtree.index
from buzzard._actors.message import Msg
from buzzard._a_raster_recipe import ARasterRecipe, ABackRasterRecipe
from buzzard._actors.cached.cache_extractor import ActorCacheExtractor
from buzzard._actors.cached.cache_supervisor import ActorCacheSupervisor
from buzzard._actors.cached.file_checker import ActorFileChecker
from buzzard._actors.cached.merger import ActorMerger
from buzzard._actors.cached.producer import ActorProducer
from buzzard._actors.cached.queries_handler import ActorQueriesHandler
from buzzard._actors.cached.reader import ActorReader
from buzzard._actors.cached.writer import ActorWriter
from buzzard._actors.computation_accumulator import ActorComputationAccumulator
from buzzard._actors.computation_gate1 import ActorComputationGate1
from buzzard._actors.computation_gate2 import ActorComputationGate2
from buzzard._actors.computer import ActorComputer
from buzzard._actors.production_gate import ActorProductionGate
from buzzard._actors.resampler import ActorResampler
[docs]class CachedRasterRecipe(ARasterRecipe):
"""Concrete class defining the behavior of a raster computed on the fly and fills a cache to
avoid subsequent computations.
>>> help(Dataset.create_cached_raster_recipe)
"""
def __init__(
self, ds,
fp, dtype, channel_count, channels_schema, sr,
compute_array, merge_arrays,
cache_dir, overwrite,
primitives_back, primitives_kwargs, convert_footprint_per_primitive,
computation_pool, merge_pool, io_pool, resample_pool,
cache_tiles, computation_tiles,
max_resampling_size,
debug_observers,
):
back = BackCachedRasterRecipe(
ds._back,
weakref.proxy(self),
fp, dtype, channel_count, channels_schema, sr,
compute_array, merge_arrays,
cache_dir, overwrite,
primitives_back, primitives_kwargs, convert_footprint_per_primitive,
computation_pool, merge_pool, io_pool, resample_pool,
cache_tiles, computation_tiles,
max_resampling_size,
debug_observers,
)
super().__init__(ds=ds, back=back)
@property
def cache_tiles(self):
"""Cache tiles provided or created at construction"""
return self._back.cache_fps.copy()
@property
def cache_dir(self):
"""Cache directory path provided at construction"""
return self._back.cache_dir
class BackCachedRasterRecipe(ABackRasterRecipe):
"""Implementation of CachedRasterRecipe's specifications"""
def __init__(
self, back_ds, facade_proxy,
fp, dtype, channel_count, channels_schema, sr,
compute_array, merge_arrays,
cache_dir, overwrite,
primitives_back, primitives_kwargs, convert_footprint_per_primitive,
computation_pool, merge_pool, io_pool, resample_pool,
cache_tiles, computation_tiles,
max_resampling_size,
debug_observers,
):
super().__init__(
# Source
back_ds=back_ds,
wkt_stored=sr,
# RasterSource
channels_schema=channels_schema,
dtype=dtype,
fp_stored=fp,
channel_count=channel_count,
# Recipe
facade_proxy=facade_proxy,
computation_pool=computation_pool,
merge_pool=merge_pool,
compute_array=compute_array,
merge_arrays=merge_arrays,
primitives_back=primitives_back,
primitives_kwargs=primitives_kwargs,
convert_footprint_per_primitive=convert_footprint_per_primitive,
# Async
resample_pool=resample_pool,
max_resampling_size=max_resampling_size,
debug_observers=debug_observers,
)
self.io_pool = io_pool
self.cache_fps = cache_tiles
self.cache_dir = cache_dir
self.overwrite = overwrite
# Tilings shortcuts ****************************************************
self._cache_footprint_index = self._build_cache_fps_index(
cache_tiles,
)
self.cache_fps_of_compute_fp = {
compute_fp: self.cache_fps_of_fp(compute_fp)
for compute_fp in computation_tiles.flat
}
self.compute_fps_of_cache_fp = collections.defaultdict(list)
for compute_fp, cache_fps in self.cache_fps_of_compute_fp.items():
for cache_fp in cache_fps:
self.compute_fps_of_cache_fp[cache_fp].append(compute_fp)
self.indices_of_cache_fp = {
cache_fp: indices
for indices, cache_fp in np.ndenumerate(cache_tiles)
}
# Scheduler notification ***********************************************
self.back_ds.put_message(Msg(
'/Global/TopLevel', 'new_raster', self,
))
# ******************************************************************************************* **
def cache_fps_of_fp(self, fp):
assert fp.same_grid(self.fp)
rtl = self.fp.spatial_to_raster(fp.tl, dtype=float)
bounds = np.r_[rtl, rtl + fp.rsize]
return [
self.cache_fps.flat[i]
for i in list(self._cache_footprint_index.intersection(bounds))
]
def fname_prefix_of_cache_fp(self, cache_fp):
y, x = self.indices_of_cache_fp[cache_fp]
params = np.r_[
x,
y,
self.fp.spatial_to_raster(cache_fp.tl),
]
return "buzz_x{:03d}-y{:03d}_x{:05d}-y{:05d}".format(*params)
def list_cache_path_candidates(self, cache_fp=None):
if cache_fp is not None:
prefix = self.fname_prefix_of_cache_fp(cache_fp)
s = os.path.join(self.cache_dir, prefix + '_[0123456789abcdef]*.tif') # TODO: Use regex
return glob.glob(s)
else:
s = os.path.join(
self.cache_dir,
# TODO: Use regex
'buzz_x[0-9]*-y[0-9]*_x[0-9]*-y[0-9]*_[0123456789abcdef]*.tif',
)
return glob.glob(s)
def create_actors(self):
actors = [
ActorCacheExtractor(self),
ActorCacheSupervisor(self),
ActorFileChecker(self),
ActorMerger(self),
ActorProducer(self),
ActorQueriesHandler(self),
ActorReader(self),
ActorWriter(self),
ActorComputationAccumulator(self),
ActorComputationGate1(self),
ActorComputationGate2(self),
ActorComputer(self),
ActorProductionGate(self),
ActorResampler(self),
]
for a in actors:
self.debug_mngr.event('object_allocated', a)
return actors
# ******************************************************************************************* **
def _build_cache_fps_index(self, cache_fps):
idx = rtree.index.Index()
bounds_inset = np.asarray([
+ 1 / 4,
+ 1 / 4,
- 1 / 4,
- 1 / 4,
])
for i, fp in enumerate(cache_fps.flat):
rtl = self.fp.spatial_to_raster(fp.tl, dtype=float)
bounds = np.r_[rtl, rtl + fp.rsize] + bounds_inset
idx.insert(i, bounds)
return idx