47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627 | class Pipeline:
"""
Orchestrates component execution with centralized I/O handling.
Responsibilities:
- Component instantiation and ordering
- Pre-run validation of entire pipeline
- State management (updating DataState from ComponentResult)
- File I/O (saving gpkg, COCO generation)
- Output registration
- Background task management
"""
def __init__(
self,
components: List[BaseComponent],
data_state: DataState,
output_path: Path,
verbose: bool = True,
):
"""
Initialize pipeline.
Args:
components: List of component instances (already configured)
data_state: Initial data state
output_path: Base output directory
verbose: Whether to print the flow chart and status messages
"""
self.components = components
self.data_state = data_state
self.output_path = Path(output_path)
self.verbose = verbose
self.output_path.mkdir(parents=True, exist_ok=True)
# Setup background executor for async COCO generation
self.background_executor = ProcessPoolExecutor(max_workers=1)
self.data_state.background_executor = self.background_executor
# Assign component IDs and output paths
for i, component in enumerate(self.components):
component.component_id = i
component.output_path = self.output_path / f"{i}_{component.name}"
# Validate pipeline configuration immediately to catch errors early
if self.verbose:
self._print_flow_chart()
self._validate_pipeline(raise_on_error=True)
@classmethod
def from_config(cls, io_config: InferIOConfig, config: PipelineConfig, verbose: bool = True) -> 'Pipeline':
"""
Create a Pipeline from configuration objects.
This matches the interface of the original pipeline.py for backward compatibility.
Args:
io_config: Input/output configuration
config: Pipeline configuration with component configs
verbose: Whether to print the flow chart and status messages
Returns:
Configured Pipeline instance
"""
# Import component classes here to avoid circular imports
from canopyrs.engine.components.aggregator import AggregatorComponent
from canopyrs.engine.components.detector import DetectorComponent
from canopyrs.engine.components.segmenter import SegmenterComponent
from canopyrs.engine.components.tilerizer import TilerizerComponent
from canopyrs.engine.components.classifier import ClassifierComponent
output_path = Path(io_config.output_folder)
# Initialize AOI configuration (Area of Interest, used by the Tilerizer)
infer_aois_config = parse_tilerizer_aoi_config(
aoi_config=io_config.aoi_config,
aoi_type=io_config.aoi_type,
aois={INFER_AOI_NAME: io_config.aoi}
)
# Instantiate components from config
components = []
for component_id, (component_type, component_config) in enumerate(config.components_configs):
if component_type == 'tilerizer':
component = TilerizerComponent(
component_config, output_path, component_id, infer_aois_config
)
elif component_type == 'detector':
component = DetectorComponent(component_config, output_path, component_id)
elif component_type == 'aggregator':
component = AggregatorComponent(component_config, output_path, component_id)
elif component_type == 'segmenter':
component = SegmenterComponent(component_config, output_path, component_id)
elif component_type == 'classifier':
component = ClassifierComponent(component_config, output_path, component_id)
else:
raise ValueError(f'Invalid component type: {component_type}')
components.append(component)
# Initialize data state from the io (input/output) config
infer_gdf = gpd.read_file(io_config.input_gpkg) if io_config.input_gpkg else None
infer_gdf_columns_to_pass = (
set(io_config.infer_gdf_columns_to_pass)
if io_config.infer_gdf_columns_to_pass else set()
)
# If an infer_gdf from a previous pipeline run is provided,
# make sure to pass the special columns if present
for special_column_name in [object_id_column_name, tile_path_column_name]:
if infer_gdf is not None and special_column_name in infer_gdf.columns:
infer_gdf_columns_to_pass.add(special_column_name)
# Derive product name from imagery path, or use default for tiled input
if io_config.input_imagery:
product_name = validate_and_convert_product_name(
strip_all_extensions_and_path(Path(io_config.input_imagery))
)
else:
product_name = "tiled_input"
data_state = DataState(
imagery_path=io_config.input_imagery,
parent_output_path=io_config.output_folder,
product_name=product_name,
tiles_path=io_config.tiles_path,
infer_coco_path=io_config.input_coco,
infer_gdf=infer_gdf,
infer_gdf_columns_to_pass=infer_gdf_columns_to_pass,
)
green_print("Pipeline initialized")
return cls(
components=components,
data_state=data_state,
output_path=output_path,
verbose=verbose,
)
# -------------------------------------------------------------------------
# Execution
# -------------------------------------------------------------------------
def run(self, strict_rgb_validation: bool = True) -> DataState:
"""
Run the pipeline.
Args:
strict_rgb_validation: If True, enforce strict RGB band validation
Returns:
Final DataState with all outputs
"""
# Validate input raster/tiles bands
self._validate_input_data(strict_rgb_validation)
try:
for component in self.components:
green_print(f"Running {component.name}...")
self._wait_for_required_state(component)
# Run component - returns ComponentResult
component.output_path.mkdir(parents=True, exist_ok=True)
result = component(self.data_state)
# Pipeline handles all I/O and state updates
self._process_result(component, result)
# Final cleanup of async tasks
self.data_state.clean_side_processes()
# Save final GDF to root output folder if one was produced
if self.data_state.infer_gdf is not None:
green_print("Saving final GeoDataFrame...")
final_gpkg_path = self._save_final_gpkg()
num_polygons = len(self.data_state.infer_gdf)
print(f"Final GDF containing {num_polygons} polygons saved to: {final_gpkg_path}")
green_print("Pipeline finished")
finally:
self.background_executor.shutdown(wait=True)
return self.data_state
def __call__(self) -> DataState:
"""Alias for run()."""
return self.run()
def _wait_for_required_state(self, component: BaseComponent) -> None:
"""Wait for any required state still being produced by background processes."""
if self.data_state.side_processes is None or len(self.data_state.side_processes) == 0:
return
for key in component.requires_state:
if getattr(self.data_state, key, None) is None:
self.data_state.clean_side_processes(key)
def _process_result(self, component: BaseComponent, result: ComponentResult):
"""
Process ComponentResult: update state and handle I/O.
This is where all I/O and merging is centralized.
"""
# Register component folder
self.data_state.register_component_folder(
component.name, component.component_id, component.output_path
)
# Apply state updates (e.g., tiles_path, infer_coco_path)
for key, value in result.state_updates.items():
setattr(self.data_state, key, value)
# Merge GDF if provided (this replaces the old update_infer_gdf call)
if result.gdf is not None:
if len(result.gdf) > 0:
if result.objects_are_new:
# New objects: replace existing GDF entirely
self.data_state.infer_gdf = self._set_as_new_gdf(result.gdf)
else:
# Existing objects refined: merge into existing GDF
merged_gdf = self._merge_result_gdf(result.gdf)
self.data_state.infer_gdf = merged_gdf
else:
warnings.warn(f"{component.name} returned an empty GeoDataFrame (0 results).")
self.data_state.infer_gdf = result.gdf
# Update columns to pass based on what's actually in the merged GDF
# (not just what the component claims to produce, since merge may add columns)
if self.data_state.infer_gdf is not None:
# Use all non-geometry columns from the merged GDF
actual_columns = set(self.data_state.infer_gdf.columns) - {Col.GEOMETRY}
self.data_state.infer_gdf_columns_to_pass = actual_columns
# Register any files the component already wrote itself (e.g. geodataset internals)
for file_type, file_path in result.output_files.items():
if file_path is not None:
self.data_state.register_output_file(
component.name, component.component_id, file_type, Path(file_path)
)
# Save GeoPackage if requested (use merged GDF from data_state)
if result.save_gpkg and self.data_state.infer_gdf is not None and len(self.data_state.infer_gdf) > 0:
gpkg_path = self._save_gpkg(component, self.data_state.infer_gdf, result.gpkg_name_suffix)
file_type = 'gpkg' if result.gpkg_name_suffix == 'aggregated' else 'pre_aggregated_gpkg'
self.data_state.register_output_file(
component.name, component.component_id, file_type, gpkg_path
)
# Queue COCO generation if requested (use merged GDF from data_state)
if result.save_coco and self.data_state.infer_gdf is not None and len(self.data_state.infer_gdf) > 0:
future_coco = self._queue_coco_generation(component, result)
if future_coco:
self.data_state.side_processes.append(future_coco)
def _merge_result_gdf(self, result_gdf: Union[gpd.GeoDataFrame, pd.DataFrame]) -> gpd.GeoDataFrame:
"""
Merge component output with existing infer_gdf (objects_are_new=False).
Only called when a component refines existing objects (e.g., prompted
segmenter replacing detector boxes with masks, or aggregator filtering).
Rules:
1. No existing GDF → set directly, assign object_ids if missing
2. Result has geometry + valid merge key → becomes base, merge in other columns
3. Result has geometry + no merge key → full replacement
4. Result has no geometry → merge attributes into existing
5. Merge key priority: object_id > tile_path (if unique)
Args:
result_gdf: Output from component (GeoDataFrame or DataFrame)
Returns:
Merged GeoDataFrame
"""
existing_gdf = self.data_state.infer_gdf
# Case 1: No existing GDF - set directly
if existing_gdf is None:
return self._set_as_new_gdf(result_gdf)
# Check if result has geometry
has_geometry = Col.GEOMETRY in result_gdf.columns and result_gdf[Col.GEOMETRY].notna().any()
# Try to determine merge key
merge_key = self._determine_merge_key(result_gdf, existing_gdf, raise_on_error=False)
# Case 2: Result has geometry
if has_geometry:
if merge_key:
# Merge with existing to get other columns
return self._merge_with_new_geometry(result_gdf, existing_gdf, merge_key)
else:
# No merge key - full replacement
return self._set_as_new_gdf(result_gdf)
# Case 3: Result has no geometry - must merge into existing
if not merge_key:
raise ValueError(
"Cannot merge DataFrame into existing GDF: no valid merge key found. "
f"Result columns: {list(result_gdf.columns)}"
)
# no new geometry, merge attributes into existing
return self._merge_into_existing(result_gdf, existing_gdf, merge_key)
def _set_as_new_gdf(self, result_gdf: Union[gpd.GeoDataFrame, pd.DataFrame]) -> gpd.GeoDataFrame:
"""Set result as new infer_gdf, assigning object_ids if missing."""
# Assign object_ids if not present
if Col.OBJECT_ID not in result_gdf.columns:
result_gdf = result_gdf.copy()
result_gdf[Col.OBJECT_ID] = range(len(result_gdf))
# Ensure it's a GeoDataFrame
if isinstance(result_gdf, gpd.GeoDataFrame):
return result_gdf
# Convert DataFrame to GeoDataFrame (no geometry)
if Col.GEOMETRY in result_gdf.columns:
return gpd.GeoDataFrame(result_gdf, geometry=Col.GEOMETRY)
return gpd.GeoDataFrame(result_gdf)
def _determine_merge_key(
self,
result_gdf: Union[gpd.GeoDataFrame, pd.DataFrame],
existing_gdf: gpd.GeoDataFrame,
raise_on_error: bool = True
) -> Optional[str]:
"""Determine which column to use for merging."""
# Try object_id first
if (Col.OBJECT_ID in result_gdf.columns and
Col.OBJECT_ID in existing_gdf.columns and
result_gdf[Col.OBJECT_ID].notna().all()):
return Col.OBJECT_ID
# Fall back to tile_path if unique in result
if (Col.TILE_PATH in result_gdf.columns and
Col.TILE_PATH in existing_gdf.columns and
result_gdf[Col.TILE_PATH].is_unique):
return Col.TILE_PATH
if raise_on_error:
raise ValueError(
"Cannot merge GDFs: need valid object_id or unique tile_path. "
f"Result columns: {list(result_gdf.columns)}, "
f"Existing columns: {list(existing_gdf.columns)}"
)
return None
def _merge_with_new_geometry(
self,
result_gdf: gpd.GeoDataFrame,
existing_gdf: gpd.GeoDataFrame,
merge_key: str
) -> gpd.GeoDataFrame:
"""Merge when result has new geometry (e.g., segmenter masks replace detector boxes)."""
# Get columns from existing that aren't in result (except geometry)
existing_cols_to_keep = [merge_key]
for col in existing_gdf.columns:
if col not in result_gdf.columns and col != Col.GEOMETRY:
existing_cols_to_keep.append(col)
# Merge: result is base, pull in other columns from existing
merged = result_gdf.merge(
existing_gdf[existing_cols_to_keep],
on=merge_key,
how='left'
)
# Warn about unmatched rows
unmatched = merged[merge_key].isna().sum() if merge_key in merged.columns else 0
if unmatched > 0:
warnings.warn(f"{unmatched} rows in result had no match in existing GDF")
# Check if merge_key is unique in merged. If merge key is OBJECT_ID, warn and assign new unique object_ids to duplicates. If merge key is TILE_PATH, raise error.
if merged[merge_key].duplicated().any():
if merge_key == Col.OBJECT_ID:
warnings.warn(f"Duplicate OBJECT_IDs found in merged GDF. Assigning new unique OBJECT_IDs. This can happen if a component duplicated objects, like a labeled tilerizer with overlap > 0.")
merged = merged.copy()
# Identify duplicates (keep='first' ensures the first occurrence retains its ID)
duplicates_mask = merged.duplicated(subset=[Col.OBJECT_ID], keep='first')
# Find the current highest ID to start incrementing from
current_max_id = merged[Col.OBJECT_ID].max()
num_duplicates = duplicates_mask.sum()
# Generate a range of new IDs
new_ids = range(current_max_id + 1, current_max_id + 1 + num_duplicates)
# Assign new IDs only to the rows identified as duplicates
merged.loc[duplicates_mask, Col.OBJECT_ID] = new_ids
else:
raise ValueError(f"Duplicate TILE_PATHs found in merged GDF after merging. This should not happen as TILE_PATH is the merge_key and is expected to be unique.")
return gpd.GeoDataFrame(merged, geometry=Col.GEOMETRY, crs=result_gdf.crs)
def _merge_into_existing(
self,
result_gdf: Union[gpd.GeoDataFrame, pd.DataFrame],
existing_gdf: gpd.GeoDataFrame,
merge_key: str
) -> gpd.GeoDataFrame:
"""Merge attributes into existing GDF (e.g., classifier adds scores)."""
# Drop geometry from result if present (we keep existing geometry)
result_cols = [col for col in result_gdf.columns if col != Col.GEOMETRY]
result_data = result_gdf[result_cols]
# Merge: existing is base, add new columns from result
merged = existing_gdf.merge(
result_data,
on=merge_key,
how='left'
)
# Warn about unmatched rows
new_cols = [col for col in result_cols if col != merge_key]
if new_cols:
unmatched = merged[new_cols[0]].isna().sum()
if unmatched > 0:
warnings.warn(f"{unmatched} rows in existing GDF had no match in result")
return gpd.GeoDataFrame(merged, geometry=Col.GEOMETRY, crs=existing_gdf.crs)
# -------------------------------------------------------------------------
# File I/O
# -------------------------------------------------------------------------
def _save_gpkg(self, component: BaseComponent, gdf: gpd.GeoDataFrame, suffix: str) -> Path:
"""Save GeoDataFrame to GeoPackage."""
gpkg_name = self._generate_gpkg_name(suffix)
gpkg_path = component.output_path / gpkg_name
gdf.to_file(gpkg_path, driver='GPKG')
return gpkg_path
def _save_final_gpkg(self) -> Path:
"""Save final GeoDataFrame to root output folder."""
gpkg_name = self._generate_gpkg_name("final")
gpkg_path = self.output_path / gpkg_name
self.data_state.infer_gdf.to_file(gpkg_path, driver='GPKG')
return gpkg_path
def _generate_gpkg_name(self, suffix: str) -> str:
"""Generate GeoPackage filename using product name from data state."""
product_name = self.data_state.product_name or "output"
fold = f"{INFER_AOI_NAME}{suffix}" if suffix else INFER_AOI_NAME
return GeoPackageNameConvention.create_name(
product_name=product_name,
fold=fold,
scale_factor=1.0,
ground_resolution=None
)
def _queue_coco_generation(self, component: BaseComponent, result: ComponentResult):
"""Queue async COCO generation."""
return generate_future_coco(
future_key=StateKey.INFER_COCO_PATH,
executor=self.background_executor,
component_name=component.name,
component_id=component.component_id,
description=f"{component.name} inference",
gdf=self.data_state.infer_gdf, # Use merged GDF from data_state
tiles_paths_column=Col.TILE_PATH,
polygons_column=Col.GEOMETRY,
scores_column=result.coco_scores_column,
categories_column=result.coco_categories_column,
other_attributes_columns=result.produced_columns - {Col.GEOMETRY, Col.TILE_PATH},
output_path=component.output_path,
use_rle_for_labels=False,
n_workers=4,
coco_categories_list=None
)
# -------------------------------------------------------------------------
# Validation
# -------------------------------------------------------------------------
def _validate_input_data(self, strict_rgb_validation: bool = True) -> None:
"""
Validate input raster/tiles at pipeline start.
Uses utility functions from raster_validation module to check RGB band properties.
Args:
strict_rgb_validation: If True, raise error for missing color interpretation.
If False, only warn if color interpretation is not R,G,B.
Raises:
PipelineValidationError: If validation fails
"""
try:
if self.data_state.imagery_path:
print("Validating input raster bands...")
validate_input_raster_or_tiles(
imagery_path=self.data_state.imagery_path,
strict_color_interp=strict_rgb_validation
)
print("Input raster validation passed")
elif self.data_state.tiles_path:
print("Validating input tiles...")
validate_input_raster_or_tiles(
tiles_path=self.data_state.tiles_path,
strict_color_interp=strict_rgb_validation
)
print("Tile validation passed")
except RasterValidationError as e:
# Convert to PipelineValidationError for consistency
raise PipelineValidationError(str(e))
def _validate_pipeline(self, raise_on_error: bool = True) -> List[str]:
"""
Validate entire pipeline before running.
Simulates state flow through components to catch errors early.
Args:
raise_on_error: If True, raise PipelineValidationError on first error
Returns:
List of all validation errors (empty if valid)
"""
all_errors = []
# Start with initial state
available_state = self._get_initial_state_keys()
available_columns = self._get_initial_columns()
# Simulate running through each component
for i, component in enumerate(self.components):
# Validate component
errors = component.validate(
available_state=available_state,
available_columns=available_columns,
raise_on_error=False,
)
if errors:
all_errors.append(f"Component {i} ({component.name}):")
all_errors.extend(f" {e}" for e in errors)
# Update available state/columns with what this component produces
available_state = available_state | component.produces_state
available_columns = available_columns | component.produces_columns
if all_errors and raise_on_error:
error_msg = "Pipeline validation failed:\n" + "\n".join(all_errors)
raise PipelineValidationError(error_msg)
return all_errors
def _print_flow_chart(self) -> None:
"""
Print a flow chart showing state/column availability through the pipeline.
This visualizes the data flow through all components, showing what each
component requires, produces, and what passes through.
"""
visualizer = PipelineFlowVisualizer(
components=self.components,
initial_state_keys=self._get_initial_state_keys(),
initial_columns=self._get_initial_columns(),
)
visualizer.print()
def _get_initial_state_keys(self) -> Set[str]:
"""Get state keys available at pipeline start."""
available = set()
if self.data_state.imagery_path:
available.add(StateKey.IMAGERY_PATH)
if self.data_state.tiles_path:
available.add(StateKey.TILES_PATH)
if self.data_state.infer_gdf is not None:
available.add(StateKey.INFER_GDF)
if self.data_state.infer_coco_path:
available.add(StateKey.INFER_COCO_PATH)
if self.data_state.product_name:
available.add(StateKey.PRODUCT_NAME)
return available
def _get_initial_columns(self) -> Set[str]:
"""Get GDF columns available at pipeline start."""
if self.data_state.infer_gdf is not None:
return set(self.data_state.infer_gdf.columns)
return set()
|