refactor: migrate ingestion helper to fastapi, modernize packaging, and add request logging#526
refactor: migrate ingestion helper to fastapi, modernize packaging, and add request logging#526dwnoble wants to merge 4 commits into
Conversation
…nd add request logging
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| UnusedCode | 1 medium |
| Documentation | 6 minor |
| ErrorProne | 3 high |
| Security | 1 high |
| CodeStyle | 51 minor |
| Complexity | 2 medium |
🟢 Metrics 99 complexity · 11 duplication
Metric Results Complexity 99 Duplication 11
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request refactors the ingestion-helper service from a Google Cloud Functions framework to a structured FastAPI microservice on Google Cloud Run, updating GCP workflows to use dedicated REST endpoints instead of a single entry point. Key feedback includes optimizing resource usage by caching Spanner and Storage clients as singletons in dependencies.py, raising a 400 Bad Request instead of a 500 Internal Server Error for client-side validation failures in routes/imports.py, and utilizing time.perf_counter() instead of time.time() in utils/logging.py for more reliable duration measurements.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def get_spanner_client() -> SpannerClient: | ||
| if not config.SPANNER_PROJECT_ID or not config.SPANNER_INSTANCE_ID or not config.SPANNER_DATABASE_ID: | ||
| raise HTTPException( | ||
| status_code=500, | ||
| detail="Spanner configuration is missing. Ensure SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, and SPANNER_DATABASE_ID are set." | ||
| ) | ||
| return SpannerClient( | ||
| config.SPANNER_PROJECT_ID, | ||
| config.SPANNER_INSTANCE_ID, | ||
| config.SPANNER_DATABASE_ID, | ||
| graph_database_id=config.SPANNER_GRAPH_DATABASE_ID, | ||
| location=config.LOCATION, | ||
| model_id=config.EMBEDDING_MODEL_ID | ||
| ) | ||
|
|
||
| def get_storage_client() -> StorageClient: | ||
| if not config.GCS_BUCKET_ID: | ||
| raise HTTPException( | ||
| status_code=500, | ||
| detail="GCS Bucket ID configuration is missing. Ensure GCS_BUCKET_ID is set." | ||
| ) | ||
| return StorageClient(config.GCS_BUCKET_ID) |
There was a problem hiding this comment.
Currently, get_spanner_client and get_storage_client instantiate a new SpannerClient and StorageClient on every single request. This is highly inefficient because SpannerClient initializes a new spanner.Client and Database object, which sets up new connection pools, session pools, and gRPC channels. This can quickly lead to session leaks, socket exhaustion, and high latency under load.
To optimize performance and resource utilization, cache these client instances as singletons so they are reused across requests.
_spanner_client = None
_storage_client = None
def get_spanner_client() -> SpannerClient:
global _spanner_client
if _spanner_client is None:
if not config.SPANNER_PROJECT_ID or not config.SPANNER_INSTANCE_ID or not config.SPANNER_DATABASE_ID:
raise HTTPException(
status_code=500,
detail="Spanner configuration is missing. Ensure SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, and SPANNER_DATABASE_ID are set."
)
_spanner_client = SpannerClient(
config.SPANNER_PROJECT_ID,
config.SPANNER_INSTANCE_ID,
config.SPANNER_DATABASE_ID,
graph_database_id=config.SPANNER_GRAPH_DATABASE_ID,
location=config.LOCATION,
model_id=config.EMBEDDING_MODEL_ID
)
return _spanner_client
def get_storage_client() -> StorageClient:
global _storage_client
if _storage_client is None:
if not config.GCS_BUCKET_ID:
raise HTTPException(
status_code=500,
detail="GCS Bucket ID configuration is missing. Ensure GCS_BUCKET_ID is set."
)
_storage_client = StorageClient(config.GCS_BUCKET_ID)
return _storage_clientCo-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Description
Migrates the legacy Cloud Function ingestion helper to a modular FastAPI service deployed on Cloud Run, modernizes packaging with
uv, and centralizes environment configuration. Adds structured, correlated request logging.Key Changes
1. API Architecture
actionTypedispatcher with FastAPI.routes/(/imports,/database,/embeddings,/aggregation,/cache).2. Code Reorganization & Packaging
clients/andutils/packages.clients/spanner_test.py).uvfor dependency resolution and Hatchling for build-system packaging (dynamic versioning from__init__.py).config.py.3. Observability
request_idfor every request./embeddings/ingest) decorated with@log_startlog bothSTARTandENDevents; transactional routes log only on completion.4 Documentation
/docsroute to show API documentationTo view in GCP, start the cloud run proxy:
Then visit
http://127.0.0.1:8080/docsVerification
Unit Tests
uv run pytest).Staging Integration
dev-test-logging) to the staging environment (datcom-ci).Coordinated Merge Required
This PR is paired with the helper service changes in datacommonsorg/datacommons#111. Both PRs must be merged at the same time to avoid breaking the ingestion pipeline and administrative database setup commands.