import logging
import numpy as np
import pandas as pd
from ..constants import ZERO_TOLERANCE
from .operations import Operation, ProcessDataArrays
[docs]
class MotionProfile:
"""
Factory for generating `MotionSegment` objects from process data.
Holds machine-wide kinematic constraints and orchestrates a two-pass
(GRBL-style) velocity planning algorithm: junction-deviation corner
limiting followed by forward and backward kinematic propagation.
Args:
max_jerk (float, optional): Maximum allowed jerk (units/s³). Used only
by SCurve segments. Defaults to None.
max_accleration (float, optional): Maximum allowed acceleration
(units/s²). Defaults to None.
max_velocity (float, optional): Ceiling on all target velocities
(units/s). Defaults to None.
junction_deviation (float, optional): Junction-deviation parameter
(same length units as positions). Larger values permit higher
corner velocities. Defaults to 0.05.
Attributes:
feed_rate_time_base_s (float): Divisor applied to feed-rate values to
convert them from units/minute to units/second. Defaults to 60.0.
"""
def __init__(
self,
max_jerk: float = None,
max_accleration: float = None,
max_velocity: float = None,
junction_deviation: float = 0.05,
):
self.max_jerk = max_jerk
self.max_acceleration = max_accleration
self.max_velocity = max_velocity
self.junction_deviation = junction_deviation
self._profile_type = Trapezoid
self.feed_rate_time_base_s = 60.0
@property
def profile_type(self):
return self._profile_type
@profile_type.setter
def profile_type(self, t: type):
if not issubclass(t, MotionSegment):
raise ValueError(
f"MotionProfile.profile_type: Must be subclass of MotionSegment. Received {t}"
)
self._profile_type = t
def _compute_junction_velocities(
self,
initial_positions: np.ndarray,
final_positions: np.ndarray,
max_acceleration: float,
junction_deviation: float,
) -> np.ndarray:
"""
Compute the maximum allowable speed at each segment boundary using the
junction-deviation (centripetal-acceleration) model.
The result at index ``i`` is the speed limit at the junction between
segment ``i`` and segment ``i+1``. The last element is always 0
because the machine must stop at the end of a move sequence.
The model derives from: if the tool were to follow a circular arc of
radius R through the corner, the centripetal acceleration is v²/R.
Setting that equal to ``max_acceleration`` and using the geometric
relation between R, ``junction_deviation`` (the max arc-chord deviation
δ), and the half-corner angle α gives:
v_junction² = a_max · δ · sin(α) / (1 − sin(α))
where α = (π − θ) / 2 and θ is the angle between the incoming and
outgoing direction vectors. Straight-through corners (θ≈0) yield a
very large (effectively unconstrained) junction velocity; reversals
(θ=π) yield zero.
Args:
initial_positions: Shape (N, 3) array of segment start positions.
final_positions: Shape (N, 3) array of segment end positions.
max_acceleration: Machine acceleration limit (units/s²).
junction_deviation: Maximum allowed arc-chord deviation δ (same
length units as positions).
Returns:
Shape (N,) array of junction velocity limits (units/s).
"""
incoming_directions = final_positions - initial_positions
outgoing_start_positions = np.roll(final_positions, -1, axis=0)
outgoing_directions = outgoing_start_positions - final_positions
incoming_norms = np.linalg.norm(incoming_directions, axis=1)
outgoing_norms = np.linalg.norm(outgoing_directions, axis=1)
dot_products = np.einsum("ij,ij->i", incoming_directions, outgoing_directions)
norm_products = incoming_norms * outgoing_norms
# Zero-length segments produce a zero denominator; treat them as straight-through.
with np.errstate(divide="ignore", invalid="ignore"):
cosine_angle = np.where(
norm_products > ZERO_TOLERANCE,
dot_products / norm_products,
1.0,
)
cosine_angle = np.clip(cosine_angle, -1.0, 1.0)
corner_angle = np.arccos(cosine_angle) # 0 = straight, π = reversal
# Half-corner angle α = (π − θ) / 2:
# θ=0 (straight) -> α=π/2 -> sin(α)=1 -> v_junction -> ∞ (clamped by target_velocity)
# θ=π (reversal) -> α=0 -> sin(α)=0 -> v_junction = 0
half_corner_angle = (np.pi - corner_angle) / 2.0
sin_half_corner = np.sin(half_corner_angle)
if max_acceleration is None:
# No acceleration limit -> no corner speed constraint; machine stops at the end.
junction_velocities = np.full(len(final_positions), np.inf)
junction_velocities[-1] = 0.0
return junction_velocities
junction_denom = np.maximum(1.0 - sin_half_corner, ZERO_TOLERANCE)
junction_velocity_squared = (
max_acceleration * junction_deviation * sin_half_corner / junction_denom
)
junction_velocities = np.sqrt(np.maximum(junction_velocity_squared, 0.0))
junction_velocities[-1] = 0.0 # machine stops after the last segment
return junction_velocities
def _forward_pass(
self,
distances: np.ndarray,
target_velocities: np.ndarray,
junction_velocities: np.ndarray,
max_acceleration: float,
entry_velocity: float = 0.0,
) -> tuple:
"""
Forward kinematic pass: propagate achievable exit velocities from the
first segment to the last.
For each segment the exit velocity is the minimum of:
- the segment's cruise target velocity,
- the junction velocity limit at the end of the segment, and
- the physically reachable exit speed given the entry speed and distance:
``v_exit_reachable = sqrt(v_entry² + 2 · a_max · distance)``.
The exit of segment ``i`` becomes the entry of segment ``i+1``.
Args:
distances: Shape (N,) segment lengths.
target_velocities: Shape (N,) cruise velocity ceilings (units/s).
junction_velocities: Shape (N,) corner velocity limits (units/s).
Index ``i`` is the limit at the boundary after segment ``i``.
max_acceleration: Machine acceleration limit (units/s²).
entry_velocity: Speed at the very start of the sequence. Defaults
to 0.0 (machine starts from rest).
Returns:
Tuple ``(segment_entry_velocities, segment_exit_velocities)``, each
a shape (N,) array.
"""
num_segments = len(distances)
segment_entry_velocities = np.zeros(num_segments)
segment_exit_velocities = np.zeros(num_segments)
segment_entry_velocities[0] = entry_velocity
for i in range(num_segments):
exit_velocity_reachable = (
np.sqrt(
segment_entry_velocities[i] ** 2
+ 2.0 * max_acceleration * distances[i]
)
if max_acceleration is not None
else np.inf
)
segment_exit_velocities[i] = min(
target_velocities[i],
junction_velocities[i],
exit_velocity_reachable,
)
if i + 1 < num_segments:
segment_entry_velocities[i + 1] = segment_exit_velocities[i]
return segment_entry_velocities, segment_exit_velocities
def _backward_pass(
self,
distances: np.ndarray,
segment_entry_velocities: np.ndarray,
segment_exit_velocities: np.ndarray,
max_acceleration: float,
terminal_velocity: float = 0.0,
) -> tuple:
"""
Backward kinematic pass: tighten entry velocities so every segment
can decelerate to its already-constrained exit velocity.
For each segment (walking from last to first) the entry velocity is
capped by what allows a stop at the exit:
``v_entry_max = sqrt(v_exit² + 2 · a_max · distance)``.
The tightened entry of segment ``i`` is propagated back as the exit
limit of segment ``i−1``.
After both passes the invariant ``entry[i] == exit[i-1]`` holds for
all interior boundaries, and ``exit[-1] ≤ terminal_velocity``.
Args:
distances: Shape (N,) segment lengths.
segment_entry_velocities: Shape (N,) entry velocities from the
forward pass (mutated in place).
segment_exit_velocities: Shape (N,) exit velocities from the
forward pass (mutated in place).
max_acceleration: Machine acceleration limit (units/s²).
terminal_velocity: Required speed at the very end of the sequence.
Defaults to 0.0 (machine stops).
Returns:
Tuple ``(segment_entry_velocities, segment_exit_velocities)`` —
the same arrays passed in, now tightened.
"""
num_segments = len(distances)
segment_exit_velocities[-1] = min(
segment_exit_velocities[-1], terminal_velocity
)
for i in range(num_segments - 1, -1, -1):
max_entry_velocity = (
np.sqrt(
segment_exit_velocities[i] ** 2
+ 2.0 * max_acceleration * distances[i]
)
if max_acceleration is not None
else np.inf
)
segment_entry_velocities[i] = min(
segment_entry_velocities[i], max_entry_velocity
)
if i - 1 >= 0:
segment_exit_velocities[i - 1] = min(
segment_exit_velocities[i - 1], segment_entry_velocities[i]
)
return segment_entry_velocities, segment_exit_velocities
def _apply_machine_constraints(
self,
initial_positions: np.ndarray,
final_positions: np.ndarray,
target_velocities: np.ndarray,
junction_velocities: np.ndarray,
) -> tuple:
"""Hook for subclasses to inject machine-specific velocity constraints.
Called after junction velocities are computed but before the forward and
backward passes run, so any forced stops or speed caps are automatically
propagated through the full toolpath by the planner.
The default implementation is a no-op. Subclasses override this to
enforce machine-specific rules (e.g. stop before layer changes, cap
speed on vertical moves).
Args:
initial_positions: Shape (N, 3) segment start positions.
final_positions: Shape (N, 3) segment end positions.
target_velocities: Shape (N,) cruise velocity ceilings (units/s).
junction_velocities: Shape (N,) corner velocity limits (units/s).
Returns:
Tuple of (target_velocities, junction_velocities), each shape (N,).
Return copies if you need to modify — do not mutate the inputs.
"""
return target_velocities, junction_velocities
[docs]
def from_process_data(self, process_data, home=(0, 0, 0)):
"""
Convert process data into a timed motion profile.
Accepts either a ``List[ProcessData]`` or a ``ProcessDataArrays``.
Zero-length moves (duplicate waypoints) yield ``None``.
Example::
planner = MotionProfile(max_accleration=500.0, max_velocity=50.0)
segments = list(planner.from_process_data(operation.process_data))
segments = [s for s in segments if s is not None]
total_time = sum(s.elapsed_time for s in segments)
Args:
process_data: Move data as a list of ``ProcessData`` objects or a
pre-built ``ProcessDataArrays``.
home: Machine position before the first move. Defaults to (0, 0, 0).
Yields:
A ``MotionSegment`` for each move, or ``None`` for zero-length moves.
"""
#
# 1. Extract final_positions and feed_rate as numpy arrays.
#
if isinstance(process_data, ProcessDataArrays):
final_positions = process_data.location # (n, 3), already numpy
feed_rate = process_data.feed_rate # (n,), already numpy
else:
n = len(process_data)
final_positions = np.empty((n, 3), dtype=np.float64)
feed_rate = np.empty(n, dtype=np.float64)
for i, data in enumerate(process_data):
final_positions[i] = data.location
feed_rate[i] = data.feed_rate
n = len(final_positions)
initial_positions = np.empty_like(final_positions)
initial_positions[0] = home
initial_positions[1:] = final_positions[:-1]
#
# 2. Compute target (cruise) velocity for each segment.
#
# ffill propagates the last explicit feed-rate; the machine always
# specifies a feed-rate before it begins moving, so filling with 0
# only affects any leading stationary points.
target_velocities = pd.Series(feed_rate).ffill().fillna(0.0).to_numpy()
target_velocities = target_velocities / self.feed_rate_time_base_s
if self.max_velocity is not None:
target_velocities = target_velocities.clip(max=self.max_velocity)
#
# 3. Two-pass velocity planning.
#
distances = np.linalg.norm(final_positions - initial_positions, axis=1)
junction_velocities = self._compute_junction_velocities(
initial_positions,
final_positions,
self.max_acceleration,
self.junction_deviation,
)
# Allow subclasses to inject machine-specific constraints (e.g. forced
# stops before layer changes) before the passes lock in final velocities.
target_velocities, junction_velocities = self._apply_machine_constraints(
initial_positions, final_positions, target_velocities, junction_velocities
)
segment_entry_velocities, segment_exit_velocities = self._forward_pass(
distances, target_velocities, junction_velocities, self.max_acceleration
)
segment_entry_velocities, segment_exit_velocities = self._backward_pass(
distances,
segment_entry_velocities,
segment_exit_velocities,
self.max_acceleration,
)
#
# 4. Yield segments.
#
for i in range(n):
if distances[i] == 0.0:
yield None
continue
yield self.profile_type(
initial_position=initial_positions[i],
final_position=final_positions[i],
initial_velocity=segment_entry_velocities[i],
target_velocity=target_velocities[i],
final_velocity=segment_exit_velocities[i],
max_acceleration=self.max_acceleration,
max_jerk=self.max_jerk,
_distance=float(distances[i]),
)
[docs]
def from_operation(self, operation: Operation, home=(0, 0, 0)):
"""
Convenience wrapper around ``from_process_data`` that attaches the
resulting profile directly to an ``Operation``.
Mutates ``operation`` in place: sets ``operation.motion_profile`` to
the list of segments and writes ``elapsed_time`` back onto each
corresponding ``ProcessData`` entry. Returns the same object so the
call can be chained.
Example::
planner = MotionProfile(max_accleration=500.0, max_velocity=50.0)
operation = planner.from_operation(operation)
total_time = sum(
pd.elapsed_time for pd in operation.process_data
)
Args:
operation: The ``Operation`` to profile. Mutated in place.
home: Machine position before the first move. Defaults to (0, 0, 0).
Returns:
The same ``Operation``, with ``motion_profile`` and
``elapsed_time`` populated.
"""
src = operation.arrays or operation.process_data
profile = list(self.from_process_data(src, home))
operation.motion_profile = profile
# Write elapsed_time back to both the object list AND the cached arrays so that
# downstream consumers (e.g. AdditivePart) see correct values from either path.
arr = (
operation._arrays
) # None if not yet built (won't be if src was process_data)
for i, segment in enumerate(profile):
if segment is not None:
et = segment.elapsed_time
operation.process_data[i].elapsed_time = et
if arr is not None:
arr.elapsed_time[i] = et
return operation
[docs]
class MotionSegment:
def __init__(
self,
initial_position: tuple,
final_position: tuple,
_distance: float = None,
**_,
):
"""Base class for all motion profiles
Args:
initial_position (tuple): Initial position
final_position (tuple): Final position
_distance (float, optional): Precomputed distance to skip np.linalg.norm.
"""
if not isinstance(initial_position, (tuple, list, np.ndarray)):
raise TypeError(
f"Initial position must be one of (tuple, list, np.ndarray). Received {type(initial_position)}"
)
if not isinstance(final_position, (tuple, list, np.ndarray)):
raise TypeError(
f"Final position must be one of (tuple, list, np.ndarray). Received {type(final_position)}"
)
self.initial_position = np.array(initial_position)
self.final_position = np.array(final_position)
self.distance = (
_distance
if _distance is not None
else np.linalg.norm(np.subtract(final_position, initial_position))
)
if self.distance < ZERO_TOLERANCE:
raise ValueError(
f"MotionProfile distance is zero: ({self.initial_position}, {self.final_position})"
)
self.direction = np.subtract(final_position, initial_position) / self.distance
self._dt = 0.0
def _validate_time(self, t: float):
"""Raises ValueError if t is outside the valid time range [0, _dt]."""
if not (0 <= t <= self._dt):
raise ValueError(
f"Time {t} is outside the valid range [0, {self.elapsed_time}]"
)
[docs]
def calculate(self):
pass
[docs]
def jerk(self, t: float):
"""Instantaneous jerk (da/dt) at a time (t) on the line segment initial_position, final_position"""
return None
[docs]
def acceleration(self, t: float):
"""Instantaneous acceleration at a time (t) on the line segment initial_position, final_position"""
return None
[docs]
def velocity(self, t: float):
"""Instantaneous velocity at a time (t) on the line segment initial_position, final_position"""
return None
[docs]
def position(self, t: float):
"""Instantaneous position at a time (t) on the line segment initial_position, final_position"""
return None
@property
def elapsed_time(self):
"""Elapsed time required to travel the line segment initial_position, final_position"""
return self._dt
def _s_on_segment(self, s: tuple, tol=ZERO_TOLERANCE):
"""Determines if a position (s) falls on the line segment (final_position - initial_position)
Args:
s (tuple): position in space
tol (float, optional): numerical tolerance. Defaults to ZERO_TOLERANCE.
Returns:
bool: If position s is on segment self.final_position - self.initial_position
"""
if not isinstance(s, (tuple, list, np.ndarray)):
raise TypeError(
f"position s must be one of (tuple, list, np.ndarray). Received {type(s)}"
)
s = np.array(s)
ac = s - self.initial_position
ab = self.final_position - self.initial_position
ab_cross_ac = np.cross(ab, ac)
if all(np.isclose(ab_cross_ac, [0.0, 0.0, 0.0], rtol=tol)):
k_ab = np.dot(ab, ab)
k_ac = np.dot(ab, ac)
if k_ac < 0.0:
return False
elif k_ac > k_ab:
return False
elif abs(k_ac) < tol:
return True
elif abs(k_ac - k_ab) < tol:
return True
elif 0.0 < k_ac < k_ab:
return True
return False
[docs]
class ConstantVelocity(MotionSegment):
def __init__(
self,
initial_position: tuple,
final_position: tuple,
target_velocity: float,
**_,
):
"""Constant Velocity motion profile"""
super().__init__(initial_position, final_position)
self.v = target_velocity
if self.v == 0.0:
raise ValueError(
f"ConsantVelocity Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Velocity is zero. Infinite elapsed time!"
)
self.calculate()
[docs]
def calculate(self):
self._dt = self.distance / self.v
[docs]
def acceleration(self, t: float):
self._validate_time(t)
return 0.0
[docs]
def velocity(self, t: float):
self._validate_time(t)
return self.v
[docs]
def position(self, t):
self._validate_time(t)
return self.direction * self.v * t + self.initial_position
@property
def elapsed_time(self):
return super().elapsed_time
@elapsed_time.setter
def elapsed_time(self, dt: float):
self._dt = dt
if dt == 0.0:
raise ValueError(
f"ConsantVelocity Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Elapsed time is zero. Infinite velocity!"
)
self.v = self.distance / dt
[docs]
class LinearRamp(MotionSegment):
def __init__(
self,
initial_position: tuple,
final_position: tuple,
initial_velocity: float,
final_velocity: float,
max_acceleration: float = None,
_distance: float = None,
**_,
):
"""Linear Velocity motion profile"""
super().__init__(initial_position, final_position, _distance=_distance)
self.initial_velocity = initial_velocity
self.final_velocity = final_velocity
self.max_acceleration = max_acceleration
self.calculate()
[docs]
def calculate(self):
# From ads = vdv
# a_c*dist = (v_f**2 - v_i**2)/2.
self._acceleration_const = (
self.final_velocity**2.0 - self.initial_velocity**2.0
) / (2.0 * self.distance)
if self.max_acceleration and abs(self._acceleration_const) > abs(
self.max_acceleration
):
self._acceleration_const = self.max_acceleration
self.final_velocity = np.sqrt(
2.0 * self.max_acceleration * self.distance + self.initial_velocity**2.0
)
logging.warning(
f"LinearRamp Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Acceleration exceeds maximum ({self.max_acceleration}). Modifying final velocity to meet maximum acceleration criteria: {self.final_velocity}"
)
self._dt = (
self.final_velocity - self.initial_velocity
) / self._acceleration_const
[docs]
def acceleration(self, t: float):
self._validate_time(t)
return self._acceleration_const
[docs]
def velocity(self, t: float):
self._validate_time(t)
return self.initial_velocity + self._acceleration_const * t
[docs]
def position(self, t):
self._validate_time(t)
return (
0.5 * self.direction * self._acceleration_const * t**2.0
+ self.direction * self.initial_velocity * t
+ self.initial_position
)
@property
def elapsed_time(self):
return super().elapsed_time
@elapsed_time.setter
def elapsed_time(self, dt):
self._dt = dt
self._acceleration_const = (self.final_velocity - self.initial_velocity) / dt
if self.max_acceleration and abs(self._acceleration_const) > abs(
self.max_acceleration
):
self._acceleration_const = self.max_acceleration
self.final_velocity = self.max_acceleration * dt + self.initial_velocity
logging.warning(
f"LinearRamp Motion Profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). Acceleration exceeds maximum ({self.max_acceleration}). Modifying final velocity to meet maximum acceleration criteria: {self.final_velocity}"
)
[docs]
class Trapezoid(MotionSegment):
def __init__(
self,
initial_position: tuple,
final_position: tuple,
initial_velocity: float,
target_velocity: float,
final_velocity: float = None,
max_acceleration: float = None,
_distance: float = None,
**_,
):
"""
Models a trapezoidal motion profile for a linear segment.
This profile consists of three phases:
1. A constant acceleration phase to ramp up to a maximum velocity.
2. A constant velocity phase.
3. A constant deceleration phase to ramp down to a final velocity.
The profile operates in one of two modes:
- **Distance-Limited:** If `max_acceleration` is not provided, the profile shape
is determined by assuming the ramp-up and ramp-down phases each occupy
one-third of the total move distance.
- **Acceleration-Limited:** If `max_acceleration` is provided, this limit is
enforced. This may result in a "triangular" or "linear ramp" profile
if the move is too short to reach the requested maximum velocity.
Args:
initial_position (tuple): The starting coordinate of the move.
final_position (tuple): The ending coordinate of the move.
initial_velocity (float): The velocity at the start of the move.
target_velocity (float): The target cruising velocity for the move.
final_velocity (float, optional): The velocity at the end of the move.
If None, defaults to `initial_velocity`.
max_acceleration (float, optional): The maximum allowed acceleration
and deceleration. If None, the profile is distance-limited.
"""
super().__init__(initial_position, final_position, _distance=_distance)
self.initial_velocity = initial_velocity
if final_velocity is None:
final_velocity = initial_velocity
self.final_velocity = final_velocity
self.target_velocity = target_velocity
# specifies ramp up and ramp down distances
# This is the default, which is only applied to the case where no acceleration limit is provided
self._ramp: tuple = (1 / 3.0 * self.distance, 1 / 3.0 * self.distance)
# Check to make sure that the max velocity is greater than initial and final
if self.initial_velocity > self.target_velocity:
self.initial_velocity = self.target_velocity
if self.final_velocity > self.target_velocity:
self.final_velocity = self.target_velocity
self.max_acceleration = max_acceleration
if self.max_acceleration:
self._acceleration_limited_profile()
else:
self._distance_limited_profile()
self.s_space = ()
self.t_space = ()
self.calculate()
[docs]
def calculate(self):
"""Calculates the final position and time frames for the profile.
This method uses the finalized ramp distances (`self._ramp`) and the
calculated accelerations to determine the duration of each motion phase.
It populates the `s_space` (position) and `t_space` (time) tuples
that define the boundaries of the three motion phases.
"""
self.s_space = (
self.initial_position,
self.direction * self._ramp[0] + self.initial_position,
self.final_position - self.direction * self._ramp[1],
self.final_position,
)
t_ramp_up = (
(self.target_velocity - self.initial_velocity) / self._ramp_up_acceleration
if self._ramp_up_acceleration > 0.0
else 0.0
)
t_ramp_down = (
(self.target_velocity - self.final_velocity) / self._ramp_down_acceleration
if self._ramp_down_acceleration > 0.0
else 0.0
)
t_const_velocity = (
(self.distance - (self._ramp[0] + self._ramp[1])) / self.target_velocity
if (self.target_velocity > 0)
else 0.0
)
self._dt = t_ramp_up + t_const_velocity + t_ramp_down
self.t_space = (0.0, t_ramp_up, t_ramp_up + t_const_velocity, self._dt)
def _calculate_ramp_distances(self):
"""Calculates the theoretical distances for ramp-up and ramp-down phases.
Uses the current instance attributes (`target_velocity`, `initial_velocity`,
`final_velocity`, and ramp accelerations) to compute the distance required
for acceleration and deceleration based on the standard kinematic equation:
d = (v_f^2 - v_i^2) / 2a.
Returns:
tuple[float, float]: A tuple containing the (ramp_up_distance, ramp_down_distance).
"""
ramp_up_distance = (
self.target_velocity**2.0 - self.initial_velocity**2.0
) / (2.0 * self._ramp_up_acceleration)
ramp_down_distance = (
self.target_velocity**2.0 - self.final_velocity**2.0
) / (2.0 * self._ramp_down_acceleration)
return ramp_up_distance, ramp_down_distance
def _distance_limited_profile(self):
"""
Calculates profile accelerations when no `max_acceleration` is given.
This method handles the "distance-limited" case. It uses the default
ramp distance assumption (that each ramp phase occupies 1/3 of the
total move distance) to derive the necessary accelerations required
to achieve the requested velocity profile.
Side Effects:
Sets the following instance attributes:
- `self._ramp_up_acceleration`
- `self._ramp_down_acceleration`
"""
self._ramp_up_acceleration = (
self.target_velocity**2.0 - self.initial_velocity**2.0
) / (2.0 * self._ramp[0])
self._ramp_down_acceleration = (
self.target_velocity**2.0 - self.final_velocity**2.0
) / (2.0 * self._ramp[1])
def _acceleration_limited_profile(self):
"""Adjusts the profile parameters when an acceleration limit is provided.
This helper method handles acceleration-limited scenarios by determining if
the requested `target_velocity` is physically achievable given the move
distance and `max_acceleration`.
It handles three distinct cases:
1. **Full Trapezoid:** If the move is long enough, the profile remains a
standard trapezoid, and no velocity parameters are changed.
2. **Triangular Profile:** If the move is too short to reach the requested
`target_velocity` and then decelerate, the profile becomes triangular.
The `target_velocity` is lowered to the true physical peak achievable for
the given distance and constraints.
3. **Linear Ramp:** In the extreme case that the move is too short to even
achieve the target `final_velocity`, the profile degenerates into a
simple linear ramp. Both `target_velocity` and `final_velocity` are
capped at the theoretical maximum, and the deceleration phase is removed.
Side Effects:
Modifies several instance attributes based on the calculated profile:
- `self.target_velocity`: May be lowered to the physically achievable peak.
- `self.final_velocity`: May be lowered if it's unachievable.
- `self._ramp_up_acceleration`: Set to `self.max_acceleration`.
- `self._ramp_down_acceleration`: Set to `self.max_acceleration` or 0.
- `self._dt`, `self.t_space`, `self.s_space`: Set directly in the
linear ramp case.
"""
if self.max_acceleration is None:
return
self._ramp_up_acceleration = self.max_acceleration
self._ramp_down_acceleration = self.max_acceleration
# Case 3: Linear Ramp
# We do not have enough distance to actually even achieve the specified final velocity
# This is the absolute maximum velocity acheivable if the entire distance were a linear ramp
theoretical_max = np.sqrt(
2.0 * self.max_acceleration * self.distance + self.initial_velocity**2.0
)
if (
theoretical_max < self.target_velocity
and theoretical_max < self.final_velocity
):
self.final_velocity = theoretical_max
self.target_velocity = theoretical_max
self._ramp_down_acceleration = 0.0
self._ramp = (self.distance, 0.0)
return
# Determine if we have enough distance to achieve the max velocity
# If we do not, then re-calculate the maximum velocity assuming that the ramp up/down uses 1/3 of the available distance
ramp_up, ramp_down = self._calculate_ramp_distances()
original_target_velocity = self.target_velocity
if (ramp_up + ramp_down) >= self.distance:
# Case 2: Triangular profile
# from V_f**2. = V_i**2 + 2*a*d we can derive
# 2*V_max**2 - V_i**2 - V_f**2 = 2*a*d
# Assuming that ramp_up + ramp_down = distance AND a = a_max
self.target_velocity = np.sqrt(
self.max_acceleration * self.distance
+ 0.5 * (self.initial_velocity**2.0 + self.final_velocity**2.0)
)
self._ramp = self._calculate_ramp_distances()
logging.warning(
f"Trapezoid Profile for line {self.initial_position.tolist()} -> {self.final_position.tolist()}: "
f"The requested max velocity ({original_target_velocity}) was unachievable with the "
f"max acceleration limit ({self.max_acceleration}). "
f"Max velocity has been reduced to {self.target_velocity}."
)
else:
self._ramp = (ramp_up, ramp_down)
[docs]
def acceleration(self, t: float):
self._validate_time(t)
if t <= self.t_space[1]:
return self._ramp_up_acceleration
elif self.t_space[1] < t <= self.t_space[2]:
return 0.0
elif t <= self.t_space[3]:
return -self._ramp_down_acceleration
return None
[docs]
def velocity(self, t: float):
self._validate_time(t)
if t <= self.t_space[1]:
return self.initial_velocity + self._ramp_up_acceleration * t
elif self.t_space[1] < t <= self.t_space[2]:
return self.target_velocity
elif t <= self.t_space[3]:
return self.target_velocity - self._ramp_down_acceleration * (
t - self.t_space[2]
)
return None
[docs]
def position(self, t):
self._validate_time(t)
if t <= self.t_space[1]:
return (
0.5 * self._ramp_up_acceleration * self.direction * t**2.0
+ self.initial_velocity * t * self.direction
+ self.initial_position
)
elif self.t_space[1] < t <= self.t_space[2]:
return self.s_space[1] + self.target_velocity * self.direction * (
t - self.t_space[1]
)
elif t <= self.t_space[3]:
return (
-0.5
* self._ramp_down_acceleration
* self.direction
* (t - self.t_space[2]) ** 2.0
+ self.target_velocity * self.direction * (t - self.t_space[2])
+ self.s_space[2]
)
return None
[docs]
class SCurve(MotionSegment):
def __init__(
self,
initial_position: tuple,
final_position: tuple,
initial_velocity: float,
target_velocity: float,
final_velocity: float = None,
max_acceleration: float = None,
max_jerk: float = None,
**_,
):
"""
Models an S-curve motion profile for a linear segment.
The acceleration profile is trapezoidal (rather than step-wise), giving
continuous acceleration and bounded jerk. The profile has up to seven
phases: three for ramp-up, a constant-velocity cruise, and three for
ramp-down.
The profile operates in one of two modes:
- **Distance-Limited:** If `max_jerk` is not provided, ramp-up and
ramp-down each occupy one-third of the total move distance, and the
required accelerations are derived from that constraint.
- **Jerk-Limited:** If `max_jerk` is provided, it governs the
acceleration shape. If `max_acceleration` is also supplied it acts as
a ceiling on the jerk-derived acceleration. The `target_velocity` is
reduced automatically when the move is too short for the requested
velocity (triangular case).
Args:
initial_position (tuple): The starting coordinate of the move.
final_position (tuple): The ending coordinate of the move.
initial_velocity (float): The velocity at the start of the move.
target_velocity (float): The target cruising velocity.
final_velocity (float, optional): The velocity at the end of the
move. If None, defaults to `initial_velocity`.
max_acceleration (float, optional): Acceleration ceiling used only
in jerk-limited mode. Ignored in distance-limited mode.
max_jerk (float, optional): Maximum jerk. If provided the profile
is jerk-limited; if None it is distance-limited.
"""
super().__init__(initial_position, final_position)
self.initial_velocity = initial_velocity
if final_velocity is None:
final_velocity = initial_velocity
self.final_velocity = final_velocity
self.target_velocity = target_velocity
if self.initial_velocity > self.target_velocity:
self.initial_velocity = self.target_velocity
if self.final_velocity > self.target_velocity:
self.final_velocity = self.target_velocity
self.max_acceleration = max_acceleration
self.max_jerk = max_jerk
self.t_space = ()
self.s_space = ()
if self.max_jerk:
self._jerk_limited_profile()
else:
self._distance_limited_profile()
self.calculate()
@staticmethod
def _scurve_ramp_distance(v_start, v_end, a_max):
"""Distance covered by one S-curve ramp segment.
For an S-curve ramp the average velocity equals (v_start + v_end)/2 and
the ramp duration is (3/2)*|dv|/a_max, so distance = t_ramp * v_avg.
"""
if a_max <= 0 or abs(v_end - v_start) < ZERO_TOLERANCE:
return 0.0
t_ramp = (3.0 / 2.0) * abs(v_end - v_start) / a_max
return t_ramp * (v_start + v_end) / 2.0
def _distance_limited_profile(self):
"""Derives ramp accelerations from the 1/3-distance assumption.
Each ramp (up and down) occupies one-third of the total move distance.
The required acceleration is back-calculated from the S-curve kinematics:
a = (9/4) * (v_target^2 - v_end^2) / distance
"""
d = self.distance
dv_up = self.target_velocity - self.initial_velocity
dv_down = self.target_velocity - self.final_velocity
self._ramp_up_acceleration = (
(9.0 / 4.0) * (self.target_velocity**2 - self.initial_velocity**2) / d
if dv_up > ZERO_TOLERANCE
else 0.0
)
self._ramp_down_acceleration = (
(9.0 / 4.0) * (self.target_velocity**2 - self.final_velocity**2) / d
if dv_down > ZERO_TOLERANCE
else 0.0
)
def _jerk_limited_profile(self):
"""Derives ramp accelerations from max_jerk (with optional max_acceleration cap).
Reduces target_velocity via binary search if the combined ramp distances
exceed the move distance (triangular case).
"""
dv_up = self.target_velocity - self.initial_velocity
dv_down = self.target_velocity - self.final_velocity
a_up = np.sqrt(self.max_jerk * dv_up / 2.0) if dv_up > ZERO_TOLERANCE else 0.0
a_down = (
np.sqrt(self.max_jerk * dv_down / 2.0) if dv_down > ZERO_TOLERANCE else 0.0
)
if self.max_acceleration:
a_up = min(a_up, self.max_acceleration)
a_down = min(a_down, self.max_acceleration)
d_up = self._scurve_ramp_distance(
self.initial_velocity, self.target_velocity, a_up
)
d_down = self._scurve_ramp_distance(
self.final_velocity, self.target_velocity, a_down
)
if d_up + d_down >= self.distance:
original_target = self.target_velocity
self.target_velocity = self._solve_peak_velocity()
dv_up = self.target_velocity - self.initial_velocity
dv_down = self.target_velocity - self.final_velocity
a_up = (
np.sqrt(self.max_jerk * dv_up / 2.0) if dv_up > ZERO_TOLERANCE else 0.0
)
a_down = (
np.sqrt(self.max_jerk * dv_down / 2.0)
if dv_down > ZERO_TOLERANCE
else 0.0
)
if self.max_acceleration:
a_up = min(a_up, self.max_acceleration)
a_down = min(a_down, self.max_acceleration)
logging.warning(
f"SCurve Profile for line {self.initial_position.tolist()} -> {self.final_position.tolist()}: "
f"The requested max velocity ({original_target}) was unachievable with the "
f"max jerk limit ({self.max_jerk}). "
f"Max velocity has been reduced to {self.target_velocity}."
)
self._ramp_up_acceleration = a_up
self._ramp_down_acceleration = a_down
def _solve_peak_velocity(self):
"""Binary search for the peak velocity when ramp distances exceed move distance."""
lo = max(self.initial_velocity, self.final_velocity)
hi = self.target_velocity
for _ in range(64):
mid = (lo + hi) / 2.0
dv_up = mid - self.initial_velocity
dv_down = mid - self.final_velocity
a_up = (
np.sqrt(self.max_jerk * dv_up / 2.0) if dv_up > ZERO_TOLERANCE else 0.0
)
a_down = (
np.sqrt(self.max_jerk * dv_down / 2.0)
if dv_down > ZERO_TOLERANCE
else 0.0
)
if self.max_acceleration:
a_up = min(a_up, self.max_acceleration)
a_down = min(a_down, self.max_acceleration)
d_up = self._scurve_ramp_distance(self.initial_velocity, mid, a_up)
d_down = self._scurve_ramp_distance(self.final_velocity, mid, a_down)
if d_up + d_down < self.distance:
lo = mid
else:
hi = mid
# early exit
if (hi - lo) < ZERO_TOLERANCE:
break
return (lo + hi) / 2.0
[docs]
def calculate(self):
"""Builds s_space and t_space from the ramp accelerations."""
a_up = self._ramp_up_acceleration
a_down = self._ramp_down_acceleration
dv_up = self.target_velocity - self.initial_velocity
dv_down = self.target_velocity - self.final_velocity
t_ramp_up = (3.0 / 2.0) * dv_up / a_up if a_up > ZERO_TOLERANCE else 0.0
t_ramp_down = (3.0 / 2.0) * dv_down / a_down if a_down > ZERO_TOLERANCE else 0.0
dt_up = t_ramp_up / 3.0
dt_down = t_ramp_down / 3.0
# Ramp-up positions (forward from initial_position)
if t_ramp_up > ZERO_TOLERANCE:
s1 = (
(a_up / (2.0 * t_ramp_up)) * self.direction * dt_up**3
+ self.initial_velocity * self.direction * dt_up
+ self.initial_position
)
v1 = (3.0 * a_up / (2.0 * t_ramp_up)) * dt_up**2 + self.initial_velocity
s2 = (
(a_up / 2.0) * self.direction * dt_up**2
+ v1 * self.direction * dt_up
+ s1
)
v2 = v1 + a_up * dt_up
s3 = (
(a_up / 2.0) * self.direction * dt_up**2
- (a_up / (2.0 * t_ramp_up)) * self.direction * dt_up**3
+ v2 * self.direction * dt_up
+ s2
)
else:
s1 = s2 = s3 = self.initial_position.copy()
ramp_up_distance = np.linalg.norm(s3 - self.initial_position)
# Ramp-down total distance; s4 is placed so that ramp-down ends at final_position
ramp_down_distance = (
t_ramp_down * (self.target_velocity + self.final_velocity) / 2.0
)
s4 = self.final_position - ramp_down_distance * self.direction
cruise_distance = self.distance - ramp_up_distance - ramp_down_distance
if cruise_distance < -ZERO_TOLERANCE:
logging.warning(
f"SCurve Motion profile for Line ({self.initial_position.tolist()}, {self.final_position.tolist()}). "
f"Ramp distances ({ramp_up_distance:.3f} + {ramp_down_distance:.3f}) exceed line distance {self.distance:.3f}"
)
cruise_distance = 0.0
t_cruise = (
cruise_distance / self.target_velocity
if self.target_velocity > ZERO_TOLERANCE
else 0.0
)
# Ramp-down positions (forward from s4 at v_target toward final_position)
if t_ramp_down > ZERO_TOLERANCE:
v5 = (
self.target_velocity - (3.0 * a_down / (2.0 * t_ramp_down)) * dt_down**2
)
s5 = (
self.target_velocity * self.direction * dt_down
- (a_down / (2.0 * t_ramp_down)) * self.direction * dt_down**3
+ s4
)
s6 = (
v5 * self.direction * dt_down
- (a_down / 2.0) * self.direction * dt_down**2
+ s5
)
else:
s5 = s6 = self.final_position.copy()
self.s_space = [
self.initial_position,
s1,
s2,
s3,
s4,
s5,
s6,
self.final_position,
]
self._dt = t_ramp_up + t_cruise + t_ramp_down
self.t_space = [
0.0,
dt_up,
2.0 * dt_up,
t_ramp_up,
t_ramp_up + t_cruise,
t_ramp_up + t_cruise + dt_down,
t_ramp_up + t_cruise + 2.0 * dt_down,
self._dt,
]
# ------------------------------------------------------------------
# Kinematic query functions
# ------------------------------------------------------------------
[docs]
def jerk(self, t):
super().jerk(t)
t_ramp_up = self.t_space[3]
t_ramp_down = self.t_space[7] - self.t_space[4]
a_up = self._ramp_up_acceleration
a_down = self._ramp_down_acceleration
if t <= self.t_space[1]:
return 3.0 * a_up / t_ramp_up if t_ramp_up > ZERO_TOLERANCE else 0.0
elif self.t_space[1] < t <= self.t_space[2]:
return 0.0
elif self.t_space[2] < t <= self.t_space[3]:
return -3.0 * a_up / t_ramp_up if t_ramp_up > ZERO_TOLERANCE else 0.0
elif self.t_space[3] < t <= self.t_space[4]:
return 0.0
elif self.t_space[4] < t <= self.t_space[5]:
return -3.0 * a_down / t_ramp_down if t_ramp_down > ZERO_TOLERANCE else 0.0
elif self.t_space[5] < t <= self.t_space[6]:
return 0.0
elif self.t_space[6] < t <= self.t_space[7]:
return 3.0 * a_down / t_ramp_down if t_ramp_down > ZERO_TOLERANCE else 0.0
[docs]
def acceleration(self, t):
self._validate_time(t)
t_ramp_up = self.t_space[3]
t_ramp_down = self.t_space[7] - self.t_space[4]
a_up = self._ramp_up_acceleration
a_down = self._ramp_down_acceleration
if t <= self.t_space[1]:
return (3.0 * a_up / t_ramp_up) * t if t_ramp_up > ZERO_TOLERANCE else 0.0
elif self.t_space[1] < t <= self.t_space[2]:
return a_up
elif self.t_space[2] < t <= self.t_space[3]:
return a_up - (3.0 * a_up / t_ramp_up) * (t - self.t_space[2])
elif self.t_space[3] < t <= self.t_space[4]:
return 0.0
elif self.t_space[4] < t <= self.t_space[5]:
return (
-(3.0 * a_down / t_ramp_down) * (t - self.t_space[4])
if t_ramp_down > ZERO_TOLERANCE
else 0.0
)
elif self.t_space[5] < t <= self.t_space[6]:
return -a_down
elif self.t_space[6] < t <= self.t_space[7]:
return -a_down + (3.0 * a_down / t_ramp_down) * (t - self.t_space[6])
[docs]
def velocity(self, t):
self._validate_time(t)
t_ramp_up = self.t_space[3]
t_ramp_down = self.t_space[7] - self.t_space[4]
a_up = self._ramp_up_acceleration
a_down = self._ramp_down_acceleration
if t <= self.t_space[1]:
return (
(3.0 * a_up / (2.0 * t_ramp_up)) * t**2 + self.initial_velocity
if t_ramp_up > ZERO_TOLERANCE
else self.initial_velocity
)
elif self.t_space[1] < t <= self.t_space[2]:
dt = t - self.t_space[1]
v1 = (3.0 * a_up / (2.0 * t_ramp_up)) * (
t_ramp_up / 3.0
) ** 2 + self.initial_velocity
return a_up * dt + v1
elif self.t_space[2] < t <= self.t_space[3]:
dt = t - self.t_space[2]
v1 = (3.0 * a_up / (2.0 * t_ramp_up)) * (
t_ramp_up / 3.0
) ** 2 + self.initial_velocity
v2 = a_up * (t_ramp_up / 3.0) + v1
return a_up * dt - (3.0 * a_up / (2.0 * t_ramp_up)) * dt**2 + v2
elif self.t_space[3] < t <= self.t_space[4]:
return self.target_velocity
elif self.t_space[4] < t <= self.t_space[5]:
dt = t - self.t_space[4]
return (
self.target_velocity - (3.0 * a_down / (2.0 * t_ramp_down)) * dt**2
if t_ramp_down > ZERO_TOLERANCE
else self.target_velocity
)
elif self.t_space[5] < t <= self.t_space[6]:
dt = t - self.t_space[5]
v5 = (
self.target_velocity
- (3.0 * a_down / (2.0 * t_ramp_down)) * (t_ramp_down / 3.0) ** 2
)
return v5 - a_down * dt
elif self.t_space[6] < t <= self.t_space[7]:
dt = t - self.t_space[6]
v5 = (
self.target_velocity
- (3.0 * a_down / (2.0 * t_ramp_down)) * (t_ramp_down / 3.0) ** 2
)
v6 = v5 - a_down * (t_ramp_down / 3.0)
return v6 - a_down * dt + (3.0 * a_down / (2.0 * t_ramp_down)) * dt**2
[docs]
def position(self, t):
self._validate_time(t)
t_ramp_up = self.t_space[3]
t_ramp_down = self.t_space[7] - self.t_space[4]
a_up = self._ramp_up_acceleration
a_down = self._ramp_down_acceleration
if t <= self.t_space[1]:
cubic = (
(a_up / (2.0 * t_ramp_up)) * t**3 if t_ramp_up > ZERO_TOLERANCE else 0.0
)
return (
cubic * self.direction
+ self.initial_velocity * self.direction * t
+ self.initial_position
)
elif self.t_space[1] < t <= self.t_space[2]:
dt = t - self.t_space[1]
v1 = self.velocity(self.t_space[1])
return (
(a_up / 2.0) * self.direction * dt**2
+ v1 * self.direction * dt
+ self.s_space[1]
)
elif self.t_space[2] < t <= self.t_space[3]:
dt = t - self.t_space[2]
v2 = self.velocity(self.t_space[2])
return (
(a_up / 2.0) * self.direction * dt**2
- (a_up / (2.0 * t_ramp_up)) * self.direction * dt**3
+ v2 * self.direction * dt
+ self.s_space[2]
)
elif self.t_space[3] < t <= self.t_space[4]:
dt = t - self.t_space[3]
return self.target_velocity * self.direction * dt + self.s_space[3]
elif self.t_space[4] < t <= self.t_space[5]:
dt = t - self.t_space[4]
cubic = (
(a_down / (2.0 * t_ramp_down)) * dt**3
if t_ramp_down > ZERO_TOLERANCE
else 0.0
)
return (
self.target_velocity * self.direction * dt
- cubic * self.direction
+ self.s_space[4]
)
elif self.t_space[5] < t <= self.t_space[6]:
dt = t - self.t_space[5]
v5 = self.velocity(self.t_space[5])
return (
v5 * self.direction * dt
- (a_down / 2.0) * self.direction * dt**2
+ self.s_space[5]
)
elif self.t_space[6] < t <= self.t_space[7]:
dt = t - self.t_space[6]
v6 = self.velocity(self.t_space[6])
return (
v6 * self.direction * dt
- (a_down / 2.0) * self.direction * dt**2
+ (a_down / (2.0 * t_ramp_down)) * self.direction * dt**3
+ self.s_space[6]
)