Source code for gcode_reader.emulate.motion

import logging

import numpy as np
import pandas as pd

from ..constants import ZERO_TOLERANCE
from .operations import Operation, ProcessDataArrays


[docs] class MotionProfile: """ Factory for generating `MotionSegment` objects from process data. Holds machine-wide kinematic constraints and orchestrates a two-pass (GRBL-style) velocity planning algorithm: junction-deviation corner limiting followed by forward and backward kinematic propagation. Args: max_jerk (float, optional): Maximum allowed jerk (units/s³). Used only by SCurve segments. Defaults to None. max_accleration (float, optional): Maximum allowed acceleration (units/s²). Defaults to None. max_velocity (float, optional): Ceiling on all target velocities (units/s). Defaults to None. junction_deviation (float, optional): Junction-deviation parameter (same length units as positions). Larger values permit higher corner velocities. Defaults to 0.05. Attributes: feed_rate_time_base_s (float): Divisor applied to feed-rate values to convert them from units/minute to units/second. Defaults to 60.0. """ def __init__( self, max_jerk: float = None, max_accleration: float = None, max_velocity: float = None, junction_deviation: float = 0.05, ): self.max_jerk = max_jerk self.max_acceleration = max_accleration self.max_velocity = max_velocity self.junction_deviation = junction_deviation self._profile_type = Trapezoid self.feed_rate_time_base_s = 60.0 @property def profile_type(self): return self._profile_type @profile_type.setter def profile_type(self, t: type): if not issubclass(t, MotionSegment): raise ValueError( f"MotionProfile.profile_type: Must be subclass of MotionSegment. Received {t}" ) self._profile_type = t def _compute_junction_velocities( self, initial_positions: np.ndarray, final_positions: np.ndarray, max_acceleration: float, junction_deviation: float, ) -> np.ndarray: """ Compute the maximum allowable speed at each segment boundary using the junction-deviation (centripetal-acceleration) model. The result at index ``i`` is the speed limit at the junction between segment ``i`` and segment ``i+1``. The last element is always 0 because the machine must stop at the end of a move sequence. The model derives from: if the tool were to follow a circular arc of radius R through the corner, the centripetal acceleration is v²/R. Setting that equal to ``max_acceleration`` and using the geometric relation between R, ``junction_deviation`` (the max arc-chord deviation δ), and the half-corner angle α gives: v_junction² = a_max · δ · sin(α) / (1 − sin(α)) where α = (π − θ) / 2 and θ is the angle between the incoming and outgoing direction vectors. Straight-through corners (θ≈0) yield a very large (effectively unconstrained) junction velocity; reversals (θ=π) yield zero. Args: initial_positions: Shape (N, 3) array of segment start positions. final_positions: Shape (N, 3) array of segment end positions. max_acceleration: Machine acceleration limit (units/s²). junction_deviation: Maximum allowed arc-chord deviation δ (same length units as positions). Returns: Shape (N,) array of junction velocity limits (units/s). """ incoming_directions = final_positions - initial_positions outgoing_start_positions = np.roll(final_positions, -1, axis=0) outgoing_directions = outgoing_start_positions - final_positions incoming_norms = np.linalg.norm(incoming_directions, axis=1) outgoing_norms = np.linalg.norm(outgoing_directions, axis=1) dot_products = np.einsum("ij,ij->i", incoming_directions, outgoing_directions) norm_products = incoming_norms * outgoing_norms # Zero-length segments produce a zero denominator; treat them as straight-through. with np.errstate(divide="ignore", invalid="ignore"): cosine_angle = np.where( norm_products > ZERO_TOLERANCE, dot_products / norm_products, 1.0, ) cosine_angle = np.clip(cosine_angle, -1.0, 1.0) corner_angle = np.arccos(cosine_angle) # 0 = straight, π = reversal # Half-corner angle α = (π − θ) / 2: # θ=0 (straight) -> α=π/2 -> sin(α)=1 -> v_junction -> ∞ (clamped by target_velocity) # θ=π (reversal) -> α=0 -> sin(α)=0 -> v_junction = 0 half_corner_angle = (np.pi - corner_angle) / 2.0 sin_half_corner = np.sin(half_corner_angle) if max_acceleration is None: # No acceleration limit -> no corner speed constraint; machine stops at the end. junction_velocities = np.full(len(final_positions), np.inf) junction_velocities[-1] = 0.0 return junction_velocities junction_denom = np.maximum(1.0 - sin_half_corner, ZERO_TOLERANCE) junction_velocity_squared = ( max_acceleration * junction_deviation * sin_half_corner / junction_denom ) junction_velocities = np.sqrt(np.maximum(junction_velocity_squared, 0.0)) junction_velocities[-1] = 0.0 # machine stops after the last segment return junction_velocities def _forward_pass( self, distances: np.ndarray, target_velocities: np.ndarray, junction_velocities: np.ndarray, max_acceleration: float, entry_velocity: float = 0.0, ) -> tuple: """ Forward kinematic pass: propagate achievable exit velocities from the first segment to the last. For each segment the exit velocity is the minimum of: - the segment's cruise target velocity, - the junction velocity limit at the end of the segment, and - the physically reachable exit speed given the entry speed and distance: ``v_exit_reachable = sqrt(v_entry² + 2 · a_max · distance)``. The exit of segment ``i`` becomes the entry of segment ``i+1``. Args: distances: Shape (N,) segment lengths. target_velocities: Shape (N,) cruise velocity ceilings (units/s). junction_velocities: Shape (N,) corner velocity limits (units/s). Index ``i`` is the limit at the boundary after segment ``i``. max_acceleration: Machine acceleration limit (units/s²). entry_velocity: Speed at the very start of the sequence. Defaults to 0.0 (machine starts from rest). Returns: Tuple ``(segment_entry_velocities, segment_exit_velocities)``, each a shape (N,) array. """ num_segments = len(distances) segment_entry_velocities = np.zeros(num_segments) segment_exit_velocities = np.zeros(num_segments) segment_entry_velocities[0] = entry_velocity for i in range(num_segments): exit_velocity_reachable = ( np.sqrt( segment_entry_velocities[i] ** 2 + 2.0 * max_acceleration * distances[i] ) if max_acceleration is not None else np.inf ) segment_exit_velocities[i] = min( target_velocities[i], junction_velocities[i], exit_velocity_reachable, ) if i + 1 < num_segments: segment_entry_velocities[i + 1] = segment_exit_velocities[i] return segment_entry_velocities, segment_exit_velocities def _backward_pass( self, distances: np.ndarray, segment_entry_velocities: np.ndarray, segment_exit_velocities: np.ndarray, max_acceleration: float, terminal_velocity: float = 0.0, ) -> tuple: """ Backward kinematic pass: tighten entry velocities so every segment can decelerate to its already-constrained exit velocity. For each segment (walking from last to first) the entry velocity is capped by what allows a stop at the exit: ``v_entry_max = sqrt(v_exit² + 2 · a_max · distance)``. The tightened entry of segment ``i`` is propagated back as the exit limit of segment ``i−1``. After both passes the invariant ``entry[i] == exit[i-1]`` holds for all interior boundaries, and ``exit[-1] ≤ terminal_velocity``. Args: distances: Shape (N,) segment lengths. segment_entry_velocities: Shape (N,) entry velocities from the forward pass (mutated in place). segment_exit_velocities: Shape (N,) exit velocities from the forward pass (mutated in place). max_acceleration: Machine acceleration limit (units/s²). terminal_velocity: Required speed at the very end of the sequence. Defaults to 0.0 (machine stops). Returns: Tuple ``(segment_entry_velocities, segment_exit_velocities)`` — the same arrays passed in, now tightened. """ num_segments = len(distances) segment_exit_velocities[-1] = min( segment_exit_velocities[-1], terminal_velocity ) for i in range(num_segments - 1, -1, -1): max_entry_velocity = ( np.sqrt( segment_exit_velocities[i] ** 2 + 2.0 * max_acceleration * distances[i] ) if max_acceleration is not None else np.inf ) segment_entry_velocities[i] = min( segment_entry_velocities[i], max_entry_velocity ) if i - 1 >= 0: segment_exit_velocities[i - 1] = min( segment_exit_velocities[i - 1], segment_entry_velocities[i] ) return segment_entry_velocities, segment_exit_velocities def _apply_machine_constraints( self, initial_positions: np.ndarray, final_positions: np.ndarray, target_velocities: np.ndarray, junction_velocities: np.ndarray, ) -> tuple: """Hook for subclasses to inject machine-specific velocity constraints. Called after junction velocities are computed but before the forward and backward passes run, so any forced stops or speed caps are automatically propagated through the full toolpath by the planner. The default implementation is a no-op. Subclasses override this to enforce machine-specific rules (e.g. stop before layer changes, cap speed on vertical moves). Args: initial_positions: Shape (N, 3) segment start positions. final_positions: Shape (N, 3) segment end positions. target_velocities: Shape (N,) cruise velocity ceilings (units/s). junction_velocities: Shape (N,) corner velocity limits (units/s). Returns: Tuple of (target_velocities, junction_velocities), each shape (N,). Return copies if you need to modify — do not mutate the inputs. """ return target_velocities, junction_velocities
[docs] def from_process_data(self, process_data, home=(0, 0, 0)): """ Convert process data into a timed motion profile. Accepts either a ``List[ProcessData]`` or a ``ProcessDataArrays``. Zero-length moves (duplicate waypoints) yield ``None``. Example:: planner = MotionProfile(max_accleration=500.0, max_velocity=50.0) segments = list(planner.from_process_data(operation.process_data)) segments = [s for s in segments if s is not None] total_time = sum(s.elapsed_time for s in segments) Args: process_data: Move data as a list of ``ProcessData`` objects or a pre-built ``ProcessDataArrays``. home: Machine position before the first move. Defaults to (0, 0, 0). Yields: A ``MotionSegment`` for each move, or ``None`` for zero-length moves. """ # # 1. Extract final_positions and feed_rate as numpy arrays. # if isinstance(process_data, ProcessDataArrays): final_positions = process_data.location # (n, 3), already numpy feed_rate = process_data.feed_rate # (n,), already numpy else: n = len(process_data) final_positions = np.empty((n, 3), dtype=np.float64) feed_rate = np.empty(n, dtype=np.float64) for i, data in enumerate(process_data): final_positions[i] = data.location feed_rate[i] = data.feed_rate n = len(final_positions) initial_positions = np.empty_like(final_positions) initial_positions[0] = home initial_positions[1:] = final_positions[:-1] # # 2. Compute target (cruise) velocity for each segment. # # ffill propagates the last explicit feed-rate; the machine always # specifies a feed-rate before it begins moving, so filling with 0 # only affects any leading stationary points. target_velocities = pd.Series(feed_rate).ffill().fillna(0.0).to_numpy() target_velocities = target_velocities / self.feed_rate_time_base_s if self.max_velocity is not None: target_velocities = target_velocities.clip(max=self.max_velocity) # # 3. Two-pass velocity planning. # distances = np.linalg.norm(final_positions - initial_positions, axis=1) junction_velocities = self._compute_junction_velocities( initial_positions, final_positions, self.max_acceleration, self.junction_deviation, ) # Allow subclasses to inject machine-specific constraints (e.g. forced # stops before layer changes) before the passes lock in final velocities. target_velocities, junction_velocities = self._apply_machine_constraints( initial_positions, final_positions, target_velocities, junction_velocities ) segment_entry_velocities, segment_exit_velocities = self._forward_pass( distances, target_velocities, junction_velocities, self.max_acceleration ) segment_entry_velocities, segment_exit_velocities = self._backward_pass( distances, segment_entry_velocities, segment_exit_velocities, self.max_acceleration, ) # # 4. Yield segments. # for i in range(n): if distances[i] == 0.0: yield None continue yield self.profile_type( initial_position=initial_positions[i], final_position=final_positions[i], initial_velocity=segment_entry_velocities[i], target_velocity=target_velocities[i], final_velocity=segment_exit_velocities[i], max_acceleration=self.max_acceleration, max_jerk=self.max_jerk, _distance=float(distances[i]), )
[docs] def from_operation(self, operation: Operation, home=(0, 0, 0)): """ Convenience wrapper around ``from_process_data`` that attaches the resulting profile directly to an ``Operation``. Mutates ``operation`` in place: sets ``operation.motion_profile`` to the list of segments and writes ``elapsed_time`` back onto each corresponding ``ProcessData`` entry. Returns the same object so the call can be chained. Example:: planner = MotionProfile(max_accleration=500.0, max_velocity=50.0) operation = planner.from_operation(operation) total_time = sum( pd.elapsed_time for pd in operation.process_data ) Args: operation: The ``Operation`` to profile. Mutated in place. home: Machine position before the first move. Defaults to (0, 0, 0). Returns: The same ``Operation``, with ``motion_profile`` and ``elapsed_time`` populated. """ src = operation.arrays or operation.process_data profile = list(self.from_process_data(src, home)) operation.motion_profile = profile # Write elapsed_time back to both the object list AND the cached arrays so that # downstream consumers (e.g. AdditivePart) see correct values from either path. arr = ( operation._arrays ) # None if not yet built (won't be if src was process_data) for i, segment in enumerate(profile): if segment is not None: et = segment.elapsed_time operation.process_data[i].elapsed_time = et if arr is not None: arr.elapsed_time[i] = et return operation
[docs] class MotionSegment: def __init__( self, initial_position: tuple, final_position: tuple, _distance: float = None, **_, ): """Base class for all motion profiles Args: initial_position (tuple): Initial position final_position (tuple): Final position _distance (float, optional): Precomputed distance to skip np.linalg.norm. """ if not isinstance(initial_position, (tuple, list, np.ndarray)): raise TypeError( f"Initial position must be one of (tuple, list, np.ndarray). Received {type(initial_position)}" ) if not isinstance(final_position, (tuple, list, np.ndarray)): raise TypeError( f"Final position must be one of (tuple, list, np.ndarray). Received {type(final_position)}" ) self.initial_position = np.array(initial_position) self.final_position = np.array(final_position) self.distance = ( _distance if _distance is not None else np.linalg.norm(np.subtract(final_position, initial_position)) ) if self.distance < ZERO_TOLERANCE: raise ValueError( f"MotionProfile distance is zero: ({self.initial_position}, {self.final_position})" ) self.direction = np.subtract(final_position, initial_position) / self.distance self._dt = 0.0 def _validate_time(self, t: float): """Raises ValueError if t is outside the valid time range [0, _dt].""" if not (0 <= t <= self._dt): raise ValueError( f"Time {t} is outside the valid range [0, {self.elapsed_time}]" )
[docs] def calculate(self): pass
[docs] def jerk(self, t: float): """Instantaneous jerk (da/dt) at a time (t) on the line segment initial_position, final_position""" return None
[docs] def acceleration(self, t: float): """Instantaneous acceleration at a time (t) on the line segment initial_position, final_position""" return None
[docs] def velocity(self, t: float): """Instantaneous velocity at a time (t) on the line segment initial_position, final_position""" return None
[docs] def position(self, t: float): """Instantaneous position at a time (t) on the line segment initial_position, final_position""" return None
@property def elapsed_time(self): """Elapsed time required to travel the line segment initial_position, final_position""" return self._dt def _s_on_segment(self, s: tuple, tol=ZERO_TOLERANCE): """Determines if a position (s) falls on the line segment (final_position - initial_position) Args: s (tuple): position in space tol (float, optional): numerical tolerance. Defaults to ZERO_TOLERANCE. Returns: bool: If position s is on segment self.final_position - self.initial_position """ if not isinstance(s, (tuple, list, np.ndarray)): raise TypeError( f"position s must be one of (tuple, list, np.ndarray). Received {type(s)}" ) s = np.array(s) ac = s - self.initial_position ab = self.final_position - self.initial_position ab_cross_ac = np.cross(ab, ac) if all(np.isclose(ab_cross_ac, [0.0, 0.0, 0.0], rtol=tol)): k_ab = np.dot(ab, ab) k_ac = np.dot(ab, ac) if k_ac < 0.0: return False elif k_ac > k_ab: return False elif abs(k_ac) < tol: return True elif abs(k_ac - k_ab) < tol: return True elif 0.0 < k_ac < k_ab: return True return False
[docs] class ConstantVelocity(MotionSegment): def __init__( self, initial_position: tuple, final_position: tuple, target_velocity: float, **_, ): """Constant Velocity motion profile""" super().__init__(initial_position, final_position) self.v = target_velocity if self.v == 0.0: raise ValueError( f"ConsantVelocity Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Velocity is zero. Infinite elapsed time!" ) self.calculate()
[docs] def calculate(self): self._dt = self.distance / self.v
[docs] def acceleration(self, t: float): self._validate_time(t) return 0.0
[docs] def velocity(self, t: float): self._validate_time(t) return self.v
[docs] def position(self, t): self._validate_time(t) return self.direction * self.v * t + self.initial_position
@property def elapsed_time(self): return super().elapsed_time @elapsed_time.setter def elapsed_time(self, dt: float): self._dt = dt if dt == 0.0: raise ValueError( f"ConsantVelocity Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Elapsed time is zero. Infinite velocity!" ) self.v = self.distance / dt
[docs] class LinearRamp(MotionSegment): def __init__( self, initial_position: tuple, final_position: tuple, initial_velocity: float, final_velocity: float, max_acceleration: float = None, _distance: float = None, **_, ): """Linear Velocity motion profile""" super().__init__(initial_position, final_position, _distance=_distance) self.initial_velocity = initial_velocity self.final_velocity = final_velocity self.max_acceleration = max_acceleration self.calculate()
[docs] def calculate(self): # From ads = vdv # a_c*dist = (v_f**2 - v_i**2)/2. self._acceleration_const = ( self.final_velocity**2.0 - self.initial_velocity**2.0 ) / (2.0 * self.distance) if self.max_acceleration and abs(self._acceleration_const) > abs( self.max_acceleration ): self._acceleration_const = self.max_acceleration self.final_velocity = np.sqrt( 2.0 * self.max_acceleration * self.distance + self.initial_velocity**2.0 ) logging.warning( f"LinearRamp Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Acceleration exceeds maximum ({self.max_acceleration}). Modifying final velocity to meet maximum acceleration criteria: {self.final_velocity}" ) self._dt = ( self.final_velocity - self.initial_velocity ) / self._acceleration_const
[docs] def acceleration(self, t: float): self._validate_time(t) return self._acceleration_const
[docs] def velocity(self, t: float): self._validate_time(t) return self.initial_velocity + self._acceleration_const * t
[docs] def position(self, t): self._validate_time(t) return ( 0.5 * self.direction * self._acceleration_const * t**2.0 + self.direction * self.initial_velocity * t + self.initial_position )
@property def elapsed_time(self): return super().elapsed_time @elapsed_time.setter def elapsed_time(self, dt): self._dt = dt self._acceleration_const = (self.final_velocity - self.initial_velocity) / dt if self.max_acceleration and abs(self._acceleration_const) > abs( self.max_acceleration ): self._acceleration_const = self.max_acceleration self.final_velocity = self.max_acceleration * dt + self.initial_velocity logging.warning( f"LinearRamp Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Acceleration exceeds maximum ({self.max_acceleration}). Modifying final velocity to meet maximum acceleration criteria: {self.final_velocity}" )
[docs] class Trapezoid(MotionSegment): def __init__( self, initial_position: tuple, final_position: tuple, initial_velocity: float, target_velocity: float, final_velocity: float = None, max_acceleration: float = None, _distance: float = None, **_, ): """ Models a trapezoidal motion profile for a linear segment. This profile consists of three phases: 1. A constant acceleration phase to ramp up to a maximum velocity. 2. A constant velocity phase. 3. A constant deceleration phase to ramp down to a final velocity. The profile operates in one of two modes: - **Distance-Limited:** If `max_acceleration` is not provided, the profile shape is determined by assuming the ramp-up and ramp-down phases each occupy one-third of the total move distance. - **Acceleration-Limited:** If `max_acceleration` is provided, this limit is enforced. This may result in a "triangular" or "linear ramp" profile if the move is too short to reach the requested maximum velocity. Args: initial_position (tuple): The starting coordinate of the move. final_position (tuple): The ending coordinate of the move. initial_velocity (float): The velocity at the start of the move. target_velocity (float): The target cruising velocity for the move. final_velocity (float, optional): The velocity at the end of the move. If None, defaults to `initial_velocity`. max_acceleration (float, optional): The maximum allowed acceleration and deceleration. If None, the profile is distance-limited. """ super().__init__(initial_position, final_position, _distance=_distance) self.initial_velocity = initial_velocity if final_velocity is None: final_velocity = initial_velocity self.final_velocity = final_velocity self.target_velocity = target_velocity # specifies ramp up and ramp down distances # This is the default, which is only applied to the case where no acceleration limit is provided self._ramp: tuple = (1 / 3.0 * self.distance, 1 / 3.0 * self.distance) # Check to make sure that the max velocity is greater than initial and final if self.initial_velocity > self.target_velocity: self.initial_velocity = self.target_velocity if self.final_velocity > self.target_velocity: self.final_velocity = self.target_velocity self.max_acceleration = max_acceleration if self.max_acceleration: self._acceleration_limited_profile() else: self._distance_limited_profile() self.s_space = () self.t_space = () self.calculate()
[docs] def calculate(self): """Calculates the final position and time frames for the profile. This method uses the finalized ramp distances (`self._ramp`) and the calculated accelerations to determine the duration of each motion phase. It populates the `s_space` (position) and `t_space` (time) tuples that define the boundaries of the three motion phases. """ self.s_space = ( self.initial_position, self.direction * self._ramp[0] + self.initial_position, self.final_position - self.direction * self._ramp[1], self.final_position, ) t_ramp_up = ( (self.target_velocity - self.initial_velocity) / self._ramp_up_acceleration if self._ramp_up_acceleration > 0.0 else 0.0 ) t_ramp_down = ( (self.target_velocity - self.final_velocity) / self._ramp_down_acceleration if self._ramp_down_acceleration > 0.0 else 0.0 ) t_const_velocity = ( (self.distance - (self._ramp[0] + self._ramp[1])) / self.target_velocity if (self.target_velocity > 0) else 0.0 ) self._dt = t_ramp_up + t_const_velocity + t_ramp_down self.t_space = (0.0, t_ramp_up, t_ramp_up + t_const_velocity, self._dt)
def _calculate_ramp_distances(self): """Calculates the theoretical distances for ramp-up and ramp-down phases. Uses the current instance attributes (`target_velocity`, `initial_velocity`, `final_velocity`, and ramp accelerations) to compute the distance required for acceleration and deceleration based on the standard kinematic equation: d = (v_f^2 - v_i^2) / 2a. Returns: tuple[float, float]: A tuple containing the (ramp_up_distance, ramp_down_distance). """ ramp_up_distance = ( self.target_velocity**2.0 - self.initial_velocity**2.0 ) / (2.0 * self._ramp_up_acceleration) ramp_down_distance = ( self.target_velocity**2.0 - self.final_velocity**2.0 ) / (2.0 * self._ramp_down_acceleration) return ramp_up_distance, ramp_down_distance def _distance_limited_profile(self): """ Calculates profile accelerations when no `max_acceleration` is given. This method handles the "distance-limited" case. It uses the default ramp distance assumption (that each ramp phase occupies 1/3 of the total move distance) to derive the necessary accelerations required to achieve the requested velocity profile. Side Effects: Sets the following instance attributes: - `self._ramp_up_acceleration` - `self._ramp_down_acceleration` """ self._ramp_up_acceleration = ( self.target_velocity**2.0 - self.initial_velocity**2.0 ) / (2.0 * self._ramp[0]) self._ramp_down_acceleration = ( self.target_velocity**2.0 - self.final_velocity**2.0 ) / (2.0 * self._ramp[1]) def _acceleration_limited_profile(self): """Adjusts the profile parameters when an acceleration limit is provided. This helper method handles acceleration-limited scenarios by determining if the requested `target_velocity` is physically achievable given the move distance and `max_acceleration`. It handles three distinct cases: 1. **Full Trapezoid:** If the move is long enough, the profile remains a standard trapezoid, and no velocity parameters are changed. 2. **Triangular Profile:** If the move is too short to reach the requested `target_velocity` and then decelerate, the profile becomes triangular. The `target_velocity` is lowered to the true physical peak achievable for the given distance and constraints. 3. **Linear Ramp:** In the extreme case that the move is too short to even achieve the target `final_velocity`, the profile degenerates into a simple linear ramp. Both `target_velocity` and `final_velocity` are capped at the theoretical maximum, and the deceleration phase is removed. Side Effects: Modifies several instance attributes based on the calculated profile: - `self.target_velocity`: May be lowered to the physically achievable peak. - `self.final_velocity`: May be lowered if it's unachievable. - `self._ramp_up_acceleration`: Set to `self.max_acceleration`. - `self._ramp_down_acceleration`: Set to `self.max_acceleration` or 0. - `self._dt`, `self.t_space`, `self.s_space`: Set directly in the linear ramp case. """ if self.max_acceleration is None: return self._ramp_up_acceleration = self.max_acceleration self._ramp_down_acceleration = self.max_acceleration # Case 3: Linear Ramp # We do not have enough distance to actually even achieve the specified final velocity # This is the absolute maximum velocity acheivable if the entire distance were a linear ramp theoretical_max = np.sqrt( 2.0 * self.max_acceleration * self.distance + self.initial_velocity**2.0 ) if ( theoretical_max < self.target_velocity and theoretical_max < self.final_velocity ): self.final_velocity = theoretical_max self.target_velocity = theoretical_max self._ramp_down_acceleration = 0.0 self._ramp = (self.distance, 0.0) return # Determine if we have enough distance to achieve the max velocity # If we do not, then re-calculate the maximum velocity assuming that the ramp up/down uses 1/3 of the available distance ramp_up, ramp_down = self._calculate_ramp_distances() original_target_velocity = self.target_velocity if (ramp_up + ramp_down) >= self.distance: # Case 2: Triangular profile # from V_f**2. = V_i**2 + 2*a*d we can derive # 2*V_max**2 - V_i**2 - V_f**2 = 2*a*d # Assuming that ramp_up + ramp_down = distance AND a = a_max self.target_velocity = np.sqrt( self.max_acceleration * self.distance + 0.5 * (self.initial_velocity**2.0 + self.final_velocity**2.0) ) self._ramp = self._calculate_ramp_distances() logging.warning( f"Trapezoid Profile for line {self.initial_position.tolist()} -> {self.final_position.tolist()}: " f"The requested max velocity ({original_target_velocity}) was unachievable with the " f"max acceleration limit ({self.max_acceleration}). " f"Max velocity has been reduced to {self.target_velocity}." ) else: self._ramp = (ramp_up, ramp_down)
[docs] def acceleration(self, t: float): self._validate_time(t) if t <= self.t_space[1]: return self._ramp_up_acceleration elif self.t_space[1] < t <= self.t_space[2]: return 0.0 elif t <= self.t_space[3]: return -self._ramp_down_acceleration return None
[docs] def velocity(self, t: float): self._validate_time(t) if t <= self.t_space[1]: return self.initial_velocity + self._ramp_up_acceleration * t elif self.t_space[1] < t <= self.t_space[2]: return self.target_velocity elif t <= self.t_space[3]: return self.target_velocity - self._ramp_down_acceleration * ( t - self.t_space[2] ) return None
[docs] def position(self, t): self._validate_time(t) if t <= self.t_space[1]: return ( 0.5 * self._ramp_up_acceleration * self.direction * t**2.0 + self.initial_velocity * t * self.direction + self.initial_position ) elif self.t_space[1] < t <= self.t_space[2]: return self.s_space[1] + self.target_velocity * self.direction * ( t - self.t_space[1] ) elif t <= self.t_space[3]: return ( -0.5 * self._ramp_down_acceleration * self.direction * (t - self.t_space[2]) ** 2.0 + self.target_velocity * self.direction * (t - self.t_space[2]) + self.s_space[2] ) return None
[docs] class SCurve(MotionSegment): def __init__( self, initial_position: tuple, final_position: tuple, initial_velocity: float, target_velocity: float, final_velocity: float = None, max_acceleration: float = None, max_jerk: float = None, **_, ): """ Models an S-curve motion profile for a linear segment. The acceleration profile is trapezoidal (rather than step-wise), giving continuous acceleration and bounded jerk. The profile has up to seven phases: three for ramp-up, a constant-velocity cruise, and three for ramp-down. The profile operates in one of two modes: - **Distance-Limited:** If `max_jerk` is not provided, ramp-up and ramp-down each occupy one-third of the total move distance, and the required accelerations are derived from that constraint. - **Jerk-Limited:** If `max_jerk` is provided, it governs the acceleration shape. If `max_acceleration` is also supplied it acts as a ceiling on the jerk-derived acceleration. The `target_velocity` is reduced automatically when the move is too short for the requested velocity (triangular case). Args: initial_position (tuple): The starting coordinate of the move. final_position (tuple): The ending coordinate of the move. initial_velocity (float): The velocity at the start of the move. target_velocity (float): The target cruising velocity. final_velocity (float, optional): The velocity at the end of the move. If None, defaults to `initial_velocity`. max_acceleration (float, optional): Acceleration ceiling used only in jerk-limited mode. Ignored in distance-limited mode. max_jerk (float, optional): Maximum jerk. If provided the profile is jerk-limited; if None it is distance-limited. """ super().__init__(initial_position, final_position) self.initial_velocity = initial_velocity if final_velocity is None: final_velocity = initial_velocity self.final_velocity = final_velocity self.target_velocity = target_velocity if self.initial_velocity > self.target_velocity: self.initial_velocity = self.target_velocity if self.final_velocity > self.target_velocity: self.final_velocity = self.target_velocity self.max_acceleration = max_acceleration self.max_jerk = max_jerk self.t_space = () self.s_space = () if self.max_jerk: self._jerk_limited_profile() else: self._distance_limited_profile() self.calculate() @staticmethod def _scurve_ramp_distance(v_start, v_end, a_max): """Distance covered by one S-curve ramp segment. For an S-curve ramp the average velocity equals (v_start + v_end)/2 and the ramp duration is (3/2)*|dv|/a_max, so distance = t_ramp * v_avg. """ if a_max <= 0 or abs(v_end - v_start) < ZERO_TOLERANCE: return 0.0 t_ramp = (3.0 / 2.0) * abs(v_end - v_start) / a_max return t_ramp * (v_start + v_end) / 2.0 def _distance_limited_profile(self): """Derives ramp accelerations from the 1/3-distance assumption. Each ramp (up and down) occupies one-third of the total move distance. The required acceleration is back-calculated from the S-curve kinematics: a = (9/4) * (v_target^2 - v_end^2) / distance """ d = self.distance dv_up = self.target_velocity - self.initial_velocity dv_down = self.target_velocity - self.final_velocity self._ramp_up_acceleration = ( (9.0 / 4.0) * (self.target_velocity**2 - self.initial_velocity**2) / d if dv_up > ZERO_TOLERANCE else 0.0 ) self._ramp_down_acceleration = ( (9.0 / 4.0) * (self.target_velocity**2 - self.final_velocity**2) / d if dv_down > ZERO_TOLERANCE else 0.0 ) def _jerk_limited_profile(self): """Derives ramp accelerations from max_jerk (with optional max_acceleration cap). Reduces target_velocity via binary search if the combined ramp distances exceed the move distance (triangular case). """ dv_up = self.target_velocity - self.initial_velocity dv_down = self.target_velocity - self.final_velocity a_up = np.sqrt(self.max_jerk * dv_up / 2.0) if dv_up > ZERO_TOLERANCE else 0.0 a_down = ( np.sqrt(self.max_jerk * dv_down / 2.0) if dv_down > ZERO_TOLERANCE else 0.0 ) if self.max_acceleration: a_up = min(a_up, self.max_acceleration) a_down = min(a_down, self.max_acceleration) d_up = self._scurve_ramp_distance( self.initial_velocity, self.target_velocity, a_up ) d_down = self._scurve_ramp_distance( self.final_velocity, self.target_velocity, a_down ) if d_up + d_down >= self.distance: original_target = self.target_velocity self.target_velocity = self._solve_peak_velocity() dv_up = self.target_velocity - self.initial_velocity dv_down = self.target_velocity - self.final_velocity a_up = ( np.sqrt(self.max_jerk * dv_up / 2.0) if dv_up > ZERO_TOLERANCE else 0.0 ) a_down = ( np.sqrt(self.max_jerk * dv_down / 2.0) if dv_down > ZERO_TOLERANCE else 0.0 ) if self.max_acceleration: a_up = min(a_up, self.max_acceleration) a_down = min(a_down, self.max_acceleration) logging.warning( f"SCurve Profile for line {self.initial_position.tolist()} -> {self.final_position.tolist()}: " f"The requested max velocity ({original_target}) was unachievable with the " f"max jerk limit ({self.max_jerk}). " f"Max velocity has been reduced to {self.target_velocity}." ) self._ramp_up_acceleration = a_up self._ramp_down_acceleration = a_down def _solve_peak_velocity(self): """Binary search for the peak velocity when ramp distances exceed move distance.""" lo = max(self.initial_velocity, self.final_velocity) hi = self.target_velocity for _ in range(64): mid = (lo + hi) / 2.0 dv_up = mid - self.initial_velocity dv_down = mid - self.final_velocity a_up = ( np.sqrt(self.max_jerk * dv_up / 2.0) if dv_up > ZERO_TOLERANCE else 0.0 ) a_down = ( np.sqrt(self.max_jerk * dv_down / 2.0) if dv_down > ZERO_TOLERANCE else 0.0 ) if self.max_acceleration: a_up = min(a_up, self.max_acceleration) a_down = min(a_down, self.max_acceleration) d_up = self._scurve_ramp_distance(self.initial_velocity, mid, a_up) d_down = self._scurve_ramp_distance(self.final_velocity, mid, a_down) if d_up + d_down < self.distance: lo = mid else: hi = mid # early exit if (hi - lo) < ZERO_TOLERANCE: break return (lo + hi) / 2.0
[docs] def calculate(self): """Builds s_space and t_space from the ramp accelerations.""" a_up = self._ramp_up_acceleration a_down = self._ramp_down_acceleration dv_up = self.target_velocity - self.initial_velocity dv_down = self.target_velocity - self.final_velocity t_ramp_up = (3.0 / 2.0) * dv_up / a_up if a_up > ZERO_TOLERANCE else 0.0 t_ramp_down = (3.0 / 2.0) * dv_down / a_down if a_down > ZERO_TOLERANCE else 0.0 dt_up = t_ramp_up / 3.0 dt_down = t_ramp_down / 3.0 # Ramp-up positions (forward from initial_position) if t_ramp_up > ZERO_TOLERANCE: s1 = ( (a_up / (2.0 * t_ramp_up)) * self.direction * dt_up**3 + self.initial_velocity * self.direction * dt_up + self.initial_position ) v1 = (3.0 * a_up / (2.0 * t_ramp_up)) * dt_up**2 + self.initial_velocity s2 = ( (a_up / 2.0) * self.direction * dt_up**2 + v1 * self.direction * dt_up + s1 ) v2 = v1 + a_up * dt_up s3 = ( (a_up / 2.0) * self.direction * dt_up**2 - (a_up / (2.0 * t_ramp_up)) * self.direction * dt_up**3 + v2 * self.direction * dt_up + s2 ) else: s1 = s2 = s3 = self.initial_position.copy() ramp_up_distance = np.linalg.norm(s3 - self.initial_position) # Ramp-down total distance; s4 is placed so that ramp-down ends at final_position ramp_down_distance = ( t_ramp_down * (self.target_velocity + self.final_velocity) / 2.0 ) s4 = self.final_position - ramp_down_distance * self.direction cruise_distance = self.distance - ramp_up_distance - ramp_down_distance if cruise_distance < -ZERO_TOLERANCE: logging.warning( f"SCurve Motion profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). " f"Ramp distances ({ramp_up_distance:.3f} + {ramp_down_distance:.3f}) exceed line distance {self.distance:.3f}" ) cruise_distance = 0.0 t_cruise = ( cruise_distance / self.target_velocity if self.target_velocity > ZERO_TOLERANCE else 0.0 ) # Ramp-down positions (forward from s4 at v_target toward final_position) if t_ramp_down > ZERO_TOLERANCE: v5 = ( self.target_velocity - (3.0 * a_down / (2.0 * t_ramp_down)) * dt_down**2 ) s5 = ( self.target_velocity * self.direction * dt_down - (a_down / (2.0 * t_ramp_down)) * self.direction * dt_down**3 + s4 ) s6 = ( v5 * self.direction * dt_down - (a_down / 2.0) * self.direction * dt_down**2 + s5 ) else: s5 = s6 = self.final_position.copy() self.s_space = [ self.initial_position, s1, s2, s3, s4, s5, s6, self.final_position, ] self._dt = t_ramp_up + t_cruise + t_ramp_down self.t_space = [ 0.0, dt_up, 2.0 * dt_up, t_ramp_up, t_ramp_up + t_cruise, t_ramp_up + t_cruise + dt_down, t_ramp_up + t_cruise + 2.0 * dt_down, self._dt, ]
# ------------------------------------------------------------------ # Kinematic query functions # ------------------------------------------------------------------
[docs] def jerk(self, t): super().jerk(t) t_ramp_up = self.t_space[3] t_ramp_down = self.t_space[7] - self.t_space[4] a_up = self._ramp_up_acceleration a_down = self._ramp_down_acceleration if t <= self.t_space[1]: return 3.0 * a_up / t_ramp_up if t_ramp_up > ZERO_TOLERANCE else 0.0 elif self.t_space[1] < t <= self.t_space[2]: return 0.0 elif self.t_space[2] < t <= self.t_space[3]: return -3.0 * a_up / t_ramp_up if t_ramp_up > ZERO_TOLERANCE else 0.0 elif self.t_space[3] < t <= self.t_space[4]: return 0.0 elif self.t_space[4] < t <= self.t_space[5]: return -3.0 * a_down / t_ramp_down if t_ramp_down > ZERO_TOLERANCE else 0.0 elif self.t_space[5] < t <= self.t_space[6]: return 0.0 elif self.t_space[6] < t <= self.t_space[7]: return 3.0 * a_down / t_ramp_down if t_ramp_down > ZERO_TOLERANCE else 0.0
[docs] def acceleration(self, t): self._validate_time(t) t_ramp_up = self.t_space[3] t_ramp_down = self.t_space[7] - self.t_space[4] a_up = self._ramp_up_acceleration a_down = self._ramp_down_acceleration if t <= self.t_space[1]: return (3.0 * a_up / t_ramp_up) * t if t_ramp_up > ZERO_TOLERANCE else 0.0 elif self.t_space[1] < t <= self.t_space[2]: return a_up elif self.t_space[2] < t <= self.t_space[3]: return a_up - (3.0 * a_up / t_ramp_up) * (t - self.t_space[2]) elif self.t_space[3] < t <= self.t_space[4]: return 0.0 elif self.t_space[4] < t <= self.t_space[5]: return ( -(3.0 * a_down / t_ramp_down) * (t - self.t_space[4]) if t_ramp_down > ZERO_TOLERANCE else 0.0 ) elif self.t_space[5] < t <= self.t_space[6]: return -a_down elif self.t_space[6] < t <= self.t_space[7]: return -a_down + (3.0 * a_down / t_ramp_down) * (t - self.t_space[6])
[docs] def velocity(self, t): self._validate_time(t) t_ramp_up = self.t_space[3] t_ramp_down = self.t_space[7] - self.t_space[4] a_up = self._ramp_up_acceleration a_down = self._ramp_down_acceleration if t <= self.t_space[1]: return ( (3.0 * a_up / (2.0 * t_ramp_up)) * t**2 + self.initial_velocity if t_ramp_up > ZERO_TOLERANCE else self.initial_velocity ) elif self.t_space[1] < t <= self.t_space[2]: dt = t - self.t_space[1] v1 = (3.0 * a_up / (2.0 * t_ramp_up)) * ( t_ramp_up / 3.0 ) ** 2 + self.initial_velocity return a_up * dt + v1 elif self.t_space[2] < t <= self.t_space[3]: dt = t - self.t_space[2] v1 = (3.0 * a_up / (2.0 * t_ramp_up)) * ( t_ramp_up / 3.0 ) ** 2 + self.initial_velocity v2 = a_up * (t_ramp_up / 3.0) + v1 return a_up * dt - (3.0 * a_up / (2.0 * t_ramp_up)) * dt**2 + v2 elif self.t_space[3] < t <= self.t_space[4]: return self.target_velocity elif self.t_space[4] < t <= self.t_space[5]: dt = t - self.t_space[4] return ( self.target_velocity - (3.0 * a_down / (2.0 * t_ramp_down)) * dt**2 if t_ramp_down > ZERO_TOLERANCE else self.target_velocity ) elif self.t_space[5] < t <= self.t_space[6]: dt = t - self.t_space[5] v5 = ( self.target_velocity - (3.0 * a_down / (2.0 * t_ramp_down)) * (t_ramp_down / 3.0) ** 2 ) return v5 - a_down * dt elif self.t_space[6] < t <= self.t_space[7]: dt = t - self.t_space[6] v5 = ( self.target_velocity - (3.0 * a_down / (2.0 * t_ramp_down)) * (t_ramp_down / 3.0) ** 2 ) v6 = v5 - a_down * (t_ramp_down / 3.0) return v6 - a_down * dt + (3.0 * a_down / (2.0 * t_ramp_down)) * dt**2
[docs] def position(self, t): self._validate_time(t) t_ramp_up = self.t_space[3] t_ramp_down = self.t_space[7] - self.t_space[4] a_up = self._ramp_up_acceleration a_down = self._ramp_down_acceleration if t <= self.t_space[1]: cubic = ( (a_up / (2.0 * t_ramp_up)) * t**3 if t_ramp_up > ZERO_TOLERANCE else 0.0 ) return ( cubic * self.direction + self.initial_velocity * self.direction * t + self.initial_position ) elif self.t_space[1] < t <= self.t_space[2]: dt = t - self.t_space[1] v1 = self.velocity(self.t_space[1]) return ( (a_up / 2.0) * self.direction * dt**2 + v1 * self.direction * dt + self.s_space[1] ) elif self.t_space[2] < t <= self.t_space[3]: dt = t - self.t_space[2] v2 = self.velocity(self.t_space[2]) return ( (a_up / 2.0) * self.direction * dt**2 - (a_up / (2.0 * t_ramp_up)) * self.direction * dt**3 + v2 * self.direction * dt + self.s_space[2] ) elif self.t_space[3] < t <= self.t_space[4]: dt = t - self.t_space[3] return self.target_velocity * self.direction * dt + self.s_space[3] elif self.t_space[4] < t <= self.t_space[5]: dt = t - self.t_space[4] cubic = ( (a_down / (2.0 * t_ramp_down)) * dt**3 if t_ramp_down > ZERO_TOLERANCE else 0.0 ) return ( self.target_velocity * self.direction * dt - cubic * self.direction + self.s_space[4] ) elif self.t_space[5] < t <= self.t_space[6]: dt = t - self.t_space[5] v5 = self.velocity(self.t_space[5]) return ( v5 * self.direction * dt - (a_down / 2.0) * self.direction * dt**2 + self.s_space[5] ) elif self.t_space[6] < t <= self.t_space[7]: dt = t - self.t_space[6] v6 = self.velocity(self.t_space[6]) return ( v6 * self.direction * dt - (a_down / 2.0) * self.direction * dt**2 + (a_down / (2.0 * t_ramp_down)) * self.direction * dt**3 + self.s_space[6] )
[docs] def extract_motion_scalars(operation) -> dict: """Extract per-segment velocity scalars from a processed operation. Returns three numpy arrays aligned 1:1 with ``operation.process_data``: - ``initial_velocity`` — entry speed into the segment (units/s). - ``target_velocity`` — commanded cruise ceiling (units/s). - ``final_velocity`` — exit speed from the segment (units/s). ``None`` entries in ``motion_profile`` (zero-length moves) produce ``NaN``. ``ConstantVelocity`` segments, which have no separate target/final velocity, use their single ``v`` attribute for all three fields. Args: operation: A processed ``Operation`` whose ``motion_profile`` list has been populated by ``MotionProfile.from_operation()``. Returns: Dict with keys ``"initial_velocity"``, ``"target_velocity"``, ``"final_velocity"``, each a shape ``(N,)`` float64 array. Raises: ValueError: If ``operation.motion_profile`` is empty or not set. """ if not operation.motion_profile: raise ValueError( "operation.motion_profile is empty. " "Call MotionProfile.from_operation() before extracting scalars." ) n = len(operation.process_data) initial_velocity = np.full(n, np.nan) target_velocity = np.full(n, np.nan) final_velocity = np.full(n, np.nan) for i, segment in enumerate(operation.motion_profile): if segment is None: continue # ConstantVelocity has a single `v`; all others have separate fields. v = getattr(segment, "v", None) initial_velocity[i] = getattr(segment, "initial_velocity", v) target_velocity[i] = getattr(segment, "target_velocity", v) final_velocity[i] = getattr(segment, "final_velocity", v) return { "initial_velocity": initial_velocity, "target_velocity": target_velocity, "final_velocity": final_velocity, }