Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c50d248
feat: supabase auth users sync background runner in dashboard api
ben-fornefeld Mar 28, 2026
68a6265
chore: auto-commit generated changes
github-actions[bot] Mar 28, 2026
9d8fe63
feat(db): enhance auth user sync triggers to retain direct operations…
ben-fornefeld Mar 30, 2026
1e5f28b
Merge branch 'feature/supabase-users-sync-worker' of https://github.c…
ben-fornefeld Mar 30, 2026
56154df
feat(sync): implement user sync queue with enhanced error handling an…
ben-fornefeld Mar 30, 2026
3a431aa
feat(sync): add supabase auth user sync configuration and secrets man…
ben-fornefeld Mar 31, 2026
eb69678
chore: remove smoke test
ben-fornefeld Mar 31, 2026
bb9fa39
add: e2e runner test
ben-fornefeld Mar 31, 2026
461d2ec
chore: change dashboard-api env variable management
ben-fornefeld Mar 31, 2026
ce0cbd1
refactor(sync): update user sync logic to utilize new database structure
ben-fornefeld Mar 31, 2026
4d622ce
fix: lint
ben-fornefeld Mar 31, 2026
84ecd06
test: apply database migrations in end-to-end test setup
ben-fornefeld Mar 31, 2026
3f56979
Merge origin/main into feature/supabase-users-sync-worker
ben-fornefeld Apr 1, 2026
386d0c6
feat(sync): enhance user sync processing and acknowledgment
ben-fornefeld Apr 1, 2026
3e3fa3c
refactor(gcp): remove auth_db_connection_string resources and update …
ben-fornefeld Apr 1, 2026
5e0972b
chore(dashboard-api): refactor environment variable management
ben-fornefeld Apr 1, 2026
6b5a5c8
refactor: use river for queue worker
ben-fornefeld Apr 1, 2026
1113654
chore: auto-commit generated changes
github-actions[bot] Apr 1, 2026
fb0afd4
chore: auto-commit generated changes
github-actions[bot] Apr 1, 2026
bd4dd80
chore: update pgx dependency to v5.9.1 and golang.org/x/mod to v0.34.…
ben-fornefeld Apr 1, 2026
97e305a
Merge branch 'feature/supabase-users-sync-worker' of https://github.c…
ben-fornefeld Apr 1, 2026
c98878f
chore: fix lint
ben-fornefeld Apr 2, 2026
e314b96
chore: auto-commit generated changes
github-actions[bot] Apr 2, 2026
db124a4
improve: retrying for river jobs
ben-fornefeld Apr 2, 2026
0e7f147
Merge branch 'feature/supabase-users-sync-worker' of https://github.c…
ben-fornefeld Apr 2, 2026
eed510b
refactor: enhance auth user sync worker and streamline environment va…
ben-fornefeld Apr 2, 2026
8226f67
chore: rename config and fix test
ben-fornefeld Apr 2, 2026
8485d7a
improve: config
ben-fornefeld Apr 2, 2026
37f842f
Merge origin/main into feature/supabase-users-sync-worker
ben-fornefeld Apr 2, 2026
c0b1102
chore: auto-commit generated changes
github-actions[bot] Apr 2, 2026
87ef8f0
fix: correct env var name in template to match Go config
cursoragent Apr 2, 2026
45a7ccb
fix: update permissions for trigger_user on river_job and sequences i…
ben-fornefeld Apr 3, 2026
638429a
Merge branch 'feature/supabase-users-sync-worker' of https://github.c…
ben-fornefeld Apr 3, 2026
07d4c25
refactor: update environment variable handling and improve auth user …
ben-fornefeld Apr 6, 2026
bd1ebb1
chore: auto-commit generated changes
github-actions[bot] Apr 6, 2026
126d984
fix: lint
ben-fornefeld Apr 6, 2026
d4ba4c2
Merge branch 'feature/supabase-users-sync-worker' of https://github.c…
ben-fornefeld Apr 6, 2026
2ac1f83
Merge main into feature/supabase-users-sync-worker
ben-fornefeld Apr 9, 2026
da28e6a
chore: auto-commit generated changes
github-actions[bot] Apr 9, 2026
1512ede
chore: auto-commit generated changes
github-actions[bot] Apr 9, 2026
d25d531
refactor(dashboard-api): align auth user sync worker with review feed…
ben-fornefeld Apr 9, 2026
fe217ea
chore: auto-commit generated changes
github-actions[bot] Apr 9, 2026
40a5cd0
refactor(db): split supabase auth sync source client
ben-fornefeld Apr 9, 2026
c25caba
chore(db): address follow-up nits
ben-fornefeld Apr 9, 2026
18f7839
fix(dashboard-api): stop river worker gracefully on shutdown
ben-fornefeld Apr 9, 2026
9667b77
refactor(dashboard-api): simplify service lifecycle orchestration
ben-fornefeld Apr 9, 2026
37eb463
chore: auto-commit generated changes
github-actions[bot] Apr 9, 2026
c5d82f0
refactor(db): drop supabase replica client plumbing
ben-fornefeld Apr 9, 2026
e4d5bff
fix: use write db for supabase db get user read
ben-fornefeld Apr 9, 2026
4862139
chore: fix lint
ben-fornefeld Apr 9, 2026
4ecc627
fix(dashboard-api): lazy-init supabase worker client
ben-fornefeld Apr 9, 2026
29bbf59
chore(dashboard-api): reduce supabase worker pool size
ben-fornefeld Apr 9, 2026
48a620c
test(dashboard-api): tighten auth user sync coverage
ben-fornefeld Apr 10, 2026
2eda1d4
chore: auto-commit generated changes
github-actions[bot] Apr 10, 2026
bb31094
feat(dashboard-api): add supabase db config override
ben-fornefeld Apr 10, 2026
2a2311a
Merge remote-tracking branch 'origin/main' into feature/supabase-user…
ben-fornefeld Apr 10, 2026
11d0f0f
refactor(dashboard-api): narrow infra sync runner changes
ben-fornefeld Apr 10, 2026
1dc74e1
chore: auto-commit generated changes
github-actions[bot] Apr 10, 2026
1359257
chore: auto-commit generated changes
github-actions[bot] Apr 10, 2026
15291ac
refactor(supabase): drop unused trigger role setup
ben-fornefeld Apr 10, 2026
2c86213
refactor(dashboard-api): use shared not-found helper
ben-fornefeld Apr 10, 2026
fbb8835
chore: remove env vars artifact
ben-fornefeld Apr 10, 2026
1838a3b
refactor(dashboard-api): tie river worker to signal context
ben-fornefeld Apr 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/dashboard-api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/jackc/pgx/v5 v5.7.5
github.com/oapi-codegen/gin-middleware v1.0.2
github.com/oapi-codegen/runtime v1.1.1
github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.1
)

