Skip to content

DataState

Source code in canopyrs/engine/data_state.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@dataclass
class DataState:
    imagery_path: str = None
    parent_output_path: str = None
    product_name: str = None  # Derived from imagery filename or "tiled_input" if only tiles

    tiles_path: str = None

    infer_coco_path: str = None
    infer_gdf: gpd.GeoDataFrame = None
    infer_gdf_columns_to_pass: set = field(default_factory=set)
    infer_gdf_columns_to_delete_on_save: List = field(default_factory=list)

    background_executor: Optional = None
    side_processes: List = field(default_factory=list)

    component_output_folders: Dict = field(default_factory=dict)
    component_output_files: Dict = field(default_factory=dict)

    def update_infer_gdf(self, infer_gdf: gpd.GeoDataFrame) -> None:
        assert isinstance(infer_gdf, gpd.GeoDataFrame)
        assert object_id_column_name in infer_gdf.columns, f"Columns of the infer_gdf must contain a '{object_id_column_name}'."
        self.infer_gdf = infer_gdf

    def register_component_folder(self, component_name: str, component_id: int, folder_path: Path) -> None:
        """
        Register the output folder for a component.
        """
        key = get_component_folder_name(component_id, component_name)
        self.component_output_folders[key] = folder_path

    def register_output_file(self, component_name: str, component_id: int, file_type: str, file_path: Path) -> None:
        """
        Register an output file created by a component.
        """
        key = get_component_folder_name(component_id, component_name)

        if key not in self.component_output_files:
            self.component_output_files[key] = {}

        self.component_output_files[key][file_type] = file_path

    def get_component_folder(self, component_name: str, component_id: int) -> Optional[Path]:
        """Get the output folder for a specific component."""
        key = get_component_folder_name(component_id, component_name)
        return self.component_output_folders.get(key)

    def get_output_file(self, component_name: str, component_id: int, file_type: str) -> Optional[Path]:
        """Get a specific output file from a component."""
        key = get_component_folder_name(component_id, component_name)
        if key in self.component_output_files:
            return self.component_output_files[key].get(file_type)
        return None

    def get_latest_output_by_type(self, file_type: str) -> Optional[Path]:
        """Get the most recent output file of a specific type from any component."""
        latest_id = -1
        latest_path = None

        for key, files in self.component_output_files.items():
            if file_type in files:
                component_id = int(key.split('_')[0])
                if component_id > latest_id:
                    latest_id = component_id
                    latest_path = files[file_type]

        return latest_path

    def get_all_outputs(self) -> Dict:
        """Get all registered output files organized by component."""
        return self.component_output_files

    def clean_side_processes(self, key: str = None):
        """
        Wait for and process side processes.

        Args:
            key: If provided, only process side processes for this attribute.
                 If None, process all side processes.
        """
        to_process = []
        to_keep = []

        for side_process in self.side_processes:
            if key is None or (isinstance(side_process, tuple) and side_process[0] == key):
                to_process.append(side_process)
            else:
                to_keep.append(side_process)

        self.side_processes = to_keep

        for side_process in to_process:
            if isinstance(side_process, tuple):
                attribute_name = side_process[0]
                future_or_result = side_process[1]

                # Check if this is a Future object with a .result() method
                if hasattr(future_or_result, 'result'):
                    result = future_or_result.result()
                else:
                    result = future_or_result  # It's already a result

                # Update the data_state attribute
                if attribute_name:
                    setattr(self, attribute_name, result)

                # If there's registration info, register the output file
                if len(side_process) > 2 and isinstance(side_process[2], dict):
                    reg_info = side_process[2]

                    # If an expected_path was provided, use it
                    if 'expected_path' in reg_info:
                        file_path = Path(reg_info['expected_path'])
                    # Otherwise try to get a path from the result
                    elif isinstance(result, (str, Path)):
                        file_path = Path(result)
                    else:
                        file_path = None

                    if file_path:
                        # Register the component folder first
                        self.register_component_folder(
                            reg_info['component_name'],
                            reg_info['component_id'],
                            file_path.parent
                        )
                        # Then register the file
                        self.register_output_file(
                            reg_info['component_name'],
                            reg_info['component_id'],
                            reg_info['file_type'],
                            file_path
                        )

        # Clear processed side processes
        self.side_processes = []

        return self

clean_side_processes(key=None)

