-
Notifications
You must be signed in to change notification settings - Fork 281
Add realtime_trace_jsonl recipe for structured real-time optimization progress streaming #1177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
08308d7
408b6da
868022f
9d2dfac
4ab23b2
1f1bead
2c9444c
211359a
2e8f3bd
afbe9e0
6bf7355
cc41cad
f646e51
b822dbc
7fd53cf
987251f
3478efb
fc869bc
c7d36b9
7e81977
d8e849e
8edf0cc
488226c
8d965ca
0a24299
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,37 +1,185 @@ | ||
| import json | ||
|
|
||
| from pyscipopt import SCIP_EVENTTYPE, Eventhdlr, Model | ||
|
|
||
| _TRACE_HANDLER_KEY = "_structured_optimization_trace_handler" | ||
|
|
||
|
|
||
| class _TraceEventhdlr(Eventhdlr): | ||
| def __init__(self): | ||
| self.trace = None | ||
| self.write_run_end_active = False | ||
| self._caught_events = set() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if we attach the handler to a model, solve it, call something like |
||
|
|
||
| def eventinit(self): | ||
| for event_type in ( | ||
| SCIP_EVENTTYPE.BESTSOLFOUND, | ||
| SCIP_EVENTTYPE.DUALBOUNDIMPROVED, | ||
| ): | ||
| if event_type not in self._caught_events: | ||
| self.model.catchEvent(event_type, self) | ||
| self._caught_events.add(event_type) | ||
|
|
||
| def eventexec(self, event): | ||
| if self.trace is not None: | ||
| self.trace._handle_event(event) | ||
|
|
||
|
|
||
| class _StructuredOptimizationTrace: | ||
| """Internal trace controller shared by both public APIs.""" | ||
|
|
||
| def __init__(self, model: Model, path=None, write_run_end=True): | ||
| self.model = model | ||
| self.path = path | ||
| self.write_run_end = write_run_end | ||
| self._fh = None | ||
| self._handler = None | ||
| self._last_snapshot: dict[str, object] = {} | ||
|
|
||
| def __enter__(self): | ||
| if not hasattr(self.model, "data") or self.model.data is None: | ||
| self.model.data = {} | ||
|
|
||
| self._handler = self.model.data.get(_TRACE_HANDLER_KEY) | ||
| if self._handler is None: | ||
| self._handler = _TraceEventhdlr() | ||
| self.model.includeEventhdlr( | ||
| self._handler, | ||
| "structured_trace", | ||
| "Structured optimization trace handler", | ||
| ) | ||
| self.model.data[_TRACE_HANDLER_KEY] = self._handler | ||
|
|
||
| if self._handler.write_run_end_active: | ||
| raise RuntimeError( | ||
| "structured optimization trace is already active for this model" | ||
| ) | ||
|
|
||
| self.model.data["trace"] = [] | ||
|
|
||
| if self.path is not None: | ||
| self._fh = open(self.path, "w", encoding="utf-8") | ||
|
|
||
| self._handler.trace = self | ||
| if self.write_run_end: | ||
| self._handler.write_run_end_active = True | ||
|
|
||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc, tb): | ||
| fields = {} | ||
| if self._last_snapshot: | ||
| fields.update(self._last_snapshot) | ||
|
|
||
| if exc_type is None: | ||
| fields["status"] = "finished" | ||
| else: | ||
| fields.update( | ||
| { | ||
| "status": "exception", | ||
| "exception": exc_type.__name__, | ||
| "message": str(exc) if exc is not None else None, | ||
| } | ||
| ) | ||
|
|
||
| try: | ||
| if self.write_run_end: | ||
| self._write_event("run_end", fields) | ||
| finally: | ||
| if self._fh is not None: | ||
| try: | ||
| self._fh.close() | ||
| finally: | ||
| self._fh = None | ||
|
|
||
| if self._handler is not None and self._handler.trace is self: | ||
| self._handler.trace = None | ||
| if self.write_run_end: | ||
| self._handler.write_run_end_active = False | ||
|
|
||
| return False | ||
|
|
||
| def _handle_event(self, event): | ||
| event_type = event.getType() | ||
| if event_type == SCIP_EVENTTYPE.BESTSOLFOUND: | ||
| self._write_snapshot("bestsol_found") | ||
| elif event_type == SCIP_EVENTTYPE.DUALBOUNDIMPROVED: | ||
| self._write_snapshot("dualbound_improved") | ||
|
|
||
| def _snapshot_now(self): | ||
| return { | ||
| "time": self.model.getSolvingTime(), | ||
| "primalbound": self.model.getPrimalbound(), | ||
| "dualbound": self.model.getDualbound(), | ||
| "gap": self.model.getGap(), | ||
| "nodes": self.model.getNNodes(), | ||
| "nsol": self.model.getNSols(), | ||
| } | ||
|
|
||
| def _write_snapshot(self, event_type): | ||
| snapshot = self._snapshot_now() | ||
| self._last_snapshot = snapshot | ||
| self._write_event(event_type, snapshot) | ||
|
|
||
| def _write_event(self, event_type, fields=None): | ||
| event = {"type": event_type} | ||
| if fields: | ||
| event.update(fields) | ||
|
|
||
| self.model.data["trace"].append(event) | ||
| if self._fh is not None: | ||
| self._fh.write(json.dumps(event) + "\n") | ||
| self._fh.flush() | ||
|
|
||
|
|
||
| def structured_optimization_trace(model: Model, path=None): | ||
| """ | ||
| Create a context manager for structured optimization progress tracing. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like most of this docstring would work better at the top of the file |
||
|
|
||
| Records progress events in ``model.data["trace"]`` while the model is | ||
| optimized inside the context. If ``path`` is given, the same records are | ||
| written as JSONL and flushed after each write. | ||
|
|
||
| On exit, appends a final ``run_end`` record and closes JSONL output. If the | ||
| context exits with a Python exception, the ``run_end`` record includes | ||
| exception metadata and the exception is re-raised. | ||
|
|
||
| Args: | ||
| model: SCIP Model. | ||
| path: Optional JSONL output path. If None, records are only stored in | ||
| ``model.data["trace"]``. | ||
|
|
||
| Usage: | ||
| with structured_optimization_trace(model, path="trace.jsonl"): | ||
| model.optimize() | ||
|
|
||
| with structured_optimization_trace(model): | ||
| model.optimizeNogil() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are you talking about |
||
| """ | ||
| return _StructuredOptimizationTrace(model, path=path, write_run_end=True) | ||
|
|
||
|
|
||
| def attach_structured_optimization_trace(model: Model): | ||
| """ | ||
| Attaches an event handler that records optimization progress in structured JSONL format. | ||
| Attaches an event handler that records structured optimization progress. | ||
|
|
||
| This is the attach-style API for simple in-memory tracing. It records | ||
| progress events in ``model.data["trace"]`` but does not manage a Python | ||
| finalization scope, so it does not emit a final ``run_end`` record. | ||
|
|
||
| Use ``structured_optimization_trace(model, path=...)`` as a context manager | ||
| when JSONL output, ``run_end`` records, flushing, closing, or exception | ||
| finalization are required. | ||
|
|
||
| Args: | ||
| model: SCIP Model | ||
| """ | ||
|
|
||
| class _TraceEventhdlr(Eventhdlr): | ||
| def eventinit(self): | ||
| self.model.catchEvent(SCIP_EVENTTYPE.BESTSOLFOUND, self) | ||
| self.model.catchEvent(SCIP_EVENTTYPE.DUALBOUNDIMPROVED, self) | ||
|
|
||
| def eventexec(self, event): | ||
| record = { | ||
| "time": self.model.getSolvingTime(), | ||
| "primalbound": self.model.getPrimalbound(), | ||
| "dualbound": self.model.getDualbound(), | ||
| "gap": self.model.getGap(), | ||
| "nodes": self.model.getNNodes(), | ||
| "nsol": self.model.getNSols(), | ||
| } | ||
| self.model.data["trace"].append(record) | ||
|
|
||
| if not hasattr(model, "data") or model.data is None: | ||
| model.data = {} | ||
| model.data["trace"] = [] | ||
|
|
||
| hdlr = _TraceEventhdlr() | ||
| model.includeEventhdlr( | ||
| hdlr, "structured_trace", "Structured optimization trace handler" | ||
| ) | ||
| Usage: | ||
| attach_structured_optimization_trace(model) | ||
| model.optimize() | ||
| trace = model.data["trace"] | ||
| """ | ||
| trace = _StructuredOptimizationTrace(model, write_run_end=False) | ||
| trace.__enter__() | ||
|
|
||
| return model | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,32 +1,130 @@ | ||
| import json | ||
|
|
||
| import pytest | ||
| from helpers.utils import bin_packing_model | ||
|
|
||
| from pyscipopt.recipes.structured_optimization_trace import ( | ||
| attach_structured_optimization_trace, | ||
| structured_optimization_trace, | ||
| ) | ||
|
|
||
|
|
||
| def test_structured_optimization_trace(): | ||
| from random import randint | ||
|
|
||
| model = bin_packing_model(sizes=[randint(1, 40) for _ in range(120)], capacity=50) | ||
| def _model(): | ||
| model = bin_packing_model(sizes=list(range(1, 41)) * 3, capacity=50) | ||
| model.setParam("limits/time", 5) | ||
| return model | ||
|
|
||
|
|
||
| def _assert_progress_records(records): | ||
| required_fields = {"time", "primalbound", "dualbound", "gap", "nodes", "nsol"} | ||
|
|
||
| for record in records: | ||
| if record["type"] != "run_end": | ||
| assert required_fields <= set(record.keys()) | ||
|
|
||
| primalbounds = [r["primalbound"] for r in records if "primalbound" in r] | ||
| for i in range(1, len(primalbounds)): | ||
| assert primalbounds[i] <= primalbounds[i - 1] | ||
|
|
||
| dualbounds = [r["dualbound"] for r in records if "dualbound" in r] | ||
| for i in range(1, len(dualbounds)): | ||
| assert dualbounds[i] >= dualbounds[i - 1] | ||
|
|
||
|
|
||
| def test_attach_structured_optimization_trace_in_memory(): | ||
| model = _model() | ||
| model.data = {"test": True} | ||
|
|
||
| model = attach_structured_optimization_trace(model) | ||
|
|
||
| assert "test" in model.data | ||
| assert "trace" in model.data | ||
|
|
||
| model.optimize() | ||
|
|
||
| required_fields = {"time", "primalbound", "dualbound", "gap", "nodes", "nsol"} | ||
| for record in model.data["trace"]: | ||
| assert required_fields <= set(record.keys()) | ||
| assert model.data["trace"] | ||
| assert all("type" in record for record in model.data["trace"]) | ||
| assert "run_end" not in [r["type"] for r in model.data["trace"]] | ||
| _assert_progress_records(model.data["trace"]) | ||
|
|
||
| primalbounds = [r["primalbound"] for r in model.data["trace"]] | ||
| for i in range(1, len(primalbounds)): | ||
| assert primalbounds[i] <= primalbounds[i - 1] | ||
|
|
||
| dualbounds = [r["dualbound"] for r in model.data["trace"]] | ||
| for i in range(1, len(dualbounds)): | ||
| assert dualbounds[i] >= dualbounds[i - 1] | ||
| @pytest.mark.parametrize("optimize", ["optimize", "optimizeNogil"]) | ||
| def test_structured_optimization_trace_context_in_memory(optimize): | ||
| model = _model() | ||
| model.data = {"test": True} | ||
|
|
||
| with structured_optimization_trace(model): | ||
| getattr(model, optimize)() | ||
|
|
||
| assert "test" in model.data | ||
| assert "trace" in model.data | ||
|
|
||
| types = [r["type"] for r in model.data["trace"]] | ||
| assert "run_end" in types | ||
| assert model.data["trace"][-1]["type"] == "run_end" | ||
| assert model.data["trace"][-1]["status"] == "finished" | ||
| _assert_progress_records(model.data["trace"]) | ||
|
|
||
|
|
||
| def test_structured_optimization_trace_file_output(tmp_path): | ||
| model = _model() | ||
| path = tmp_path / "trace.jsonl" | ||
|
|
||
| with structured_optimization_trace(model, path=str(path)): | ||
| model.optimize() | ||
|
|
||
| assert path.exists() | ||
|
|
||
| records = [json.loads(line) for line in path.read_text().splitlines()] | ||
| assert records == model.data["trace"] | ||
| assert records[-1]["type"] == "run_end" | ||
| assert records[-1]["status"] == "finished" | ||
| _assert_progress_records(records) | ||
|
|
||
|
|
||
| def test_structured_optimization_trace_records_run_end_on_exception(): | ||
| model = _model() | ||
|
|
||
| with pytest.raises(ValueError): | ||
| with structured_optimization_trace(model): | ||
| raise ValueError("test error") | ||
|
|
||
| assert model.data["trace"] == [ | ||
| { | ||
| "type": "run_end", | ||
| "status": "exception", | ||
| "exception": "ValueError", | ||
| "message": "test error", | ||
| } | ||
| ] | ||
|
|
||
|
|
||
| def test_structured_optimization_trace_reuses_handler_for_repeated_contexts(tmp_path): | ||
| model = _model() | ||
| first_path = tmp_path / "first.jsonl" | ||
| second_path = tmp_path / "second.jsonl" | ||
|
|
||
| with structured_optimization_trace(model, path=str(first_path)): | ||
| pass | ||
|
|
||
| handler = model.data["_structured_optimization_trace_handler"] | ||
|
|
||
| with structured_optimization_trace(model, path=str(second_path)): | ||
| pass | ||
|
|
||
| assert model.data["_structured_optimization_trace_handler"] is handler | ||
|
|
||
| first_records = [json.loads(line) for line in first_path.read_text().splitlines()] | ||
| second_records = [json.loads(line) for line in second_path.read_text().splitlines()] | ||
|
|
||
| assert first_records == [{"type": "run_end", "status": "finished"}] | ||
| assert second_records == [{"type": "run_end", "status": "finished"}] | ||
|
|
||
|
|
||
| def test_structured_optimization_trace_rejects_nested_contexts(): | ||
| model = _model() | ||
|
|
||
| with structured_optimization_trace(model): | ||
| with pytest.raises(RuntimeError): | ||
| with structured_optimization_trace(model): | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than replacing the old entry, this should be its own entry (and the CHANGELOG is outdated, I believe)