-
Notifications
You must be signed in to change notification settings - Fork 295
feat(dashboard-api): supabase auth users sync background runner #2247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
c50d248
68a6265
9d8fe63
1e5f28b
56154df
3a431aa
eb69678
bb9fa39
461d2ec
ce0cbd1
4d622ce
84ecd06
3f56979
386d0c6
3e3fa3c
5e0972b
6b5a5c8
1113654
fb0afd4
bd4dd80
97e305a
c98878f
e314b96
db124a4
0e7f147
eed510b
8226f67
8485d7a
37f842f
c0b1102
87ef8f0
45a7ccb
638429a
07d4c25
bd1ebb1
126d984
d4ba4c2
2ac1f83
da28e6a
1512ede
d25d531
fe217ea
40a5cd0
c25caba
18f7839
9667b77
37eb463
c5d82f0
e4d5bff
4862139
4ecc627
29bbf59
48a620c
2eda1d4
bb31094
2a2311a
11d0f0f
1dc74e1
1359257
15291ac
2c86213
fbb8835
1838a3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package supabaseauthusersync | ||
|
|
||
| import "time" | ||
|
|
||
| type Config struct { | ||
| Enabled bool | ||
| BatchSize int32 | ||
| PollInterval time.Duration | ||
| LockTimeout time.Duration | ||
| MaxAttempts int32 | ||
| } | ||
|
|
||
| func DefaultConfig() Config { | ||
| return Config{ | ||
| Enabled: false, | ||
| BatchSize: 50, | ||
| PollInterval: 2 * time.Second, | ||
| LockTimeout: 2 * time.Minute, | ||
| MaxAttempts: 20, | ||
| } | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| package supabaseauthusersync | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "runtime/debug" | ||
| "time" | ||
|
|
||
| "github.com/google/uuid" | ||
| "github.com/jackc/pgx/v5" | ||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/e2b-dev/infra/packages/shared/pkg/logger" | ||
| ) | ||
|
|
||
| type processorStore interface { | ||
| Ack(ctx context.Context, id int64) error | ||
| Retry(ctx context.Context, id int64, backoff time.Duration, lastError string) error | ||
| DeadLetter(ctx context.Context, id int64, lastError string) error | ||
| GetAuthUser(ctx context.Context, userID uuid.UUID) (*AuthUser, error) | ||
| UpsertPublicUser(ctx context.Context, id uuid.UUID, email string) error | ||
| DeletePublicUser(ctx context.Context, id uuid.UUID) error | ||
| } | ||
|
|
||
| type Processor struct { | ||
| store processorStore | ||
| maxAttempts int32 | ||
| l logger.Logger | ||
| } | ||
|
|
||
| func NewProcessor(store processorStore, maxAttempts int32, l logger.Logger) *Processor { | ||
| return &Processor{ | ||
| store: store, | ||
| maxAttempts: maxAttempts, | ||
| l: l, | ||
| } | ||
| } | ||
|
|
||
| func (p *Processor) Process(ctx context.Context, item QueueItem) { | ||
| err := p.processOnce(ctx, item) | ||
|
|
||
| if err == nil { | ||
| if ackErr := p.store.Ack(ctx, item.ID); ackErr != nil { | ||
| p.l.Error(ctx, "failed to ack queue item", | ||
| zap.Int64("queue_item_id", item.ID), | ||
| zap.String("user_id", item.UserID.String()), | ||
| zap.Error(ackErr), | ||
| ) | ||
| } | ||
|
|
||
| return | ||
| } | ||
|
|
||
| p.l.Warn(ctx, "failed to process queue item", | ||
| zap.Int64("queue_item_id", item.ID), | ||
| zap.String("user_id", item.UserID.String()), | ||
| zap.Int32("attempt", item.AttemptCount), | ||
| zap.Error(err), | ||
| ) | ||
|
|
||
| if item.AttemptCount >= p.maxAttempts { | ||
| if dlErr := p.store.DeadLetter(ctx, item.ID, err.Error()); dlErr != nil { | ||
| p.l.Error(ctx, "failed to dead-letter queue item", | ||
| zap.Int64("queue_item_id", item.ID), | ||
| zap.Error(dlErr), | ||
| ) | ||
| } | ||
|
|
||
| return | ||
| } | ||
|
|
||
| backoff := retryBackoff(item.AttemptCount) | ||
|
|
||
| if retryErr := p.store.Retry(ctx, item.ID, backoff, err.Error()); retryErr != nil { | ||
| p.l.Error(ctx, "failed to retry queue item", | ||
| zap.Int64("queue_item_id", item.ID), | ||
| zap.Error(retryErr), | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| func (p *Processor) processOnce(ctx context.Context, item QueueItem) (err error) { | ||
| defer func() { | ||
| if recovered := recover(); recovered != nil { | ||
| p.l.Error(ctx, "panic while processing queue item", | ||
| zap.Int64("queue_item_id", item.ID), | ||
| zap.String("user_id", item.UserID.String()), | ||
| zap.String("panic", fmt.Sprint(recovered)), | ||
| zap.String("stack", string(debug.Stack())), | ||
| ) | ||
|
|
||
| err = fmt.Errorf("panic while processing queue item: %v", recovered) | ||
| } | ||
| }() | ||
|
|
||
| return p.reconcile(ctx, item) | ||
| } | ||
|
|
||
| func (p *Processor) reconcile(ctx context.Context, item QueueItem) error { | ||
| authUser, err := p.store.GetAuthUser(ctx, item.UserID) | ||
|
|
||
| if errors.Is(err, pgx.ErrNoRows) { | ||
| if delErr := p.store.DeletePublicUser(ctx, item.UserID); delErr != nil { | ||
| return fmt.Errorf("delete public.users %s: %w", item.UserID, delErr) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| if err != nil { | ||
| return fmt.Errorf("get auth.users %s: %w", item.UserID, err) | ||
| } | ||
|
|
||
| if err = p.store.UpsertPublicUser(ctx, authUser.ID, authUser.Email); err != nil { | ||
| return fmt.Errorf("upsert public.users %s: %w", authUser.ID, err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func retryBackoff(attempt int32) time.Duration { | ||
| switch { | ||
| case attempt <= 1: | ||
| return 5 * time.Second | ||
| case attempt <= 3: | ||
| return 30 * time.Second | ||
| case attempt <= 6: | ||
| return 2 * time.Minute | ||
| case attempt <= 10: | ||
| return 5 * time.Minute | ||
| default: | ||
| return 15 * time.Minute | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| package supabaseauthusersync | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/google/uuid" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/e2b-dev/infra/packages/shared/pkg/logger" | ||
| ) | ||
|
|
||
| type retryCall struct { | ||
| id int64 | ||
| backoff time.Duration | ||
| lastError string | ||
| } | ||
|
|
||
| type deadLetterCall struct { | ||
| id int64 | ||
| lastError string | ||
| } | ||
|
|
||
| type fakeProcessorStore struct { | ||
| getAuthUserFn func(context.Context, uuid.UUID) (*AuthUser, error) | ||
|
|
||
| ackCalls []int64 | ||
| retryCalls []retryCall | ||
| deadLetterCalls []deadLetterCall | ||
| } | ||
|
|
||
| func (s *fakeProcessorStore) Ack(_ context.Context, id int64) error { | ||
| s.ackCalls = append(s.ackCalls, id) | ||
| return nil | ||
| } | ||
|
|
||
| func (s *fakeProcessorStore) Retry(_ context.Context, id int64, backoff time.Duration, lastError string) error { | ||
| s.retryCalls = append(s.retryCalls, retryCall{ | ||
| id: id, | ||
| backoff: backoff, | ||
| lastError: lastError, | ||
| }) | ||
| return nil | ||
| } | ||
|
|
||
| func (s *fakeProcessorStore) DeadLetter(_ context.Context, id int64, lastError string) error { | ||
| s.deadLetterCalls = append(s.deadLetterCalls, deadLetterCall{ | ||
| id: id, | ||
| lastError: lastError, | ||
| }) | ||
| return nil | ||
| } | ||
|
|
||
| func (s *fakeProcessorStore) GetAuthUser(ctx context.Context, userID uuid.UUID) (*AuthUser, error) { | ||
| return s.getAuthUserFn(ctx, userID) | ||
| } | ||
|
|
||
| func (s *fakeProcessorStore) UpsertPublicUser(_ context.Context, _ uuid.UUID, _ string) error { | ||
| return nil | ||
| } | ||
|
|
||
| func (s *fakeProcessorStore) DeletePublicUser(_ context.Context, _ uuid.UUID) error { | ||
| return nil | ||
| } | ||
|
|
||
| func TestProcessorProcessRetriesRecoveredPanic(t *testing.T) { | ||
|
Check failure on line 67 in packages/dashboard-api/internal/supabaseauthusersync/processor_test.go
|
||
| store := &fakeProcessorStore{ | ||
| getAuthUserFn: func(context.Context, uuid.UUID) (*AuthUser, error) { | ||
| panic("boom") | ||
| }, | ||
| } | ||
| processor := NewProcessor(store, 3, logger.NewNopLogger()) | ||
| item := QueueItem{ | ||
| ID: 1, | ||
| UserID: uuid.New(), | ||
| AttemptCount: 1, | ||
| } | ||
|
|
||
| require.NotPanics(t, func() { | ||
| processor.Process(context.Background(), item) | ||
| }) | ||
| require.Empty(t, store.ackCalls) | ||
| require.Len(t, store.retryCalls, 1) | ||
| require.Contains(t, store.retryCalls[0].lastError, "panic while processing queue item") | ||
| require.Empty(t, store.deadLetterCalls) | ||
| } | ||
|
|
||
| func TestProcessorProcessDeadLettersRecoveredPanicAtMaxAttempts(t *testing.T) { | ||
| store := &fakeProcessorStore{ | ||
| getAuthUserFn: func(context.Context, uuid.UUID) (*AuthUser, error) { | ||
| panic("boom") | ||
| }, | ||
| } | ||
| processor := NewProcessor(store, 3, logger.NewNopLogger()) | ||
| item := QueueItem{ | ||
| ID: 1, | ||
| UserID: uuid.New(), | ||
| AttemptCount: 3, | ||
| } | ||
|
|
||
| require.NotPanics(t, func() { | ||
| processor.Process(context.Background(), item) | ||
| }) | ||
| require.Empty(t, store.ackCalls) | ||
| require.Empty(t, store.retryCalls) | ||
| require.Len(t, store.deadLetterCalls, 1) | ||
| require.Contains(t, store.deadLetterCalls[0].lastError, "panic while processing queue item") | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| package supabaseauthusersync | ||
|
|
||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/e2b-dev/infra/packages/shared/pkg/logger" | ||
| ) | ||
|
|
||
| type Runner struct { | ||
| cfg Config | ||
| store *Store | ||
| processor *Processor | ||
| lockOwner string | ||
| l logger.Logger | ||
| } | ||
|
|
||
| func NewRunner(cfg Config, store *Store, lockOwner string, l logger.Logger) *Runner { | ||
| return &Runner{ | ||
| cfg: cfg, | ||
| store: store, | ||
| processor: NewProcessor(store, cfg.MaxAttempts, l), | ||
| lockOwner: lockOwner, | ||
| l: l, | ||
| } | ||
| } | ||
|
|
||
| func (r *Runner) Run(ctx context.Context) error { | ||
| r.l.Info(ctx, "starting supabase auth user sync worker", | ||
| zap.String("lock_owner", r.lockOwner), | ||
| zap.Duration("poll_interval", r.cfg.PollInterval), | ||
| zap.Int32("batch_size", r.cfg.BatchSize), | ||
| ) | ||
|
|
||
| ticker := time.NewTicker(r.cfg.PollInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| r.l.Info(ctx, "stopping supabase auth user sync worker") | ||
|
|
||
| return ctx.Err() | ||
| case <-ticker.C: | ||
| r.poll(ctx) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (r *Runner) poll(ctx context.Context) { | ||
| items, err := r.store.ClaimBatch(ctx, r.lockOwner, r.cfg.LockTimeout, r.cfg.BatchSize) | ||
| if err != nil { | ||
| r.l.Error(ctx, "failed to claim queue batch", zap.Error(err)) | ||
|
|
||
| return | ||
| } | ||
|
|
||
| if len(items) == 0 { | ||
| return | ||
| } | ||
|
|
||
| r.l.Debug(ctx, "claimed queue batch", zap.Int("count", len(items))) | ||
|
|
||
| for _, item := range items { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Items in a batch are processed sequentially. Each item does 2–3 synchronous DB round-trips ( |
||
| r.processor.Process(ctx, item) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.