Skip to content

Commit dbea9d1

Browse files
committed
Harden full-pipeline export retries
1 parent a8d2b93 commit dbea9d1

3 files changed

Lines changed: 115 additions & 5 deletions

File tree

docs/operations/deepseek_gcp_a100_setup.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,24 @@ After correcting those bootstrap defects, the same fresh node was able to:
105105
- initialize a direct one-GPU `LLM(...)`
106106
- start a real `openarchives_ocr_run_node` workload with `runtime_backend=vllm`
107107

108+
The same node was also used for a real `10`-PDF `extract -> clean -> ocr`
109+
checkpoint:
110+
111+
- the stable end-to-end shape on that node was:
112+
- multi-GPU extraction
113+
- `workers_per_device=1`
114+
- multi-GPU DeepSeek OCR with `workers_per_gpu=1`
115+
- an isolated extraction benchmark with `workers_per_device=2` was faster on the
116+
same sample, but the first full-pipeline replay hit a Docling allocator crash:
117+
- `malloc_consolidate(): unaligned fastbin chunk detected`
118+
- treat `workers_per_device=2` as benchmark-only / experimental until it is
119+
proven stable in the full Corpus pipeline, not just in extract-only tests
120+
121+
The full-pipeline checkpoint harness also now retries the JSONL export when OCR
122+
has already filled text into parquet rows but the first export pass still emits
123+
zero records. This guards the observed end-of-run export race on the benchmark
124+
node without changing the OCR output contract itself.
125+
108126
## Current runner expectation
109127

110128
`glossapi.ocr.deepseek.runner._build_env()` now auto-discovers

src/glossapi/scripts/full_pipeline_checkpoint.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,37 @@ def _count_jsonl_records(path: Path) -> int:
106106
return sum(1 for line in fp if line.strip())
107107

108108

109+
def _export_jsonl_with_retry(
110+
corpus: Corpus,
111+
*,
112+
export_path: Path,
113+
metadata_path: Path,
114+
text_key: str,
115+
metadata_key: str,
116+
post_ocr_counts: Dict[str, int],
117+
max_attempts: int = 4,
118+
retry_delay_sec: float = 1.0,
119+
) -> int:
120+
needs_retry = int(post_ocr_counts.get("text_nonempty", 0) or 0) > 0
121+
attempts = max_attempts if needs_retry else 1
122+
123+
for attempt in range(attempts):
124+
if export_path.exists():
125+
export_path.unlink()
126+
corpus.jsonl(
127+
export_path,
128+
text_key=text_key,
129+
metadata_key=metadata_key,
130+
include_remaining_metadata=False,
131+
metadata_path=metadata_path,
132+
)
133+
export_records = _count_jsonl_records(export_path)
134+
if export_records > 0 or attempt == attempts - 1:
135+
return export_records
136+
time.sleep(retry_delay_sec)
137+
return 0
138+
139+
109140
def main(argv: Optional[List[str]] = None) -> int:
110141
args = _parse_args(argv)
111142
_apply_cli_tuning_overrides(args)
@@ -178,15 +209,15 @@ def main(argv: Optional[List[str]] = None) -> int:
178209
post_ocr_counts = _read_metadata_counts(metadata_path)
179210

180211
export_start = time.perf_counter()
181-
corpus.jsonl(
182-
export_path,
212+
export_records = _export_jsonl_with_retry(
213+
corpus,
214+
export_path=export_path,
215+
metadata_path=metadata_path,
183216
text_key=str(args.text_key),
184217
metadata_key=str(args.metadata_key),
185-
include_remaining_metadata=False,
186-
metadata_path=metadata_path,
218+
post_ocr_counts=post_ocr_counts,
187219
)
188220
export_elapsed = float(time.perf_counter() - export_start)
189-
export_records = _count_jsonl_records(export_path)
190221

191222
finished_at = time.time()
192223
report: Dict[str, Any] = {

tests/test_full_pipeline_checkpoint.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,64 @@ def jsonl(self, output_path, **kwargs):
208208
assert rc == 0
209209
assert captured["repair_exec_batch_target_pages"] == 64
210210
assert captured["repair_exec_batch_target_items"] == 24
211+
212+
213+
def test_full_pipeline_checkpoint_retries_empty_export_when_ocr_text_exists(tmp_path, monkeypatch):
214+
calls = {"jsonl": 0}
215+
216+
class DummyCorpus:
217+
def __init__(self, input_dir, output_dir):
218+
self.input_dir = input_dir
219+
self.output_dir = output_dir
220+
221+
def _metadata_path(self):
222+
path = self.output_dir / "download_results" / "download_results.parquet"
223+
path.parent.mkdir(parents=True, exist_ok=True)
224+
return path
225+
226+
def extract(self, **kwargs):
227+
pd.DataFrame(
228+
[{"filename": "doc.pdf", "needs_ocr": True, "ocr_success": False, "text": ""}]
229+
).to_parquet(self._metadata_path(), index=False)
230+
231+
def clean(self, **kwargs):
232+
return None
233+
234+
def ocr(self, **kwargs):
235+
pd.DataFrame(
236+
[{"filename": "doc.pdf", "needs_ocr": False, "ocr_success": True, "text": "fixed text"}]
237+
).to_parquet(self._metadata_path(), index=False)
238+
239+
def jsonl(self, output_path, **kwargs):
240+
calls["jsonl"] += 1
241+
if calls["jsonl"] == 1:
242+
output_path.write_text("", encoding="utf-8")
243+
return
244+
output_path.write_text(json.dumps({"text": "fixed text"}) + "\n", encoding="utf-8")
245+
246+
monkeypatch.setattr(checkpoint, "Corpus", DummyCorpus)
247+
248+
input_dir = tmp_path / "in"
249+
input_dir.mkdir()
250+
output_dir = tmp_path / "out"
251+
export_path = tmp_path / "export.jsonl"
252+
report_path = tmp_path / "report.json"
253+
254+
rc = checkpoint.main(
255+
[
256+
"--input-dir",
257+
str(input_dir),
258+
"--output-dir",
259+
str(output_dir),
260+
"--export-path",
261+
str(export_path),
262+
"--report-path",
263+
str(report_path),
264+
]
265+
)
266+
267+
assert rc == 0
268+
assert calls["jsonl"] == 2
269+
report = json.loads(report_path.read_text(encoding="utf-8"))
270+
assert report["post_ocr_counts"]["text_nonempty"] == 1
271+
assert report["export_records"] == 1

0 commit comments

Comments
 (0)