Source code for gcode_reader.emulate.operations

import logging
from typing import List, Tuple, Dict
from dataclasses import dataclass, field, asdict, fields

import numpy as np

from .commands import Command


[docs] @dataclass(slots=True) class ProcessData: """Stores data associated with Command execution by a Machine.""" elapsed_time: float = 0.0 location: Tuple[float, float, float] = None tool_direction: Tuple[float, float, float] = (0, 0, 1) relative_movement: Tuple[float, float, float] = None distance: float = 0.0 feed_rate: float = 0.0
[docs] def to_dict(self): return {name: getattr(self, name) for name in self.attribute_names()}
[docs] def attribute_names(self): """Convenience function for getting attribute_names""" return [f.name for f in fields(self)]
[docs] @dataclass(slots=True) class AdditiveProcessData(ProcessData): """Stores ProcessData associated with Command execution by a Machine for an additive operation.""" deposited_volume: float = 0.0 bead_area: float = None extruder_type: type = None
[docs] @dataclass(slots=True) class SubtractiveProcessData(ProcessData): """Stores process_data associated with Command execution by a Machine for a subtractive operation.""" spindle_speed: float = 0.0 removed_volume: float = 0.0
[docs] class ProcessDataArrays: """Columnar (struct-of-arrays) view of a List[ProcessData]. Downstream consumers (motion.py, additive_part.py) iterate the object list to extract numpy arrays, paying O(N) Python overhead twice. Building this once after the machine loop lets both consumers read pre-built arrays directly. Construction is intentionally lazy via from_process_data_list() rather than being built inside the machine loop so that the ProcessData objects (still needed for tool callbacks) remain the primary output. """ __slots__ = ( "elapsed_time", "location", "tool_direction", "relative_movement", "distance", "feed_rate", "deposited_volume", "bead_area", "extruder_type", "spindle_speed", "removed_volume", "_has_additive", "_has_subtractive", ) def __init__(self): for s in self.__slots__: object.__setattr__(self, s, None) def __len__(self): return len(self.elapsed_time)
[docs] @classmethod def from_process_data_list(cls, process_data: list) -> "ProcessDataArrays": """Build columnar arrays from a List[ProcessData] in a single pass.""" n = len(process_data) obj = cls() pd0 = process_data[0] has_additive = hasattr(pd0, "deposited_volume") has_subtractive = hasattr(pd0, "spindle_speed") object.__setattr__(obj, "_has_additive", has_additive) object.__setattr__(obj, "_has_subtractive", has_subtractive) elapsed_time = np.empty(n, dtype=np.float64) location = np.empty((n, 3), dtype=np.float64) tool_direction = np.empty((n, 3), dtype=np.float64) relative_movement = np.empty((n, 3), dtype=np.float64) distance = np.empty(n, dtype=np.float64) feed_rate = np.empty(n, dtype=np.float64) if has_additive: deposited_volume = np.empty(n, dtype=np.float64) bead_area = np.empty(n, dtype=object) extruder_type = np.empty(n, dtype=object) if has_subtractive: spindle_speed = np.empty(n, dtype=np.float64) removed_volume = np.empty(n, dtype=np.float64) _nan3 = (np.nan, np.nan, np.nan) for i, p in enumerate(process_data): elapsed_time[i] = p.elapsed_time loc = p.location if p.location is not None else _nan3 location[i, 0] = loc[0] location[i, 1] = loc[1] location[i, 2] = loc[2] td = p.tool_direction tool_direction[i, 0] = td[0] tool_direction[i, 1] = td[1] tool_direction[i, 2] = td[2] rm = p.relative_movement if p.relative_movement is not None else _nan3 relative_movement[i, 0] = rm[0] relative_movement[i, 1] = rm[1] relative_movement[i, 2] = rm[2] distance[i] = p.distance feed_rate[i] = p.feed_rate if has_additive: deposited_volume[i] = p.deposited_volume bead_area[i] = p.bead_area extruder_type[i] = p.extruder_type if has_subtractive: spindle_speed[i] = p.spindle_speed removed_volume[i] = p.removed_volume object.__setattr__(obj, "elapsed_time", elapsed_time) object.__setattr__(obj, "location", location) object.__setattr__(obj, "tool_direction", tool_direction) object.__setattr__(obj, "relative_movement", relative_movement) object.__setattr__(obj, "distance", distance) object.__setattr__(obj, "feed_rate", feed_rate) if has_additive: object.__setattr__(obj, "deposited_volume", deposited_volume) object.__setattr__(obj, "bead_area", bead_area) object.__setattr__(obj, "extruder_type", extruder_type) if has_subtractive: object.__setattr__(obj, "spindle_speed", spindle_speed) object.__setattr__(obj, "removed_volume", removed_volume) return obj
_process_data_map: dict[type, type] = {}
[docs] def register_process_data(operation_cls: type, process_data_cls: type) -> None: """Associate an Operation subclass with the ProcessData subclass it produces. Args: operation_cls: The Operation subclass (e.g. AdditiveOperation). process_data_cls: The ProcessData subclass to instantiate for it. """ _process_data_map[operation_cls] = process_data_cls
[docs] def make_process_data(operation_cls: type) -> "ProcessData": """Return a fresh ProcessData instance appropriate for *operation_cls*. If *operation_cls* is not a class registered in the map (e.g. a caller accidentally passed an instance), falls back to a plain ``ProcessData``. """ if not isinstance(operation_cls, type): return ProcessData() return _process_data_map.get(operation_cls, ProcessData)()
[docs] @dataclass class Operation: """Represents a single manufacturing step. Holds sequence of commands and stores the resulting process data after simulation or execution by a machine model. Attributes: name: A human-readable name for the operation. commands: A list of `Command` objects that define the machine's actions. process_data: A list of `ProcessData` objects, populated after processing, containing detailed information like tool locations and timing. processed_by_machine: Stores the type of the machine that last processed this operation. """ name: str = None commands: List[Command] = field(default_factory=list) process_data: List[ProcessData] = field(default_factory=list) motion_profile: List = field(default_factory=list) processed_by_machine: type = None _arrays: "ProcessDataArrays" = field(default=None, repr=False, compare=False) @property def arrays(self) -> "ProcessDataArrays": """Columnar view of process_data; built once on first access.""" if self._arrays is None and self.process_data: self._arrays = ProcessDataArrays.from_process_data_list(self.process_data) return self._arrays def _invalidate_arrays(self): self._arrays = None def _validate_process_data_type(self): """Validates self.process_data by checking if all ProcessData entries have the type name""" return len({type(entry) for entry in self.process_data}) <= 1
[docs] def get_total_elapsed_time(self) -> float: """Calculates the total elapsed time for the operation. Returns: The total elapsed time in seconds as a float. """ return sum( data.elapsed_time if data.elapsed_time is not None else 0.0 for data in self.process_data )
[docs] def get_all_locations(self) -> np.ndarray: """Gathers all valid (non-None) location coordinates from the process data. Returns: A NumPy array of shape (N, 3) containing the [x, y, z] coordinates for N data points. Returns an empty array if no locations are found. """ return np.asarray( [data.location for data in self.process_data if data.location is not None], dtype=float, )
[docs] def get_bounds(self): """Calculates the 3D Axis-aligned bounding box of the operation from its process data. This method checks if the operation has been processed before attempting to calculate the bounds. A warning is logged if it has not been processed. Returns: A tuple containing the minimum and maximum coordinates: `((min_x, min_y, min_z), (max_x, max_y, max_z))`. Returns `None` if the operation has not been processed or contains no location data. """ if self.processed_by_machine == None: logging.warning( "Operation has not been processed since last modification. Bounds cannot be calculated" ) return None locations = self.get_all_locations() if locations.size == 0: return None return tuple(np.min(locations, axis=0)), tuple(np.max(locations, axis=0))
[docs] def get_statistics(self): """Compiles key statistics about the operation into a dictionary.""" stats = dict( operation_name=self.name, elapsed_time=0.0, bounds=None, n_commands=len(self.commands), ) # Check for processed flag if self.processed_by_machine == None: logging.warning( "Operation has not been processed since last modification. Statistics cannot be calculated" ) return None stats["elapsed_time"] = self.get_total_elapsed_time() stats["bounds"] = self.get_bounds() return stats
[docs] def process_data_to_numpy_dict( self, bounds: Tuple[int, int] = None ) -> Dict[str, np.ndarray]: """Converts process_data to a dict of NumPy arrays, keyed by attribute name. Args: bounds: Optional (start, end) index bounds. Defaults to the full list. Returns: A dict mapping each ProcessData attribute name to a NumPy array. Returns an empty dict if the bounds is zero-length. Raises: TypeError: If bounds is not a tuple/list, or if process_data contains mixed types. ValueError: If bounds does not have exactly 2 elements. IndexError: If bounds are out of range. """ if bounds is not None: if not isinstance(bounds, (tuple, list)): raise TypeError( f"bounds must be an iterable (tuple or list) of length 2. I.e. (0,2)" ) if len(bounds) != 2: raise ValueError(f"bounds must be iterable of length 2. I.e. (0,2)") if bounds[1] > len(self.process_data): raise IndexError( f"upper bound {bounds[1]} > {len(self.process_data)} length of process_data list" ) if bounds[0] < 0: raise IndexError(f"lower bound {bounds[0]} < 0") if bounds[0] > bounds[1]: bounds = (bounds[1], bounds[0]) # flip the around else: bounds = (0, len(self.process_data)) start, end = bounds n = end - start # Validate that all process_data entries are the same type (and thus have the same attributes) if not self._validate_process_data_type(): types = {type(e).__name__ for e in self.process_data} raise TypeError( f"process_data contains mixed types: {types}. All entries must be the same ProcessData subclass." ) if start == end: return {} # Once we get this far we can assume that there is process data and that each entry has the same attributes process_data_subset = self.process_data[start:end] process_data_attrs = process_data_subset[0].attribute_names() arrays = {} for attr in process_data_attrs: first_val = next( ( getattr(item, attr) for item in process_data_subset if getattr(item, attr) is not None ), None, ) # Skip the attribute if you cannot create a numpy array from it if not np.isscalar(first_val) and not isinstance( first_val, (tuple, list, np.ndarray) ): continue elif first_val is None: arrays[attr] = np.full(n, np.nan) else: shape = (n, *np.shape(first_val)) arrays[attr] = np.full(shape, np.nan) # Fill in place for i, item in enumerate(process_data_subset): for attr in arrays.keys(): val = getattr(item, attr) if val is not None: arrays[attr][i] = val return arrays
[docs] @dataclass class AdditiveOperation(Operation): """An `Operation` specialized for additive processes (e.g., 3D printing).""" pass
[docs] @dataclass class SubtractiveOperation(Operation): """An `Operation` specialized for subtractive processes (e.g., milling, CNC).""" pass
# Default associations — new operation types can call register_process_data() themselves. register_process_data(AdditiveOperation, AdditiveProcessData) register_process_data(SubtractiveOperation, SubtractiveProcessData) def _assert_process_data_arrays_covers_subclasses(): # Hard-fail at import if a ProcessData subclass adds a field that # ProcessDataArrays.__slots__ doesn't carry — silent data drops in # from_process_data_list() were the failure mode this is guarding. expected = set() for cls in (ProcessData, AdditiveProcessData, SubtractiveProcessData): expected.update(f.name for f in fields(cls)) private_slots = {"_has_additive", "_has_subtractive"} covered = set(ProcessDataArrays.__slots__) - private_slots missing = expected - covered if missing: raise TypeError( f"ProcessDataArrays.__slots__ is missing fields {sorted(missing)}; " f"add them to __slots__ and update from_process_data_list()." ) _assert_process_data_arrays_covers_subclasses()