Expand Down Expand Up @@ -117,7 +118,6 @@ require (
github.com/shirou/gopsutil/v4 v4.25.9 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/testify v1.11.1 // indirect
github.com/testcontainers/testcontainers-go v0.40.0 // indirect
github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0 // indirect
github.com/tklauser/go-sysconf v0.3.15 // indirect
Expand Down
8 changes: 8 additions & 0 deletions packages/dashboard-api/internal/cfg/model.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cfg

import (
"time"

"github.com/caarlos0/env/v11"
)

Expand All @@ -12,6 +14,12 @@ type Config struct {

AuthDBConnectionString string `env:"AUTH_DB_CONNECTION_STRING"`
AuthDBReadReplicaConnectionString string `env:"AUTH_DB_READ_REPLICA_CONNECTION_STRING"`

SupabaseAuthUserSyncEnabled bool `env:"SUPABASE_AUTH_USER_SYNC_ENABLED" envDefault:"false"`
SupabaseAuthUserSyncBatchSize int32 `env:"SUPABASE_AUTH_USER_SYNC_BATCH_SIZE" envDefault:"50"`
SupabaseAuthUserSyncPollInterval time.Duration `env:"SUPABASE_AUTH_USER_SYNC_POLL_INTERVAL" envDefault:"2s"`
SupabaseAuthUserSyncLockTimeout time.Duration `env:"SUPABASE_AUTH_USER_SYNC_LOCK_TIMEOUT" envDefault:"2m"`
SupabaseAuthUserSyncMaxAttempts int32 `env:"SUPABASE_AUTH_USER_SYNC_MAX_ATTEMPTS" envDefault:"20"`
}

func Parse() (Config, error) {
Expand Down
21 changes: 21 additions & 0 deletions packages/dashboard-api/internal/supabaseauthusersync/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package supabaseauthusersync

import "time"

type Config struct {
Enabled bool
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
BatchSize int32
PollInterval time.Duration
LockTimeout time.Duration
MaxAttempts int32
}

func DefaultConfig() Config {
return Config{
Enabled: false,
BatchSize: 50,
PollInterval: 2 * time.Second,
LockTimeout: 2 * time.Minute,
MaxAttempts: 20,
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
135 changes: 135 additions & 0 deletions packages/dashboard-api/internal/supabaseauthusersync/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package supabaseauthusersync

import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/shared/pkg/logger"
)

type processorStore interface {
Ack(ctx context.Context, id int64) error
Retry(ctx context.Context, id int64, backoff time.Duration, lastError string) error
DeadLetter(ctx context.Context, id int64, lastError string) error
GetAuthUser(ctx context.Context, userID uuid.UUID) (*AuthUser, error)
UpsertPublicUser(ctx context.Context, id uuid.UUID, email string) error
DeletePublicUser(ctx context.Context, id uuid.UUID) error
}

type Processor struct {
store processorStore
maxAttempts int32
l logger.Logger
}

func NewProcessor(store processorStore, maxAttempts int32, l logger.Logger) *Processor {
return &Processor{
store: store,
maxAttempts: maxAttempts,
l: l,
}
}

func (p *Processor) Process(ctx context.Context, item QueueItem) {
err := p.processOnce(ctx, item)

if err == nil {
if ackErr := p.store.Ack(ctx, item.ID); ackErr != nil {
p.l.Error(ctx, "failed to ack queue item",
zap.Int64("queue_item_id", item.ID),
zap.String("user_id", item.UserID.String()),
zap.Error(ackErr),
)
}

return
}

p.l.Warn(ctx, "failed to process queue item",
zap.Int64("queue_item_id", item.ID),
zap.String("user_id", item.UserID.String()),
zap.Int32("attempt", item.AttemptCount),
zap.Error(err),
)

if item.AttemptCount >= p.maxAttempts {
if dlErr := p.store.DeadLetter(ctx, item.ID, err.Error()); dlErr != nil {
p.l.Error(ctx, "failed to dead-letter queue item",
zap.Int64("queue_item_id", item.ID),
zap.Error(dlErr),
)
}

return
}

backoff := retryBackoff(item.AttemptCount)

if retryErr := p.store.Retry(ctx, item.ID, backoff, err.Error()); retryErr != nil {
p.l.Error(ctx, "failed to retry queue item",
zap.Int64("queue_item_id", item.ID),
zap.Error(retryErr),
)
}
}

func (p *Processor) processOnce(ctx context.Context, item QueueItem) (err error) {
defer func() {
if recovered := recover(); recovered != nil {
p.l.Error(ctx, "panic while processing queue item",
zap.Int64("queue_item_id", item.ID),
zap.String("user_id", item.UserID.String()),
zap.String("panic", fmt.Sprint(recovered)),
zap.String("stack", string(debug.Stack())),
)

err = fmt.Errorf("panic while processing queue item: %v", recovered)
}
}()

return p.reconcile(ctx, item)
}

func (p *Processor) reconcile(ctx context.Context, item QueueItem) error {
authUser, err := p.store.GetAuthUser(ctx, item.UserID)

if errors.Is(err, pgx.ErrNoRows) {
if delErr := p.store.DeletePublicUser(ctx, item.UserID); delErr != nil {
return fmt.Errorf("delete public.users %s: %w", item.UserID, delErr)
}

return nil
}

if err != nil {
return fmt.Errorf("get auth.users %s: %w", item.UserID, err)
}

if err = p.store.UpsertPublicUser(ctx, authUser.ID, authUser.Email); err != nil {
return fmt.Errorf("upsert public.users %s: %w", authUser.ID, err)
}

return nil
}

func retryBackoff(attempt int32) time.Duration {
switch {
case attempt <= 1:
return 5 * time.Second
case attempt <= 3:
return 30 * time.Second
case attempt <= 6:
return 2 * time.Minute
case attempt <= 10:
return 5 * time.Minute
default:
return 15 * time.Minute
}
}
109 changes: 109 additions & 0 deletions packages/dashboard-api/internal/supabaseauthusersync/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package supabaseauthusersync

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/e2b-dev/infra/packages/shared/pkg/logger"
)

type retryCall struct {
id int64
backoff time.Duration
lastError string
}

type deadLetterCall struct {
id int64
lastError string
}

type fakeProcessorStore struct {
getAuthUserFn func(context.Context, uuid.UUID) (*AuthUser, error)

ackCalls []int64
retryCalls []retryCall
deadLetterCalls []deadLetterCall
}

func (s *fakeProcessorStore) Ack(_ context.Context, id int64) error {
s.ackCalls = append(s.ackCalls, id)
return nil

Check failure on line 35 in packages/dashboard-api/internal/supabaseauthusersync/processor_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/dashboard-api)

return with no blank line before (nlreturn)
}

func (s *fakeProcessorStore) Retry(_ context.Context, id int64, backoff time.Duration, lastError string) error {
s.retryCalls = append(s.retryCalls, retryCall{
id: id,
backoff: backoff,
lastError: lastError,
})
return nil

Check failure on line 44 in packages/dashboard-api/internal/supabaseauthusersync/processor_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/dashboard-api)

return with no blank line before (nlreturn)
}

