import logging
import re
import numpy as np
import pandas as pd
from pathlib import Path
from . import commands
from .. import read
from . import tag_literals
from ..syntax import resolve_flavor
from .operations import ProcessData
from .additive_part import AdditivePart
[docs]
class GcodeExporter:
"""Translates abstract Command objects into G-code strings or data series.
This class uses configurable "syntax flavors" to handle different G-code
dialects. It can export a sequence of Command objects into a final G-code
file or into a format suitable for data analysis (e.g., a Pandas DataFrame).
Attributes:
syntax (dict): The syntax definition dictionary for the chosen flavor.
word_map (dict): A map to translate Command attribute names to G-code words.
code_map (dict): A map for specific G-code numbers (e.g., rapid move code).
gcode_line_helpers (list): A list of helper methods used to construct a G-code line.
write_order (list): A list of G-code word keys defining their output order.
"""
def __init__(self, syntax_name="default"):
"""Initializes the GcodeExporter with a specific syntax flavor.
Args:
syntax_name (str, optional): The name of the syntax flavor to use,
which corresponds to a definition in `gcode_reader.syntax.flavors`.
Defaults to "default".
"""
# utilize the setter to
self.syntax = syntax_name
self.gcode_line_helpers = (
self._command_to_gcode_line_gm_code_helper,
self._command_to_gcode_line_parameter_helper,
self._command_to_gcode_line_comment_helper,
) # Helpers must yield str (return an iterable)
self.write_order = ("G", "M", "X", "Y", "Z", "A", "B", "C", "F", "E")
@property
def syntax(self):
return self._syntax
@syntax.setter
def syntax(self, val):
self._syntax = resolve_flavor(val)
@property
def code_map(self):
return self.syntax.get("code_map", {})
@property
def word_map(self):
return self.syntax.get("word_map", {})
[docs]
def command_to_gcode_words(self, command: commands.Command) -> dict:
"""Converts a Command's data into a dictionary of G-code words.
This method uses the exporter's `word_map` to translate the Command's
internal attributes (e.g., `position`) into the corresponding G-code
words and values (e.g., {'X': 10, 'Y': 20}). It's the first step in
converting a command to either a string or a data series.
Args:
command (Command): The Command object to process.
Returns:
dict: A dictionary mapping G-code words (e.g., 'G', 'X', 'F') to
their respective values.
"""
if not isinstance(command, commands.Command):
raise TypeError(
f"command argument must be instance of `Command`. Received type {type(command)}"
)
# The command dictionary will have keys that match the Command's attributes
# We need to map these to the gcode words defined in the syntax's word_map
command_dict = command.to_dict()
words = {}
# Always have G, M, comment in the dictionary
words["G"] = command_dict.get("G")
words["M"] = command_dict.get("M")
words["comment"] = command_dict.get("comment")
# Config commands carry their extra information (e.g. M5 S200) in "settings"
# We need to re-insert those values into "words" so they are written out
# These values are stored in a dictionary inside of "settings" to help contextualize them
# at a more abstract level. TODO: Should the config commands be refactored because of this?
settings = command_dict.get("settings")
if settings is not None and isinstance(settings, dict):
for key, val in settings.items():
words[key] = val
# word_map = [ { attr_key: word_key } ]
# Assign the value of command_data[attr_key] to words[word_key]
for attr_key, word_key in self.word_map.items():
value = command_dict.get(attr_key)
if value is None:
continue
if isinstance(word_key, (list, tuple)):
if isinstance(value, (list, tuple)) and len(value) == len(word_key):
for i, k in enumerate(word_key):
if value[i] is not None:
words[k] = value[i]
elif word_key is not None:
value = command_dict.get(attr_key)
if value is not None:
words[word_key] = value
return words
def _command_to_gcode_line_gm_code_helper(self, **words):
"""Generator helper that yields the primary G or M code for a line.
This function prioritizes 'G' over 'M'. If a 'G' key exists in the
words, it yields the formatted G-code. Otherwise, it checks for an
'M' key. It assumes a single line will not contain both a primary
G and M code.
Args:
**words: A dictionary of G-code words and their values.
Yields:
str: The formatted G or M code string (e.g., "G1").
"""
g_val = words.get("G")
if g_val is not None:
yield f"G{g_val}"
else:
m_val = words.get("M")
if m_val is not None:
yield f"M{m_val}"
def _command_to_gcode_line_parameter_helper(self, **words):
"""Generator helper that yields all parameter words (X, Y, F, etc.).
This function iterates through the provided words, ignoring command
codes ('G', 'M') and metadata ('comment', 'tags', etc). It formats each parameter
and yields it as a string. The output order is determined by the
`self.write_order` attribute to ensure consistent formatting.
Args:
**words: A dictionary of G-code words and their values.
Yields:
str: A formatted parameter string (e.g., "X10.5", "F500").
"""
ignore_keys = ["G", "M", "comment", "raw", "tags"]
command_keys = [k for k in words if k not in ignore_keys]
# This allows us to ensure a consistent output format
# User can define their desired output format in self.write_order
command_keys.sort(
key=lambda k: (
(
self.write_order.index(k.upper())
if k.upper() in self.write_order
else float("inf")
),
k.upper(),
)
)
for key in command_keys:
value = words.get(key)
if value is None:
continue
try:
clean_value = read._clean_word_value(key, value, self.syntax)
if clean_value is not None:
yield f"{key.upper()}{clean_value}"
except Exception:
logging.error(
f"GcodeExporter._command_to_gcode_line_parameter_helper: Failed to clean word ({key}) value ({value}) "
)
def _command_to_gcode_line_comment_helper(self, **words):
"""Generator helper that yields the formatted comment for a line.
Args:
**words: A dictionary of G-code words and their values.
Yields:
str: The formatted comment string (e.g., "; layer 10").
"""
comment_text = words.get("comment")
comment_str = ""
if comment_text and isinstance(comment_text, str):
comment_re = self.syntax.get("comment_re")
if not comment_re:
pass
else:
try:
escaped_comment = comment_text.replace("\\", "\\\\")
comment_re_str = (
comment_re.pattern
if hasattr(comment_re, "pattern")
else comment_re
)
template = re.sub(
r"\\|\$.*",
"",
comment_re_str,
)
formatted_comment = re.sub(r"\(\.\*\)", escaped_comment, template)
comment_str = formatted_comment.replace("\\\\", "\\")
except re.error as e:
logging.error(
f'GcodeExporter._command_to_gcode_line_comment_helper: Invalid comment regex in syntax: "{comment_re}". Error: {e}'
)
pass # no comment added
yield comment_str
[docs]
def command_to_gcode_line(self, command: commands.Command) -> str:
"""Constructs a complete G-code line string from a Command object.
This method orchestrates the G-code generation process. It first
converts the command to a dictionary of words, then uses the list of
`gcode_line_helpers` to assemble those words into a final, space-
delimited G-code string.
Args:
command (Command): The Command to convert into a G-code line.
Returns:
str: A single, formatted G-code line.
"""
if not isinstance(command, commands.Command):
raise TypeError(
f"command argument must be instance of `Command`. Received type {type(command)}"
)
words = self.command_to_gcode_words(command)
if not words:
return ""
command_parts = []
for helper_fn in self.gcode_line_helpers:
command_parts.extend(helper_fn(**words))
line = " ".join(p for p in command_parts if p)
return f"{line}\n"
def _command_to_pd_series_handle_move(self, command, series_data):
"""Internal helper to add descriptive tags for commands.Move commands.
This method inspects a `commands.Move` command to add context-specific tags
to its data series, which is useful for later analysis. It identifies
and tags rapid moves, linear moves, and pure Z-axis moves.
Args:
command (Command): The command being processed, expected to be a `commands.Move`.
series_data (dict): The data dictionary for the command, which will
be modified by adding tags.
Returns:
dict: The modified `series_data` dictionary with new tags.
"""
# NOTE: Assuming the default rapid move G code is G0. This provides a mechanism for a machine profile or syntax definition to override this default via the `code_map`
rapid_move = self.code_map.get("rapid", 0)
if isinstance(command, commands.Move):
if not isinstance(command, commands.MoveExtrude):
if "G" in series_data.keys() and series_data["G"] == rapid_move:
series_data["tags"] = f"{tag_literals.RAPID_MOVE}"
else:
series_data["tags"] = f"{tag_literals.LINEAR_MOVE}"
return series_data
[docs]
def command_to_dict(self, command: commands.Command) -> dict:
"""Converts a `Command` instance into a plain dict suitable for a `DataFrame` row.
Prefer this over `command_to_pd_series` when building a DataFrame from many
commands — passing a list of dicts to `pd.DataFrame()` is significantly faster
than passing a list of `pd.Series`.
Args:
command (`Command`): The Command object to convert.
Returns:
dict: A dictionary containing the command's G-code words and metadata.
"""
if not isinstance(command, commands.Command):
raise TypeError(
f"command argument must be instance of `Command`. Received type {type(command)}"
)
series_data = dict(**self.command_to_gcode_words(command))
series_data["tags"] = f"{command.TAG}"
series_data["raw"] = command.raw
series_data = self._command_to_pd_series_handle_move(command, series_data)
return series_data
[docs]
def command_to_pd_series(self, command: commands.Command):
"""Converts a `Command` instance into a `pd.Series`.
For bulk DataFrame construction use `command_to_dict` instead.
"""
return pd.Series(self.command_to_dict(command))
[docs]
class VTKExporter:
"""Exports geometry and scalar data to PyVista and VTK formats."""
[docs]
@staticmethod
def additive_part_to_polydata(
additive_part: AdditivePart,
deposition_range: tuple = None,
custom_scalars: dict = None,
) -> "pv.PolyData":
"""Converts an AdditivePart to a PyVista PolyData mesh with scalar fields.
Args:
additive_part (AdditivePart): The part to convert.
deposition_range (tuple, optional): ``(start, end)`` pair of
0-based indices into the ordered sequence of depositing segments,
selecting a contiguous subset to mesh. ``None`` includes all
depositing segments.
custom_scalars (dict, optional): Mapping of ``{name: array}`` for
additional or replacement scalar arrays stored as cell data.
Use this to supply measured data (e.g. temperature) or any
other derived field. Array length must equal
``mesh.n_cells`` (one value per line-segment).
Returns:
pv.PolyData: Mesh with scalar arrays in ``cell_data``.
Raises:
TypeError: If ``additive_part`` is not an ``AdditivePart``.
ImportError: If pyvista is not installed.
"""
try:
import pyvista as pv # noqa: F401 — validates availability early
except ImportError:
raise ImportError(
"VTK export requires pyvista. Install with: pip install gcode-reader[viz]"
) from None
if not isinstance(additive_part, AdditivePart):
raise TypeError(
f"additive_part must be an instance of AdditivePart. "
f"Received {type(additive_part).__name__}"
)
return VTKExporter._build_additive_part_polydata(
additive_part, deposition_range, custom_scalars
)
[docs]
@staticmethod
def additive_part_to_vtk(
additive_part: AdditivePart,
filepath: str,
binary: bool = True,
deposition_range: tuple = None,
):
"""Exports an AdditivePart to a ``.vtk`` file with embedded scalar fields.
Args:
additive_part (AdditivePart): The part to export.
filepath (str): Destination path; must end with ``.vtk``.
binary (bool, optional): Write binary VTK format. Defaults to True.
deposition_range (tuple, optional): ``(start, end)`` pair of
0-based indices into the ordered sequence of depositing segments
to export. ``None`` exports all depositing segments.
Raises:
TypeError: If ``additive_part`` is not an ``AdditivePart``.
ValueError: If ``filepath`` does not end with ``.vtk``.
FileNotFoundError: If the parent directory of ``filepath`` does
not exist.
"""
if not isinstance(additive_part, AdditivePart):
raise TypeError(
f"additive_part must be an instance of AdditivePart. "
f"Received {type(additive_part).__name__}"
)
path_obj = Path(filepath)
if path_obj.suffix.lower() != ".vtk":
raise ValueError(
f"Filepath must end with '.vtk'. Received: '{path_obj.suffix}'"
)
if not path_obj.parent.exists():
raise FileNotFoundError(
f"The directory '{path_obj.parent}' does not exist."
)
mesh = VTKExporter.additive_part_to_polydata(additive_part, deposition_range)
mesh.save(filepath, binary=binary)
@staticmethod
def _build_additive_part_polydata(
additive_part: AdditivePart,
deposition_range: tuple,
custom_scalars: dict,
) -> "pv.PolyData":
"""Constructs the PolyData geometry and all scalar arrays in one pass.
Iterates depositing segments within the requested deposition range. For
each segment of N process-data records, N nodes are produced and N-1
cell scalars are computed. Scalars for cell k (the edge from node k to
node k+1) come from ``process_data_slice[k+1]`` — the a->b record
housed at the destination index b — so there is no off-by-one error
from using the anchor (index 0) record whose scalars belong to the
preceding travel move.
Scalars are stored as ``cell_data`` (one value per segment).
Returns an empty ``pv.PolyData`` when the filtered range contains no
depositing segments.
"""
import pyvista as pv
depositing = additive_part.deposition_dataframe
if depositing is None or depositing.empty:
return pv.PolyData()
n_depositions = len(depositing)
if deposition_range is None:
deposition_range = (0, n_depositions)
else:
if (
not isinstance(deposition_range, (tuple, list))
or len(deposition_range) != 2
):
raise TypeError(
"deposition_range must be a (start, end) tuple of integer indices."
)
if deposition_range[1] > n_depositions:
raise IndexError(
f"deposition_range upper bound {deposition_range[1]} exceeds "
f"{n_depositions} total depositing segments."
)
depositing = depositing.iloc[deposition_range[0] : deposition_range[1]]
if depositing.empty:
return pv.PolyData()
layer_times = additive_part.layer_times
bead = additive_part.bead
# Pre-extract scalar fields from the relevant process_data into
# contiguous numpy arrays in a single loop.
start_deposition = int(depositing["start"].min())
end_deposition = int(depositing["end"].max()) + 1 # inclusive
process_data_dict = additive_part.operation.process_data_to_numpy_dict(
(start_deposition, end_deposition)
)
# Include planned velocity scalars when a motion profile is available.
# The velocity arrays share the same index space as process_data_dict, so
# they are sliced with the same s+1:e window inside the cell loop below.
motion_scalars = {}
if additive_part.operation.motion_profile:
try:
from .motion import extract_motion_scalars
full_motion_scalars = extract_motion_scalars(additive_part.operation)
for field, arr in full_motion_scalars.items():
motion_scalars[field] = arr[start_deposition:end_deposition]
except Exception:
logging.warning(
"VTKExporter: could not extract motion scalars — "
"velocity fields will be absent from the mesh."
)
nodes_list = []
edges_list = []
# Merge all per-index array sources into one dict; motion_scalars may be empty.
scalar_sources = {**process_data_dict, **motion_scalars}
# Constant-per-segment fill values; layer_time is updated each row.
fill_values = {
"layer_time": 0.0,
"temperature": np.nan,
}
#: Scalar fields attached to every mesh as **cell data** (one value per
#: line-segment cell, not per point). process_data[i] describes the a->b
#: move that ends at location i, so cell k (nodes k->k+1) uses
#: process_data[k+1] — the record housed at the destination index.
scalar_fields = {name: [] for name in [*scalar_sources, *fill_values]}
# Keys present in scalar_sources that should not be written to the mesh.
_skip_scalar_fields = {"location", "relative_movement"}
point_offset = 0
for row in depositing.itertuples():
s = row.start - start_deposition
e = row.end - start_deposition + 1 # exclusive
n = e - s
if n < 2:
continue
n_cells = n - 1
fill_values["layer_time"] = layer_times.get(row.layer_index, 0.0)
# Nodes: one per process-data record (N total) ---
nodes_list.append(process_data_dict["location"][s:e])
# Cell scalars: one per line segment
# Cell k connects node k -> node k+1 and is described by process_data_slice[k+1]
for field_name, arr in scalar_sources.items():
scalar_fields[field_name].append(arr[s + 1 : e])
for field_name, val in fill_values.items():
scalar_fields[field_name].append(np.full(n_cells, val))
idx = np.arange(point_offset, point_offset + n)
edges_list.append(np.stack((idx[:-1], idx[1:]), axis=1))
point_offset += n
for key in _skip_scalar_fields:
scalar_fields.pop(key)
if not nodes_list:
return pv.PolyData()
nodes_arr = np.vstack(nodes_list)
edges_arr = np.vstack(edges_list)
padding = np.full(len(edges_arr), 2, dtype=int)
cells = np.column_stack((padding, edges_arr))
mesh = pv.PolyData(nodes_arr, cells)
for name, arrays in scalar_fields.items():
mesh.cell_data[name] = np.concatenate(arrays)
# Mechanism for user to specify custom scalar data (also cell data)
if custom_scalars:
for name, arr in custom_scalars.items():
mesh.cell_data[name] = np.asarray(arr, dtype=float)
return mesh