Source code for gcode_reader.emulate.machine

import functools
import logging
import time
from math import sqrt, radians, cos, sin
from typing import List, Tuple
import collections.abc

import pandas as pd
import numpy as np

from ..constants import ZERO_TOLERANCE

from .. import analysis
from . import machine_options
from . import motion
from .tools import Tool
from .operations import (
    Operation,
    AdditiveOperation,
    SubtractiveOperation,
    AdditiveProcessData,
    SubtractiveProcessData,
    ProcessData,
    make_process_data,
    _process_data_map,
)
from .parsers import GcodeParser
from .exporters import GcodeExporter
from . import commands
from .commands import (
    Command,
    Config,
    Move,
    FeedRate,
    Dwell,
)
from .registry import register


[docs] @register("default") class Machine: """An emulation environment for a generic CNC machine. This class acts as an orchestrator for processing sequences of machine commands (Operations). It maintains the machine's state, such as position and feed rate, and applies kinematic models (motion profiles) to each movement. It is designed to be configurable with different tools, parsers, exporters, and machine-specific options. Attributes: options (machine_options.MachineOptions): The configuration object containing parameters like max feedrate and acceleration. tools (Tuple[Tool, ...]): A tuple of tool objects attached to the machine. parser (GcodeParser): The parser instance used to convert G-code files into Command objects. exporter (GcodeExporter): The exporter instance used to convert Command objects back into G-code strings or other formats. home (Tuple[float, float, float]): The machine's home coordinates (X, Y, Z). motion_profile_type (type): The motion profile class (e.g., Trapezoid) used to calculate movement kinematics. """ def __init__( self, tools: Tuple = None, parser: GcodeParser = GcodeParser(), exporter: GcodeExporter = GcodeExporter(), **options, ): """Initializes a new Machine instance. Args: tools (Tuple[Tool, ...], optional): A tuple of Tool instances for the machine to use. Defaults to None. parser (GcodeParser, optional): An instance of a GcodeParser. Defaults to a new GcodeParser(). exporter (GcodeExporter, optional): An instance of a GcodeExporter. Defaults to a new GcodeExporter(). **options: Keyword arguments passed to the constructor of the machine's `MachineOptions` type. """ self._options_type: type = machine_options.MachineOptions self._options: machine_options.MachineOptions = None self._tools: List[Tool] = None self._parser: GcodeParser = None self._exporter: GcodeExporter = None self._home: Tuple[float, float, float] = (0, 0, 0) self._motion_profile: motion.MotionProfile = motion.MotionProfile() # use the setter functions here to enforce types # 'options' must be last to correctly propagate settings accross machine components. self.parser = parser self.exporter = exporter self.tools = tools self.options = options # State management self._history: List[ProcessData] = [] self._last_location: Tuple[float, float, float] = self._home self._last_feed_rate: float = 0.0 self._absolute_position = True self._tool_direction: Tuple[float, float, float] = (0, 0, 1) # Command process helpers. The order determines order of operations self.process_command_helpers = ( self._config, self._move, self._dwell, self._tool_rotate, ) # Coordinate system and positioning self._coordinate_system = "machine" self._work_offset = (0, 0, 0) self.feed_rate_time_base_s = 1.0 # multiplier to convert time unit to seconds (for unit/min conversion mainly) self.unit_per_meter = 1.0 # multiplier to convert length unit to meters
[docs] def clear_history(self): """Clears this Machine's command history.""" self._history = [] self._last_location = self.home self._last_feed_rate = 0.0 self._tool_direction = (0, 0, 1) for tool in self.tools or (): tool.reset_history()
# ################################################################## # # Properties # # # ################################################################## @property def options(self): return self._options @options.setter def options(self, values): if isinstance(values, dict): self._options = self._options_type(**values) elif isinstance(values, self._options_type): self._options = values else: raise TypeError( f"{type(self).__name__} expects options of type {self._options_type}. Received {type(values).__name__}" ) for tool in self.tools or (): tool.options = self._options self.parser.syntax = self.options.syntax self.exporter.syntax = self.options.syntax @property def tools(self): return self._tools @tools.setter def tools(self, value): if isinstance(value, collections.abc.Iterable): self._tools = tuple(value) elif isinstance(value, Tool): self._tools = (value,) elif value is None: self._tools = () else: raise TypeError( f"Machine.tools.setter: Unknown type receivied {type(value).__name__} for tools." ) for tool in self._tools: tool.options = self.options @property def parser(self): return self._parser @parser.setter def parser(self, value): if not isinstance(value, GcodeParser): raise TypeError( f"Machine.parser.setter: Unknown type receivied {type(value).__name__} for parser." ) self._parser = value # NOTE: DO not sync the parser syntax with self.options.syntax here. This allows someone to override the default parser @property def exporter(self): return self._exporter @exporter.setter def exporter(self, value): if not isinstance(value, GcodeExporter): raise TypeError( f"Machine.exporter.setter: Unknown type receivied {type(value).__name__} for exporter. Ignoring" ) self._exporter = value # NOTE: DO not sync the exporter syntax with self.options.syntax here. This allows someone to override the default exporter @property def home(self): return self._home @home.setter def home(self, location): self._home = tuple(location) @property def motion_profile(self): return self._motion_profile @motion_profile.setter def motion_profile(self, profile: type): if profile is None or not isinstance(profile, motion.MotionProfile): raise TypeError( f"Machine.motion_profile.setter: Unkown motion profile type. Received ({profile}). Expected instance of gcode_reader.emulate.motion.MotionProfile" ) self._motion_profile = profile @property def coordinate_system(self): return self._coordinate_system @coordinate_system.setter def coordinate_system(self, val: str): if not isinstance(val, str): raise TypeError(f"{type(self).__name__}:coordinate_system must be type str") clean_val = val.lower().strip() if not clean_val in ["machine", "work"]: raise ValueError( f'{type(self).__name__}:coordinate_system must be one of ("machine", "gcode")' ) self._coordinate_system = clean_val @property def work_offset(self): return self._work_offset @work_offset.setter def work_offset(self, val: tuple): if not isinstance(val, (tuple, list, np.ndarray)): raise TypeError( f"{type(self).__name__}:work_offset must be type tuple or list" ) if not len(val) == 3: raise IndexError( f"{type(self).__name__}:work_offset must have length 3. I.e. (10,0,0)" ) self._work_offset = tuple([float(v) for v in val]) @property def absolute_position(self): return self._absolute_position @absolute_position.setter def absolute_position(self, val): if not isinstance(val, bool): raise TypeError( f"{type(self).__name__}:absolute_position must be type bool" ) self._absolute_position = val @property def incremental_position(self): return not self.absolute_position @incremental_position.setter def incremental_position(self, val): if not isinstance(val, bool): raise TypeError( f"{type(self).__name__}:incremental_position must be type bool" ) self._absolute_position = not val # ################################################################# # # Parsers & Exporters # # # ##################################################################
[docs] def gcode_file_to_operation(self, filepath: str, operation_type: type = Operation): """Parses a G-code file and returns a processed Operation. This method uses the machine's configured parser to read a G-code file into a list of commands, wraps them in the specified `operation_type`, and then processes the operation to populate its process data. Args: filepath (str): The path to the G-code file. operation_type (type, optional): The class of Operation to create (e.g., `Operation`, `AdditiveOperation`). Defaults to `Operation`. Returns: Operation: A new, processed Operation instance. Raises: ValueError: If `operation_type` is not a valid subclass of `Operation`. """ if not issubclass(operation_type, Operation): raise ValueError( f"operation_type must be one of (Operation, AdditiveOperation, SubtractiveOperation). Recieved {operation_type}" ) commands = self.parser.gcode_file_to_commands(filepath) operation = operation_type(name=filepath, commands=commands) return self.process_operation(operation)
[docs] def operation_to_dataframe(self, operation: Operation) -> pd.DataFrame: """Processes an Operation and converts it into a Pandas DataFrame. This method ensures an operation is fully processed, then converts each command and its corresponding process data into a pandas Series. These are combined into a single DataFrame for analysis. Args: operation (Operation): The operation to be converted. Returns: pd.DataFrame: A DataFrame where each row represents a command and its associated process data. """ # NOTE: Keeping this in the machine emulator in case we need machine specific # implementations or data cleaning operation = self.process_operation(operation) command_data = [ self.exporter.command_to_dict(command) for command in operation.commands ] df = pd.DataFrame(command_data) # Receive dictionary of process data arrays. This will convert readily to DataFrame columns process_data_dict = operation.process_data_to_numpy_dict() for key, val in process_data_dict.items(): # We want to split out tuple/list data to its components if len(val.shape) > 1 and val.shape[1] > 1: for j in range(val.shape[1]): df[f"{key}_{j}"] = val[:, j] else: df[key] = val return df
[docs] def write_gcode_file( self, filepath: str, operation: Operation, overwrite: bool = False ): """Processes an Operation and writes the resulting G-code to a file. This method processes the sequence of commands in the operation and uses the exporter to generate a formatted G-code string for each one. The resulting lines are then written to the specified file. WARNING: ALWAYS REVIEW GENERATED G-CODE BEFORE FOR CORRECTNESS AND SAFETY BEFORE EXECUTING ON A MACHINE. Args: filepath (str): The path to the output file. operation (Operation): The operation containing the commands to write. overwrite (bool, optional): If True, an existing file at the path will be overwritten. If False (default), the operation will fail if the file already exists. """ # NOTE: Restrict G-code file writing to operations that were processed by this machine only. # this does not completely eliminate risk of someone processing an operation with another # machine and then writing a G-code file to this machine if operation.processed_by_machine != type(self): raise TypeError( f"Mismatched machine types. This writer belongs to a " f"'{type(self).__name__}' machine, but the operation was processed " f"by a '{operation.processed_by_machine.__name__}' machine. " ) mode = "x" if overwrite: mode = "w" self.process_operation(operation) gcode_lines = [ self.exporter.command_to_gcode_line(command) for command in operation.commands ] try: with open(filepath, mode) as file: file.writelines(gcode_lines) logging.warning( f"G-code file written to: {filepath}.\n\nIMPORTANT: ALWAYS REVIEW GENERATED G-CODE BEFORE FOR CORRECTNESS AND SAFETY BEFORE EXECUTING ON A MACHINE." ) except FileExistsError as exc: logging.error( f"Machine.write_gcode_file. File ({filepath}) already exists. Cannot write with mode={mode}. Exception: {exc}" ) raise exc except Exception as exc: logging.error(f"Machine.write_gcode_file. Unknown Exception: {exc}") raise exc
[docs] def export_operation_event_series( self, operation: Operation, spatial_scale_factor: float = 1e-3, destination: str = None, ): """Write a process event series for an Operation Args: operation (Operation): The Operation object to be processed. It should have been created or processed by this same machine type. spatial_scale_factor (float, optional): A multiplicative factor to convert spatial units from the Operation (typically millimeters) to the desired output units (typically meters). Defaults to 1e-3. destination (str | None, optional): The full path for the output file. If `None`, a path is automatically generated from the operation's name, e.g., 'MyPart_event_series.aes'. Defaults to None. Returns: pandas.DataFrame: A DataFrame containing the event series. """ t0 = time.perf_counter() logging.debug(f"Machine.export_operation_event_series") operation = self.process_operation(operation) event_series = analysis.operation_to_event_series( operation=operation, spatial_scale_factor=spatial_scale_factor, ) if destination is None: name = "operation" if operation.name is not None: import os.path name, _ = os.path.splitext(operation.name) destination = f"{name}_event_series.aes" logging.debug(f"\tWriting CSV File to {destination}") event_series.to_csv(destination, header=False, index=False) logging.debug( f"...Machine.export_operation_event_series Completed. Total Elapsed time: {time.perf_counter() - t0:0.3f} (s)" ) return event_series
# ################################################################## # # Command and Operation Processing # # # ##################################################################
[docs] def process_operation( self, operation: Operation, force=False, skip_tool=False ) -> Operation: """Fully processes an Operation, calculating all derived data. This is the main entry point for simulating an operation. It first calculates basic process data (location, distance, etc.) for each command. It then applies the machine-wide motion profile to calculate kinematic data like elapsed time. It avoids re-processing an operation that has already been processed by this same machine instance unless `force` is True. Args: operation: The operation to process. force: If True, re-processes the operation even if it has been processed by this machine before. Defaults to False. skip_tool: If True, skips tool processing during the command processing phase. Useful for two-pass processing where tools depend on motion profile data (e.g., elapsed_time) that isn't available until after the first pass. Defaults to False. Returns: Operation: A new, fully processed Operation instance. """ this_machine = type(self) previous_machine = operation.processed_by_machine operation_type = type(operation) if not force and previous_machine == this_machine: logging.debug( f"Operation {operation.name} already processed by {this_machine}. Returning unmodified." ) return operation # NOTE: Currently not allowing conversion between machines! (unless forced) elif previous_machine is not None and previous_machine != type(self): if not force: raise TypeError( f"Cannot re-process operation {operation.name} with a different machine. This operation was " f"processed by '{previous_machine.__name__}' and cannot be re-processed by '{this_machine.__name__}'." ) elif force: logging.warning( f"Force re-processing enabled for operation '{operation.name}'." f"Operation was previously processed by '{previous_machine.__name__}' and is now being processed by '{this_machine.__name__}'." f"IMPORTANT: ALWAYS REVIEW OUTPUT FOR CORRECTNESS AND SAFETY BEFORE APPLYING TO A REAL-WORLD MACHINE." ) logging.debug( f"{operation_type.__name__} {operation.name} being processed by {this_machine}." ) process_data = self._process_commands_batch( operation.commands, operation_type, skip_tool ) operation = operation_type( name=operation.name, commands=operation.commands, process_data=process_data, processed_by_machine=this_machine, ) # The motion profile needs the full set of process data since it uses the previous and next movement to determine time estimates operation = self.apply_motion_profile_to_operation(operation) return operation
def _process_commands_batch(self, cmd_list, operation_type, skip_tool=False): """Process all commands in one pass with inlined move math. Replaces the per-command process_command loop for the base Machine class. Eliminates ~5N Python function-call round-trips (_config, _move, _dwell, _tool_rotate, make_process_data) by inlining Move math and reusing pre- allocated ProcessData objects. Non-move commands still call self._config() so machine subclass overrides remain active. Tool processing is interleaved in command order so tool-state changes (e.g. extruder on/off) apply to exactly the right moves. """ N = len(cmd_list) if N == 0: return [] # Subclass helpers beyond the 4 base ones (e.g. MasterPrint._handle_ca_virtual_axis). # Called per-command so overrides like CA detection remain active. _base_helpers = {self._config, self._move, self._dwell, self._tool_rotate} extra_helpers = [ h for h in self.process_command_helpers if h not in _base_helpers ] loc_dim = len(self._last_location) work_offset = self._work_offset # tuple — re-read after each _config call # Local state copies — avoid repeated property access in the hot loop is_absolute = self._absolute_position is_work_coords = self._coordinate_system == "work" last_location = list(self._last_location) # mutable for in-place update last_feed_rate = self._last_feed_rate tool_direction = list(self._tool_direction) do_tools = bool(self.tools) and not skip_tool process_data_cls = _process_data_map.get(operation_type, ProcessData) process_data_list = [process_data_cls() for _ in range(N)] for i, cmd in enumerate(cmd_list): process_data_entry = process_data_list[i] if isinstance(cmd, Move): # ── Hot path: inline _move logic ────────────────────────────── raw_loc = cmd.location n_axes = len(raw_loc) prev0, prev1, prev2 = ( last_location[0], last_location[1], last_location[2], ) for j in range(loc_dim): v = raw_loc[j] if j < n_axes else None if v is not None: last_location[j] = ( v + (work_offset[j] if is_work_coords else 0.0) if is_absolute else last_location[j] + v ) dx = last_location[0] - prev0 dy = last_location[1] - prev1 dz = last_location[2] - prev2 if loc_dim > 2 else 0.0 dist = sqrt(dx * dx + dy * dy + dz * dz) process_data_entry.location = ( last_location[0], last_location[1], last_location[2], ) process_data_entry.relative_movement = (dx, dy, dz) process_data_entry.distance = dist cmd_feed_rate = cmd.feed_rate if cmd_feed_rate is not None: last_feed_rate = cmd_feed_rate process_data_entry.feed_rate = last_feed_rate else: # ── Non-move: propagate state, call _config ──────────────────── process_data_entry.location = ( last_location[0], last_location[1], last_location[2], ) process_data_entry.feed_rate = last_feed_rate process_data_entry.distance = 0.0 process_data_entry.relative_movement = (0.0, 0.0, 0.0) self._config(cmd, process_data_entry) # Re-sync after _config (subclass may have changed machine state) is_absolute = self._absolute_position is_work_coords = self._coordinate_system == "work" work_offset = self._work_offset if isinstance(cmd, Dwell): process_data_entry.elapsed_time = cmd.time_s # ── Inline _tool_rotate ──────────────────────────────────────────── settings = getattr(cmd, "settings", None) if settings: rotation = settings.get("rotation") if rotation is not None and len(rotation) == 3: A = float(rotation[0]) if rotation[0] is not None else 0.0 B = float(rotation[1]) if rotation[1] is not None else 0.0 C = float(rotation[2]) if rotation[2] is not None else 0.0 tool_direction = list(self.abc_to_tool_direction(A, B, C)) process_data_entry.tool_direction = ( tool_direction[0], tool_direction[1], tool_direction[2], ) # ── Subclass extra helpers (e.g. MasterPrint._handle_ca_virtual_axis) ── for helper in extra_helpers: process_data_entry = helper(cmd, process_data_entry) # ── Tool processing (in command order, so tool state is correct) ─── if do_tools: for tool in self.tools: process_data_entry = tool.process_command(cmd, process_data_entry) process_data_list[i] = process_data_entry self._history.append(cmd) # Update machine state self._absolute_position = is_absolute self._last_location = tuple(last_location) self._last_feed_rate = last_feed_rate self._tool_direction = tuple(tool_direction) return process_data_list
[docs] def process_command( self, command: commands.Command, operation_type=Operation, skip_tool=False ) -> ProcessData: """Processes a single command to generate its ProcessData. This method dispatches a command to the appropriate internal helper (_move, _dwell, etc.) and optionally to any attached tools to calculate derived data like new position, distance traveled, and tool-specific metrics. Args: command: The command to process. operation_type: The parent operation's type, used to determine the type of ProcessData to create. Defaults to Operation. skip_tool: If True, skips tool processing. Defaults to False. Returns: ProcessData: The populated data object for the command. """ process_data = make_process_data(operation_type) for helper_fn in self.process_command_helpers: process_data = helper_fn(command, process_data) # Tools will process tool specific commands such as Extrude, Mill, etc if self.tools is not None and not skip_tool: for tool in self.tools or (): process_data = tool.process_command(command, process_data) self._history.append(command) return process_data
[docs] @staticmethod @functools.lru_cache(maxsize=512) def abc_to_tool_direction(A: float, B: float, C: float) -> tuple: """Convert 5-axis A/B/C Euler angles (degrees) to a unit tool direction vector. Rotations about X (A), Y (B), Z (C) are applied in that order from Z-up. Reference Image: https://www.mastercam.com/wp-content/uploads/2021/09/axes.png Args: A: Rotation about X-axis in degrees. B: Rotation about Y-axis in degrees. C: Rotation about Z-axis in degrees. Returns: Normalised (x, y, z) tuple representing the tool direction. """ import numpy as np A_rad, B_rad, C_rad = radians(A), radians(B), radians(C) cos_a, sin_a = cos(A_rad), sin(A_rad) cos_b, sin_b = cos(B_rad), sin(B_rad) cos_c, sin_c = cos(C_rad), sin(C_rad) Rx = np.array([[1, 0, 0], [0, cos_a, -sin_a], [0, sin_a, cos_a]]) Ry = np.array([[cos_b, 0, sin_b], [0, 1, 0], [-sin_b, 0, cos_b]]) Rz = np.array([[cos_c, -sin_c, 0], [sin_c, cos_c, 0], [0, 0, 1]]) v = Rx @ Ry @ Rz @ np.array([0.0, 0.0, 1.0]) norm = np.linalg.norm(v) return tuple(v / norm) if norm > 1e-10 else (0.0, 0.0, 1.0)
def _tool_rotate(self, command: commands.Command, process_data: ProcessData): """Updates tool direction from A/B/C rotation angles encoded in a command.""" settings = getattr(command, "settings", None) rotation = settings.get("rotation") if settings else None if rotation is not None and len(rotation) == 3: A = float(rotation[0]) if rotation[0] is not None else 0.0 B = float(rotation[1]) if rotation[1] is not None else 0.0 C = float(rotation[2]) if rotation[2] is not None else 0.0 self._tool_direction = self.abc_to_tool_direction(A, B, C) process_data.tool_direction = self._tool_direction return process_data def _config(self, command: commands.Config, process_data: ProcessData): """Processes Machine state configuration commands""" # NOTE: Only universally applicable configuration commands should be handled within the base Machine _config function if isinstance(command, commands.FeedRate) and command.code is not None: process_data.feed_rate = float(command.code) elif isinstance(command, commands.AbsolutePosition): self.absolute_position = True elif isinstance(command, commands.IncrementalPosition): self.absolute_position = False elif isinstance(command, commands.WorkCoordinates): self.coordinate_system = "work" elif isinstance(command, commands.MachineCoordinates): self.coordinate_system = "machine" return process_data def _dwell(self, command: Dwell, process_data: ProcessData): """Processes a Dwell command, setting its elapsed time.""" if isinstance(command, Dwell): process_data.elapsed_time = command.time_s return process_data def _move(self, command: commands.Move, process_data: ProcessData): """Processes a Move command to calculate new position and distance.""" # We always sync current machine location and feed rate here if not isinstance(command, commands.Move): process_data.location = self._last_location process_data.feed_rate = self._last_feed_rate return process_data # Determine which coordinate system we are using: is_work_coords = self.coordinate_system == "work" # Location last_location = self._last_location current_location = tuple( ( ( command.location[i] + ( self.work_offset[i] if is_work_coords else 0.0 ) # Only apply the work offset if we are in "work" coordinate mode if self.absolute_position else last_location[i] + command.location[i] ) # only if the G-code provided a coordinate for this axis, otherwise don't update axis if i < len(command.location) and command.location[i] is not None else last_location[i] ) for i in range(len(last_location)) ) process_data.location = current_location # Distance calculation if current_location != last_location: delta = tuple([a - b for a, b in zip(current_location, last_location)]) process_data.relative_movement = delta sum_squares = sum(dx**2.0 for dx in delta) process_data.distance = sqrt(sum_squares) else: process_data.relative_movement = (0, 0, 0) process_data.distance = 0 if command.feed_rate is not None: process_data.feed_rate = command.feed_rate else: process_data.feed_rate = self._last_feed_rate # Update state self._last_location = process_data.location self._last_feed_rate = process_data.feed_rate return process_data
[docs] def get_motion_limits(self): """Retrieves and prepares motion limits from the machine's options. Returns: dict: A dictionary containing 'velocity', 'acceleration', and 'jerk' limits, with velocity converted to units/sec. """ velocity_limit = None if self.options.max_feedrate is not None: velocity_limit = self.options.max_feedrate / self.feed_rate_time_base_s acceleration_limit = self.options.max_acceleration jerk_limit = self.options.max_jerk logging.debug( f"Machine motion limits ({velocity_limit, acceleration_limit, jerk_limit})" ) if velocity_limit is not None: self._motion_profile.max_velocity = velocity_limit if acceleration_limit is not None: self._motion_profile.max_acceleration = acceleration_limit if jerk_limit is not None: self._motion_profile.max_jerk = jerk_limit return dict( velocity=velocity_limit, acceleration=acceleration_limit, jerk=jerk_limit )
[docs] def apply_motion_profile_to_operation(self, operation: Operation): limits = self.get_motion_limits() # fetch limits before running the profile return self._motion_profile.from_operation(operation, self.home)
[docs] def translate_command(self, command: commands.Command, old_meta: ProcessData): """Translates a command for a different machine context. This helper function is intended to be called when transferring an operation processed by one machine to another. It allows for machine- specific adjustments to the command itself. Args: command (Command): The command to translate. old_meta (ProcessData): The process data of the command as calculated by the previous machine. Returns: Command: The translated command. """ return command