func (s *fakeProcessorStore) DeadLetter(_ context.Context, id int64, lastError string) error {
s.deadLetterCalls = append(s.deadLetterCalls, deadLetterCall{
id: id,
lastError: lastError,
})
return nil

Check failure on line 52 in packages/dashboard-api/internal/supabaseauthusersync/processor_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/dashboard-api)

return with no blank line before (nlreturn)
}

func (s *fakeProcessorStore) GetAuthUser(ctx context.Context, userID uuid.UUID) (*AuthUser, error) {
return s.getAuthUserFn(ctx, userID)
}

func (s *fakeProcessorStore) UpsertPublicUser(_ context.Context, _ uuid.UUID, _ string) error {
return nil
}

func (s *fakeProcessorStore) DeletePublicUser(_ context.Context, _ uuid.UUID) error {
return nil
}

func TestProcessorProcessRetriesRecoveredPanic(t *testing.T) {

Check failure on line 67 in packages/dashboard-api/internal/supabaseauthusersync/processor_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/dashboard-api)

Function TestProcessorProcessRetriesRecoveredPanic missing the call to method parallel (paralleltest)
store := &fakeProcessorStore{
getAuthUserFn: func(context.Context, uuid.UUID) (*AuthUser, error) {
panic("boom")
},
}
processor := NewProcessor(store, 3, logger.NewNopLogger())
item := QueueItem{
ID: 1,
UserID: uuid.New(),
AttemptCount: 1,
}

