Skip to content

refactor: migrate ingestion helper to fastapi, modernize packaging, and add request logging#526

Open
dwnoble wants to merge 4 commits into
datacommonsorg:masterfrom
dwnoble:ingestion-helper-fastapi
Open

refactor: migrate ingestion helper to fastapi, modernize packaging, and add request logging#526
dwnoble wants to merge 4 commits into
datacommonsorg:masterfrom
dwnoble:ingestion-helper-fastapi

Conversation

@dwnoble
Copy link
Copy Markdown
Contributor

@dwnoble dwnoble commented Jun 7, 2026

Description

Migrates the legacy Cloud Function ingestion helper to a modular FastAPI service deployed on Cloud Run, modernizes packaging with uv, and centralizes environment configuration. Adds structured, correlated request logging.

Key Changes

1. API Architecture

  • Replaced the monolithic actionType dispatcher with FastAPI.
  • Organized endpoints into domain-specific routers under routes/ (/imports, /database, /embeddings, /aggregation, /cache).
  • Centralized request/response validation using Pydantic schemas.

2. Code Reorganization & Packaging

  • Split business logic into structured clients/ and utils/ packages.
  • Colocated unit tests next to target modules (e.g., clients/spanner_test.py).
  • Migrated to uv for dependency resolution and Hatchling for build-system packaging (dynamic versioning from __init__.py).
  • Centralized all environment lookups in config.py.

3. Observability

  • Implemented an ASGI logging middleware that generates a unique request_id for every request.
  • Heavy routes (e.g., /embeddings/ingest) decorated with @log_start log both START and END events; transactional routes log only on completion.

4 Documentation

  • Added /docs route to show API documentation

To view in GCP, start the cloud run proxy:

$ gcloud run services proxy ingestion-helper-service --project=datcom-ci --region=us-central1

Then visit http://127.0.0.1:8080/docs

3XHaUzwWKM9JLc7

Verification

Unit Tests

  • Unit tests pass (uv run pytest).

Staging Integration

  • Deployed custom-tagged container (dev-test-logging) to the staging environment (datcom-ci).
  • Executed end-to-end integration tests successfully; verified that Cloud Workflows successfully resolved, completed, and wrote to Spanner.

Coordinated Merge Required

This PR is paired with the helper service changes in datacommonsorg/datacommons#111. Both PRs must be merged at the same time to avoid breaking the ingestion pipeline and administrative database setup commands.

@codacy-production
Copy link
Copy Markdown

codacy-production Bot commented Jun 7, 2026

Not up to standards ⛔

🔴 Issues 4 high · 3 medium · 57 minor

Alerts:
⚠ 64 issues (≤ 0 issues of at least minor severity)

Results:
64 new issues

Category Results
UnusedCode 1 medium
Documentation 6 minor
ErrorProne 3 high
Security 1 high
CodeStyle 51 minor
Complexity 2 medium

View in Codacy

🟢 Metrics 99 complexity · 11 duplication

Metric Results
Complexity 99
Duplication 11

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the ingestion-helper service from a Google Cloud Functions framework to a structured FastAPI microservice on Google Cloud Run, updating GCP workflows to use dedicated REST endpoints instead of a single entry point. Key feedback includes optimizing resource usage by caching Spanner and Storage clients as singletons in dependencies.py, raising a 400 Bad Request instead of a 500 Internal Server Error for client-side validation failures in routes/imports.py, and utilizing time.perf_counter() instead of time.time() in utils/logging.py for more reliable duration measurements.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +20 to +41
def get_spanner_client() -> SpannerClient:
if not config.SPANNER_PROJECT_ID or not config.SPANNER_INSTANCE_ID or not config.SPANNER_DATABASE_ID:
raise HTTPException(
status_code=500,
detail="Spanner configuration is missing. Ensure SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, and SPANNER_DATABASE_ID are set."
)
return SpannerClient(
config.SPANNER_PROJECT_ID,
config.SPANNER_INSTANCE_ID,
config.SPANNER_DATABASE_ID,
graph_database_id=config.SPANNER_GRAPH_DATABASE_ID,
location=config.LOCATION,
model_id=config.EMBEDDING_MODEL_ID
)

def get_storage_client() -> StorageClient:
if not config.GCS_BUCKET_ID:
raise HTTPException(
status_code=500,
detail="GCS Bucket ID configuration is missing. Ensure GCS_BUCKET_ID is set."
)
return StorageClient(config.GCS_BUCKET_ID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Currently, get_spanner_client and get_storage_client instantiate a new SpannerClient and StorageClient on every single request. This is highly inefficient because SpannerClient initializes a new spanner.Client and Database object, which sets up new connection pools, session pools, and gRPC channels. This can quickly lead to session leaks, socket exhaustion, and high latency under load.

To optimize performance and resource utilization, cache these client instances as singletons so they are reused across requests.

_spanner_client = None
_storage_client = None

def get_spanner_client() -> SpannerClient:
    global _spanner_client
    if _spanner_client is None:
        if not config.SPANNER_PROJECT_ID or not config.SPANNER_INSTANCE_ID or not config.SPANNER_DATABASE_ID:
            raise HTTPException(
                status_code=500,
                detail="Spanner configuration is missing. Ensure SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, and SPANNER_DATABASE_ID are set."
            )
        _spanner_client = SpannerClient(
            config.SPANNER_PROJECT_ID,
            config.SPANNER_INSTANCE_ID,
            config.SPANNER_DATABASE_ID,
            graph_database_id=config.SPANNER_GRAPH_DATABASE_ID,
            location=config.LOCATION,
            model_id=config.EMBEDDING_MODEL_ID
        )
    return _spanner_client

def get_storage_client() -> StorageClient:
    global _storage_client
    if _storage_client is None:
        if not config.GCS_BUCKET_ID:
            raise HTTPException(
                status_code=500,
                detail="GCS Bucket ID configuration is missing. Ensure GCS_BUCKET_ID is set."
            )
        _storage_client = StorageClient(config.GCS_BUCKET_ID)
    return _storage_client

Comment thread pipeline/workflow/ingestion-helper/routes/imports.py Outdated
Comment thread pipeline/workflow/ingestion-helper/utils/logging.py Outdated
dwnoble and others added 2 commits June 7, 2026 17:10
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant