Skip to content

Commit 5528f61

Browse files
committed
Merge branch 'codex/docling-281' into development
2 parents 2e053ca + e23d4f1 commit 5528f61

2 files changed

Lines changed: 228 additions & 39 deletions

File tree

src/glossapi/scripts/openarchives_pdf_stage_pull.py

Lines changed: 170 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class TransferItem:
4545
"""
4646

4747
PDF_NAME_PATTERN = re.compile(r"([A-Za-z0-9._-]+\.pdf(?:\.[A-Za-z0-9_-]+)?)", re.IGNORECASE)
48+
FILENAME_KEYS = ("filename", "canonical_filename", "md_filename", "source_filename")
4849

4950

5051
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
@@ -56,6 +57,7 @@ def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
5657
p.add_argument("--work-root", required=True, help="Root directory for downloads, partials, logs, and state.")
5758
p.add_argument("--remote-host", default="debian@83.212.80.170")
5859
p.add_argument("--password-env", default="GREECE_BOX_PASSWORD", help="Environment variable containing the remote SSH password.")
60+
p.add_argument("--transport", choices=("sftp", "rsync"), default="sftp")
5961
p.add_argument("--max-attempts", type=int, default=20)
6062
p.add_argument("--connect-timeout", type=int, default=30)
6163
p.add_argument("--io-timeout", type=int, default=180)
@@ -323,6 +325,40 @@ def append_event(path: Path, payload: dict) -> None:
323325
handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
324326

325327

328+
def sshpass_env(password_env: str) -> dict[str, str]:
329+
env = os.environ.copy()
330+
secret = env.get(password_env)
331+
if not secret:
332+
raise SystemExit(f"Password env var '{password_env}' is not set.")
333+
env["SSHPASS"] = secret
334+
return env
335+
336+
337+
def ssh_transport_options(connect_timeout: int) -> list[str]:
338+
return [
339+
"-o",
340+
"BatchMode=no",
341+
"-o",
342+
"PreferredAuthentications=password",
343+
"-o",
344+
"PubkeyAuthentication=no",
345+
"-o",
346+
"KbdInteractiveAuthentication=yes",
347+
"-o",
348+
f"ConnectTimeout={int(connect_timeout)}",
349+
"-o",
350+
"ServerAliveInterval=15",
351+
"-o",
352+
"ServerAliveCountMax=3",
353+
"-o",
354+
"ConnectionAttempts=3",
355+
"-o",
356+
"StrictHostKeyChecking=no",
357+
"-o",
358+
"UserKnownHostsFile=/tmp/greece_box_known_hosts",
359+
]
360+
361+
326362
def canonicalize_pdf_name(raw: str) -> Optional[str]:
327363
text = os.path.basename(str(raw).strip())
328364
if not text:
@@ -337,6 +373,74 @@ def canonicalize_pdf_name(raw: str) -> Optional[str]:
337373
return None
338374

339375

376+
def _walk_json_strings(obj) -> Iterable[str]:
377+
if isinstance(obj, dict):
378+
for key, value in obj.items():
379+
if isinstance(key, str):
380+
yield key
381+
yield from _walk_json_strings(value)
382+
elif isinstance(obj, list):
383+
for item in obj:
384+
yield from _walk_json_strings(item)
385+
elif isinstance(obj, str):
386+
yield obj
387+
388+
389+
def _extract_priority_filenames_from_csv(path: Path) -> set[str]:
390+
results: set[str] = set()
391+
with path.open("r", encoding="utf-8", errors="ignore", newline="") as handle:
392+
reader = csv.DictReader(handle)
393+
fields = {field.strip() for field in (reader.fieldnames or []) if field}
394+
keyed = any(key in fields for key in FILENAME_KEYS)
395+
for row in reader:
396+
if keyed:
397+
for key in FILENAME_KEYS:
398+
value = row.get(key)
399+
if value:
400+
canonical = canonicalize_pdf_name(value)
401+
if canonical is not None:
402+
results.add(canonical)
403+
break
404+
else:
405+
for value in row.values():
406+
if not value:
407+
continue
408+
for match in PDF_NAME_PATTERN.findall(str(value)):
409+
canonical = canonicalize_pdf_name(match)
410+
if canonical is not None:
411+
results.add(canonical)
412+
return results
413+
414+
415+
def _extract_priority_filenames_from_json(path: Path) -> set[str]:
416+
results: set[str] = set()
417+
data = json.loads(path.read_text(encoding="utf-8", errors="ignore"))
418+
for text in _walk_json_strings(data):
419+
canonical = canonicalize_pdf_name(text)
420+
if canonical is not None:
421+
results.add(canonical)
422+
continue
423+
for match in PDF_NAME_PATTERN.findall(text):
424+
canonical = canonicalize_pdf_name(match)
425+
if canonical is not None:
426+
results.add(canonical)
427+
return results
428+
429+
430+
def _extract_priority_filenames_from_text(path: Path) -> set[str]:
431+
results: set[str] = set()
432+
text = path.read_text(encoding="utf-8", errors="ignore")
433+
for line in text.splitlines():
434+
canonical = canonicalize_pdf_name(line)
435+
if canonical is not None:
436+
results.add(canonical)
437+
for match in PDF_NAME_PATTERN.findall(text):
438+
canonical = canonicalize_pdf_name(match)
439+
if canonical is not None:
440+
results.add(canonical)
441+
return results
442+
443+
340444
def load_priority_filenames(priority_dir: Path) -> set[str]:
341445
results: set[str] = set()
342446
if not priority_dir.exists():
@@ -348,17 +452,60 @@ def load_priority_filenames(priority_dir: Path) -> set[str]:
348452
if direct is not None:
349453
results.add(direct)
350454
continue
455+
suffix = path.suffix.lower()
351456
try:
352-
text = path.read_text(encoding="utf-8", errors="ignore")
457+
if suffix == ".csv":
458+
results.update(_extract_priority_filenames_from_csv(path))
459+
elif suffix == ".json":
460+
results.update(_extract_priority_filenames_from_json(path))
461+
elif suffix in {".txt", ".list", ".lst", ".log"}:
462+
results.update(_extract_priority_filenames_from_text(path))
463+
else:
464+
continue
353465
except Exception:
354466
continue
355-
for match in PDF_NAME_PATTERN.findall(text):
356-
canonical = canonicalize_pdf_name(match)
357-
if canonical is not None:
358-
results.add(canonical)
359467
return results
360468

361469

470+
def rsync_one(
471+
*,
472+
remote_host: str,
473+
remote_path: str,
474+
temp_path: Path,
475+
password_env: str,
476+
connect_timeout: int,
477+
io_timeout: int,
478+
) -> subprocess.CompletedProcess[str]:
479+
ssh_cmd = (
480+
"ssh "
481+
"-o BatchMode=no "
482+
"-o PreferredAuthentications=password "
483+
"-o PubkeyAuthentication=no "
484+
"-o KbdInteractiveAuthentication=yes "
485+
f"-o ConnectTimeout={int(connect_timeout)} "
486+
"-o ServerAliveInterval=15 "
487+
"-o ServerAliveCountMax=3 "
488+
"-o ConnectionAttempts=3 "
489+
"-o StrictHostKeyChecking=no "
490+
"-o UserKnownHostsFile=/tmp/greece_box_known_hosts"
491+
)
492+
cmd = [
493+
"sshpass",
494+
"-e",
495+
"rsync",
496+
"-av",
497+
"--partial",
498+
"--append-verify",
499+
"--inplace",
500+
f"--timeout={int(io_timeout)}",
501+
"-e",
502+
ssh_cmd,
503+
f"{remote_host}:{remote_path}",
504+
str(temp_path),
505+
]
506+
return subprocess.run(cmd, capture_output=True, text=True, env=sshpass_env(password_env))
507+
508+
362509
def sftp_one(
363510
*,
364511
remote_host: str,
@@ -372,37 +519,13 @@ def sftp_one(
372519
"sshpass",
373520
"-e",
374521
"sftp",
375-
"-o",
376-
"BatchMode=no",
377-
"-o",
378-
"PreferredAuthentications=password",
379-
"-o",
380-
"PubkeyAuthentication=no",
381-
"-o",
382-
"KbdInteractiveAuthentication=yes",
383-
"-o",
384-
f"ConnectTimeout={int(connect_timeout)}",
385-
"-o",
386-
"ServerAliveInterval=15",
387-
"-o",
388-
"ServerAliveCountMax=3",
389-
"-o",
390-
"ConnectionAttempts=3",
391-
"-o",
392-
"StrictHostKeyChecking=no",
393-
"-o",
394-
"UserKnownHostsFile=/tmp/greece_box_known_hosts",
522+
*ssh_transport_options(connect_timeout),
395523
"-b",
396524
"-",
397525
remote_host,
398526
]
399-
env = os.environ.copy()
400-
secret = env.get(password_env)
401-
if not secret:
402-
raise SystemExit(f"Password env var '{password_env}' is not set.")
403-
env["SSHPASS"] = secret
404527
batch = f'reget "{remote_path}" "{temp_path}"\n'
405-
return subprocess.run(cmd, capture_output=True, text=True, env=env, input=batch)
528+
return subprocess.run(cmd, capture_output=True, text=True, env=sshpass_env(password_env), input=batch)
406529

407530

408531
def run(argv: Optional[Sequence[str]] = None) -> int:
@@ -496,6 +619,7 @@ def refresh_priorities() -> dict[str, int]:
496619
current_path,
497620
{
498621
"updated_at": utc_now(),
622+
"transport": str(args.transport),
499623
"canonical_filename": canonical,
500624
"remote_path": remote_path,
501625
"remote_size_bytes": remote_size,
@@ -509,6 +633,7 @@ def refresh_priorities() -> dict[str, int]:
509633
{
510634
"ts": utc_now(),
511635
"event": "start",
636+
"transport": str(args.transport),
512637
"canonical_filename": canonical,
513638
"remote_path": remote_path,
514639
"remote_size_bytes": remote_size,
@@ -517,14 +642,18 @@ def refresh_priorities() -> dict[str, int]:
517642
},
518643
)
519644

520-
result = sftp_one(
521-
remote_host=str(args.remote_host),
522-
remote_path=remote_path,
523-
temp_path=temp_path,
524-
password_env=str(args.password_env),
525-
connect_timeout=int(args.connect_timeout),
526-
io_timeout=int(args.io_timeout),
527-
)
645+
transfer_kwargs = {
646+
"remote_host": str(args.remote_host),
647+
"remote_path": remote_path,
648+
"temp_path": temp_path,
649+
"password_env": str(args.password_env),
650+
"connect_timeout": int(args.connect_timeout),
651+
"io_timeout": int(args.io_timeout),
652+
}
653+
if str(args.transport) == "rsync":
654+
result = rsync_one(**transfer_kwargs)
655+
else:
656+
result = sftp_one(**transfer_kwargs)
528657

529658
if result.returncode == 0 and temp_path.exists():
530659
actual_size = temp_path.stat().st_size
@@ -543,6 +672,7 @@ def refresh_priorities() -> dict[str, int]:
543672
{
544673
"ts": utc_now(),
545674
"event": "completed",
675+
"transport": str(args.transport),
546676
"canonical_filename": canonical,
547677
"size_bytes": actual_size,
548678
},
@@ -556,6 +686,7 @@ def refresh_priorities() -> dict[str, int]:
556686
{
557687
"ts": utc_now(),
558688
"event": "failed",
689+
"transport": str(args.transport),
559690
"canonical_filename": canonical,
560691
"return_code": int(result.returncode),
561692
"partial_size_bytes": actual_size,

tests/test_openarchives_pdf_stage_pull.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import subprocess
34
from pathlib import Path
45

56
from glossapi.scripts.openarchives_pdf_stage_pull import (
@@ -8,6 +9,7 @@
89
canonicalize_pdf_name,
910
load_priority_filenames,
1011
read_manifest,
12+
run,
1113
)
1214

1315

@@ -131,3 +133,59 @@ def test_transfer_state_priorities_are_selected_first(tmp_path: Path) -> None:
131133
assert counts["priority_total"] == 1
132134
assert counts["priority_pending"] == 1
133135
state.close()
136+
137+
138+
def test_load_priority_filenames_ignores_parquet_and_reads_csv_columns(tmp_path: Path) -> None:
139+
priority_dir = tmp_path / "priority"
140+
priority_dir.mkdir()
141+
(priority_dir / "unreachable_from_source_20260331.csv").write_text(
142+
"filename,source_unreachable_reason\n"
143+
"ZFV_051.pdf,connect_timeout\n"
144+
"ZGA_056.pdf,connect_timeout\n",
145+
encoding="utf-8",
146+
)
147+
(priority_dir / "unreachable_from_source_20260331.parquet").write_bytes(b"PAR1junkZXY_999.pdfjunk")
148+
149+
names = load_priority_filenames(priority_dir)
150+
151+
assert names == {"ZFV_051.pdf", "ZGA_056.pdf"}
152+
153+
154+
def test_run_uses_rsync_transport_when_requested(tmp_path: Path, monkeypatch) -> None:
155+
manifest = tmp_path / "manifest.tsv"
156+
_write_manifest(manifest)
157+
work_root = tmp_path / "work"
158+
seen: list[str] = []
159+
160+
def _fake_rsync_one(**kwargs):
161+
seen.append("rsync")
162+
Path(kwargs["temp_path"]).parent.mkdir(parents=True, exist_ok=True)
163+
Path(kwargs["temp_path"]).write_bytes(b"x" * 10)
164+
return subprocess.CompletedProcess(args=["rsync"], returncode=0, stdout="", stderr="")
165+
166+
def _fake_sftp_one(**kwargs):
167+
seen.append("sftp")
168+
return subprocess.CompletedProcess(args=["sftp"], returncode=1, stdout="", stderr="unexpected")
169+
170+
monkeypatch.setenv("GREECE_BOX_PASSWORD", "secret")
171+
monkeypatch.setattr("glossapi.scripts.openarchives_pdf_stage_pull.rsync_one", _fake_rsync_one)
172+
monkeypatch.setattr("glossapi.scripts.openarchives_pdf_stage_pull.sftp_one", _fake_sftp_one)
173+
174+
rc = run(
175+
[
176+
"--manifest",
177+
str(manifest),
178+
"--work-root",
179+
str(work_root),
180+
"--transport",
181+
"rsync",
182+
"--limit",
183+
"1",
184+
"--summary-interval-seconds",
185+
"0",
186+
]
187+
)
188+
189+
assert rc == 0
190+
assert seen == ["rsync"]
191+
assert (work_root / "downloads" / "AAA_456.pdf").exists()

0 commit comments

Comments
 (0)