import logging
from typing import List, Tuple, Dict
from dataclasses import dataclass, field, asdict, fields
import numpy as np
from .commands import Command
[docs]
@dataclass(slots=True)
class ProcessData:
"""Stores data associated with Command execution by a Machine."""
elapsed_time: float = 0.0
location: Tuple[float, float, float] = None
tool_direction: Tuple[float, float, float] = (0, 0, 1)
relative_movement: Tuple[float, float, float] = None
distance: float = 0.0
feed_rate: float = 0.0
[docs]
def to_dict(self):
return {name: getattr(self, name) for name in self.attribute_names()}
[docs]
def attribute_names(self):
"""Convenience function for getting attribute_names"""
return [f.name for f in fields(self)]
[docs]
@dataclass(slots=True)
class AdditiveProcessData(ProcessData):
"""Stores ProcessData associated with Command execution by a Machine for an additive operation."""
deposited_volume: float = 0.0
bead_area: float = None
extruder_type: type = None
[docs]
@dataclass(slots=True)
class SubtractiveProcessData(ProcessData):
"""Stores process_data associated with Command execution by a Machine for a subtractive operation."""
spindle_speed: float = 0.0
removed_volume: float = 0.0
[docs]
class ProcessDataArrays:
"""Columnar (struct-of-arrays) view of a List[ProcessData].
Downstream consumers (motion.py, additive_part.py) iterate the object list to
extract numpy arrays, paying O(N) Python overhead twice. Building this once
after the machine loop lets both consumers read pre-built arrays directly.
Construction is intentionally lazy via from_process_data_list() rather than
being built inside the machine loop so that the ProcessData objects (still
needed for tool callbacks) remain the primary output.
"""
__slots__ = (
"elapsed_time",
"location",
"tool_direction",
"relative_movement",
"distance",
"feed_rate",
"deposited_volume",
"bead_area",
"extruder_type",
"spindle_speed",
"removed_volume",
"_has_additive",
"_has_subtractive",
)
def __init__(self):
for s in self.__slots__:
object.__setattr__(self, s, None)
def __len__(self):
return len(self.elapsed_time)
[docs]
@classmethod
def from_process_data_list(cls, process_data: list) -> "ProcessDataArrays":
"""Build columnar arrays from a List[ProcessData] in a single pass."""
n = len(process_data)
obj = cls()
pd0 = process_data[0]
has_additive = hasattr(pd0, "deposited_volume")
has_subtractive = hasattr(pd0, "spindle_speed")
object.__setattr__(obj, "_has_additive", has_additive)
object.__setattr__(obj, "_has_subtractive", has_subtractive)
elapsed_time = np.empty(n, dtype=np.float64)
location = np.empty((n, 3), dtype=np.float64)
tool_direction = np.empty((n, 3), dtype=np.float64)
relative_movement = np.empty((n, 3), dtype=np.float64)
distance = np.empty(n, dtype=np.float64)
feed_rate = np.empty(n, dtype=np.float64)
if has_additive:
deposited_volume = np.empty(n, dtype=np.float64)
bead_area = np.empty(n, dtype=object)
extruder_type = np.empty(n, dtype=object)
if has_subtractive:
spindle_speed = np.empty(n, dtype=np.float64)
removed_volume = np.empty(n, dtype=np.float64)
_nan3 = (np.nan, np.nan, np.nan)
for i, p in enumerate(process_data):
elapsed_time[i] = p.elapsed_time
loc = p.location if p.location is not None else _nan3
location[i, 0] = loc[0]
location[i, 1] = loc[1]
location[i, 2] = loc[2]
td = p.tool_direction
tool_direction[i, 0] = td[0]
tool_direction[i, 1] = td[1]
tool_direction[i, 2] = td[2]
rm = p.relative_movement if p.relative_movement is not None else _nan3
relative_movement[i, 0] = rm[0]
relative_movement[i, 1] = rm[1]
relative_movement[i, 2] = rm[2]
distance[i] = p.distance
feed_rate[i] = p.feed_rate
if has_additive:
deposited_volume[i] = p.deposited_volume
bead_area[i] = p.bead_area
extruder_type[i] = p.extruder_type
if has_subtractive:
spindle_speed[i] = p.spindle_speed
removed_volume[i] = p.removed_volume
object.__setattr__(obj, "elapsed_time", elapsed_time)
object.__setattr__(obj, "location", location)
object.__setattr__(obj, "tool_direction", tool_direction)
object.__setattr__(obj, "relative_movement", relative_movement)
object.__setattr__(obj, "distance", distance)
object.__setattr__(obj, "feed_rate", feed_rate)
if has_additive:
object.__setattr__(obj, "deposited_volume", deposited_volume)
object.__setattr__(obj, "bead_area", bead_area)
object.__setattr__(obj, "extruder_type", extruder_type)
if has_subtractive:
object.__setattr__(obj, "spindle_speed", spindle_speed)
object.__setattr__(obj, "removed_volume", removed_volume)
return obj
_process_data_map: dict[type, type] = {}
[docs]
def register_process_data(operation_cls: type, process_data_cls: type) -> None:
"""Associate an Operation subclass with the ProcessData subclass it produces.
Args:
operation_cls: The Operation subclass (e.g. AdditiveOperation).
process_data_cls: The ProcessData subclass to instantiate for it.
"""
_process_data_map[operation_cls] = process_data_cls
[docs]
def make_process_data(operation_cls: type) -> "ProcessData":
"""Return a fresh ProcessData instance appropriate for *operation_cls*.
If *operation_cls* is not a class registered in the map (e.g. a caller
accidentally passed an instance), falls back to a plain ``ProcessData``.
"""
if not isinstance(operation_cls, type):
return ProcessData()
return _process_data_map.get(operation_cls, ProcessData)()
[docs]
@dataclass
class Operation:
"""Represents a single manufacturing step.
Holds sequence of commands and stores the resulting process data after simulation or execution by a machine model.
Attributes:
name: A human-readable name for the operation.
commands: A list of `Command` objects that define the machine's actions.
process_data: A list of `ProcessData` objects, populated after processing,
containing detailed information like tool locations and timing.
processed_by_machine: Stores the type of the machine that last processed
this operation.
"""
name: str = None
commands: List[Command] = field(default_factory=list)
process_data: List[ProcessData] = field(default_factory=list)
motion_profile: List = field(default_factory=list)
processed_by_machine: type = None
_arrays: "ProcessDataArrays" = field(default=None, repr=False, compare=False)
@property
def arrays(self) -> "ProcessDataArrays":
"""Columnar view of process_data; built once on first access."""
if self._arrays is None and self.process_data:
self._arrays = ProcessDataArrays.from_process_data_list(self.process_data)
return self._arrays
def _invalidate_arrays(self):
self._arrays = None
def _validate_process_data_type(self):
"""Validates self.process_data by checking if all ProcessData entries have the type name"""
return len({type(entry) for entry in self.process_data}) <= 1
[docs]
def get_total_elapsed_time(self) -> float:
"""Calculates the total elapsed time for the operation.
Returns:
The total elapsed time in seconds as a float.
"""
return sum(
data.elapsed_time if data.elapsed_time is not None else 0.0
for data in self.process_data
)
[docs]
def get_all_locations(self) -> np.ndarray:
"""Gathers all valid (non-None) location coordinates from the process data.
Returns:
A NumPy array of shape (N, 3) containing the [x, y, z] coordinates
for N data points. Returns an empty array if no locations are found.
"""
return np.asarray(
[data.location for data in self.process_data if data.location is not None],
dtype=float,
)
[docs]
def get_bounds(self):
"""Calculates the 3D Axis-aligned bounding box of the operation from its process data.
This method checks if the operation has been processed before attempting to
calculate the bounds. A warning is logged if it has not been processed.
Returns:
A tuple containing the minimum and maximum coordinates:
`((min_x, min_y, min_z), (max_x, max_y, max_z))`.
Returns `None` if the operation has not been processed or contains no location data.
"""
if self.processed_by_machine == None:
logging.warning(
"Operation has not been processed since last modification. Bounds cannot be calculated"
)
return None
locations = self.get_all_locations()
if locations.size == 0:
return None
return tuple(np.min(locations, axis=0)), tuple(np.max(locations, axis=0))
[docs]
def get_statistics(self):
"""Compiles key statistics about the operation into a dictionary."""
stats = dict(
operation_name=self.name,
elapsed_time=0.0,
bounds=None,
n_commands=len(self.commands),
)
# Check for processed flag
if self.processed_by_machine == None:
logging.warning(
"Operation has not been processed since last modification. Statistics cannot be calculated"
)
return None
stats["elapsed_time"] = self.get_total_elapsed_time()
stats["bounds"] = self.get_bounds()
return stats
[docs]
def process_data_to_numpy_dict(
self, bounds: Tuple[int, int] = None
) -> Dict[str, np.ndarray]:
"""Converts process_data to a dict of NumPy arrays, keyed by attribute name.
Args:
bounds: Optional (start, end) index bounds. Defaults to the full list.
Returns:
A dict mapping each ProcessData attribute name to a NumPy array.
Returns an empty dict if the bounds is zero-length.
Raises:
TypeError: If bounds is not a tuple/list, or if process_data contains mixed types.
ValueError: If bounds does not have exactly 2 elements.
IndexError: If bounds are out of range.
"""
if bounds is not None:
if not isinstance(bounds, (tuple, list)):
raise TypeError(
f"bounds must be an iterable (tuple or list) of length 2. I.e. (0,2)"
)
if len(bounds) != 2:
raise ValueError(f"bounds must be iterable of length 2. I.e. (0,2)")
if bounds[1] > len(self.process_data):
raise IndexError(
f"upper bound {bounds[1]} > {len(self.process_data)} length of process_data list"
)
if bounds[0] < 0:
raise IndexError(f"lower bound {bounds[0]} < 0")
if bounds[0] > bounds[1]:
bounds = (bounds[1], bounds[0]) # flip the around
else:
bounds = (0, len(self.process_data))
start, end = bounds
n = end - start
# Validate that all process_data entries are the same type (and thus have the same attributes)
if not self._validate_process_data_type():
types = {type(e).__name__ for e in self.process_data}
raise TypeError(
f"process_data contains mixed types: {types}. All entries must be the same ProcessData subclass."
)
if start == end:
return {}
# Once we get this far we can assume that there is process data and that each entry has the same attributes
process_data_subset = self.process_data[start:end]
process_data_attrs = process_data_subset[0].attribute_names()
arrays = {}
for attr in process_data_attrs:
first_val = next(
(
getattr(item, attr)
for item in process_data_subset
if getattr(item, attr) is not None
),
None,
)
# Skip the attribute if you cannot create a numpy array from it
if not np.isscalar(first_val) and not isinstance(
first_val, (tuple, list, np.ndarray)
):
continue
elif first_val is None:
arrays[attr] = np.full(n, np.nan)
else:
shape = (n, *np.shape(first_val))
arrays[attr] = np.full(shape, np.nan)
# Fill in place
for i, item in enumerate(process_data_subset):
for attr in arrays.keys():
val = getattr(item, attr)
if val is not None:
arrays[attr][i] = val
return arrays
[docs]
@dataclass
class AdditiveOperation(Operation):
"""An `Operation` specialized for additive processes (e.g., 3D printing)."""
pass
[docs]
@dataclass
class SubtractiveOperation(Operation):
"""An `Operation` specialized for subtractive processes (e.g., milling, CNC)."""
pass
# Default associations — new operation types can call register_process_data() themselves.
register_process_data(AdditiveOperation, AdditiveProcessData)
register_process_data(SubtractiveOperation, SubtractiveProcessData)
def _assert_process_data_arrays_covers_subclasses():
# Hard-fail at import if a ProcessData subclass adds a field that
# ProcessDataArrays.__slots__ doesn't carry — silent data drops in
# from_process_data_list() were the failure mode this is guarding.
expected = set()
for cls in (ProcessData, AdditiveProcessData, SubtractiveProcessData):
expected.update(f.name for f in fields(cls))
private_slots = {"_has_additive", "_has_subtractive"}
covered = set(ProcessDataArrays.__slots__) - private_slots
missing = expected - covered
if missing:
raise TypeError(
f"ProcessDataArrays.__slots__ is missing fields {sorted(missing)}; "
f"add them to __slots__ and update from_process_data_list()."
)
_assert_process_data_arrays_covers_subclasses()