import logging
from gcode_reader.emulate.motion import MotionProfile
from ..machine import Machine
from ..extruders import ScrewExtruder, ScrewModel
from ..operations import AdditiveProcessData, Operation, AdditiveOperation, ProcessData
from ..commands import Config, Command, G, M, Dwell, Move, Extrude
from ..machine_options import IngersollOptions
from ..parsers import GcodeParser
from ..registry import register
from ..tag_literals import LINEAR_MOVE, MOVE_EXTRUDE
from ...constants import ZERO_TOLERANCE
[docs]
class MasterPrintMotionProfile(MotionProfile):
"""Defines the kinematic limits and default behaviors for the MasterPrint machine.
Attributes:
traverse_feed_rate (float): The default feed rate for rapid moves (G0).
"""
def __init__(
self,
max_jerk=None,
max_accleration=None,
max_velocity=None,
traverse_feed_rate=10000,
angle_factor=0.5,
):
"""Initialize the motion profile.
Args:
max_jerk (float, optional): Maximum jerk allowed. Defaults to None.
max_accleration (float, optional): Maximum acceleration allowed. Defaults to None.
max_velocity (float, optional): Maximum velocity allowed. Defaults to None.
traverse_feed_rate (float, optional): Feed rate for G0 moves in mm/min.
Defaults to 10000.
angle_factor (float, optional): Factor for cornering velocity calculations.
Defaults to 0.5.
"""
super().__init__(max_jerk, max_accleration, max_velocity, angle_factor)
self.traverse_feed_rate = traverse_feed_rate
[docs]
class MasterPrintScrewModel(ScrewModel):
"""Models the physics and calibration curves of the MasterPrint screw extruder.
This class handles the conversion between requested Throughput (kg/hr) and
Screw RPM using a cubic polynomial calibration curve.
Attributes:
rpm_coefficients (tuple): Coefficients [a, b, c, d] for calculating RPM
from Throughput (x) via `a*x^3 + b*x^2 + c*x + d`.
throughput_coefficients (tuple): Coefficients [a, b, c, d] for calculating
Throughput from RPM (x) via `a*x^3 + b*x^2 + c*x + d`.
"""
def __init__(self, nozzle_diameter=0, displacement=1):
"""Initialize the screw model.
Args:
nozzle_diameter (float, optional): Diameter of the nozzle in mm. Defaults to 0.
displacement (float, optional): Theoretical displacement per revolution.
Defaults to 1.
"""
super().__init__(nozzle_diameter, displacement)
self._rpm_coeff = None
self._throughput_coeff = None
self.rpm_coefficients = (
0,
0,
1,
0,
) # NOTE: These are a linear default where RPM = kg/hr.
self.throughput_coefficients = (
0,
0,
1,
0,
) # NOTE: These are a linear default where kg/hr = RPM
@property
def rpm_coefficients(self):
"""Get the coefficients for the Throughput -> RPM polynomial."""
return self._rpm_coeff
@rpm_coefficients.setter
def rpm_coefficients(self, val):
"""Set the coefficients for the Throughput -> RPM polynomial.
Args:
val (iterable): A list or tuple of exactly 4 numeric coefficients.
Raises:
TypeError: If val is not an iterable of length 4.
"""
if not isinstance(val, (tuple, list)) or not len(val) == 4:
raise TypeError(f"rpm coefficients must be Iterable of length 4.")
self._rpm_coeff = tuple(val)
@property
def throughput_coefficients(self):
"""Get the coefficients for the RPM -> Throughput polynomial."""
return self._throughput_coeff
@throughput_coefficients.setter
def throughput_coefficients(self, val):
"""Set the coefficients for the RPM -> Throughput polynomial.
Args:
val (iterable): A list or tuple of exactly 4 numeric coefficients.
Raises:
TypeError: If val is not an iterable of length 4.
"""
if not isinstance(val, (tuple, list)) or not len(val) == 4:
raise TypeError(f"throughput coefficients must be Iterable of length 4.")
self._throughput_coeff = tuple(val)
[docs]
def calc_extrusion_rate(
self,
extruder: ScrewExtruder,
command: Extrude,
process_data: AdditiveProcessData,
pellet_density=1100,
volumetric_mode=False,
):
"""Calculate the required RPM and Volume for a given command.
This method determines the flow rate required to achieve a specific
bead geometry (in volumetric mode) or volume (in standard mode) and
converts that flow rate to RPM using the calibration coefficients.
Args:
extruder (ScrewExtruder): The extruder instance invoking this calculation.
command (Extrude): The current G-code command.
process_data (AdditiveProcessData): State data including feed rate and time.
pellet_density (float, optional): Density of material in kg/m^3.
Defaults to 1100.
volumetric_mode (bool, optional): If True, calculates flow based on
bead cross-sectional area. Defaults to False.
Returns:
(float, float): A tuple of (RPM, deposited_volume).
"""
dt_hr = process_data.elapsed_time / 3600.0
feed_rate = (
process_data.feed_rate if process_data.feed_rate is not None else 0.0
)
if feed_rate <= ZERO_TOLERANCE:
logging.debug(
f"Feed rate is zero or negative ({feed_rate} mm/min). Extrusion calculations may be inaccurate"
)
throughput = 0
volume = 0
flow = 0
# Variable bead mode, we need to calculate RPM
if volumetric_mode:
logging.debug(
"Extruder in Volumetric mode. Calculating flow based on bead area and feed rate."
)
bead_area = (
extruder.last_bead_area if extruder.last_bead_area is not None else 0.0
) * 1000.0 # Converts from mm^2/1000 back to mm^2
flow = bead_area * feed_rate * 60 # (mm^2)(mm/min)(min/hr) -> mm^3/hr
volume = flow * dt_hr
# Volume specification mode. Each command will tell us how much volume we are depositing, process data will give us the time component
elif isinstance(command, Extrude):
volume = command.deposited_volume # mm^3
flow = volume / dt_hr if dt_hr > ZERO_TOLERANCE else 0.0 # mm^3/hr
# Per Ingersoll docs: RPM is calibrated with throughput (kg/hr) on the x-axis and RPM on the y-axis
throughput = (
flow * pellet_density * 1e-9
) # (mm^3/hr) (kg/m^3) (m^3/mm^3) -> kg/hr
x = throughput
a, b, c, d = self.rpm_coefficients
rpm = a * x**3 + b * x**2 + c * x + d
return rpm, volume
[docs]
def calc_deposited_volume(
self,
extruder: ScrewExtruder,
command: Extrude,
process_data: AdditiveProcessData,
pellet_density=1100,
):
"""Calculate the deposited volume given a known RPM.
Used when the machine is commanding specific RPMs directly rather than
requesting a specific volume or bead shape.
Args:
extruder (ScrewExtruder): The extruder instance.
command (Extrude): The G-code command (containing explicit RPM if present).
process_data (AdditiveProcessData): State data including elapsed time.
pellet_density (float, optional): Density of material in kg/m^3.
Defaults to 1100.
Returns:
(float, float): A tuple of (deposited_volume, RPM).
"""
rpm = extruder.last_extrusion_rate
if isinstance(command, Extrude) and command.extrusion_rate is not None:
rpm = command.extrusion_rate
if rpm is None:
rpm = 0.0
a, b, c, d = self.throughput_coefficients
throughput = a * rpm**3 + b * rpm**2 + c * rpm + d # kg/hr
flow = throughput / pellet_density * 1e9 # mm³/hr
dt = process_data.elapsed_time / 3600.0 # hours
volume = flow * dt # mm³
return volume, rpm
[docs]
class MasterPrintExtruder(ScrewExtruder):
"""Represents the screw extruder hardware on the MasterPrint.
Manages state related to volumetric modes, material density, and processing
logic that routes commands to the underlying `MasterPrintScrewModel`.
Attributes:
volumetric_mode (bool): If True, extruder operates based on bead geometry.
pellet_density (float): Material density in kg/m^3.
on (bool): Current state of the extruder motor.
"""
def __init__(
self,
options=None,
model=MasterPrintScrewModel(),
pellet_density=1100,
volumetric_mode=False,
):
"""Initialize the MasterPrint extruder.
Args:
options (dict, optional): Configuration options. Defaults to None.
model (MasterPrintScrewModel, optional): The physical model of the screw.
Defaults to MasterPrintScrewModel().
pellet_density (float, optional): Material density. Defaults to 1100.
volumetric_mode (bool, optional): Initial state of volumetric mode.
Defaults to False.
Raises:
TypeError: If the provided model is not a MasterPrintScrewModel.
"""
if not isinstance(model, MasterPrintScrewModel):
raise TypeError(
f"MasterPrintExtruder requires MasterPrintScrewModel as model attribute"
)
super().__init__(options, model)
self.volumetric_mode = bool(volumetric_mode)
self.pellet_density = float(pellet_density)
self.on = False
def _extruder_on_from_command(self, command: Command):
"""Determine if the extruder should be on based on the command.
This method encapsulates the logic for determining whether the extruder
motor should be active based on the presence of extrusion parameters in
the command. It checks for both `extrusion_rate` and `deposited_volume`
to accommodate different ways that extrusion might be specified in G-code.
Args:
command (Command): The G-code command to inspect.
Returns:
bool: True if the extruder should be on, False otherwise.
"""
if isinstance(command, Extrude):
if command.extrusion_rate is not None:
return command.extrusion_rate > 0
elif command.deposited_volume is not None:
return command.deposited_volume > 0
return (
self.on
) # Default to current state if no explicit extrusion parameters are found
[docs]
def process_command(self, command: Command, process_data: AdditiveProcessData):
"""Process a G-code command and calculate extrusion parameters.
Updates the extruder state based on the command type and calculates
the resulting RPM and deposited volume. In volumetric mode, bead area
from move commands is cached for subsequent calculations.
Args:
command: The G-code command to process.
process_data: Current process state containing bead geometry,
feed rate, and other parameters.
Returns:
AdditiveProcessData: Updated process data with calculated
extrusion rate and volume.
"""
volume_specified = self.volumetric_mode
self.on = self._extruder_on_from_command(command)
if isinstance(command, Extrude):
if command.extrusion_rate is None and command.deposited_volume is not None:
volume_specified = True
if process_data.bead_area is not None:
self.last_bead_area = process_data.bead_area
if not self.on:
logging.debug(
f"Extruder is off. No extrusion will occur for command: {command.to_dict()}"
)
rpm, volume = 0, 0
elif volume_specified:
if isinstance(command, Dwell) and self.last_bead_area is None:
logging.warning(
"MasterPrintExtruder. Volumetric mode dwell encountered without prior bead area. RPM calculation may be inaccurate. Check AdditiveProcessData outputs"
)
rpm, volume = self.model.calc_extrusion_rate(
self,
command,
process_data,
pellet_density=self.pellet_density,
volumetric_mode=self.volumetric_mode,
)
else:
volume, rpm = self.model.calc_deposited_volume(
self, command, process_data, self.pellet_density
)
self.last_extrusion_rate = rpm
self.last_deposited_volume = volume
process_data = self._set_process_data(process_data)
return process_data
[docs]
@register("ingersoll", aliases=["masterprint", "ingersollmasterprint"])
class MasterPrint(Machine):
"""Represents the Ingersoll MasterPrint Manufacturing system.
This class coordinates the machine's motion, tools, and G-code parsing
specifics. It handles Ingersoll-specific G-codes for units, positioning,
and virtual axes (like 'CA' for Cross-Sectional Area).
"""
def __init__(self, **options):
super().__init__(
tools=(MasterPrintExtruder()), parser=GcodeParser(syntax_name="ingersoll")
)
self._options_type = IngersollOptions
self.options = options
self.motion_profile = MasterPrintMotionProfile()
self._volumetric_mode = None
self.process_command_helpers = (
self._config,
self._move,
self._handle_ca_virtual_axis,
self._apply_traverse_speed,
self._dwell,
)
# NOTE: using a property ensures that the tool variable bead setting is coupled to the machine setting
@property
def volumetric_mode(self):
"""Get the machine-wide volumetric mode state."""
return self._volumetric_mode
@volumetric_mode.setter
def volumetric_mode(self, val):
"""Set the machine-wide volumetric mode.
Propagates the setting to the primary extruder tool.
Args:
val (bool): True to enable volumetric mode, False otherwise.
Raises:
TypeError: If val is not a boolean.
"""
if val is not None and not isinstance(val, bool):
raise TypeError(
f"Expected type bool or None. Received {type(val).__name__}"
)
self._volumetric_mode = val
if val is not None:
self.tools[0].volumetric_mode = val
def _config(
self, command: Config, process_data: AdditiveProcessData
) -> AdditiveProcessData:
"""Process configuration commands (G/M codes) specific to Ingersoll.
Handles:
- G94: Units per minute feed rate mode.
- G70/G71: Inch/Metric unit selection.
Args:
command (Config): The configuration command.
process_data (AdditiveProcessData): Current process state.
Returns:
AdditiveProcessData: Updated process state.
"""
process_data = super()._config(command, process_data)
if not isinstance(command, Config):
return process_data
# G modal commands
if isinstance(command, G):
if command.code == 94: # unit/min mode
self.feed_rate_time_base_s = 60.0
elif command.code == 70: # inch mode
self.unit_per_meter = 0.0254
elif command.code == 71:
self.unit_per_meter = 1.0
# M modal commands
elif isinstance(command, M):
pass
return process_data
def _dwell(self, command: Dwell, process_data):
"""Processes a Dwell command, setting its elapsed time.
Also triggers the tool's process_command to ensure extrusion calculations
are performed during the dwell period (e.g., priming or purging).
Args:
command (Dwell): The dwell command.
process_data: The current process data.
Returns:
AdditiveProcessData: Updated process data.
"""
if isinstance(command, Dwell):
process_data.elapsed_time = command.time_s
for tool in self.tools:
process_data = tool.process_command(command, process_data)
return process_data
def _handle_ca_virtual_axis(
self, command: Command, process_data: ProcessData
) -> ProcessData:
"""Parses the 'CA' virtual axis from G-code commands.
The 'CA' parameter represents the cross-sectional area of the bead.
This method extracts that value and stores it in `process_data.bead_area`.
Ingersoll Logic:
- Expected units in G-code are mm^2/1000.
- This method converts them back to standard mm^2.
- Requires `volumetric_mode` to be True.
Args:
command (Command): The current command to inspect.
process_data (ProcessData): The current process state.
Returns:
ProcessData: Updated process data containing bead area info.
Raises:
ValueError: If CA is specified but volumetric_mode is False.
"""
# Custom logic to handle the cross sectional area virtual axis.
#
# Ingersoll Docs:
# Virtual axis representing the cross-sectional area of the bead as calculated by the software. Used by HSVP to determine spindle speed. Expected units are mm^2/1000.
# Prior to extrusion (whether first extrusion or after a travel move later in the print):
# G0 …(travel move to extrusion start point) • G1 CA=STARTING_VALUE FB=_CA_VELO ;(Software will choose correct starting value)
# FXXXX.XXX ;print feed rate
# EXTRUDERB(True)
# G4 FX.XX ;(If adding a dwell)
# G1 …(continue with motion here)… FXXXX.XXX ;(use same print feed rate as before)
# Note: setting the feed rate prior to extrusion is necessary for the dwell spindle speed to be calculated correctly
#
if not isinstance(command, Move) or not command.G == 1:
logging.debug(
f"MasterPrint._handle_ca_virtual_axis only processes G1 moves. Received command: {command.to_dict()}"
)
return process_data
# CA is only applicable for Additive
elif not isinstance(process_data, AdditiveProcessData):
logging.debug(
f"MasterPrint._handle_ca_virtual_axis only processes AdditiveProcessData. Received process_data of type {type(process_data).__name__}"
)
return process_data
ca_val = command._words.get("CA")
if ca_val is not None:
if self.volumetric_mode is None:
logging.warning(
"CA parameter detected in G-code. Automatically enabling volumetric_mode. "
"Set MasterPrint.volumetric_mode = True/False explicitly before parsing to override this behavior."
)
self.volumetric_mode = True
elif not self.volumetric_mode:
raise ValueError(
f"MasterPrint.volumetric_mode must be True when specifying bead cross sectional area. e.g. CA=20/1000"
)
# CA specifies a bead cross sectional area. Store it in deposited_volume for the process data and let the extruder handle it
process_data.bead_area = (
float(ca_val) * 1000.0
) # NOTE: Convert back to mm^2. The G-code specifies mm^2/1000, and the parser evaluates that.
return process_data
def _move(self, command: Command, process_data: ProcessData) -> ProcessData:
"""Processes a Move command, handling the MasterPrint's traverse logic.
Rapid moves (G0) are executed at a fixed traverse velocity defined in
the motion profile, ignoring standard feed rate settings.
Args:
command (Command): The move command.
process_data (ProcessData): Current process state.
Returns:
ProcessData: Updated process data with corrected feed rates for rapids.
"""
process_data = super()._move(command, process_data)
if (
isinstance(command, Move) and command.G == 0
): # Rapid move occurs at traverse speed
process_data.feed_rate = self.motion_profile.traverse_feed_rate
return process_data
def _apply_traverse_speed(
self, command: Command, process_data: ProcessData
) -> ProcessData:
"""Ensures G0 rapid moves use the traverse feed rate.
``_process_commands_batch`` inlines move logic and treats ``_move`` as a
base helper, so the G0 override in ``_move`` is never reached in the hot
path. This extra helper is registered explicitly so it runs after the
inlined feed-rate assignment and correctly sets the traverse speed.
"""
if isinstance(command, Move) and command.G == 0:
process_data.feed_rate = self.motion_profile.traverse_feed_rate
return process_data
[docs]
def process_operation(self, operation: Operation, force=False) -> Operation:
"""Fully processes an operation, including a second pass for extruder data.
This method extends the base `Machine`'s processing pipeline to accommodate
the `ScrewExtruder`'s dependency on accurate time estimates for its
calculations.
The process is as follows:
1. It first calls `super().process_operation()`, which runs the full
kinematic simulation and applies the motion profile. This is the
critical step that calculates and populates the `elapsed_time` for
each movement command.
2. It then performs a second pass, iterating through each command and its
now time-aware `process_data`. This allows the extruder tool to use
the `elapsed_time` to accurately calculate `deposited_volume`.
Args:
operation (AdditiveOperation): The additive manufacturing operation to process.
force (bool, optional): If True, re-processes the operation even
if it has been processed before. Defaults to False.
Returns:
AdditiveOperation: The operation with fully populated process data,
including time-dependent extruder calculations.
"""
operation = super().process_operation(operation, force, skip_tool=True)
for i, command in enumerate(operation.commands):
for tool in self.tools:
operation.process_data[i] = tool.process_command(
command, operation.process_data[i]
)
# Second pass updated process_data (deposited_volume etc.) after motion
# profiling already built the _arrays cache. Drop it so the next access
# to operation.arrays rebuilds from the complete process_data.
operation._invalidate_arrays()
return operation
[docs]
def operation_to_dataframe(self, operation: Operation):
"""Converts an Operation to a DataFrame with MasterPrint-aware tagging.
Extends the base implementation with a second pass that corrects tags for
Move commands where the screw extruder was actively depositing material.
These rows are tagged LINEAR_MOVE by default (since the G-code issues
separate EXTRUDERB commands rather than inline extrusion parameters), but
should be MOVE_EXTRUDE when deposited_volume > 0.
Args:
operation (Operation): The operation to convert.
Returns:
pd.DataFrame: DataFrame with corrected MOVE_EXTRUDE tags.
"""
df = super().operation_to_dataframe(operation)
depositing_move = (df["deposited_volume"] > 0) & (
df["tags"].str.contains(LINEAR_MOVE, regex=False)
)
df.loc[depositing_move, "tags"] = df.loc[depositing_move, "tags"].str.replace(
LINEAR_MOVE, MOVE_EXTRUDE, regex=False
)
return df
[docs]
def gcode_file_to_operation(self, filepath: str, operation_type=AdditiveOperation):
"""Parse a G-code file and emulate into an Operation object.
Args:
filepath (str): Path to the G-code file.
operation_type (type, optional): The class to instantiate for the operation.
Defaults to AdditiveOperation.
Returns:
Operation: The emulated operation.
"""
return super().gcode_file_to_operation(filepath, operation_type)