aiearth.predict.dataset.datasource.raster_dataset_reader 源代码

import math
import pandas as pd
from typing import Any, Dict, List, Optional, Union, Callable, Tuple
from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import Datasource, Reader, ReadTask

from aiearth.predict.dataset.raster import RasterDataset
from aiearth.predict.dataset.vector import VectorDataset
from aiearth.predict.utils import Box, PadBounds


[文档]class RasterDatasetReader(Reader): def __init__( self, raster_dataset: RasterDataset, image_size, bound, block_size, block_memory_buffer, # MiB aoi_vector_dataset: VectorDataset, read_block_image_column, read_block_box_column, read_block_pad_bounds_column, size_bytes_growth_factor, ): self.raster_dataset = raster_dataset self.image_size = image_size self.bound = bound self.block_size = block_size self.aoi_vector_dataset = aoi_vector_dataset self.read_block_image_column = read_block_image_column self.read_block_box_column = read_block_box_column self.read_block_pad_bounds_column = read_block_pad_bounds_column self.size_bytes_growth_factor = size_bytes_growth_factor self.chip_size_bytes = ( self.image_size * self.image_size * self.raster_dataset.get_chip_num_channels() * self.raster_dataset.get_chip_item_byte_size() * self.size_bytes_growth_factor ) if self.block_size is None: self.block_size = math.floor( block_memory_buffer * 1024 * 1024 / self.chip_size_bytes ) def _get_sliding_windows(self): extent = self.raster_dataset.get_extent() valid_size = self.image_size - self.bound * 2 image_width = self.image_size image_height = self.image_size xmin, ymin, xmax, ymax = ( extent.xmin, extent.ymin, extent.xmax, extent.ymax, ) windows = [] bounds = [] for y in range(ymin - self.bound, ymax, valid_size): for x in range(xmin - self.bound, xmax, valid_size): right_x_bound = max(self.bound, x + image_width - extent.xmax) bottom_y_bound = max(self.bound, y + image_height - extent.ymax) box = Box.from_xywh(x, y, image_width, image_height) if self.aoi_vector_dataset is not None: p_bounds = self.raster_dataset.calc_window_box_bounds(box) if not self.aoi_vector_dataset.is_intersects_with_box(p_bounds): continue pad_bounds = PadBounds( self.bound, bottom_y_bound, self.bound, right_x_bound ) windows.append(box) bounds.append(pad_bounds) return (windows, bounds) def _read_blocks(self, boxes, pad_bounds) -> List[Block]: images = self.raster_dataset.get_data_by_window_box(boxes) block = pd.DataFrame( { self.read_block_image_column: images, self.read_block_box_column: boxes, self.read_block_pad_bounds_column: pad_bounds, } ) return [block] def _get_read_tasks(self, parallelism): read_tasks = [] boxes = [] pad_bounds = [] windows, bounds = self._get_sliding_windows() # n = len(windows) # block_size = max(1, n // parallelism) block_size = self.block_size for window, bound in zip(windows, bounds): boxes.append(window) pad_bounds.append(bound) if len(boxes) == block_size: blocks_meta = BlockMetadata( num_rows=block_size, size_bytes=block_size * self.chip_size_bytes, schema=None, input_files=None, exec_stats=None, ) read_task = ReadTask( lambda boxes=boxes, pad_bounds=pad_bounds: self._read_blocks( boxes, pad_bounds ), blocks_meta, ) read_tasks.append(read_task) boxes = [] pad_bounds = [] if len(boxes) > 0: blocks_meta = BlockMetadata( num_rows=len(boxes), size_bytes=len(boxes) * self.chip_size_bytes, schema=None, input_files=None, exec_stats=None, ) read_task = ReadTask( lambda boxes=boxes, pad_bounds=pad_bounds: self._read_blocks( boxes, pad_bounds ), blocks_meta, ) read_tasks.append(read_task) return read_tasks
[文档] def estimate_inmemory_data_size(self) -> Optional[int]: return None
[文档] def get_read_tasks(self, parallelism: int) -> List[ReadTask]: return self._get_read_tasks(parallelism)