"""Presolve passes for :class:`~pympcc.MPCCProblem`.
Two passes are applied (both opt-in via ``MPCCSolver(..., presolve=True)``):
1. **Pinned-variable elimination** — variables with ``xl[j] == xu[j]``
(finite, equal) are substituted out of the optimisation. The reduced
problem has fewer variables; callbacks are wrapped to reinject the
fixed values before evaluation.
2. **Dead-pair pruning** — complementarity pairs whose Jacobian row is
structurally empty (no x-dependence) and whose value at ``x0`` is
strictly positive are dropped. The constraint ``G_i ≥ 0`` (resp.
``H_i ≥ 0``) is trivially satisfied by a positive constant.
Restricted to the COO-sparsity case; falls back to a no-op when
sparsity is not provided.
The mapping ``PresolveMap`` is used by the solver to expand the result
back to the original variable / comp-pair space before returning.
Module layout
-------------
The detection passes live in :mod:`pympcc._presolve_detect`, FBBT in
:mod:`pympcc._presolve_fbbt`, and the reduction step in
:mod:`pympcc._presolve_reduce`. This module owns :class:`PresolveMap`
and the public :func:`presolve` entry point, and re-exports the
private helpers so existing call sites that import them via
``pympcc._presolve`` keep working.
"""
from __future__ import annotations
import logging
import warnings
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
# Re-export the split-out helpers so external callers (and the test
# suite) can keep importing them via ``pympcc._presolve``. Each of the
# three submodules is single-purpose and self-contained.
from ._presolve_detect import (
_detect_dead,
_detect_empty_cols,
_detect_empty_rows,
_detect_forced,
_detect_pinned_from_bounds,
_detect_prefix_eq,
)
from ._presolve_fbbt import _fbbt
from ._presolve_reduce import _build_reduced, _identity_map
from .problem import MPCCProblem
from .result import IterationInfo, MPCCResult
_log = logging.getLogger(__name__)
__all__ = ["PresolveMap", "presolve"]
[docs]
@dataclass
class PresolveMap:
"""Mapping between an original :class:`MPCCProblem` and its reduced form.
Use :meth:`expand_result` to lift a result on the reduced problem
back to the original variable / comp-pair indexing.
"""
keep_vars: np.ndarray # original variable indices kept
fixed_vars: np.ndarray # original variable indices pinned
fixed_vals: np.ndarray # value each pinned variable was fixed to
keep_comp: np.ndarray # original comp-pair indices kept
n_orig: int # original number of variables
n_comp_orig: int # original number of comp pairs
n_ineq: int = 0 # original number of inequality constraints
n_eq: int = 0 # original number of equality constraints
keep_ineq: Optional[np.ndarray] = None # original ineq rows kept (None ⇒ all)
keep_eq: Optional[np.ndarray] = None # original eq rows kept (None ⇒ all)
# B2 — forced-pair pruning: pairs whose H_i ≡ 0 (resp. G_i ≡ 0)
# structurally were dropped from the comp block; their G_i (resp. H_i)
# was promoted to a regular inequality. Multipliers on those promoted
# rows are dropped on expand_result (info loss; documented).
promote_G: np.ndarray = field(default_factory=lambda: np.empty(0, dtype=np.intp))
promote_H: np.ndarray = field(default_factory=lambda: np.empty(0, dtype=np.intp))
# B3 — pre-fixing on linear sign analysis: pairs where one side is
# linear and provably > 0 over [xl, xu] are dropped; the other side
# is enforced as an equality (H_i = 0 if G_i > 0; G_i = 0 if H_i > 0).
# Multipliers on those promoted eq rows are dropped on expand_result.
prefix_H_eq: np.ndarray = field(default_factory=lambda: np.empty(0, dtype=np.intp))
prefix_G_eq: np.ndarray = field(default_factory=lambda: np.empty(0, dtype=np.intp))
@property
def is_identity(self) -> bool:
n_ineq_kept = self.n_ineq if self.keep_ineq is None else self.keep_ineq.size
n_eq_kept = self.n_eq if self.keep_eq is None else self.keep_eq.size
return (self.fixed_vars.size == 0
and self.keep_comp.size == self.n_comp_orig
and n_ineq_kept == self.n_ineq
and n_eq_kept == self.n_eq
and self.promote_G.size == 0
and self.promote_H.size == 0
and self.prefix_H_eq.size == 0
and self.prefix_G_eq.size == 0)
[docs]
def expand_x(self, x_red: np.ndarray) -> np.ndarray:
"""Scatter a reduced-space ``x`` back to the original ``n_orig`` slots."""
x_red = np.asarray(x_red, dtype=float)
if self.fixed_vars.size == 0:
return x_red.copy()
x_full = np.empty(self.n_orig, dtype=float)
x_full[self.keep_vars] = x_red
x_full[self.fixed_vars] = self.fixed_vals
return x_full
[docs]
def expand_comp(self, vec_red: np.ndarray, *, fill: float = 0.0) -> np.ndarray:
"""Scatter a length-``len(keep_comp)`` vector back to ``n_comp_orig``."""
vec_red = np.asarray(vec_red, dtype=float)
if self.keep_comp.size == self.n_comp_orig:
return vec_red.copy()
out = np.full(self.n_comp_orig, fill, dtype=float)
out[self.keep_comp] = vec_red
return out
[docs]
def expand_result(
self,
result: MPCCResult,
problem_orig: MPCCProblem,
) -> MPCCResult:
"""Expand ``result`` (built on the reduced problem) into original space.
Mutates ``result`` in place and also returns it.
"""
if self.is_identity:
return result
# Primal solution.
x_full = self.expand_x(result.x)
# G, H: re-evaluate on the original problem so values reflect the
# original constraint, including pruned (constant-positive) pairs.
G_full = np.asarray(problem_orig.comp_G(x_full), dtype=float) # type: ignore[misc]
H_full = np.asarray(problem_orig.comp_H(x_full), dtype=float) # type: ignore[misc]
# mult_g: layout is [ineq | eq | <comp blocks>], comp blocks each
# of length ``n_comp_red``. Rebuild with zeros at pruned ineq/eq
# rows and dropped comp pairs. The reduced ineq block is
# ``[orig_kept | promoted_G | promoted_H]``; only the orig-kept
# slice maps back — promoted multipliers are dropped (info loss).
mult_g = result.mult_g
if mult_g is not None:
n_ineq_kept = self.n_ineq if self.keep_ineq is None else self.keep_ineq.size
n_promote = self.promote_G.size + self.promote_H.size
n_ineq_red = n_ineq_kept + n_promote
n_eq_kept = self.n_eq if self.keep_eq is None else self.keep_eq.size
n_prefix_eq = self.prefix_H_eq.size + self.prefix_G_eq.size
n_eq_red = n_eq_kept + n_prefix_eq
n_comp_red = self.keep_comp.size
n_pre_red = n_ineq_red + n_eq_red
ineq_red = np.asarray(mult_g[:n_ineq_kept]) # promoted slice ignored
eq_red = np.asarray(mult_g[n_ineq_red:n_ineq_red + n_eq_kept]) # prefix slice ignored
tail = np.asarray(mult_g[n_pre_red:])
ineq_full = np.zeros(self.n_ineq)
if self.keep_ineq is None:
ineq_full = ineq_red
else:
ineq_full[self.keep_ineq] = ineq_red
eq_full = np.zeros(self.n_eq)
if self.keep_eq is None:
eq_full = eq_red
else:
eq_full[self.keep_eq] = eq_red
tail_full = tail
if (n_comp_red > 0 and tail.size % n_comp_red == 0
and self.keep_comp.size != self.n_comp_orig):
K = tail.size // n_comp_red
blocks_red = tail.reshape(K, n_comp_red)
blocks_full = np.zeros((K, self.n_comp_orig))
blocks_full[:, self.keep_comp] = blocks_red
tail_full = blocks_full.reshape(-1)
if (n_ineq_red != self.n_ineq
or n_eq_red != self.n_eq
or self.keep_comp.size != self.n_comp_orig):
mult_g = np.concatenate([ineq_full, eq_full, tail_full])
# comp_*_scale: pad pruned slots with 1.0 (no rescaling on those pairs).
cG_scale = result.comp_G_scale
cH_scale = result.comp_H_scale
if cG_scale is not None and len(cG_scale) != self.n_comp_orig:
cG_scale = self.expand_comp(cG_scale, fill=1.0)
if cH_scale is not None and len(cH_scale) != self.n_comp_orig:
cH_scale = self.expand_comp(cH_scale, fill=1.0)
# history: scatter each iter's primal back to n_orig.
new_history: list[IterationInfo] = []
for info in result.history:
new_history.append(IterationInfo(
epsilon=info.epsilon,
x=self.expand_x(info.x),
obj=info.obj,
status=info.status,
message=info.message,
comp_residual=info.comp_residual,
comp_residual_mean=info.comp_residual_mean,
n_ipopt_iter=info.n_ipopt_iter,
iter_time=info.iter_time,
kkt_residual=info.kkt_residual,
restoration_iter_count=info.restoration_iter_count,
entered_restoration=info.entered_restoration,
))
result.x = x_full
result.G = G_full
result.H = H_full
result.mult_g = mult_g
result.comp_G_scale = cG_scale
result.comp_H_scale = cH_scale
result.history = new_history
return result
# --------------------------------------------------------------------------- #
# Public entry point #
# --------------------------------------------------------------------------- #
[docs]
def presolve(problem: MPCCProblem) -> tuple[MPCCProblem, PresolveMap]:
"""Reduce *problem* by FBBT, pinned-var elimination and dead-pair pruning.
Returns ``(reduced_problem, map)``. When nothing can be eliminated,
or when reduction would yield an invalid problem (e.g. an empty
Jacobian sparsity pattern), returns ``(problem, identity_map)``.
Detected linear infeasibility (an FBBT bound crossing) emits a
``UserWarning`` and falls back to identity so the original problem
can surface the issue through the strategy's normal infeasibility
path.
"""
# 1. Empty-row detection on the original sparsity.
drop_ineq, drop_eq, infeasible = _detect_empty_rows(problem)
if infeasible:
warnings.warn(
"pympcc.presolve: structurally-empty constraint row violates its "
"RHS at x0; falling back to identity (no presolve).",
UserWarning,
stacklevel=2,
)
return problem, _identity_map(problem)
# 2. FBBT: tighten xl/xu from linear ineq/eq rows.
xl_t, xu_t, infeasible = _fbbt(problem)
if infeasible:
warnings.warn(
"pympcc.presolve: FBBT detected an infeasible linear constraint; "
"falling back to identity (no presolve).",
UserWarning,
stacklevel=2,
)
return problem, _identity_map(problem)
# 3. Empty-column detection — pin free variables via xl=xu=clip(0,...).
free_cols = _detect_empty_cols(problem, xl_t, xu_t)
if free_cols.size:
for j in free_cols:
lo, hi = xl_t[j], xu_t[j]
fix = 0.0
if np.isfinite(lo) and np.isfinite(hi):
fix = float(np.clip(0.0, lo, hi))
elif np.isfinite(lo):
fix = float(max(0.0, lo))
elif np.isfinite(hi):
fix = float(min(0.0, hi))
xl_t[j] = fix
xu_t[j] = fix
# 4. Pinned-var detection on the (possibly tightened) bounds.
pinned_idx, pinned_vals = _detect_pinned_from_bounds(xl_t, xu_t)
dead = _detect_dead(problem)
promote_G, promote_H, extra_dead = _detect_forced(problem, dead)
if extra_dead.size:
dead = dead.copy()
dead[extra_dead] = True
dropped_pairs = dead.copy()
if promote_G.size:
dropped_pairs[promote_G] = True
if promote_H.size:
dropped_pairs[promote_H] = True
# B3 — linear sign analysis using FBBT-tightened bounds.
prefix_H_eq, prefix_G_eq, b3_infeasible = _detect_prefix_eq(
problem, xl_t, xu_t, dropped_pairs,
)
if b3_infeasible:
warnings.warn(
"pympcc.presolve: linear sign analysis detected infeasible "
"complementarity (G_i > 0 and H_i > 0 simultaneously); "
"falling back to identity (no presolve).",
UserWarning,
stacklevel=2,
)
return problem, _identity_map(problem)
if prefix_H_eq.size:
dropped_pairs[prefix_H_eq] = True
if prefix_G_eq.size:
dropped_pairs[prefix_G_eq] = True
keep_comp = np.where(~dropped_pairs)[0].astype(np.intp)
bounds_changed = (
not np.array_equal(xl_t, np.asarray(problem.xl, dtype=float))
or not np.array_equal(xu_t, np.asarray(problem.xu, dtype=float))
)
rows_dropped = bool(drop_ineq.any() or drop_eq.any())
if (pinned_idx.size == 0
and keep_comp.size == problem.n_comp
and not bounds_changed
and not rows_dropped
and promote_G.size == 0
and promote_H.size == 0
and prefix_H_eq.size == 0
and prefix_G_eq.size == 0):
return problem, _identity_map(problem)
# MPCC requires n_comp >= 1; bail out of comp pruning rather than
# destroy that invariant.
if keep_comp.size == 0:
keep_comp = np.arange(problem.n_comp, dtype=np.intp)
promote_G = np.empty(0, dtype=np.intp)
promote_H = np.empty(0, dtype=np.intp)
prefix_H_eq = np.empty(0, dtype=np.intp)
prefix_G_eq = np.empty(0, dtype=np.intp)
if pinned_idx.size == 0 and not bounds_changed and not rows_dropped:
return problem, _identity_map(problem)
keep_vars = np.setdiff1d(
np.arange(problem.n, dtype=np.intp),
pinned_idx,
assume_unique=True,
)
# cyipopt requires n >= 1. When *every* variable is pinned, fall
# back to identity so the strategy / NLP layer evaluates the
# original problem at its single feasible point (where IPOPT
# terminates after one iteration); avoids constructing an
# n_red == 0 NLP that cyipopt rejects.
if keep_vars.size == 0:
return problem, _identity_map(problem)
pmap = PresolveMap(
keep_vars=keep_vars,
fixed_vars=pinned_idx.astype(np.intp),
fixed_vals=np.asarray(pinned_vals, dtype=float),
keep_comp=keep_comp,
n_orig=problem.n,
n_comp_orig=problem.n_comp,
n_ineq=problem.n_ineq,
n_eq=problem.n_eq,
promote_G=promote_G,
promote_H=promote_H,
prefix_H_eq=prefix_H_eq,
prefix_G_eq=prefix_G_eq,
)
try:
reduced = _build_reduced(
problem, pmap, xl_full=xl_t, xu_full=xu_t,
drop_ineq=drop_ineq, drop_eq=drop_eq,
)
except ValueError as e:
warnings.warn(
f"pympcc.presolve: reduction would invalidate the problem ({e}); "
"falling back to identity (no presolve).",
UserWarning,
stacklevel=3,
)
return problem, _identity_map(problem)
return reduced, pmap