require.NotPanics(t, func() {
processor.Process(context.Background(), item)
})
require.Empty(t, store.ackCalls)
require.Len(t, store.retryCalls, 1)
require.Contains(t, store.retryCalls[0].lastError, "panic while processing queue item")
require.Empty(t, store.deadLetterCalls)
}

func TestProcessorProcessDeadLettersRecoveredPanicAtMaxAttempts(t *testing.T) {
store := &fakeProcessorStore{
getAuthUserFn: func(context.Context, uuid.UUID) (*AuthUser, error) {
panic("boom")
},
}
processor := NewProcessor(store, 3, logger.NewNopLogger())
item := QueueItem{
ID: 1,
UserID: uuid.New(),
AttemptCount: 3,
}

require.NotPanics(t, func() {
processor.Process(context.Background(), item)
})
require.Empty(t, store.ackCalls)
require.Empty(t, store.retryCalls)
require.Len(t, store.deadLetterCalls, 1)
require.Contains(t, store.deadLetterCalls[0].lastError, "panic while processing queue item")
}
69 changes: 69 additions & 0 deletions packages/dashboard-api/internal/supabaseauthusersync/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package supabaseauthusersync

import (
"context"
"time"

"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/shared/pkg/logger"
)

type Runner struct {
cfg Config
store *Store
processor *Processor
lockOwner string
l logger.Logger
}

func NewRunner(cfg Config, store *Store, lockOwner string, l logger.Logger) *Runner {
return &Runner{
cfg: cfg,
store: store,
processor: NewProcessor(store, cfg.MaxAttempts, l),
lockOwner: lockOwner,
l: l,
}
}

func (r *Runner) Run(ctx context.Context) error {
r.l.Info(ctx, "starting supabase auth user sync worker",
zap.String("lock_owner", r.lockOwner),
zap.Duration("poll_interval", r.cfg.PollInterval),
zap.Int32("batch_size", r.cfg.BatchSize),
)

ticker := time.NewTicker(r.cfg.PollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
r.l.Info(ctx, "stopping supabase auth user sync worker")

return ctx.Err()
case <-ticker.C:
r.poll(ctx)
}
}
}

func (r *Runner) poll(ctx context.Context) {
items, err := r.store.ClaimBatch(ctx, r.lockOwner, r.cfg.LockTimeout, r.cfg.BatchSize)
if err != nil {
r.l.Error(ctx, "failed to claim queue batch", zap.Error(err))

return
}

if len(items) == 0 {
return
}

r.l.Debug(ctx, "claimed queue batch", zap.Int("count", len(items)))

for _, item := range items {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Items in a batch are processed sequentially. Each item does 2–3 synchronous DB round-trips (GetAuthUser + UpsertPublicUser/DeletePublicUser + Ack/Retry), so batch throughput is proportional to batchSize × DB latency. With the default BatchSize=50 and typical Postgres latency, a full batch can take several seconds — longer than PollInterval=2s. If sustained write volume is expected, consider processing items in a small goroutine pool.

r.processor.Process(ctx, item)
}
}
Loading
Loading