Wait for and process side processes.

Parameters:

Name Type Description Default
key str

If provided, only process side processes for this attribute. If None, process all side processes.

None
Source code in canopyrs/engine/data_state.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def clean_side_processes(self, key: str = None):
    """
    Wait for and process side processes.

    Args:
        key: If provided, only process side processes for this attribute.
             If None, process all side processes.
    """
    to_process = []
    to_keep = []

    for side_process in self.side_processes:
        if key is None or (isinstance(side_process, tuple) and side_process[0] == key):
            to_process.append(side_process)
        else:
            to_keep.append(side_process)

    self.side_processes = to_keep

    for side_process in to_process:
        if isinstance(side_process, tuple):
            attribute_name = side_process[0]
            future_or_result = side_process[1]

            # Check if this is a Future object with a .result() method
            if hasattr(future_or_result, 'result'):
                result = future_or_result.result()
            else:
                result = future_or_result  # It's already a result

            # Update the data_state attribute
            if attribute_name:
                setattr(self, attribute_name, result)

            # If there's registration info, register the output file
            if len(side_process) > 2 and isinstance(side_process[2], dict):
                reg_info = side_process[2]

                # If an expected_path was provided, use it
                if 'expected_path' in reg_info:
                    file_path = Path(reg_info['expected_path'])
                # Otherwise try to get a path from the result
                elif isinstance(result, (str, Path)):
                    file_path = Path(result)
                else:
                    file_path = None

                if file_path:
                    # Register the component folder first
                    self.register_component_folder(
                        reg_info['component_name'],
                        reg_info['component_id'],
                        file_path.parent
                    )
                    # Then register the file
                    self.register_output_file(
                        reg_info['component_name'],
                        reg_info['component_id'],
                        reg_info['file_type'],
                        file_path
                    )

    # Clear processed side processes
    self.side_processes = []

    return self

get_all_outputs()

Get all registered output files organized by component.

Source code in canopyrs/engine/data_state.py
78
79
80
def get_all_outputs(self) -> Dict:
    """Get all registered output files organized by component."""
    return self.component_output_files

get_component_folder(component_name, component_id)

Get the output folder for a specific component.

Source code in canopyrs/engine/data_state.py
52
53
54
55
def get_component_folder(self, component_name: str, component_id: int) -> Optional[Path]:
    """Get the output folder for a specific component."""
    key = get_component_folder_name(component_id, component_name)
    return self.component_output_folders.get(key)

get_latest_output_by_type(file_type)

Get the most recent output file of a specific type from any component.

Source code in canopyrs/engine/data_state.py
64
65
66
67
68
69
70
71
72
73
74
75
76
def get_latest_output_by_type(self, file_type: str) -> Optional[Path]:
    """Get the most recent output file of a specific type from any component."""
    latest_id = -1
    latest_path = None

    for key, files in self.component_output_files.items():
        if file_type in files:
            component_id = int(key.split('_')[0])
            if component_id > latest_id:
                latest_id = component_id
                latest_path = files[file_type]

    return latest_path

get_output_file(component_name, component_id, file_type)

Get a specific output file from a component.

Source code in canopyrs/engine/data_state.py
57
58
59
60
61
62
def get_output_file(self, component_name: str, component_id: int, file_type: str) -> Optional[Path]:
    """Get a specific output file from a component."""
    key = get_component_folder_name(component_id, component_name)
    if key in self.component_output_files:
        return self.component_output_files[key].get(file_type)
    return None

register_component_folder(component_name, component_id, folder_path)

Register the output folder for a component.

Source code in canopyrs/engine/data_state.py
34
35
36
37
38
39
def register_component_folder(self, component_name: str, component_id: int, folder_path: Path) -> None:
    """
    Register the output folder for a component.
    """
    key = get_component_folder_name(component_id, component_name)
    self.component_output_folders[key] = folder_path

register_output_file(component_name, component_id, file_type, file_path)

Register an output file created by a component.

Source code in canopyrs/engine/data_state.py
41
42
43
44
45
46
47
48
49
50
def register_output_file(self, component_name: str, component_id: int, file_type: str, file_path: Path) -> None:
    """
    Register an output file created by a component.
    """
    key = get_component_folder_name(component_id, component_name)

    if key not in self.component_output_files:
        self.component_output_files[key] = {}

    self.component_output_files[key][file_type] = file_path