Source code for gcode_reader.emulate.diagnostics

"""Sanity checks for AdditivePart inputs and derived state.

Surfaces common silent-failure modes that otherwise propagate as zero-valued or
oddly-shaped statistics:

- ``check_extrusion``: extrusion did not register (often a wrong-machine config),
  no ``is_depositing`` rows, or zero depositing distance/volume.
- ``check_geometry``: degenerate bounding-box dimensions, suspicious aspect ratios.
- ``check_layer_grouping``: non-monotonic layer ordering, misaligned ``layer_normal``,
  coordinate-resolution fragmentation, or outlier moves at unrelated heights
  (the failure mode that motivated the original layer-grouping fix).
- ``check_additive_part``: runs all of the above plus cross-cutting checks
  (1 layer despite measurable build extent, implausible implied layer thickness,
  zero deposition length/volume despite depositing rows).

Each check returns a dict with the raw numbers and a list of human-readable
``warnings``. The ``format_*_report`` helpers render those dicts as text.
"""

from typing import Optional

import numpy as np
import pandas as pd


[docs] def check_layer_grouping( df: pd.DataFrame, layer_index_col: str = "layer_index", layer_height_col: str = "layer_height", sliver_count: int = 5, max_gap_ratio: float = 10.0, ) -> dict: """Compute layer-grouping sanity checks for a tagged G-code DataFrame. Args: df: DataFrame with ``layer_index`` and ``layer_height`` columns (the output of ``insert_layer_indices_gcode_dataframe``). layer_index_col: Column name for layer indices. layer_height_col: Column name for projected heights. sliver_count: A layer with fewer than this many rows is flagged as a sliver. max_gap_ratio: Gaps between consecutive layers larger than ``max_gap_ratio * median_gap`` are flagged as outliers (e.g. a probe move at an unrelated height). Returns: A dict with summary statistics and a ``per_layer`` DataFrame. Keys: - ``n_layers``: number of distinct layer indices - ``monotonic``: True if ``layer_index`` is in ascending physical-height order - ``rank_correlation``: Pearson correlation between layer_index and physical height (near +1 = healthy, near 0 = scrambled ordering) - ``min_gap``, ``median_gap``, ``max_gap``: gaps between consecutive layer heights - ``min_spread``, ``median_spread``, ``max_spread``: within-layer height spread (large values relative to gap suggest the normal is misaligned) - ``n_slivers``: number of layers with fewer than ``sliver_count`` rows - ``n_outlier_gaps``: number of gaps larger than ``max_gap_ratio * median_gap`` - ``per_layer``: DataFrame indexed by layer_index with min/max/count/spread columns - ``warnings``: list of human-readable strings flagging anomalies """ if layer_index_col not in df.columns or layer_height_col not in df.columns: raise KeyError( f"check_layer_grouping: DataFrame must contain {layer_index_col!r} and " f"{layer_height_col!r} columns. Run insert_layer_indices_gcode_dataframe first." ) per_layer = df.groupby(layer_index_col)[layer_height_col].agg( ["min", "max", "count"] ) per_layer["spread"] = per_layer["max"] - per_layer["min"] first_height = df.groupby(layer_index_col)[layer_height_col].first() monotonic = bool(first_height.is_monotonic_increasing) if len(first_height) >= 2: rank_correlation = float( np.corrcoef(first_height.index.to_numpy(), first_height.to_numpy())[0, 1] ) else: rank_correlation = float("nan") sorted_heights = np.sort(first_height.to_numpy()) gaps = np.diff(sorted_heights) if len(sorted_heights) >= 2 else np.array([]) median_gap = float(np.median(gaps)) if gaps.size else float("nan") min_gap = float(gaps.min()) if gaps.size else float("nan") max_gap = float(gaps.max()) if gaps.size else float("nan") n_outlier_gaps = ( int((gaps > max_gap_ratio * median_gap).sum()) if gaps.size and median_gap > 0 else 0 ) n_slivers = int((per_layer["count"] < sliver_count).sum()) warnings = [] if not monotonic: warnings.append( "layer_index is not monotonic in physical height — downstream code that " "iterates layers in numeric order may walk them out of build order." ) if median_gap > 0 and per_layer["spread"].median() > 0.1 * median_gap: warnings.append( f"median within-layer spread ({per_layer['spread'].median():.3g}) is " f">10% of median inter-layer gap ({median_gap:.3g}) — layer_normal may be " "misaligned with the true layer planes." ) if len(per_layer) > 0 and n_slivers / len(per_layer) > 0.1: warnings.append( f"{n_slivers}/{len(per_layer)} layers contain <{sliver_count} rows — " "possible coordinate-resolution fragmentation or many travel-only layers." ) if n_outlier_gaps > 0: warnings.append( f"{n_outlier_gaps} inter-layer gap(s) exceed {max_gap_ratio}x the median " f"gap (median {median_gap:.3g}, max {max_gap:.3g}) — likely outlier moves " "(probe, purge, retract to safe height)." ) return { "n_layers": int(len(per_layer)), "monotonic": monotonic, "rank_correlation": rank_correlation, "min_gap": min_gap, "median_gap": median_gap, "max_gap": max_gap, "min_spread": ( float(per_layer["spread"].min()) if len(per_layer) else float("nan") ), "median_spread": ( float(per_layer["spread"].median()) if len(per_layer) else float("nan") ), "max_spread": ( float(per_layer["spread"].max()) if len(per_layer) else float("nan") ), "n_slivers": n_slivers, "n_outlier_gaps": n_outlier_gaps, "per_layer": per_layer, "warnings": warnings, }
[docs] def check_extrusion(df: pd.DataFrame) -> dict: """Verify that extrusion data is present and non-trivial. A common silent failure mode is configuring the wrong machine/extruder, in which case ``deposited_volume`` stays at zero and ``is_depositing`` is empty even though the G-code clearly contains print moves. Downstream stats (layer count, volume, mass, deposition bounds) then come back as zero or missing with no error raised. Returns a dict with: - ``has_is_depositing_col``, ``has_deposited_volume_col`` - ``n_depositing_rows``: number of rows tagged ``is_depositing == True`` - ``depositing_fraction``: fraction of rows that are depositing - ``total_deposited_volume``: sum of the ``deposited_volume`` column - ``n_depositions``: distinct ``deposition_id`` values among depositing rows - ``warnings``: human-readable flags """ warnings = [] has_dep_col = "is_depositing" in df.columns has_vol_col = "deposited_volume" in df.columns has_dist_col = "distance" in df.columns n_depositing = int(df["is_depositing"].sum()) if has_dep_col else 0 depositing_fraction = n_depositing / len(df) if has_dep_col and len(df) > 0 else 0.0 total_volume = float(df["deposited_volume"].sum()) if has_vol_col else float("nan") if has_dep_col and has_dist_col: depositing_distance = float(df.loc[df["is_depositing"], "distance"].sum()) else: depositing_distance = float("nan") if has_dep_col and "deposition_id" in df.columns: n_depositions = int(df.loc[df["is_depositing"], "deposition_id"].nunique()) else: n_depositions = 0 if not has_dep_col: warnings.append( "DataFrame has no 'is_depositing' column — extrusion was not tagged. " "Check that the part was built from process data, not raw G-code." ) elif n_depositing == 0: warnings.append( "No depositing rows found — extrusion did not register. Most common cause: " "wrong machine/extruder configured for this G-code dialect, so extrude " "commands are not being parsed." ) if has_vol_col and total_volume <= 0: warnings.append( f"Total deposited_volume is {total_volume:.4g} — extrusion volume did not " "register. Check the extruder model (bead area, density) and machine choice." ) if has_dep_col and len(df) > 0 and 0 < depositing_fraction < 0.01: warnings.append( f"Only {depositing_fraction:.2%} of rows are depositing — unusually low. " "Could indicate that most extrude commands are being parsed as travel." ) if has_dep_col and n_depositing > 0 and has_dist_col and depositing_distance <= 0: warnings.append( "is_depositing rows exist but their total distance is 0 — extrude moves " "may have been parsed as zero-length. Check the machine/extruder model." ) return { "has_is_depositing_col": has_dep_col, "has_deposited_volume_col": has_vol_col, "n_depositing_rows": n_depositing, "depositing_fraction": depositing_fraction, "total_deposited_volume": total_volume, "depositing_distance": depositing_distance, "n_depositions": n_depositions, "warnings": warnings, }
[docs] def check_geometry(part) -> dict: """Sanity-check the deposition bounding box and overall geometry of an AdditivePart. Flags degenerate dimensions (zero extent in any axis), unusually thin parts (aspect ratio > 1000), and missing bounds. A zero extent in the build direction is often a tell-tale of extrusion not registering or of a single-layer parse. Returns a dict with: - ``bounds``: ``((xmin, ymin, zmin), (xmax, ymax, zmax))`` or None - ``dims``: ``(dx, dy, dz)`` or None - ``aspect_ratio``: max(dim) / min(positive dim), or inf if any dim is zero - ``warnings`` """ warnings = [] try: bounds = part.calc_deposition_bounds() if part.dataframe is not None else None except ValueError: # calc_deposition_bounds raises when there are zero deposition paths bounds = None if bounds is None: warnings.append( "No deposition bounds — part has no recorded deposition locations. " "Likely upstream: extrusion not registering (see check_extrusion)." ) return { "bounds": None, "dims": None, "aspect_ratio": float("nan"), "warnings": warnings, } mins, maxs = np.asarray(bounds[0]), np.asarray(bounds[1]) dims = maxs - mins zero_axes = [name for name, d in zip("xyz", dims) if d == 0.0] if zero_axes: warnings.append( f"Bounding box has zero extent in {','.join(zero_axes)} — geometry is " "degenerate in that axis (planar print, single move, or extrusion not " "registering)." ) positive_dims = dims[dims > 0] if positive_dims.size > 0: aspect = float(dims.max() / positive_dims.min()) else: aspect = float("inf") if positive_dims.size > 0 and aspect > 1000: warnings.append( f"Aspect ratio {aspect:.0f}:1 — one dimension is >1000x smaller than the " "largest. Plausible for thin walls/sheets, but worth verifying units." ) return { "bounds": (tuple(mins), tuple(maxs)), "dims": tuple(float(d) for d in dims), "aspect_ratio": aspect, "warnings": warnings, }
[docs] def check_additive_part(part) -> dict: """Run the full battery of sanity checks on an AdditivePart. Combines ``check_extrusion``, ``check_geometry``, and ``check_layer_grouping``, plus a few cross-cutting checks (layer count plausibility, deposition length). Returns a dict with one sub-dict per check and a flat ``warnings`` list that aggregates every warning, prefixed by which check raised it. """ warnings = [] extrusion = ( check_extrusion(part.dataframe) if part.dataframe is not None else { "warnings": ["No dataframe on part."], } ) for w in extrusion.get("warnings", []): warnings.append(f"extrusion: {w}") geometry = check_geometry(part) for w in geometry.get("warnings", []): warnings.append(f"geometry: {w}") if part.dataframe is not None and "layer_index" in part.dataframe.columns: layer_grouping = check_layer_grouping(part.dataframe) for w in layer_grouping.get("warnings", []): warnings.append(f"layer_grouping: {w}") else: layer_grouping = {"warnings": ["No layer_index column on part dataframe."]} warnings.append("layer_grouping: No layer_index column on part dataframe.") # Cross-cutting: layer count vs build-direction extent n_layers = part.n_layers cross_warnings = [] if geometry.get("dims") is not None and n_layers > 0: # Project bounding box onto the layer normal to get expected build height. n = np.asarray(part.layer_normal, dtype=float) n = n / np.linalg.norm(n) mins, maxs = np.asarray(geometry["bounds"][0]), np.asarray( geometry["bounds"][1] ) build_extent = float(abs((maxs - mins) @ n)) implied_thickness = build_extent / n_layers if n_layers > 0 else 0.0 if n_layers == 1 and build_extent > 0: cross_warnings.append( f"Only 1 layer detected but build extent along layer_normal is " f"{build_extent:.4g} — layer_normal may be misaligned." ) if implied_thickness > 0 and ( implied_thickness < 1e-4 or implied_thickness > 100 ): cross_warnings.append( f"Implied layer thickness ({implied_thickness:.4g}) is outside the " "0.0001 – 100 range — check units, layer_normal, or n_layers." ) elif n_layers == 0: cross_warnings.append("n_layers == 0 — no layers were detected.") for w in cross_warnings: warnings.append(f"cross: {w}") deposition_length = float(part.deposition_length) try: deposition_volume = float(part.volume) except Exception: deposition_volume = float("nan") if deposition_length <= 0 and extrusion.get("n_depositing_rows", 0) == 0: # Already covered by extrusion warnings; skip duplication. pass elif deposition_length <= 0: warnings.append( "cross: deposition_length is 0 despite depositing rows existing." ) if ( not np.isnan(deposition_volume) and deposition_volume <= 0 and extrusion.get("n_depositing_rows", 0) > 0 ): warnings.append( "cross: deposition_volume is 0 despite depositing rows existing — bead " "area or process_data deposited_volume is likely zero." ) return { "extrusion": extrusion, "geometry": geometry, "layer_grouping": layer_grouping, "n_layers": n_layers, "deposition_length": deposition_length, "deposition_volume": deposition_volume, "warnings": warnings, }
[docs] def format_additive_part_report(diagnostics: dict) -> str: """Render the output of ``check_additive_part`` as a human-readable string.""" lines = ["AdditivePart diagnostics", "=" * 32] ext = diagnostics.get("extrusion", {}) lines.append("") lines.append("Extrusion") lines.append("-" * 32) lines.append(f"depositing rows: {ext.get('n_depositing_rows', 'n/a')}") lines.append( f"depositing frac: {ext.get('depositing_fraction', float('nan')):.2%}" ) lines.append( f"depositing dist: {ext.get('depositing_distance', float('nan')):.4g}" ) if ext.get("has_deposited_volume_col"): lines.append( f"col total volume: {ext.get('total_deposited_volume', float('nan')):.4g}" ) lines.append(f"n depositions: {ext.get('n_depositions', 'n/a')}") geom = diagnostics.get("geometry", {}) lines.append("") lines.append("Geometry") lines.append("-" * 32) if geom.get("dims") is not None: lines.append( f"dims (x,y,z): " f"({geom['dims'][0]:.4g}, {geom['dims'][1]:.4g}, {geom['dims'][2]:.4g})" ) lines.append(f"aspect ratio: {geom.get('aspect_ratio', float('nan')):.4g}") else: lines.append("dims: none") lines.append("") lines.append(f"n_layers: {diagnostics.get('n_layers', 'n/a')}") lines.append( f"deposition length: {diagnostics.get('deposition_length', float('nan')):.4g}" ) lines.append( f"deposition volume: {diagnostics.get('deposition_volume', float('nan')):.4g}" ) lg = diagnostics.get("layer_grouping", {}) if lg and "n_layers" in lg: lines.append("") lines.append(format_layer_grouping_report(lg)) warnings = diagnostics.get("warnings", []) lines.append("") if warnings: lines.append("All warnings") lines.append("-" * 32) for w in warnings: lines.append(f" - {w}") else: lines.append("No warnings — all checks passed.") return "\n".join(lines)
[docs] def format_layer_grouping_report(diagnostics: dict) -> str: """Render the output of ``check_layer_grouping`` as a human-readable string.""" lines = [ "Layer grouping diagnostics", "-" * 32, f"n_layers: {diagnostics['n_layers']}", f"monotonic: {diagnostics['monotonic']}", f"rank correlation: {diagnostics['rank_correlation']:.4f}", f"gap min/med/max: {diagnostics['min_gap']:.4g} / " f"{diagnostics['median_gap']:.4g} / {diagnostics['max_gap']:.4g}", f"spread min/med/max:{diagnostics['min_spread']:.4g} / " f"{diagnostics['median_spread']:.4g} / {diagnostics['max_spread']:.4g}", f"slivers: {diagnostics['n_slivers']}", f"outlier gaps: {diagnostics['n_outlier_gaps']}", ] if diagnostics["warnings"]: lines.append("") lines.append("Warnings:") for w in diagnostics["warnings"]: lines.append(f" - {w}") return "\n".join(lines)