from __future__ import annotations
import json
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Literal, Optional
import numpy as np
__all__ = ["IPOPTStatus", "IterationInfo", "MPCCResult", "unscale_multipliers"]
PairStatus = Literal["G_active", "H_active", "biactive", "inactive"]
[docs]
class IPOPTStatus(IntEnum):
"""IPOPT solver return codes (integer values match cyipopt's status field)."""
SOLVED = 0 # Solve_Succeeded
ACCEPTABLE = 1 # Solved_To_Acceptable_Level
INFEASIBLE = 2 # Infeasible_Problem_Detected
SMALL_STEP = 3 # Search_Direction_Becomes_Too_Small
DIVERGING = 4 # Diverging_Iterates
USER_STOP = 5 # User_Requested_Stop
MAX_ITER = -1 # Maximum_Iterations_Exceeded
RESTORE_FAIL = -2 # Restoration_Failed
STEP_ERROR = -3 # Error_In_Step_Computation
MAX_TIME = -4 # Maximum_CpuTime_Exceeded
NO_DOF = -10 # Not_Enough_Degrees_Of_Freedom
BAD_PROBLEM = -11 # Invalid_Problem_Definition
BAD_OPTION = -12 # Invalid_Option
BAD_NUMBER = -13 # Invalid_Number_Detected
EXCEPTION = -100 # Unrecoverable_Exception
NO_MEMORY = -102 # Insufficient_Memory
INTERNAL_ERR = -199 # Internal_Error
[docs]
@dataclass
class IterationInfo:
"""Diagnostic snapshot for one NLP solve in an iterative strategy."""
epsilon: float
x: np.ndarray
obj: float
status: int
message: str
comp_residual: float # max_i |G_i * H_i|
comp_residual_mean: float # mean_i |G_i * H_i|
n_ipopt_iter: int # IPOPT iterations in this NLP solve
iter_time: float # wall-clock seconds for this NLP solve
kkt_residual: Optional[float] = None # MPCC stationarity residual (∞-norm)
restoration_iter_count: int = 0 # IPOPT iterations in feasibility restoration
entered_restoration: bool = False # True if any iter ran in restoration mode
[docs]
@dataclass
class MPCCResult:
"""
Result returned by :func:`pympcc.solve` or :meth:`pympcc.MPCCSolver.solve`.
Attributes
----------
x : ndarray
Solution vector.
obj : float
Objective value at the solution.
status : int
IPOPT exit code of the final NLP solve.
Common values: 0 = Solve_Succeeded, 1 = Solved_To_Acceptable_Level,
-1 = Maximum_Iterations_Exceeded, 2 = Infeasible_Problem_Detected.
message : str
Human-readable status from IPOPT.
G : ndarray, shape (n_comp,)
G(x*) values at the solution.
H : ndarray, shape (n_comp,)
H(x*) values at the solution.
comp_residual : float
Complementarity feasibility measure: ``max_i |G_i(x*) * H_i(x*)|``.
comp_residual_mean : float
Mean complementarity residual: ``mean_i |G_i(x*) * H_i(x*)|``.
success : bool
``True`` when IPOPT status is 0 (Solve_Succeeded) or
1 (Solved_To_Acceptable_Level).
strategy : str
Name of the reformulation strategy used.
solve_time : float or None
Sum of wall-clock seconds spent inside each ``nlp.solve()`` call.
Excludes strategy setup (NLP construction, sparsity computation),
callback invocations, and post-solve stationarity classification.
``None`` when not measured (e.g. results constructed programmatically).
history : list[IterationInfo]
Per-iteration diagnostics for iterative strategies (Scholtes,
smoothing). Empty for the direct strategy.
"""
x: np.ndarray
obj: float
status: int
message: str
G: np.ndarray
H: np.ndarray
comp_residual: float
comp_residual_mean: float
success: bool
strategy: str
history: list[IterationInfo] = field(default_factory=list)
solve_time: Optional[float] = None
mult_g: Optional[np.ndarray] = field(default=None)
stationarity: str = "unknown"
kkt_residual: Optional[float] = None
# Active-set cleanup phase (post-continuation polish solve).
# Populated only when the strategy was invoked with cleanup={True,"auto"}.
cleanup_status: Optional[int] = None # IPOPT status of cleanup NLP
cleanup_n_iter: Optional[int] = None # IPOPT iters in cleanup NLP
cleanup_obj: Optional[float] = None # objective at cleanup solution
cleanup_active_set: Optional[tuple] = None # (I_G_active, I_H_active)
cleanup_accepted: Optional[bool] = None # True iff cleanup result replaces continuation
# Complementarity-pair diagonal scaling that was active during the solve.
# ``None`` when the problem was solved without rescaling. Multipliers
# ``mpcc_mult_G`` / ``mpcc_mult_H`` returned via ``mult_g`` are in
# *scaled* space; multiply by these vectors to recover original-space
# KKT duals. See :func:`pympcc.unscale_multipliers`.
comp_G_scale: Optional[np.ndarray] = None
comp_H_scale: Optional[np.ndarray] = None
# Constraint-qualification diagnostics (populated when the solver was
# invoked with ``diagnostics=True``). See ``pympcc._diagnostics``.
cq: Optional[str] = None # "MPCC-LICQ" | "MPCC-MFCQ" | "none" | "unknown"
cq_active_set_sizes: Optional[dict] = None # {"g","h","G","H","biactive","xL","xU"}
cq_rank_deficit: Optional[int] = None # 0 ⇔ LICQ
# B-stationarity diagnostics (populated alongside ``cq``). See
# ``pympcc._stationarity.verify_b_stationarity``.
b_stationary: Optional[str] = None # "B-stationary" | "not B-stationary" | "intractable" | "unknown"
b_stationary_witness: Optional[tuple] = None # branch chars where descent was found
b_stationary_min_descent: Optional[float] = None
# Per-pair complementarity status. Always populated by MPCCSolver. Each
# entry is one of: "G_active" (G_i≈0, H_i>0), "H_active" (H_i≈0, G_i>0),
# "biactive" (both≈0), "inactive" (both>0 — infeasible, should not occur).
per_pair_status: Optional[list] = None
# MPCC-stationarity multipliers μ_G = −λ_G, μ_H = −λ_H (literature
# sign convention). Populated when ``tnlp_refine=True`` from the TNLP
# active-set re-solve; None otherwise to avoid confusion with approximate
# relaxed-NLP multipliers.
mult_comp_G_mpcc: Optional[np.ndarray] = None
mult_comp_H_mpcc: Optional[np.ndarray] = None
# TNLP active-set refinement result (§2.6). Populated when the solver is
# called with ``tnlp_refine=True``.
tnlp_refined: Optional[Any] = None # TNLPResult, typed as Any to avoid circular import
# MPCC second-order sufficient conditions (§2.3). Populated when the
# solver is invoked with ``diagnostics=True``.
# ``True`` — reduced Hessian is PD on the critical cone.
# ``False`` — at least one non-positive eigenvalue (not a strict local min).
# ``None`` — check skipped (non-converged, biactive pairs, or Hessian
# unavailable); see ``sosc_skipped_reason``.
sosc: Optional[bool] = None
sosc_min_eigenvalue: Optional[float] = None
sosc_skipped_reason: Optional[str] = None
# Condition-number diagnostics (§6.2). Populated when ``diagnostics=True``.
# ``jac_condition`` — κ₂ of the active-constraint Jacobian. Large values
# coincide with near-LICQ failure.
# ``hessian_condition_estimate`` — κ₂ of the reduced Lagrangian Hessian
# when SOSC is computable and PD. ``None`` when SOSC was skipped or
# the reduced Hessian is indefinite.
jac_condition: Optional[float] = None
hessian_condition_estimate: Optional[float] = None
# Time-limit incumbent flag (§6.3). ``True`` when a ``time_limit`` was
# set and the outer loop terminated early because of it; the returned
# iterate is the best feasible incumbent seen up to that point.
time_limit_hit: bool = False
# PATH-style multi-merit & degeneracy diagnostics (§2.7). Populated
# when the solver is invoked with ``diagnostics=True``. See
# :mod:`pympcc._diagnostics` for the dict schemas.
merit_cross_check: Optional[dict] = None
jac_row_norms: Optional[dict] = None
jac_col_norms: Optional[dict] = None
degeneracy_report: Optional[dict] = None
initial_point_stats: Optional[dict] = None
# Hot-start telemetry (§6.5). ``n_ipopt_iter_total`` aggregates the
# IPOPT iteration counts across every inner NLP solve for this result;
# ``warmstart_savings_iter`` is populated on warm
# :meth:`MPCCSolver.resolve` calls and reports the iteration delta vs
# the cold-baseline solve recorded by the same solver instance.
n_ipopt_iter_total: Optional[int] = None
warmstart_savings_iter: Optional[int] = None
[docs]
def unscale_comp_multipliers(
self,
mpcc_mult_G: np.ndarray,
mpcc_mult_H: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Convert scaled-space comp multipliers back to original space.
If the problem was solved with ``comp_G_scale``/``comp_H_scale``
active, IPOPT's multipliers correspond to the *scaled* constraint
``s · G(x) ≥ 0``. Original-space duals satisfy ``μ = s · μ̃``
(chain rule). Returns the input unchanged if no scaling was used.
"""
s_G = self.comp_G_scale if self.comp_G_scale is not None else 1.0
s_H = self.comp_H_scale if self.comp_H_scale is not None else 1.0
return s_G * mpcc_mult_G, s_H * mpcc_mult_H
[docs]
def to_json(self) -> str:
"""Serialize the result to a JSON string.
Arrays are converted to lists. Fields that are ``None`` are included
as JSON ``null``. The ``history`` list is omitted (use
``summary(verbosity=2)`` for human-readable history).
Returns
-------
str
JSON-encoded result suitable for logging, benchmarking, or storage.
"""
def _arr(a):
return a.tolist() if isinstance(a, np.ndarray) else a
d: dict = {
"obj": self.obj,
"success": self.success,
"status": self.status,
"message": self.message,
"strategy": self.strategy,
"comp_residual": self.comp_residual,
"comp_residual_mean": self.comp_residual_mean,
"stationarity": self.stationarity,
"kkt_residual": self.kkt_residual,
"x": _arr(self.x),
"G": _arr(self.G),
"H": _arr(self.H),
"per_pair_status": self.per_pair_status,
"mult_comp_G_mpcc": _arr(self.mult_comp_G_mpcc),
"mult_comp_H_mpcc": _arr(self.mult_comp_H_mpcc),
"cq": self.cq,
"cq_rank_deficit": self.cq_rank_deficit,
"b_stationary": self.b_stationary,
"solve_time": self.solve_time,
"merit_cross_check": self.merit_cross_check,
"jac_row_norms": self.jac_row_norms,
"jac_col_norms": self.jac_col_norms,
"degeneracy_report": self.degeneracy_report,
"initial_point_stats": self.initial_point_stats,
}
if self.tnlp_refined is not None:
tnlp = self.tnlp_refined
d["tnlp_refined"] = {
"success": tnlp.success,
"status": tnlp.status,
"obj": tnlp.obj,
"stationarity": tnlp.stationarity,
"kkt_residual": tnlp.kkt_residual,
"mult_comp_G": _arr(tnlp.mult_comp_G),
"mult_comp_H": _arr(tnlp.mult_comp_H),
"n_iter": tnlp.n_iter,
"solve_time": tnlp.solve_time,
}
return json.dumps(d)
[docs]
def to_dataframe(self):
"""Return a per-complementarity-pair :class:`pandas.DataFrame`.
Columns: ``pair``, ``G``, ``H``, ``GH``, ``status``, and (when
available from TNLP refinement) ``mu_G``, ``mu_H``.
Requires ``pandas``.
Returns
-------
pandas.DataFrame
One row per complementarity pair.
Raises
------
ImportError
When ``pandas`` is not installed.
"""
try:
import pandas as pd
except ImportError as exc:
raise ImportError(
"pandas is required for MPCCResult.to_dataframe(). "
"Install it with: pip install pandas"
) from exc
rows = []
for i, (g, h) in enumerate(zip(self.G, self.H)):
row: dict = {
"pair": i,
"G": float(g),
"H": float(h),
"GH": float(g * h),
"status": (self.per_pair_status[i]
if self.per_pair_status is not None else None),
}
if self.mult_comp_G_mpcc is not None:
row["mu_G"] = float(self.mult_comp_G_mpcc[i])
if self.mult_comp_H_mpcc is not None:
row["mu_H"] = float(self.mult_comp_H_mpcc[i])
rows.append(row)
return pd.DataFrame(rows)
def __repr__(self) -> str: # pragma: no cover
kkt_str = (f", kkt_residual={self.kkt_residual:.3e}"
if self.kkt_residual is not None else "")
return (
f"MPCCResult(strategy={self.strategy!r}, success={self.success}, "
f"obj={self.obj:.6g}, comp_residual={self.comp_residual:.3e}, "
f"comp_residual_mean={self.comp_residual_mean:.3e}, "
f"stationarity={self.stationarity!r}{kkt_str}, status={self.status})"
)
[docs]
def summary(self, verbosity: int = 1) -> str:
"""Return a formatted summary of the result.
Parameters
----------
verbosity : int
``0`` — single-line headline.
``1`` — multi-section block (default); shows objective, residuals,
stationarity, populated diagnostics, and performance.
``2`` — also includes the per-iteration history table.
"""
try:
status_name = IPOPTStatus(self.status).name
except ValueError:
status_name = "UNKNOWN"
if verbosity <= 0:
kkt = (f", kkt={self.kkt_residual:.2e}"
if self.kkt_residual is not None else "")
return (
f"MPCCResult({self.strategy}): success={self.success}, "
f"obj={self.obj:.6g}, comp={self.comp_residual:.2e}{kkt}"
)
lines: list[str] = []
lines.append(
f"MPCCResult — strategy={self.strategy!r}, success={self.success}"
)
lines.append(f" Status: {self.status} ({status_name}) — {self.message}")
lines.append(" Solution:")
lines.append(f" objective = {self.obj: .6e}")
lines.append(
f" comp_residual = {self.comp_residual:.3e} (max), "
f"{self.comp_residual_mean:.3e} (mean)"
)
if self.kkt_residual is not None:
lines.append(f" KKT residual = {self.kkt_residual:.3e}")
stat_lines: list[str] = []
if self.stationarity and self.stationarity != "unknown":
stat_lines.append(f" class = {self.stationarity}")
if self.cq is not None:
cq_extra = ""
if self.cq_rank_deficit is not None:
cq_extra = f" (rank deficit {self.cq_rank_deficit})"
stat_lines.append(f" CQ = {self.cq}{cq_extra}")
if self.b_stationary is not None:
bs_extra = ""
if self.b_stationary_min_descent is not None:
bs_extra = f" (min descent {self.b_stationary_min_descent:.3e})"
stat_lines.append(f" B-stationary = {self.b_stationary}{bs_extra}")
if stat_lines:
lines.append(" Stationarity:")
lines.extend(stat_lines)
if self.cq_active_set_sizes is not None:
sizes = self.cq_active_set_sizes
parts = [f"|I_{k}|={v}" for k, v in sizes.items()]
lines.append(" Active set:")
lines.append(" " + ", ".join(parts))
if self.merit_cross_check is not None:
mc = self.merit_cross_check
lines.append(" Merit cross-check (max-norms):")
lines.append(
f" FB={mc['fb_max']:.3e}, "
f"min-map={mc['min_map_max']:.3e}, "
f"G·H={mc['inner_product_max']:.3e}, "
f"disagreement={mc['disagreement_ratio']:.2f}"
)
if self.degeneracy_report is not None:
dr = self.degeneracy_report
min_sv = dr.get("min_singular_value")
min_sv_s = f"{min_sv:.3e}" if isinstance(min_sv, float) else "n/a"
lines.append(
f" Degeneracy: biactive={dr.get('n_biactive')}, "
f"zero-rows={dr.get('n_zero_rows')}, "
f"zero-cols={dr.get('n_zero_cols')}, "
f"σ_min={min_sv_s}"
)
perf_lines: list[str] = []
if self.solve_time is not None:
perf_lines.append(f" solve_time = {self.solve_time:.3f}s")
if self.history:
n_outer = len(self.history)
n_ipopt = sum(it.n_ipopt_iter for it in self.history)
n_restoration = sum(it.restoration_iter_count for it in self.history)
restored = any(it.entered_restoration for it in self.history)
iters_str = f" outer iters = {n_outer} (IPOPT iters: {n_ipopt}"
if restored:
iters_str += f", restoration: {n_restoration}"
iters_str += ")"
perf_lines.append(iters_str)
if perf_lines:
lines.append(" Performance:")
lines.extend(perf_lines)
if self.cleanup_status is not None:
lines.append(" Cleanup:")
lines.append(
f" accepted={self.cleanup_accepted}, "
f"status={self.cleanup_status}, "
f"obj={self.cleanup_obj:.6e}, "
f"n_iter={self.cleanup_n_iter}"
)
if self.per_pair_status is not None:
from collections import Counter
counts = Counter(self.per_pair_status)
parts = []
for key in ("G_active", "H_active", "biactive", "inactive"):
if counts[key]:
parts.append(f"{key}={counts[key]}")
lines.append(" Complementarity pairs: " + ", ".join(parts))
if self.tnlp_refined is not None:
tnlp = self.tnlp_refined
lines.append(" TNLP Refinement:")
lines.append(
f" success={tnlp.success}, "
f"status={tnlp.status}, "
f"obj={tnlp.obj:.6e}, "
f"stationarity={tnlp.stationarity!r}, "
f"n_iter={tnlp.n_iter}"
)
if tnlp.kkt_residual is not None:
lines.append(f" KKT residual = {tnlp.kkt_residual:.3e}")
if verbosity >= 2 and self.history:
lines.append(" Per-iteration history:")
lines.append(
" k epsilon obj comp_max "
"kkt n_iter time"
)
for k, it in enumerate(self.history):
kkt_cell = (f"{it.kkt_residual:.3e}"
if it.kkt_residual is not None else " - ")
lines.append(
f" {k:<4d} {it.epsilon:.3e} {it.obj: .3e} "
f"{it.comp_residual:.3e} {kkt_cell} "
f"{it.n_ipopt_iter:<8d} {it.iter_time:.3f}s"
)
return "\n".join(lines)
[docs]
def unscale_multipliers(
result: "MPCCResult",
mpcc_mult_G: np.ndarray,
mpcc_mult_H: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Module-level alias for :meth:`MPCCResult.unscale_comp_multipliers`."""
return result.unscale_comp_multipliers(mpcc_mult_G, mpcc_mult_H)