Source code for buzzard._numpy_raster

import numpy as np

from buzzard._a_stored_raster import AStoredRaster, ABackStoredRaster

[docs]class NumpyRaster(AStoredRaster): """Concrete class defining the behavior of a wrapped numpy array >>> help(Dataset.wrap_numpy_raster) Features Defined ---------------- - Has an `array` property that points to the numpy array provided at construction. """ def __init__(self, ds, fp, array, channels_schema, wkt, mode): self._arr_shape = array.shape self._arr_address = array.__array_interface__['data'][0] back = BackNumpyRaster( ds._back, fp, array, channels_schema, wkt, mode ) super().__init__(ds=ds, back=back) @property def array(self): """Returns the Raster's full input data as a Numpy array""" assert ( self._arr_address == self._back._arr.__array_interface__['data'][0] ) return self._back._arr.reshape(*self._arr_shape)
class BackNumpyRaster(ABackStoredRaster): """Implementation of NumpyRaster""" def __init__(self, back_ds, fp, array, channels_schema, wkt, mode): array = np.atleast_3d(array) self._arr = array channel_count = array.shape[-1] if 'nodata' not in channels_schema: channels_schema['nodata'] = [None] * channel_count if 'interpretation' not in channels_schema: channels_schema['interpretation'] = ['undefined'] * channel_count if 'offset' not in channels_schema: channels_schema['offset'] = [0.] * channel_count if 'scale' not in channels_schema: channels_schema['scale'] = [1.] * channel_count if 'mask' not in channels_schema: channels_schema['mask'] = ['all_valid'] super().__init__( back_ds=back_ds, wkt_stored=wkt, channels_schema=channels_schema, dtype=array.dtype, fp_stored=fp, mode=mode, ) self._should_tranform = ( any(v != 0 for v in channels_schema['offset']) or any(v != 1 for v in channels_schema['scale']) ) def get_data(self, fp, channel_ids, dst_nodata, interpolation): samplefp = self.build_sampling_footprint(fp, interpolation) if samplefp is None: return np.full( np.r_[fp.shape, len(channel_ids)], dst_nodata, self.dtype ) chans_indexer = self._best_indexers_of_channel_ids(channel_ids) key = list(samplefp.slice_in(self.fp)) + [chans_indexer] key = tuple(key) array = self._arr[key] if self._should_tranform: array = ( array * np.asarray(self.channels_schema['scale'])[chans_indexer] + np.asarray(self.channels_schema['offset'])[chans_indexer] ) array = self.remap( samplefp, fp, array=array, mask=None, src_nodata=self.nodata, dst_nodata=dst_nodata, mask_mode='erode', interpolation=interpolation, ) array = array.astype(self.dtype, copy=False) return array def set_data(self, array, fp, channel_ids, interpolation, mask): if not fp.share_area(self.fp): return if not fp.same_grid(self.fp) and mask is None: mask = np.ones(fp.shape, bool) dstfp = self.fp.intersection(fp) # Remap **************************************************************** ret = self.remap( fp, dstfp, array=array, mask=mask, src_nodata=self.nodata, dst_nodata=self.nodata or 0, mask_mode='erode', interpolation=interpolation, ) if mask is not None: array, mask = ret else: array = ret del ret array = array.astype(self.dtype, copy=False) fp = dstfp del dstfp # Write **************************************************************** slices = fp.slice_in(self.fp) for i in channel_ids: if mask is not None: self._arr[slices + (i,)][mask] = array[..., i][mask] else: self._arr[slices + (i,)] = array[..., i] def fill(self, value, channel_ids): for i in channel_ids: self._arr[..., i] = value def close(self): super().close() del self._arr @staticmethod def _best_indexers_of_channel_ids(channel_ids): """Create an object to pick the channels of the numpy array. Returns either a slice object or a list of int to perform fancy-indexing""" l = list(channel_ids) if np.all(np.diff(l) == 1): start, stop = l[0], l[-1] + 1 l = slice(start, stop) elif np.all(np.diff(l) == -1): start, stop = l[0], l[-1] - 1 if stop < 0: stop = None l = slice(start, stop, -1) return l