feat(dashboard-api): supabase auth users sync background runner#2247
feat(dashboard-api): supabase auth users sync background runner#2247ben-fornefeld wants to merge 33 commits intomainfrom
Conversation
PR SummaryMedium Risk Overview Written by Cursor Bugbot for commit 638429a. This will update automatically on new commits. Configure here. |
packages/db/pkg/auth/sql_queries/supabase_auth_user_sync/get_auth_user.sql
Outdated
Show resolved
Hide resolved
| ); | ||
|
|
||
| CREATE INDEX auth_user_sync_queue_pending_idx | ||
| ON auth.user_sync_queue (id) |
There was a problem hiding this comment.
The partial index only covers dead_lettered_at IS NULL AND locked_at IS NULL, so every unlocked non-dead-lettered row satisfies it — including rows with a future next_attempt_at. The claim query then discards those rows in a post-index filter. Under sustained retry load (many rows waiting with next_attempt_at > now()), the index returns a large candidate set that must be filtered in-memory before the LIMIT is applied. Adding next_attempt_at as the leading column — ON auth.user_sync_queue (next_attempt_at, id) WHERE dead_lettered_at IS NULL AND locked_at IS NULL — lets PostgreSQL range-scan only rows that are actually ready to be claimed.
|
|
||
| r.l.Debug(ctx, "claimed queue batch", zap.Int("count", len(items))) | ||
|
|
||
| for _, item := range items { |
There was a problem hiding this comment.
Items in a batch are processed sequentially. Each item does 2–3 synchronous DB round-trips (GetAuthUser + UpsertPublicUser/DeletePublicUser + Ack/Retry), so batch throughput is proportional to batchSize × DB latency. With the default BatchSize=50 and typical Postgres latency, a full batch can take several seconds — longer than PollInterval=2s. If sustained write volume is expected, consider processing items in a small goroutine pool.
… while enqueuing for processing
…om/e2b-dev/infra into feature/supabase-users-sync-worker
…d recovery mechanisms - Updated the sync runner to use `RunWithRestart` for improved error recovery. - Introduced a new `UserSyncQueue` model to manage user synchronization tasks. - Added SQL migration for creating the `user_sync_queue` table with necessary triggers. - Implemented tests for the processor and supervisor to ensure robust handling of retries and panics. - Refactored existing queries to target the new `public.user_sync_queue` table.
packages/dashboard-api/internal/supabaseauthusersync/runner_test.go
Outdated
Show resolved
Hide resolved
…agement - Introduced `supabase_auth_user_sync_enabled` variable to control user synchronization. - Updated Nomad job configuration to include the new sync setting. - Added Google Secret Manager resources for managing the sync configuration securely. - Enhanced the dashboard API to utilize the new sync configuration in processing logic. - Refactored related components to improve error handling and logging for the sync process.
packages/dashboard-api/internal/supabaseauthusersync/runner_test.go
Outdated
Show resolved
Hide resolved
- Replaced the previous `Store` implementation with a new structure that integrates both authentication and main database queries. - Updated the `Runner` and `NewRunner` functions to accommodate the new database client structure. - Removed obsolete SQL queries and migration files related to the `user_sync_queue` table. - Enhanced the test suite to reflect changes in the runner's initialization and database interactions.
- Updated the `TestSupabaseAuthUserSyncRunner_EndToEnd` to apply necessary database migrations before running tests. - Refactored the `SetupDatabase` function to include a new method `ApplyMigrations` for better migration management.
Made-with: Cursor
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3f56979748
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| -- +goose Up | ||
| -- +goose StatementBegin | ||
| CREATE TABLE public.user_sync_queue ( |
There was a problem hiding this comment.
Ensure auth queue migration is executed by db-migrator
The queue/table trigger migration was added under packages/db/pkg/auth/migrations, but the deployed migrator workflow only runs ./migrations from packages/db/scripts/migrator.go (and the db Dockerfile only copies db/migrations). That means the standard prestart migration job will never create public.user_sync_queue or the new triggers, so enabling the sync worker will fail at runtime when it tries to claim from a non-existent table.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
we want to execute this manually + it should not be tied to main db migrations since authDB will be separate going forward
- Introduced a new process outcome `ready_to_ack` to streamline acknowledgment handling. - Refactored the `process` method to prepare for batch acknowledgment of processed items. - Added a new `AckBatch` method in the store to handle multiple acknowledgments efficiently. - Updated the `Runner` to process items in batches and finalize acknowledgments accordingly. - Removed obsolete SQL query for single item acknowledgment as part of the refactor. - Enhanced tests to cover new deletion logic and acknowledgment scenarios.
| -- name: GetAuthUserByID :one | ||
| SELECT id, email | ||
| FROM auth.users | ||
| WHERE id = sqlc.arg(user_id)::uuid; |
There was a problem hiding this comment.
🔴 The SQL query selects email from auth.users without null handling, and the generated AuthUser.Email field is a non-nullable string; Supabase supports phone-only and anonymous auth where email is NULL, so pgx returns a scan error (not pgx.ErrNoRows) for those users. In reconcile(), that error triggers the generic retry path, causing indefinite retries until the item is dead-lettered and the user is never synced to public.users. Fix by changing the query to COALESCE(email, '') or by mapping Email to *string and skipping the upsert when absent.
Extended reasoning...
What the bug is and how it manifests
The file packages/db/pkg/auth/sql_queries/supabase_auth_user_sync/get_auth_user.sql (lines 1-4) contains:
-- name: GetAuthUserByID :one
SELECT id, email
FROM auth.users
WHERE id = sqlc.arg(user_id)::uuid;The generated code in get_auth_user.sql.go scans the result into AuthUser, whose Email field is declared as a non-nullable string in models.go. In pgx v5, scanning a SQL NULL into a plain Go string returns a scan error — it does not return pgx.ErrNoRows.
The specific code path that triggers it
In processor.go, reconcile() calls p.store.GetAuthUser(ctx, item.UserID). It checks errors.Is(err, pgx.ErrNoRows) to decide whether to delete the public user. A NULL-email scan error is not pgx.ErrNoRows, so it falls into the generic if err != nil { return "", fmt.Errorf(...) } branch, which propagates back to process(). Since the item is not yet at maxAttempts, process() calls store.Retry(...), scheduling the item for another attempt with exponential backoff.
Why existing code does not prevent it
The reconcile() function only has two exit paths for errors from GetAuthUser: the pgx.ErrNoRows special case (leading to a delete) and a generic retry path. There is no handling for scan errors caused by NULL columns, and no defensive COALESCE or nullable type in the generated SQL/struct.
Impact
Any Supabase deployment using phone-only or anonymous authentication will have users with auth.users.email = NULL. Every such user that enters the sync queue will be retried up to MaxAttempts (default 20) times with growing backoffs and then dead-lettered. Those users are permanently excluded from public.users, causing any feature that relies on public.users to silently omit them.
How to fix it
The simplest fix is to add COALESCE in the SQL query:
SELECT id, COALESCE(email, '') AS email
FROM auth.users
WHERE id = sqlc.arg(user_id)::uuid;Alternatively, change the sqlc config so email maps to *string, update AuthUser.Email to *string, and skip the upsert (or pass an empty string) when Email is nil.
Step-by-step proof
- A Supabase phone-only user signs up. The
auth.usersrow hasid = <uuid>,email = NULL. - The
sync_insert_auth_users_to_public_users_triggerfires, inserting(user_id, 'upsert')intopublic.user_sync_queue. - The worker calls
ClaimBatch, picks up the item, incrementsattempt_countto 1. processor.process()->processOnce()->reconcile()->store.GetAuthUser(ctx, userID).GetAuthUserByIDexecutesSELECT id, email FROM auth.users WHERE id = $1. The row exists so pgx does not returnpgx.ErrNoRows. Insteadrow.Scan(&i.ID, &i.Email)attempts to scan NULL intostringand returns a pgx scan error.- Back in
reconcile(),errors.Is(err, pgx.ErrNoRows)isfalse. The error is returned asfmt.Errorf("get auth.users %s: %w", ...). process()seesitem.AttemptCount (1) < maxAttempts (20), so it callsstore.Retry(...)with a 5s backoff.- Steps 3-7 repeat until
attempt_count >= 20, at which pointDeadLetteris called. - The user is now dead-lettered and will never be synced to
public.users.
There was a problem hiding this comment.
fine, we require emails for auth
| %{ for key, val in env } | ||
| ${ key } = "${ val }" | ||
| %{ endfor } |
There was a problem hiding this comment.
🟡 The %{ for key, val in env } loop in dashboard-api.hcl renders env var values directly inside double-quoted HCL strings without escaping. A value containing a double-quote produces syntactically invalid HCL (Nomad job submission fails), and a value containing ${ causes Terraform templatefile() to attempt expression evaluation at render time (plan-time error). DASHBOARD_API_ENV_VARS is documented as accepting arbitrary operator-supplied values, making these characters plausible; fix by escaping double-quotes in values or passing extra env vars via a Nomad template stanza instead.
Extended reasoning...
What the bug is and how it manifests
In iac/modules/job-dashboard-api/jobs/dashboard-api.hcl lines 86-88, a Terraform templatefile() directive renders arbitrary env var values directly inside double-quoted HCL strings:
%{ for key, val in env }
${ key } = "${ val }"
%{ endfor }
Two distinct injection vectors exist. First, if val contains a double-quote character, the rendered output becomes syntactically invalid HCL (e.g. MY_VAR = "conn"string" breaks the HCL parser), causing Nomad to reject the job spec at submit time. Second, if val contains ${, Terraform's templatefile() function interprets it as an expression interpolation during template rendering and produces a plan-time error before the job is ever submitted.
The specific code path that triggers it
The value flows through: DASHBOARD_API_ENV_VARS env var -> parsed as map(string) in iac/provider-gcp/variables.tf -> passed to module.nomad as dashboard_api_env_vars -> passed to module.dashboard_api as env -> filtered in local.env in main.tf (removes null/empty only) -> passed to templatefile() -> rendered at line 87 with no escaping.
Why existing code does not prevent it
The locals block in main.tf only filters out null/empty values (if value != null && value != ""); it does not escape or sanitize values. Terraform's templatefile() does not automatically escape template variable contents when they are interpolated. The HCL string delimiters are hard-coded double quotes with no escaping mechanism.
What the impact would be
For a double-quote injection (e.g. a PostgreSQL DSN like postgresql://user:p"ass@host/db), the Nomad job submission fails at terraform apply time with an HCL parse error -- the deployment is blocked. For a ${ injection (e.g. a JWT secret or template string containing ${), Terraform fails during plan or apply with an evaluation error. Neither failure is silent, but both are surprising and difficult to debug without knowing the root cause.
How to fix it
The safest fix is to use replace(val, "\"", "\\\"") to escape double-quotes within the current templatefile() approach, or pass extra env vars through a Nomad template stanza which has proper escaping semantics. The ${ vector can be addressed by replacing ${ with $${ (Terraform's escape sequence for a literal ${).
Step-by-step proof
- Operator sets
DASHBOARD_API_ENV_VARS='{"DB_URL":"postgresql://user:p\"ass@host/db"}'in their env file. - Terraform receives
var.dashboard_api_env_vars = {DB_URL = "postgresql://user:p\"ass@host/db"}. local.envpasses it through unchanged (non-null, non-empty).templatefile()renders line 87 as:DB_URL = "postgresql://user:p"ass@host/db"-- the embedded"terminates the HCL string prematurely.- Nomad's job spec parser receives invalid HCL and rejects the job with a parse error.
- Alternatively, for a value containing
${SOME_VAR}:templatefile()evaluates${SOME_VAR}as a Terraform expression, returning an undefined variable error duringterraform apply.
Note: the identical ${ key } = "${ value }" pattern exists in job-api and job-orchestrator for their job_env_vars, but those maps contain hardcoded infrastructure constants (VOLUME_TOKEN_ISSUER, etc.) unlikely to contain special characters. The new DASHBOARD_API_ENV_VARS is explicitly documented as a freeform map for arbitrary operator-supplied values, removing that safety assumption.
…om/e2b-dev/infra into feature/supabase-users-sync-worker
dobrac
left a comment
There was a problem hiding this comment.
Looks so much simpler with the library!
| metric.WithUnit("{job}"), | ||
| ) | ||
| if err != nil { | ||
| l.Warn(context.Background(), "failed to initialize auth user sync metric", zap.Error(err)) |
There was a problem hiding this comment.
lets pass the context as a parameter
| -- +goose Down | ||
| -- +goose StatementBegin | ||
|
|
||
| /* We don't want to drop the schema, as it is used by other services. */ |
There was a problem hiding this comment.
why not drop the schema here if the dashboard-api is the source of truth?
There was a problem hiding this comment.
belt has a migration that creates this schema. we don't want to delete that accidentally but i guess we can add the down migration here and just execute the migration manually
There was a problem hiding this comment.
what are these files generated from?
There was a problem hiding this comment.
good catch. these were artifacts that the generator did not clean up manually
packages/dashboard-api/Makefile
Outdated
| make build | ||
| NODE_ID=$(HOSTNAME) ./bin/dashboard-api | ||
| @EXTRA_ENV=$(DASHBOARD_API_EXTRA_ENV); \ | ||
| eval "env NODE_ID=$(HOSTNAME) $$EXTRA_ENV ./bin/dashboard-api" |
There was a problem hiding this comment.
is the usage of eval safe (secure) here? Can we avoid it?
There was a problem hiding this comment.
i think it was fine but i cleaned it up to not rely on eval
packages/dashboard-api/Makefile
Outdated
| HOSTNAME := $(shell hostname 2> /dev/null || hostnamectl hostname 2> /dev/null) | ||
| $(if $(HOSTNAME),,$(error Failed to determine hostname: both 'hostname' and 'hostnamectl' failed)) | ||
|
|
||
| define DASHBOARD_API_EXTRA_ENV |
There was a problem hiding this comment.
I think it might have been simpler to expose the env variable directly rather than having it nested under extra env
There was a problem hiding this comment.
i had a conversation with @djeebus on this but, i think this way it's much simpler adding environment variables to our dev setups, without the need to put it inside secrets or hook up a new makefile -> terraform -> nomad code path
| if config.SupabaseAuthUserSyncEnabled { | ||
| workerLogger := l.With(zap.String("worker", "supabase_auth_user_sync")) | ||
|
|
||
| authPool := authDB.WritePool() |
There was a problem hiding this comment.
the authDB is not Supabase DB, but a preparation for a future split of public.users, public.teams, etc to its own database
There was a problem hiding this comment.
is this about the naming of the config?
…riable handling - Updated the AuthUserSyncWorker to utilize OpenTelemetry metrics for better monitoring. - Refactored the Makefile to improve environment variable exportation and streamline build/run commands. - Removed outdated SQL query files related to user sync queue operations to clean up the codebase. - Adjusted the database migration script to drop the auth_custom schema if it exists, ensuring a cleaner migration process.
packages/db/pkg/auth/migrations/20260401000003_river_auth_user_sync_triggers.sql
Show resolved
Hide resolved
packages/db/pkg/auth/migrations/20260401000003_river_auth_user_sync_triggers.sql
Outdated
Show resolved
Hide resolved
Use AUTH_USER_SYNC_BACKGROUND_WORKER_ENABLED instead of SUPABASE_AUTH_USER_SYNC_ENABLED to match the actual config field in packages/dashboard-api/internal/cfg/model.go Co-authored-by: Ben Fornefeld <ben-fornefeld@users.noreply.github.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
packages/db/pkg/auth/migrations/20260401000003_river_auth_user_sync_triggers.sql
Outdated
Show resolved
Hide resolved
…n auth_custom schema - Changed REVOKE statement to specify INSERT permission on river_job. - Added REVOKE for USAGE and SELECT on all sequences in the auth_custom schema for trigger_user.
…om/e2b-dev/infra into feature/supabase-users-sync-worker
| %{ for key in sort(keys(env)) ~} | ||
| ${key} = "${env[key]}" | ||
| %{ endfor ~} |
There was a problem hiding this comment.
You don't need to sort it manually; terraform does it for you:
| %{ for key in sort(keys(env)) ~} | |
| ${key} = "${env[key]}" | |
| %{ endfor ~} | |
| %{ for key, value in env ~} | |
| ${key} = "${value}" | |
| %{ endfor ~} |
| conflicting_extra_env_keys = sort(tolist(setintersection( | ||
| toset(keys(local.base_env)), | ||
| toset(keys(local.extra_env)), | ||
| ))) |
There was a problem hiding this comment.
This is kind of an anti-pattern. If someone sets the values above, it's b/c they wanted to override it. We shouldn't forbid it.
| extra_env = { | ||
| for key, value in var.extra_env : key => value | ||
| if value != null && trimspace(value) != "" | ||
| } |
There was a problem hiding this comment.
Why? Empty strings are valid env vars, and could be distinct from populated values.
| // must match the schema used in packages/db/pkg/auth/migrations for River tables and triggers | ||
| AuthCustomSchema = "auth_custom" | ||
|
|
||
| // must match the queue value in packages/db/pkg/auth/migrations trigger SQL inserts | ||
| AuthUserProjectionQueue = "auth_user_projection" | ||
|
|
||
| // must match the kind value in packages/db/pkg/auth/migrations trigger SQL inserts | ||
| AuthUserProjectionKind = "auth_user_projection" | ||
|
|
||
| AuthUserProjectionMaxWorkers = 10 |
There was a problem hiding this comment.
Let's unexport these. The constants don't add value to the one log message they're attached to. If we really want to log them out, add something like "client configured with " in the NewRiverClient function. Keeping them private gives us more flexibility wrt refactors in the future.
|
|
||
| authMigrationsDir := "packages/db/pkg/auth/migrations" | ||
|
|
||
| db.ApplyMigrationsUpTo(t, 20260401000001, authMigrationsDir) |
There was a problem hiding this comment.
To ensure that future tests function correctly, I'd migrate to latest every time.
| runUpsertProjection(t, db) | ||
| runDeleteProjection(t, db) | ||
| runBurstBacklog(t, db) |
There was a problem hiding this comment.
This test scares me in general. I'd much rather have a 200 line test that makes clear setup steps, action steps, and verification steps (even if they're interleaved). It's really unclear what this test does, and why it does it.
| RedisClusterURL string `env:"REDIS_CLUSTER_URL"` | ||
| RedisTLSCABase64 string `env:"REDIS_TLS_CA_BASE64"` | ||
|
|
||
| AuthUserSyncBackgroundWorkerEnabled bool `env:"AUTH_USER_SYNC_BACKGROUND_WORKER_ENABLED" envDefault:"false"` |
There was a problem hiding this comment.
We should probably avoid the yoda-speak:
| AuthUserSyncBackgroundWorkerEnabled bool `env:"AUTH_USER_SYNC_BACKGROUND_WORKER_ENABLED" envDefault:"false"` | |
| EnableAuthUserSyncBackgroundWorker bool `env:"ENABLE_AUTH_USER_SYNC_BACKGROUND_WORKER" envDefault:"false"` |
| userID, err := uuid.Parse(job.Args.UserID) | ||
| if err != nil { | ||
| telemetry.ReportError(ctx, "auth user sync parse user_id", err, attrs...) | ||
| w.observeJob(ctx, job.Args.Operation, "cancelled") |
There was a problem hiding this comment.
maybe "validation-failed" or "invalid arguments" instead?
| if job.Args.Email == "" { | ||
| err := fmt.Errorf("missing email in job args for user %s", userID) | ||
| telemetry.ReportError(ctx, "auth user sync missing email", err, attrs...) | ||
| w.observeJob(ctx, job.Args.Operation, "cancelled") |
| default: | ||
| err := fmt.Errorf("unknown operation %q for user %s", job.Args.Operation, userID) | ||
| telemetry.ReportError(ctx, "auth user sync unknown operation", err, attrs...) | ||
| w.observeJob(ctx, job.Args.Operation, "cancelled") |
| workerLogger := l.With(zap.String("worker", backgroundworker.AuthUserProjectionKind)) | ||
| workerMeter := tel.MeterProvider.Meter("github.com/e2b-dev/infra/packages/dashboard-api") | ||
|
|
||
| authPool := authDB.WritePool() | ||
| if err := backgroundworker.RunRiverMigrations(ctx, authPool); err != nil { | ||
| l.Fatal(ctx, "failed to run River migrations on auth DB", zap.Error(err)) | ||
| } | ||
|
|
||
| workers := river.NewWorkers() | ||
| river.AddWorker(workers, backgroundworker.NewAuthUserSyncWorker(ctx, db, workerMeter, workerLogger)) | ||
|
|
||
| riverClient, err = backgroundworker.NewRiverClient(authPool, workers) | ||
| if err != nil { | ||
| l.Fatal(ctx, "failed to create River client", zap.Error(err)) | ||
| } | ||
|
|
||
| if err := riverClient.Start(signalCtx); err != nil { | ||
| l.Fatal(ctx, "failed to start River client", zap.Error(err)) | ||
| } | ||
|
|
||
| l.Info(ctx, "background worker started", zap.String("queue", backgroundworker.AuthUserProjectionQueue), zap.String("schema", backgroundworker.AuthCustomSchema)) |
There was a problem hiding this comment.
maybe cleaner to put this in it's own function: setupRiverWorker or something
| $(if $(HOSTNAME),,$(error Failed to determine hostname: both 'hostname' and 'hostnamectl' failed)) | ||
|
|
||
| define export_extra_env | ||
| export $$(printf '%s' "$$DASHBOARD_API_ENV_VARS" | jq -r '(if .=="" then empty elif type=="string" then (fromjson? // empty) else . end) | to_entries? // [] | map("\(.key)=\(.value|tostring)") | .[]' 2>/dev/null) 2>/dev/null; |
There was a problem hiding this comment.
Where are you defining these? You could just run ENV=a ENV=bb make run and it'll receive both those env vars
| func (db *Database) ApplyMigrationsUpTo(t *testing.T, version int64, migrationDirs ...string) { | ||
| t.Helper() | ||
|
|
||
| db.applyGooseMigrations(t, version, migrationDirs...) |
There was a problem hiding this comment.
I don't think we should ever be running tests on an old version of the database. These tests will be stale as soon as a new migration is created, then we'll be testing that new code works against old dbs, which servers no purpose.
djeebus
left a comment
There was a problem hiding this comment.
Bunch of comments; none are critical, just a lot of clean up.

No description provided.