import functools
import logging
import time
from math import sqrt, radians, cos, sin
from typing import List, Tuple
import collections.abc
import pandas as pd
import numpy as np
from ..constants import ZERO_TOLERANCE
from .. import analysis
from . import machine_options
from . import motion
from .tools import Tool
from .operations import (
Operation,
AdditiveOperation,
SubtractiveOperation,
AdditiveProcessData,
SubtractiveProcessData,
ProcessData,
make_process_data,
_process_data_map,
)
from .parsers import GcodeParser
from .exporters import GcodeExporter
from . import commands
from .commands import (
Command,
Config,
Move,
FeedRate,
Dwell,
)
from .registry import register
[docs]
@register("default")
class Machine:
"""An emulation environment for a generic CNC machine.
This class acts as an orchestrator for processing sequences of machine
commands (Operations). It maintains the machine's state, such as position
and feed rate, and applies kinematic models (motion profiles) to each movement.
It is designed to be configurable with different tools, parsers, exporters,
and machine-specific options.
Attributes:
options (machine_options.MachineOptions): The configuration object
containing parameters like max feedrate and acceleration.
tools (Tuple[Tool, ...]): A tuple of tool objects attached to the
machine.
parser (GcodeParser): The parser instance used to convert G-code
files into Command objects.
exporter (GcodeExporter): The exporter instance used to convert
Command objects back into G-code strings or other formats.
home (Tuple[float, float, float]): The machine's home coordinates (X, Y, Z).
motion_profile_type (type): The motion profile class (e.g., Trapezoid)
used to calculate movement kinematics.
"""
def __init__(
self,
tools: Tuple = None,
parser: GcodeParser = GcodeParser(),
exporter: GcodeExporter = GcodeExporter(),
**options,
):
"""Initializes a new Machine instance.
Args:
tools (Tuple[Tool, ...], optional): A tuple of Tool instances for
the machine to use. Defaults to None.
parser (GcodeParser, optional): An instance of a GcodeParser.
Defaults to a new GcodeParser().
exporter (GcodeExporter, optional): An instance of a GcodeExporter.
Defaults to a new GcodeExporter().
**options: Keyword arguments passed to the constructor of the
machine's `MachineOptions` type.
"""
self._options_type: type = machine_options.MachineOptions
self._options: machine_options.MachineOptions = None
self._tools: List[Tool] = None
self._parser: GcodeParser = None
self._exporter: GcodeExporter = None
self._home: Tuple[float, float, float] = (0, 0, 0)
self._motion_profile: motion.MotionProfile = motion.MotionProfile()
# use the setter functions here to enforce types
# 'options' must be last to correctly propagate settings accross machine components.
self.parser = parser
self.exporter = exporter
self.tools = tools
self.options = options
# State management
self._history: List[ProcessData] = []
self._last_location: Tuple[float, float, float] = self._home
self._last_feed_rate: float = 0.0
self._absolute_position = True
self._tool_direction: Tuple[float, float, float] = (0, 0, 1)
# Command process helpers. The order determines order of operations
self.process_command_helpers = (
self._config,
self._move,
self._dwell,
self._tool_rotate,
)
# Coordinate system and positioning
self._coordinate_system = "machine"
self._work_offset = (0, 0, 0)
self.feed_rate_time_base_s = 1.0 # multiplier to convert time unit to seconds (for unit/min conversion mainly)
self.unit_per_meter = 1.0 # multiplier to convert length unit to meters
[docs]
def clear_history(self):
"""Clears this Machine's command history."""
self._history = []
self._last_location = self.home
self._last_feed_rate = 0.0
self._tool_direction = (0, 0, 1)
for tool in self.tools or ():
tool.reset_history()
# ##################################################################
# # Properties # #
# ##################################################################
@property
def options(self):
return self._options
@options.setter
def options(self, values):
if isinstance(values, dict):
self._options = self._options_type(**values)
elif isinstance(values, self._options_type):
self._options = values
else:
raise TypeError(
f"{type(self).__name__} expects options of type {self._options_type}. Received {type(values).__name__}"
)
for tool in self.tools or ():
tool.options = self._options
self.parser.syntax = self.options.syntax
self.exporter.syntax = self.options.syntax
@property
def tools(self):
return self._tools
@tools.setter
def tools(self, value):
if isinstance(value, collections.abc.Iterable):
self._tools = tuple(value)
elif isinstance(value, Tool):
self._tools = (value,)
elif value is None:
self._tools = ()
else:
raise TypeError(
f"Machine.tools.setter: Unknown type receivied {type(value).__name__} for tools."
)
for tool in self._tools:
tool.options = self.options
@property
def parser(self):
return self._parser
@parser.setter
def parser(self, value):
if not isinstance(value, GcodeParser):
raise TypeError(
f"Machine.parser.setter: Unknown type receivied {type(value).__name__} for parser."
)
self._parser = value
# NOTE: DO not sync the parser syntax with self.options.syntax here. This allows someone to override the default parser
@property
def exporter(self):
return self._exporter
@exporter.setter
def exporter(self, value):
if not isinstance(value, GcodeExporter):
raise TypeError(
f"Machine.exporter.setter: Unknown type receivied {type(value).__name__} for exporter. Ignoring"
)
self._exporter = value
# NOTE: DO not sync the exporter syntax with self.options.syntax here. This allows someone to override the default exporter
@property
def home(self):
return self._home
@home.setter
def home(self, location):
self._home = tuple(location)
@property
def motion_profile(self):
return self._motion_profile
@motion_profile.setter
def motion_profile(self, profile: type):
if profile is None or not isinstance(profile, motion.MotionProfile):
raise TypeError(
f"Machine.motion_profile.setter: Unkown motion profile type. Received ({profile}). Expected instance of gcode_reader.emulate.motion.MotionProfile"
)
self._motion_profile = profile
@property
def coordinate_system(self):
return self._coordinate_system
@coordinate_system.setter
def coordinate_system(self, val: str):
if not isinstance(val, str):
raise TypeError(f"{type(self).__name__}:coordinate_system must be type str")
clean_val = val.lower().strip()
if not clean_val in ["machine", "work"]:
raise ValueError(
f'{type(self).__name__}:coordinate_system must be one of ("machine", "gcode")'
)
self._coordinate_system = clean_val
@property
def work_offset(self):
return self._work_offset
@work_offset.setter
def work_offset(self, val: tuple):
if not isinstance(val, (tuple, list, np.ndarray)):
raise TypeError(
f"{type(self).__name__}:work_offset must be type tuple or list"
)
if not len(val) == 3:
raise IndexError(
f"{type(self).__name__}:work_offset must have length 3. I.e. (10,0,0)"
)
self._work_offset = tuple([float(v) for v in val])
@property
def absolute_position(self):
return self._absolute_position
@absolute_position.setter
def absolute_position(self, val):
if not isinstance(val, bool):
raise TypeError(
f"{type(self).__name__}:absolute_position must be type bool"
)
self._absolute_position = val
@property
def incremental_position(self):
return not self.absolute_position
@incremental_position.setter
def incremental_position(self, val):
if not isinstance(val, bool):
raise TypeError(
f"{type(self).__name__}:incremental_position must be type bool"
)
self._absolute_position = not val
# #################################################################
# # Parsers & Exporters # #
# ##################################################################
[docs]
def gcode_file_to_operation(self, filepath: str, operation_type: type = Operation):
"""Parses a G-code file and returns a processed Operation.
This method uses the machine's configured parser to read a G-code
file into a list of commands, wraps them in the specified
`operation_type`, and then processes the operation to populate
its process data.
Args:
filepath (str): The path to the G-code file.
operation_type (type, optional): The class of Operation to create
(e.g., `Operation`, `AdditiveOperation`). Defaults to `Operation`.
Returns:
Operation: A new, processed Operation instance.
Raises:
ValueError: If `operation_type` is not a valid subclass of `Operation`.
"""
if not issubclass(operation_type, Operation):
raise ValueError(
f"operation_type must be one of (Operation, AdditiveOperation, SubtractiveOperation). Recieved {operation_type}"
)
commands = self.parser.gcode_file_to_commands(filepath)
operation = operation_type(name=filepath, commands=commands)
return self.process_operation(operation)
[docs]
def operation_to_dataframe(self, operation: Operation) -> pd.DataFrame:
"""Processes an Operation and converts it into a Pandas DataFrame.
This method ensures an operation is fully processed, then converts each
command and its corresponding process data into a pandas Series.
These are combined into a single DataFrame for analysis.
Args:
operation (Operation): The operation to be converted.
Returns:
pd.DataFrame: A DataFrame where each row represents a command and
its associated process data.
"""
# NOTE: Keeping this in the machine emulator in case we need machine specific
# implementations or data cleaning
operation = self.process_operation(operation)
command_data = [
self.exporter.command_to_dict(command) for command in operation.commands
]
df = pd.DataFrame(command_data)
# Receive dictionary of process data arrays. This will convert readily to DataFrame columns
process_data_dict = operation.process_data_to_numpy_dict()
for key, val in process_data_dict.items():
# We want to split out tuple/list data to its components
if len(val.shape) > 1 and val.shape[1] > 1:
for j in range(val.shape[1]):
df[f"{key}_{j}"] = val[:, j]
else:
df[key] = val
return df
[docs]
def write_gcode_file(
self, filepath: str, operation: Operation, overwrite: bool = False
):
"""Processes an Operation and writes the resulting G-code to a file.
This method processes the sequence of commands in the operation and uses
the exporter to generate a formatted G-code string for each one. The
resulting lines are then written to the specified file.
WARNING: ALWAYS REVIEW GENERATED G-CODE BEFORE FOR CORRECTNESS AND SAFETY BEFORE EXECUTING ON A MACHINE.
Args:
filepath (str): The path to the output file.
operation (Operation): The operation containing the commands to write.
overwrite (bool, optional): If True, an existing file at the path will
be overwritten. If False (default), the operation will fail if
the file already exists.
"""
# NOTE: Restrict G-code file writing to operations that were processed by this machine only.
# this does not completely eliminate risk of someone processing an operation with another
# machine and then writing a G-code file to this machine
if operation.processed_by_machine != type(self):
raise TypeError(
f"Mismatched machine types. This writer belongs to a "
f"'{type(self).__name__}' machine, but the operation was processed "
f"by a '{operation.processed_by_machine.__name__}' machine. "
)
mode = "x"
if overwrite:
mode = "w"
self.process_operation(operation)
gcode_lines = [
self.exporter.command_to_gcode_line(command)
for command in operation.commands
]
try:
with open(filepath, mode) as file:
file.writelines(gcode_lines)
logging.warning(
f"G-code file written to: {filepath}.\n\nIMPORTANT: ALWAYS REVIEW GENERATED G-CODE BEFORE FOR CORRECTNESS AND SAFETY BEFORE EXECUTING ON A MACHINE."
)
except FileExistsError as exc:
logging.error(
f"Machine.write_gcode_file. File ({filepath}) already exists. Cannot write with mode={mode}. Exception: {exc}"
)
raise exc
except Exception as exc:
logging.error(f"Machine.write_gcode_file. Unknown Exception: {exc}")
raise exc
[docs]
def export_operation_event_series(
self,
operation: Operation,
spatial_scale_factor: float = 1e-3,
destination: str = None,
):
"""Write a process event series for an Operation
Args:
operation (Operation): The Operation object to be processed. It should
have been created or processed by this same machine type.
spatial_scale_factor (float, optional): A multiplicative factor to
convert spatial units from the Operation (typically millimeters)
to the desired output units (typically meters). Defaults to 1e-3.
destination (str | None, optional): The full path for the output file.
If `None`, a path is automatically generated from the operation's
name, e.g., 'MyPart_event_series.aes'. Defaults to None.
Returns:
pandas.DataFrame: A DataFrame containing the event series.
"""
t0 = time.perf_counter()
logging.debug(f"Machine.export_operation_event_series")
operation = self.process_operation(operation)
event_series = analysis.operation_to_event_series(
operation=operation,
spatial_scale_factor=spatial_scale_factor,
)
if destination is None:
name = "operation"
if operation.name is not None:
import os.path
name, _ = os.path.splitext(operation.name)
destination = f"{name}_event_series.aes"
logging.debug(f"\tWriting CSV File to {destination}")
event_series.to_csv(destination, header=False, index=False)
logging.debug(
f"...Machine.export_operation_event_series Completed. Total Elapsed time: {time.perf_counter() - t0:0.3f} (s)"
)
return event_series
# ##################################################################
# # Command and Operation Processing # #
# ##################################################################
[docs]
def process_operation(
self, operation: Operation, force=False, skip_tool=False
) -> Operation:
"""Fully processes an Operation, calculating all derived data.
This is the main entry point for simulating an operation. It first
calculates basic process data (location, distance, etc.) for each
command. It then applies the machine-wide motion profile to calculate
kinematic data like elapsed time.
It avoids re-processing an operation that has already been processed
by this same machine instance unless `force` is True.
Args:
operation: The operation to process.
force: If True, re-processes the operation even if it has been
processed by this machine before. Defaults to False.
skip_tool: If True, skips tool processing during the command
processing phase. Useful for two-pass processing where tools
depend on motion profile data (e.g., elapsed_time) that isn't
available until after the first pass. Defaults to False.
Returns:
Operation: A new, fully processed Operation instance.
"""
this_machine = type(self)
previous_machine = operation.processed_by_machine
operation_type = type(operation)
if not force and previous_machine == this_machine:
logging.debug(
f"Operation {operation.name} already processed by {this_machine}. Returning unmodified."
)
return operation
# NOTE: Currently not allowing conversion between machines! (unless forced)
elif previous_machine is not None and previous_machine != type(self):
if not force:
raise TypeError(
f"Cannot re-process operation {operation.name} with a different machine. This operation was "
f"processed by '{previous_machine.__name__}' and cannot be re-processed by '{this_machine.__name__}'."
)
elif force:
logging.warning(
f"Force re-processing enabled for operation '{operation.name}'."
f"Operation was previously processed by '{previous_machine.__name__}' and is now being processed by '{this_machine.__name__}'."
f"IMPORTANT: ALWAYS REVIEW OUTPUT FOR CORRECTNESS AND SAFETY BEFORE APPLYING TO A REAL-WORLD MACHINE."
)
logging.debug(
f"{operation_type.__name__} {operation.name} being processed by {this_machine}."
)
process_data = self._process_commands_batch(
operation.commands, operation_type, skip_tool
)
operation = operation_type(
name=operation.name,
commands=operation.commands,
process_data=process_data,
processed_by_machine=this_machine,
)
# The motion profile needs the full set of process data since it uses the previous and next movement to determine time estimates
operation = self.apply_motion_profile_to_operation(operation)
return operation
def _process_commands_batch(self, cmd_list, operation_type, skip_tool=False):
"""Process all commands in one pass with inlined move math.
Replaces the per-command process_command loop for the base Machine class.
Eliminates ~5N Python function-call round-trips (_config, _move, _dwell,
_tool_rotate, make_process_data) by inlining Move math and reusing pre-
allocated ProcessData objects. Non-move commands still call self._config()
so machine subclass overrides remain active. Tool processing is interleaved
in command order so tool-state changes (e.g. extruder on/off) apply to
exactly the right moves.
"""
N = len(cmd_list)
if N == 0:
return []
# Subclass helpers beyond the 4 base ones (e.g. MasterPrint._handle_ca_virtual_axis).
# Called per-command so overrides like CA detection remain active.
_base_helpers = {self._config, self._move, self._dwell, self._tool_rotate}
extra_helpers = [
h for h in self.process_command_helpers if h not in _base_helpers
]
loc_dim = len(self._last_location)
work_offset = self._work_offset # tuple — re-read after each _config call
# Local state copies — avoid repeated property access in the hot loop
is_absolute = self._absolute_position
is_work_coords = self._coordinate_system == "work"
last_location = list(self._last_location) # mutable for in-place update
last_feed_rate = self._last_feed_rate
tool_direction = list(self._tool_direction)
do_tools = bool(self.tools) and not skip_tool
process_data_cls = _process_data_map.get(operation_type, ProcessData)
process_data_list = [process_data_cls() for _ in range(N)]
for i, cmd in enumerate(cmd_list):
process_data_entry = process_data_list[i]
if isinstance(cmd, Move):
# ── Hot path: inline _move logic ──────────────────────────────
raw_loc = cmd.location
n_axes = len(raw_loc)
prev0, prev1, prev2 = (
last_location[0],
last_location[1],
last_location[2],
)
for j in range(loc_dim):
v = raw_loc[j] if j < n_axes else None
if v is not None:
last_location[j] = (
v + (work_offset[j] if is_work_coords else 0.0)
if is_absolute
else last_location[j] + v
)
dx = last_location[0] - prev0
dy = last_location[1] - prev1
dz = last_location[2] - prev2 if loc_dim > 2 else 0.0
dist = sqrt(dx * dx + dy * dy + dz * dz)
process_data_entry.location = (
last_location[0],
last_location[1],
last_location[2],
)
process_data_entry.relative_movement = (dx, dy, dz)
process_data_entry.distance = dist
cmd_feed_rate = cmd.feed_rate
if cmd_feed_rate is not None:
last_feed_rate = cmd_feed_rate
process_data_entry.feed_rate = last_feed_rate
else:
# ── Non-move: propagate state, call _config ────────────────────
process_data_entry.location = (
last_location[0],
last_location[1],
last_location[2],
)
process_data_entry.feed_rate = last_feed_rate
process_data_entry.distance = 0.0
process_data_entry.relative_movement = (0.0, 0.0, 0.0)
self._config(cmd, process_data_entry)
# Re-sync after _config (subclass may have changed machine state)
is_absolute = self._absolute_position
is_work_coords = self._coordinate_system == "work"
work_offset = self._work_offset
if isinstance(cmd, Dwell):
process_data_entry.elapsed_time = cmd.time_s
# ── Inline _tool_rotate ────────────────────────────────────────────
settings = getattr(cmd, "settings", None)
if settings:
rotation = settings.get("rotation")
if rotation is not None and len(rotation) == 3:
A = float(rotation[0]) if rotation[0] is not None else 0.0
B = float(rotation[1]) if rotation[1] is not None else 0.0
C = float(rotation[2]) if rotation[2] is not None else 0.0
tool_direction = list(self.abc_to_tool_direction(A, B, C))
process_data_entry.tool_direction = (
tool_direction[0],
tool_direction[1],
tool_direction[2],
)
# ── Subclass extra helpers (e.g. MasterPrint._handle_ca_virtual_axis) ──
for helper in extra_helpers:
process_data_entry = helper(cmd, process_data_entry)
# ── Tool processing (in command order, so tool state is correct) ───
if do_tools:
for tool in self.tools:
process_data_entry = tool.process_command(cmd, process_data_entry)
process_data_list[i] = process_data_entry
self._history.append(cmd)
# Update machine state
self._absolute_position = is_absolute
self._last_location = tuple(last_location)
self._last_feed_rate = last_feed_rate
self._tool_direction = tuple(tool_direction)
return process_data_list
[docs]
def process_command(
self, command: commands.Command, operation_type=Operation, skip_tool=False
) -> ProcessData:
"""Processes a single command to generate its ProcessData.
This method dispatches a command to the appropriate internal helper
(_move, _dwell, etc.) and optionally to any attached tools to calculate
derived data like new position, distance traveled, and tool-specific
metrics.
Args:
command: The command to process.
operation_type: The parent operation's type, used to determine
the type of ProcessData to create. Defaults to Operation.
skip_tool: If True, skips tool processing. Defaults to False.
Returns:
ProcessData: The populated data object for the command.
"""
process_data = make_process_data(operation_type)
for helper_fn in self.process_command_helpers:
process_data = helper_fn(command, process_data)
# Tools will process tool specific commands such as Extrude, Mill, etc
if self.tools is not None and not skip_tool:
for tool in self.tools or ():
process_data = tool.process_command(command, process_data)
self._history.append(command)
return process_data
def _tool_rotate(self, command: commands.Command, process_data: ProcessData):
"""Updates tool direction from A/B/C rotation angles encoded in a command."""
settings = getattr(command, "settings", None)
rotation = settings.get("rotation") if settings else None
if rotation is not None and len(rotation) == 3:
A = float(rotation[0]) if rotation[0] is not None else 0.0
B = float(rotation[1]) if rotation[1] is not None else 0.0
C = float(rotation[2]) if rotation[2] is not None else 0.0
self._tool_direction = self.abc_to_tool_direction(A, B, C)
process_data.tool_direction = self._tool_direction
return process_data
def _config(self, command: commands.Config, process_data: ProcessData):
"""Processes Machine state configuration commands"""
# NOTE: Only universally applicable configuration commands should be handled within the base Machine _config function
if isinstance(command, commands.FeedRate) and command.code is not None:
process_data.feed_rate = float(command.code)
elif isinstance(command, commands.AbsolutePosition):
self.absolute_position = True
elif isinstance(command, commands.IncrementalPosition):
self.absolute_position = False
elif isinstance(command, commands.WorkCoordinates):
self.coordinate_system = "work"
elif isinstance(command, commands.MachineCoordinates):
self.coordinate_system = "machine"
return process_data
def _dwell(self, command: Dwell, process_data: ProcessData):
"""Processes a Dwell command, setting its elapsed time."""
if isinstance(command, Dwell):
process_data.elapsed_time = command.time_s
return process_data
def _move(self, command: commands.Move, process_data: ProcessData):
"""Processes a Move command to calculate new position and distance."""
# We always sync current machine location and feed rate here
if not isinstance(command, commands.Move):
process_data.location = self._last_location
process_data.feed_rate = self._last_feed_rate
return process_data
# Determine which coordinate system we are using:
is_work_coords = self.coordinate_system == "work"
# Location
last_location = self._last_location
current_location = tuple(
(
(
command.location[i]
+ (
self.work_offset[i] if is_work_coords else 0.0
) # Only apply the work offset if we are in "work" coordinate mode
if self.absolute_position
else last_location[i] + command.location[i]
)
# only if the G-code provided a coordinate for this axis, otherwise don't update axis
if i < len(command.location) and command.location[i] is not None
else last_location[i]
)
for i in range(len(last_location))
)
process_data.location = current_location
# Distance calculation
if current_location != last_location:
delta = tuple([a - b for a, b in zip(current_location, last_location)])
process_data.relative_movement = delta
sum_squares = sum(dx**2.0 for dx in delta)
process_data.distance = sqrt(sum_squares)
else:
process_data.relative_movement = (0, 0, 0)
process_data.distance = 0
if command.feed_rate is not None:
process_data.feed_rate = command.feed_rate
else:
process_data.feed_rate = self._last_feed_rate
# Update state
self._last_location = process_data.location
self._last_feed_rate = process_data.feed_rate
return process_data
[docs]
def get_motion_limits(self):
"""Retrieves and prepares motion limits from the machine's options.
Returns:
dict: A dictionary containing 'velocity', 'acceleration', and 'jerk'
limits, with velocity converted to units/sec.
"""
velocity_limit = None
if self.options.max_feedrate is not None:
velocity_limit = self.options.max_feedrate / self.feed_rate_time_base_s
acceleration_limit = self.options.max_acceleration
jerk_limit = self.options.max_jerk
logging.debug(
f"Machine motion limits ({velocity_limit, acceleration_limit, jerk_limit})"
)
if velocity_limit is not None:
self._motion_profile.max_velocity = velocity_limit
if acceleration_limit is not None:
self._motion_profile.max_acceleration = acceleration_limit
if jerk_limit is not None:
self._motion_profile.max_jerk = jerk_limit
return dict(
velocity=velocity_limit, acceleration=acceleration_limit, jerk=jerk_limit
)
[docs]
def apply_motion_profile_to_operation(self, operation: Operation):
limits = self.get_motion_limits() # fetch limits before running the profile
return self._motion_profile.from_operation(operation, self.home)
[docs]
def translate_command(self, command: commands.Command, old_meta: ProcessData):
"""Translates a command for a different machine context.
This helper function is intended to be called when transferring an
operation processed by one machine to another. It allows for machine-
specific adjustments to the command itself.
Args:
command (Command): The command to translate.
old_meta (ProcessData): The process data of the command as
calculated by the previous machine.
Returns:
Command: The translated command.
"""
return command