Source code for gcode_reader.emulate.exporters

import logging
import re

import numpy as np
import pandas as pd
from pathlib import Path

from . import commands
from .. import read
from . import tag_literals

from ..syntax import resolve_flavor
from .operations import ProcessData
from .additive_part import AdditivePart


[docs] class GcodeExporter: """Translates abstract Command objects into G-code strings or data series. This class uses configurable "syntax flavors" to handle different G-code dialects. It can export a sequence of Command objects into a final G-code file or into a format suitable for data analysis (e.g., a Pandas DataFrame). Attributes: syntax (dict): The syntax definition dictionary for the chosen flavor. word_map (dict): A map to translate Command attribute names to G-code words. code_map (dict): A map for specific G-code numbers (e.g., rapid move code). gcode_line_helpers (list): A list of helper methods used to construct a G-code line. write_order (list): A list of G-code word keys defining their output order. """ def __init__(self, syntax_name="default"): """Initializes the GcodeExporter with a specific syntax flavor. Args: syntax_name (str, optional): The name of the syntax flavor to use, which corresponds to a definition in `gcode_reader.syntax.flavors`. Defaults to "default". """ # utilize the setter to self.syntax = syntax_name self.gcode_line_helpers = ( self._command_to_gcode_line_gm_code_helper, self._command_to_gcode_line_parameter_helper, self._command_to_gcode_line_comment_helper, ) # Helpers must yield str (return an iterable) self.write_order = ("G", "M", "X", "Y", "Z", "A", "B", "C", "F", "E") @property def syntax(self): return self._syntax @syntax.setter def syntax(self, val): self._syntax = resolve_flavor(val) @property def code_map(self): return self.syntax.get("code_map", {}) @property def word_map(self): return self.syntax.get("word_map", {})
[docs] def command_to_gcode_words(self, command: commands.Command) -> dict: """Converts a Command's data into a dictionary of G-code words. This method uses the exporter's `word_map` to translate the Command's internal attributes (e.g., `position`) into the corresponding G-code words and values (e.g., {'X': 10, 'Y': 20}). It's the first step in converting a command to either a string or a data series. Args: command (Command): The Command object to process. Returns: dict: A dictionary mapping G-code words (e.g., 'G', 'X', 'F') to their respective values. """ if not isinstance(command, commands.Command): raise TypeError( f"command argument must be instance of `Command`. Received type {type(command)}" ) # The command dictionary will have keys that match the Command's attributes # We need to map these to the gcode words defined in the syntax's word_map command_dict = command.to_dict() words = {} # Always have G, M, comment in the dictionary words["G"] = command_dict.get("G") words["M"] = command_dict.get("M") words["comment"] = command_dict.get("comment") # Config commands carry their extra information (e.g. M5 S200) in "settings" # We need to re-insert those values into "words" so they are written out # These values are stored in a dictionary inside of "settings" to help contextualize them # at a more abstract level. TODO: Should the config commands be refactored because of this? settings = command_dict.get("settings") if settings is not None and isinstance(settings, dict): for key, val in settings.items(): words[key] = val # word_map = [ { attr_key: word_key } ] # Assign the value of command_data[attr_key] to words[word_key] for attr_key, word_key in self.word_map.items(): value = command_dict.get(attr_key) if value is None: continue if isinstance(word_key, (list, tuple)): if isinstance(value, (list, tuple)) and len(value) == len(word_key): for i, k in enumerate(word_key): if value[i] is not None: words[k] = value[i] elif word_key is not None: value = command_dict.get(attr_key) if value is not None: words[word_key] = value return words
def _command_to_gcode_line_gm_code_helper(self, **words): """Generator helper that yields the primary G or M code for a line. This function prioritizes 'G' over 'M'. If a 'G' key exists in the words, it yields the formatted G-code. Otherwise, it checks for an 'M' key. It assumes a single line will not contain both a primary G and M code. Args: **words: A dictionary of G-code words and their values. Yields: str: The formatted G or M code string (e.g., "G1"). """ g_val = words.get("G") if g_val is not None: yield f"G{g_val}" else: m_val = words.get("M") if m_val is not None: yield f"M{m_val}" def _command_to_gcode_line_parameter_helper(self, **words): """Generator helper that yields all parameter words (X, Y, F, etc.). This function iterates through the provided words, ignoring command codes ('G', 'M') and metadata ('comment', 'tags', etc). It formats each parameter and yields it as a string. The output order is determined by the `self.write_order` attribute to ensure consistent formatting. Args: **words: A dictionary of G-code words and their values. Yields: str: A formatted parameter string (e.g., "X10.5", "F500"). """ ignore_keys = ["G", "M", "comment", "raw", "tags"] command_keys = [k for k in words if k not in ignore_keys] # This allows us to ensure a consistent output format # User can define their desired output format in self.write_order command_keys.sort( key=lambda k: ( ( self.write_order.index(k.upper()) if k.upper() in self.write_order else float("inf") ), k.upper(), ) ) for key in command_keys: value = words.get(key) if value is None: continue try: clean_value = read._clean_word_value(key, value, self.syntax) if clean_value is not None: yield f"{key.upper()}{clean_value}" except Exception: logging.error( f"GcodeExporter._command_to_gcode_line_parameter_helper: Failed to clean word ({key}) value ({value}) " ) def _command_to_gcode_line_comment_helper(self, **words): """Generator helper that yields the formatted comment for a line. Args: **words: A dictionary of G-code words and their values. Yields: str: The formatted comment string (e.g., "; layer 10"). """ comment_text = words.get("comment") comment_str = "" if comment_text and isinstance(comment_text, str): comment_re = self.syntax.get("comment_re") if not comment_re: pass else: try: escaped_comment = comment_text.replace("\\", "\\\\") comment_re_str = ( comment_re.pattern if hasattr(comment_re, "pattern") else comment_re ) template = re.sub( r"\\|\$.*", "", comment_re_str, ) formatted_comment = re.sub(r"\(\.\*\)", escaped_comment, template) comment_str = formatted_comment.replace("\\\\", "\\") except re.error as e: logging.error( f'GcodeExporter._command_to_gcode_line_comment_helper: Invalid comment regex in syntax: "{comment_re}". Error: {e}' ) pass # no comment added yield comment_str
[docs] def command_to_gcode_line(self, command: commands.Command) -> str: """Constructs a complete G-code line string from a Command object. This method orchestrates the G-code generation process. It first converts the command to a dictionary of words, then uses the list of `gcode_line_helpers` to assemble those words into a final, space- delimited G-code string. Args: command (Command): The Command to convert into a G-code line. Returns: str: A single, formatted G-code line. """ if not isinstance(command, commands.Command): raise TypeError( f"command argument must be instance of `Command`. Received type {type(command)}" ) words = self.command_to_gcode_words(command) if not words: return "" command_parts = [] for helper_fn in self.gcode_line_helpers: command_parts.extend(helper_fn(**words)) line = " ".join(p for p in command_parts if p) return f"{line}\n"
def _command_to_pd_series_handle_move(self, command, series_data): """Internal helper to add descriptive tags for commands.Move commands. This method inspects a `commands.Move` command to add context-specific tags to its data series, which is useful for later analysis. It identifies and tags rapid moves, linear moves, and pure Z-axis moves. Args: command (Command): The command being processed, expected to be a `commands.Move`. series_data (dict): The data dictionary for the command, which will be modified by adding tags. Returns: dict: The modified `series_data` dictionary with new tags. """ # NOTE: Assuming the default rapid move G code is G0. This provides a mechanism for a machine profile or syntax definition to override this default via the `code_map` rapid_move = self.code_map.get("rapid", 0) if isinstance(command, commands.Move): if not isinstance(command, commands.MoveExtrude): if "G" in series_data.keys() and series_data["G"] == rapid_move: series_data["tags"] = f"{tag_literals.RAPID_MOVE}" else: series_data["tags"] = f"{tag_literals.LINEAR_MOVE}" return series_data
[docs] def command_to_dict(self, command: commands.Command) -> dict: """Converts a `Command` instance into a plain dict suitable for a `DataFrame` row. Prefer this over `command_to_pd_series` when building a DataFrame from many commands — passing a list of dicts to `pd.DataFrame()` is significantly faster than passing a list of `pd.Series`. Args: command (`Command`): The Command object to convert. Returns: dict: A dictionary containing the command's G-code words and metadata. """ if not isinstance(command, commands.Command): raise TypeError( f"command argument must be instance of `Command`. Received type {type(command)}" ) series_data = dict(**self.command_to_gcode_words(command)) series_data["tags"] = f"{command.TAG}" series_data["raw"] = command.raw series_data = self._command_to_pd_series_handle_move(command, series_data) return series_data
[docs] def command_to_pd_series(self, command: commands.Command): """Converts a `Command` instance into a `pd.Series`. For bulk DataFrame construction use `command_to_dict` instead. """ return pd.Series(self.command_to_dict(command))
[docs] class VTKExporter: """Exports geometry and scalar data to PyVista and VTK formats."""
[docs] @staticmethod def additive_part_to_polydata( additive_part: AdditivePart, deposition_range: tuple = None, custom_scalars: dict = None, ) -> "pv.PolyData": """Converts an AdditivePart to a PyVista PolyData mesh with scalar fields. Args: additive_part (AdditivePart): The part to convert. deposition_range (tuple, optional): ``(start, end)`` pair of 0-based indices into the ordered sequence of depositing segments, selecting a contiguous subset to mesh. ``None`` includes all depositing segments. custom_scalars (dict, optional): Mapping of ``{name: array}`` for additional or replacement scalar arrays stored as cell data. Use this to supply measured data (e.g. temperature) or any other derived field. Array length must equal ``mesh.n_cells`` (one value per line-segment). Returns: pv.PolyData: Mesh with scalar arrays in ``cell_data``. Raises: TypeError: If ``additive_part`` is not an ``AdditivePart``. ImportError: If pyvista is not installed. """ try: import pyvista as pv # noqa: F401 — validates availability early except ImportError: raise ImportError( "VTK export requires pyvista. Install with: pip install gcode-reader[viz]" ) from None if not isinstance(additive_part, AdditivePart): raise TypeError( f"additive_part must be an instance of AdditivePart. " f"Received {type(additive_part).__name__}" ) return VTKExporter._build_additive_part_polydata( additive_part, deposition_range, custom_scalars )
[docs] @staticmethod def additive_part_to_vtk( additive_part: AdditivePart, filepath: str, binary: bool = True, deposition_range: tuple = None, ): """Exports an AdditivePart to a ``.vtk`` file with embedded scalar fields. Args: additive_part (AdditivePart): The part to export. filepath (str): Destination path; must end with ``.vtk``. binary (bool, optional): Write binary VTK format. Defaults to True. deposition_range (tuple, optional): ``(start, end)`` pair of 0-based indices into the ordered sequence of depositing segments to export. ``None`` exports all depositing segments. Raises: TypeError: If ``additive_part`` is not an ``AdditivePart``. ValueError: If ``filepath`` does not end with ``.vtk``. FileNotFoundError: If the parent directory of ``filepath`` does not exist. """ if not isinstance(additive_part, AdditivePart): raise TypeError( f"additive_part must be an instance of AdditivePart. " f"Received {type(additive_part).__name__}" ) path_obj = Path(filepath) if path_obj.suffix.lower() != ".vtk": raise ValueError( f"Filepath must end with '.vtk'. Received: '{path_obj.suffix}'" ) if not path_obj.parent.exists(): raise FileNotFoundError( f"The directory '{path_obj.parent}' does not exist." ) mesh = VTKExporter.additive_part_to_polydata(additive_part, deposition_range) mesh.save(filepath, binary=binary)
@staticmethod def _build_additive_part_polydata( additive_part: AdditivePart, deposition_range: tuple, custom_scalars: dict, ) -> "pv.PolyData": """Constructs the PolyData geometry and all scalar arrays in one pass. Iterates depositing segments within the requested deposition range. For each segment of N process-data records, N nodes are produced and N-1 cell scalars are computed. Scalars for cell k (the edge from node k to node k+1) come from ``process_data_slice[k+1]`` — the a->b record housed at the destination index b — so there is no off-by-one error from using the anchor (index 0) record whose scalars belong to the preceding travel move. Scalars are stored as ``cell_data`` (one value per segment). Returns an empty ``pv.PolyData`` when the filtered range contains no depositing segments. """ import pyvista as pv depositing = additive_part.deposition_dataframe if depositing is None or depositing.empty: return pv.PolyData() n_depositions = len(depositing) if deposition_range is None: deposition_range = (0, n_depositions) else: if ( not isinstance(deposition_range, (tuple, list)) or len(deposition_range) != 2 ): raise TypeError( "deposition_range must be a (start, end) tuple of integer indices." ) if deposition_range[1] > n_depositions: raise IndexError( f"deposition_range upper bound {deposition_range[1]} exceeds " f"{n_depositions} total depositing segments." ) depositing = depositing.iloc[deposition_range[0] : deposition_range[1]] if depositing.empty: return pv.PolyData() layer_times = additive_part.layer_times bead = additive_part.bead # Pre-extract scalar fields from the relevant process_data into # contiguous numpy arrays in a single loop. start_deposition = int(depositing["start"].min()) end_deposition = int(depositing["end"].max()) + 1 # inclusive process_data_dict = additive_part.operation.process_data_to_numpy_dict( (start_deposition, end_deposition) ) # Include planned velocity scalars when a motion profile is available. # The velocity arrays share the same index space as process_data_dict, so # they are sliced with the same s+1:e window inside the cell loop below. motion_scalars = {} if additive_part.operation.motion_profile: try: from .motion import extract_motion_scalars full_motion_scalars = extract_motion_scalars(additive_part.operation) for field, arr in full_motion_scalars.items(): motion_scalars[field] = arr[start_deposition:end_deposition] except Exception: logging.warning( "VTKExporter: could not extract motion scalars — " "velocity fields will be absent from the mesh." ) nodes_list = [] edges_list = [] # Merge all per-index array sources into one dict; motion_scalars may be empty. scalar_sources = {**process_data_dict, **motion_scalars} # Constant-per-segment fill values; layer_time is updated each row. fill_values = { "layer_time": 0.0, "temperature": np.nan, } #: Scalar fields attached to every mesh as **cell data** (one value per #: line-segment cell, not per point). process_data[i] describes the a->b #: move that ends at location i, so cell k (nodes k->k+1) uses #: process_data[k+1] — the record housed at the destination index. scalar_fields = {name: [] for name in [*scalar_sources, *fill_values]} # Keys present in scalar_sources that should not be written to the mesh. _skip_scalar_fields = {"location", "relative_movement"} point_offset = 0 for row in depositing.itertuples(): s = row.start - start_deposition e = row.end - start_deposition + 1 # exclusive n = e - s if n < 2: continue n_cells = n - 1 fill_values["layer_time"] = layer_times.get(row.layer_index, 0.0) # Nodes: one per process-data record (N total) --- nodes_list.append(process_data_dict["location"][s:e]) # Cell scalars: one per line segment # Cell k connects node k -> node k+1 and is described by process_data_slice[k+1] for field_name, arr in scalar_sources.items(): scalar_fields[field_name].append(arr[s + 1 : e]) for field_name, val in fill_values.items(): scalar_fields[field_name].append(np.full(n_cells, val)) idx = np.arange(point_offset, point_offset + n) edges_list.append(np.stack((idx[:-1], idx[1:]), axis=1)) point_offset += n for key in _skip_scalar_fields: scalar_fields.pop(key) if not nodes_list: return pv.PolyData() nodes_arr = np.vstack(nodes_list) edges_arr = np.vstack(edges_list) padding = np.full(len(edges_arr), 2, dtype=int) cells = np.column_stack((padding, edges_arr)) mesh = pv.PolyData(nodes_arr, cells) for name, arrays in scalar_fields.items(): mesh.cell_data[name] = np.concatenate(arrays) # Mechanism for user to specify custom scalar data (also cell data) if custom_scalars: for name, arr in custom_scalars.items(): mesh.cell_data[name] = np.asarray(arr, dtype=float) return mesh