import numpy as np
from copy import copy
from ...constants import ZERO_TOLERANCE
from ..motion import MotionProfile
from ..machine import Machine
from ..extruders import ScrewExtruder
from ..machine_options import BAAMOptions
from ..operations import AdditiveProcessData, AdditiveOperation, ProcessDataArrays
from ..registry import register
from ..commands import (
Move,
Config,
Purge,
)
[docs]
class BAAMMotionProfile(MotionProfile):
def __init__(
self,
max_jerk=None,
max_accleration=None,
max_velocity=None,
max_velocity_z=None,
max_velocity_w=None,
junction_deviation=0.05,
):
super().__init__(
max_jerk=max_jerk,
max_accleration=max_accleration,
max_velocity=max_velocity,
junction_deviation=junction_deviation,
)
self.max_velocity_z = max_velocity_z
self.max_veloicty_w = max_velocity_w
def _apply_machine_constraints(
self,
initial_positions,
final_positions,
target_velocities,
junction_velocities,
):
"""Enforce BAAM-specific motion rules before the two-pass planner runs.
Rule 1 — Stop before vertical (Z-only) moves: layer changes require the
machine to come to a full stop. Setting the junction velocity to zero at
the boundary before a vertical segment lets the backward pass propagate
the required deceleration through the full approach.
Rule 2 — Cap speed on vertical moves: pure Z moves are limited to
``max_velocity_w`` (W-table) or ``max_velocity_z`` (Z-axis), falling
back to ``max_velocity`` if neither is set.
"""
dxyz = final_positions - initial_positions
is_vertical = (
(np.abs(dxyz[:, 0]) < ZERO_TOLERANCE)
& (np.abs(dxyz[:, 1]) < ZERO_TOLERANCE)
& (np.abs(dxyz[:, 2]) > ZERO_TOLERANCE)
)
if not np.any(is_vertical):
return target_velocities, junction_velocities
target_velocities = target_velocities.copy()
junction_velocities = junction_velocities.copy()
vertical_limit = self.max_veloicty_w or self.max_velocity_z or self.max_velocity
for idx in np.where(is_vertical)[0]:
if vertical_limit is not None:
target_velocities[idx] = min(target_velocities[idx], vertical_limit)
# Force a full stop at both ends of every vertical segment so the
# backward pass propagates deceleration through the whole approach.
if idx > 0:
junction_velocities[idx - 1] = 0.0
junction_velocities[idx] = 0.0
return target_velocities, junction_velocities
[docs]
@register("cincinnati", aliases=["baam", "cincinnatibaam"])
class CincinnatiBAAM(Machine):
"""
An emulation environment for a Cincinnati BAAM (Big Area Additive Manufacturing)
machine.
This class extends the generic Machine to include BAAM-specific behaviors:
1. **4-Axis Coordinate System (X, Y, Z, W):** It handles the W-axis,
which acts as a dynamic Z-offset for the build plate. The final Z
position is calculated as `Z_command - W_command`.
2. **Custom M-Codes:** It processes BAAM-specific G-code commands for
extruder control (M3/M5), parking (M68), etc.
"""
def __init__(self, **options):
super().__init__(
**options,
)
self._options_type = BAAMOptions
self.options = options
self.tools = [ScrewExtruder()]
self.tools[0].on = False
self._last_w = self._initial_w()
# motion limits and profile
options = self.options.all_options
max_velocity = options.get("max_xy_speed")
max_velocity = options.get("machine_max_feedrate_e", max_velocity)
max_velocity = max_velocity / 60.0 if max_velocity is not None else None
max_acceleration = options.get("default_acceleration")
max_velocity_z = options.get("z_speed", max_velocity)
self.motion_profile = BAAMMotionProfile(
max_velocity=max_velocity,
max_accleration=max_acceleration,
max_velocity_z=max_velocity_z,
max_velocity_w=options.get("w_table_speed", max_velocity_z),
junction_deviation=0.5,
)
self.process_command_helpers = (
self._config,
self._move,
self._dwell,
)
def _initial_w(self):
return (
self.options.offset[3]
if self.options.offset and len(self.options.offset) > 3
else 0.0
)
[docs]
def clear_history(self):
"""Clears history and resets BAAM-specific state."""
super().clear_history()
self._last_w = self._initial_w()
if self.tools:
self.tools[0].on = False
def _config(
self, command: Config, process_data: AdditiveProcessData
) -> AdditiveProcessData:
"""Processes BAAM-specific M-codes for extruder and machine control."""
process_data = super()._config(command, process_data)
if not isinstance(command, Config):
return process_data
tool: ScrewExtruder = self.tools[0]
# NOTE: THESE ARE SPECIFIC M CODES FOR THE BAAM
# TODO: Does the BAAM use modal groups? E.g. (M3 S300 M100)
if command.code == 3: # Extruder On. EX: M3 S300
tool.on = True
# We need to safely pull out the S parameter, that is the RPM control
settings = command.to_dict().get("settings")
if settings is None:
settings = command.to_dict().get("words", {})
if "S" in settings.keys():
tool.last_extrusion_rate = float(settings["S"])
tool.last_deposited_volume = 0.0
process_data = tool._set_process_data(process_data)
elif command.code == 5: # Extruder Off
tool.on = False
tool.last_extrusion_rate = 0.0
tool.last_deposited_volume = 0.0
process_data = tool._set_process_data(process_data)
elif command.code == 68: # Park (M68)
process_data.location = self.home
return process_data
def _dwell(self, command, process_data):
"""Processes Dwell and Purge commands.
Note: this override is called via the slow ``process_command`` path only.
``_process_commands_batch`` inlines dwell logic and never calls this method,
so the Purge feed_rate assignment below is bypassed in the fast path. That
is intentional: BAAM's screw extruder uses RPM × elapsed_time for volume
(not feed_rate), so the stored feed_rate on a Purge command does not affect
any downstream calculation.
"""
process_data = super()._dwell(command, process_data)
if isinstance(command, Purge):
process_data.feed_rate = command.feed_rate
return process_data
[docs]
def gcode_file_to_operation(self, filepath: str, operation_type=AdditiveOperation):
return super().gcode_file_to_operation(filepath, operation_type)
[docs]
def process_operation(
self, operation: AdditiveOperation, force=False
) -> AdditiveOperation:
"""Fully processes an additive operation, including a second pass for extruder data.
This method extends the base `Machine`'s processing pipeline to accommodate
the `ScrewExtruder`'s dependency on accurate time estimates for its
calculations.
The process is as follows:
1. It first calls `super().process_operation()`, which runs the full
kinematic simulation and applies the motion profile. This is the
critical step that calculates and populates the `elapsed_time` for
each movement command.
2. It then performs a second pass, iterating through each command and its
now time-aware `process_data`. This allows the extruder tool to use
the `elapsed_time` to accurately calculate `deposited_volume`.
Args:
operation (AdditiveOperation): The additive manufacturing operation to process.
force (bool, optional): If True, re-processes the operation even
if it has been processed before. Defaults to False.
Returns:
AdditiveOperation: The operation with fully populated process data,
including time-dependent extruder calculations.
"""
# Pass 1: XYZ motion profile with raw (un-W-corrected) locations.
# W correction is intentionally deferred to pass 3 so W-only moves
# (zero XYZ distance) don't appear as vertical toolhead displacements
# and corrupt elapsed_time.
operation = super().process_operation(operation, force, skip_tool=True)
# Pass 2: tool second pass — deposited_volume from elapsed_time.
# Reset tool state so we replay from the start of the file; otherwise
# last_extrusion_rate is left at its end-of-pass-1 value and volumes are wrong.
for tool in self.tools:
tool.reset_history()
self.tools[0].on = False # BAAM starts with extruder off; M3 enables it
for i, command in enumerate(operation.commands):
if isinstance(command, Config):
# Replay _config to update tool state (M3/M5 set RPM and on/off).
# Save/restore location so M68 park doesn't overwrite pass-1 positions.
saved_location = operation.process_data[i].location
operation.process_data[i] = self._config(
command, operation.process_data[i]
)
operation.process_data[i].location = saved_location
for tool in self.tools:
operation.process_data[i] = tool.process_command(
command, operation.process_data[i]
)
# Pass 3: W-axis location correction + W-axis elapsed_time.
# Runs after the motion profile so XYZ elapsed_time is already committed.
# Pure W moves (table movements) get elapsed_time = |ΔW| / w_table_speed.
# All Move commands get their Z corrected to abs_Z = Z_gcode - W_table.
_W_DEFAULT_SPEED = 50.0 # units/s fallback when w_table_speed not configured
w_speed = self.motion_profile.max_veloicty_w or _W_DEFAULT_SPEED
last_w = self._initial_w()
for i, command in enumerate(operation.commands):
if isinstance(command, Move):
loc = command.location
w = loc[3] if (len(loc) > 3 and loc[3] is not None) else last_w
is_pure_w = all(loc[j] is None for j in range(min(3, len(loc))))
if is_pure_w:
operation.process_data[i].elapsed_time = abs(w - last_w) / w_speed
x, y, z = operation.process_data[i].location
operation.process_data[i].location = (x, y, z - float(w))
last_w = w
operation._invalidate_arrays()
return operation