From a73516f9b488aa174b6ea4208c85c67e0cc0b292 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 29 Jun 2026 22:51:17 +0000 Subject: [PATCH 1/2] fix(distributed): don't let a dead worker pin the model-load advisory lock In distributed mode a chat request could fail with: failed to route model with internal loader: routing model ...: loading model ...: advisorylock: acquiring lock : ERROR: canceling statement due to lock timeout (SQLSTATE 55P03) Root cause is two independent defects in the cross-replica model-load path: 1. SmartRouter.Route holds a per-model PostgreSQL advisory lock for the whole cold-load sequence, which includes installBackendOnNode -> InstallBackend, a NATS request-reply with a 15m deadline (DefaultBackendInstallTimeout) that ignored ctx. When the chosen worker died mid-install, the holder sat on the lock for up to 15m. The detached loadCtx (WithoutCancel) had no deadline, so nothing capped the hold. 2. The acquiring statement, pg_advisory_lock(), is subject to any deployment global lock_timeout. A common operator setting (e.g. 10s) aborts the wait with SQLSTATE 55P03, so every other replica's request for that model hard -errored instead of waiting for the in-progress load and reusing it. For the ~15m window the model was effectively unroutable. Fixes: - advisorylock.WithLockCtx (postgres): SET lock_timeout = 0 on its dedicated connection (RESET before it returns to the pool) so the Go context, not a deployment-wide GUC, governs how long we wait. Waiters now block and then re-check, reusing the model another replica just loaded. - SmartRouter: bound the detached loadCtx with a single ModelLoadCeiling so the lock is always released in bounded time even if a sub-step wedges. Default is the configured backend.install deadline + 10m (staging + LoadModel margin), so a legitimately slow load is never cut. - installBackendOnNode: use singleflight.DoChan + select on ctx.Done() so the install wait honors cancellation; the ceiling can then actually free a caller pinned behind a dead worker. The shared install still coalesces via singleflight. Reproduced both defects as failing tests first (a real 55P03 against a testcontainer with a short lock_timeout; a wedged install that blocks Route) and confirmed green. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --- core/application/distributed.go | 6 ++ core/services/advisorylock/advisorylock.go | 14 +++++ .../advisorylock/advisorylock_test.go | 47 +++++++++++++++ core/services/nodes/router.go | 60 ++++++++++++++++--- core/services/nodes/router_test.go | 38 ++++++++++++ 5 files changed, 156 insertions(+), 9 deletions(-) diff --git a/core/application/distributed.go b/core/application/distributed.go index a1efe03045c0..40cc9fff3dfa 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -356,6 +356,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade PrefixConfig: prefixCfg, Pressure: pressure, SharedModels: cfg.Distributed.SharedModels, + // Cap how long a cold load may hold the per-model advisory lock: the + // configured backend.install deadline plus a margin for file staging and + // the remote LoadModel. Derived from the install timeout so raising it + // (for slow links pulling multi-GB images) widens the ceiling too, + // instead of letting the static default cut a legitimately slow load. + ModelLoadCeiling: cfg.Distributed.BackendInstallTimeoutOrDefault() + 10*time.Minute, }) // Wire staging-progress broadcasting so file-staging shows up on every diff --git a/core/services/advisorylock/advisorylock.go b/core/services/advisorylock/advisorylock.go index f51a6357e6ff..1ef0acffcb01 100644 --- a/core/services/advisorylock/advisorylock.go +++ b/core/services/advisorylock/advisorylock.go @@ -130,6 +130,20 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e } defer conn.Close() + // Neutralize any deployment-wide lock_timeout on this dedicated connection. + // Operators commonly set a short global lock_timeout (on the role or + // database) to bound ordinary row-lock waits. Applied to the blocking + // pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns + // LocalAI's intentional cross-replica "wait your turn, then re-check" + // coordination into a hard error for the caller (e.g. a chat request that + // just wanted to reuse a model another replica is loading). Let the Go + // context be the single source of truth for how long we wait instead. + if _, err := conn.ExecContext(ctx, "SET lock_timeout = 0"); err != nil { + return fmt.Errorf("advisorylock: disabling lock_timeout: %w", err) + } + // Restore the session default before this pooled connection is reused. + defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }() + if _, err := conn.ExecContext(ctx, "SELECT pg_advisory_lock($1)", key); err != nil { return fmt.Errorf("advisorylock: acquiring lock %d: %w", key, err) } diff --git a/core/services/advisorylock/advisorylock_test.go b/core/services/advisorylock/advisorylock_test.go index 709f77c292f1..e37034916396 100644 --- a/core/services/advisorylock/advisorylock_test.go +++ b/core/services/advisorylock/advisorylock_test.go @@ -158,6 +158,53 @@ var _ = Describe("AdvisoryLock", func() { Expect(err).To(HaveOccurred()) }) + It("waits out a short server-side lock_timeout instead of failing with 55P03", func() { + const lockKey int64 = 703 + + // Reproduce the production deployment that triggered this: a short + // global lock_timeout set on the database. Without the fix, a waiter + // blocked on pg_advisory_lock() is aborted by the server after this + // window and surfaces SQLSTATE 55P03 ("canceling statement due to + // lock timeout") to the caller instead of waiting for its turn. + Expect(db.Exec("ALTER DATABASE testdb SET lock_timeout = '300ms'").Error).ToNot(HaveOccurred()) + sqlDB, err := db.DB() + Expect(err).ToNot(HaveOccurred()) + // Drop pooled connections so subsequent ones reconnect and inherit + // the new database-level lock_timeout default. + sqlDB.SetMaxIdleConns(0) + + holding := make(chan struct{}) + released := make(chan struct{}) + go func() { + defer GinkgoRecover() + herr := WithLockCtx(context.Background(), db, lockKey, func() error { + close(holding) + // Hold well past the 300ms server lock_timeout. + time.Sleep(1 * time.Second) + return nil + }) + Expect(herr).ToNot(HaveOccurred()) + close(released) + }() + + <-holding // ensure the holder owns the lock before we contend + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + executed := false + start := time.Now() + werr := WithLockCtx(ctx, db, lockKey, func() error { + executed = true + return nil + }) + Expect(werr).ToNot(HaveOccurred(), + "waiter should wait out the in-progress hold, not fail with lock_timeout (55P03)") + Expect(executed).To(BeTrue()) + Expect(time.Since(start)).To(BeNumerically(">=", 400*time.Millisecond), + "waiter should have actually waited for the holder to release") + <-released + }) + It("serializes concurrent WithLockCtx on same key", func() { const lockKey int64 = 702 diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index 664672e395aa..670c9ca21bb4 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -68,6 +68,13 @@ type SmartRouterOptions struct { // the absolute model paths untouched so the worker loads them directly from // the shared volume (#10556). See config.DistributedConfig.SharedModels. SharedModels bool + // ModelLoadCeiling is the hard upper bound on how long a single cold-load + // attempt (node selection -> backend install -> file staging -> LoadModel) + // may run while holding the per-model advisory lock. It backstops every + // sub-step's own timeout so a wedged worker can never pin the lock - and + // every other replica's request for that model - indefinitely. Zero selects + // defaultModelLoadCeiling. + ModelLoadCeiling time.Duration } // SmartRouter routes inference requests to the best available backend node. @@ -101,8 +108,18 @@ type SmartRouter struct { // sharedModels skips file staging when all nodes mount the same models // directory at the same path (see SmartRouterOptions.SharedModels). sharedModels bool + // modelLoadCeiling bounds how long a cold load may hold the per-model + // advisory lock (see SmartRouterOptions.ModelLoadCeiling). + modelLoadCeiling time.Duration } +// defaultModelLoadCeiling is the fallback hold ceiling for a cold model load. +// It must comfortably exceed the slowest legitimate load - a multi-GB backend +// install (DefaultBackendInstallTimeout, 15m) plus staging and the remote +// LoadModel (5m) - so it never cuts a real load short; it only ever fires when +// a step is genuinely wedged (e.g. a worker that died mid-install). +const defaultModelLoadCeiling = 25 * time.Minute + // probeCacheTTL is how long a successful gRPC HealthCheck on a backend is // trusted before the next request re-probes. Matches healthCheckTTL in // pkg/model/model.go so the single-process and distributed paths share a @@ -117,6 +134,10 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter if factory == nil { factory = &tokenClientFactory{token: opts.AuthToken} } + ceiling := opts.ModelLoadCeiling + if ceiling <= 0 { + ceiling = defaultModelLoadCeiling + } return &SmartRouter{ registry: registry, unloader: opts.Unloader, @@ -131,6 +152,7 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter prefixConfig: opts.PrefixConfig, pressure: opts.Pressure, sharedModels: opts.SharedModels, + modelLoadCeiling: ceiling, } } @@ -383,11 +405,19 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType // the request context. If staging were bound to it, the multi-GB upload // aborts with "context canceled" mid-transfer and large models can never // finish staging (the model-load outage). WithoutCancel keeps the request's - // values (prefix chain, etc.) but drops its cancellation/deadline. Each - // long step still has its own bound (the file stager's resume budget, - // LoadModel's 5m timeout), and the per-model advisory lock below de-dupes - // concurrent loaders across replicas. - loadCtx := context.WithoutCancel(ctx) + // values (prefix chain, etc.) but drops its cancellation/deadline. + // + // Detaching from the caller is necessary, but it must not be unbounded: the + // load runs while holding the per-model advisory lock, and a worker that + // dies mid-install (its backend.install never replies) would otherwise pin + // that lock (and every other replica's request for the same model) until + // the NATS install deadline alone expires. Re-impose a single hard ceiling + // over the whole sequence so the lock is always released in bounded time, + // even if a sub-step wedges. Each long step still has its own (tighter) + // bound; this only backstops them. The per-model advisory lock below + // de-dupes concurrent loaders across replicas. + loadCtx, cancelLoad := context.WithTimeout(context.WithoutCancel(ctx), r.modelLoadCeiling) + defer cancelLoad() loadModel := func(ctx context.Context) (*RouteResult, error) { // Re-check after acquiring lock — another request may have loaded it node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref) @@ -916,7 +946,14 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod } key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex) - v, err, _ := r.installFlight.Do(key, func() (any, error) { + // DoChan rather than Do so this wait honors ctx cancellation. InstallBackend + // blocks for its full NATS deadline (15m by default) when a worker accepts + // the request but never replies (e.g. it died mid-install). Without ctx + // awareness the caller (holding the per-model advisory lock) would sit there + // the whole time; here a cancelled ctx (typically the model-load ceiling) + // frees the caller promptly. The shared install keeps running in the + // background and still coalesces other callers via singleflight. + resCh := r.installFlight.DoChan(key, func() (any, error) { reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil) if err != nil { return "", err @@ -931,10 +968,15 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod } return addr, nil }) - if err != nil { - return "", err + select { + case <-ctx.Done(): + return "", ctx.Err() + case res := <-resCh: + if res.Err != nil { + return "", res.Err + } + return res.Val.(string), nil } - return v.(string), nil } func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, parallel bool) grpc.Backend { diff --git a/core/services/nodes/router_test.go b/core/services/nodes/router_test.go index e97dae1222ff..81696ac4d28a 100644 --- a/core/services/nodes/router_test.go +++ b/core/services/nodes/router_test.go @@ -493,6 +493,44 @@ var _ = Describe("SmartRouter", func() { Expect(result.Node.ID).To(Equal("n3")) }) }) + + Context("worker wedges mid-install (dead node holding the lock)", func() { + It("aborts the load at the ModelLoadCeiling instead of blocking forever", func() { + // Simulate the production incident: the chosen worker accepts the + // backend.install but never replies (it died), so InstallBackend + // would otherwise block for its full NATS deadline (15m by + // default) while pinning the per-model advisory lock. Route must + // give up at the ceiling so the lock is released promptly. + reg.findAndLockErr = errors.New("not found") + reg.findIdleNode = &BackendNode{ID: "n4", Name: "dead-node", Address: "10.0.0.4:50051"} + + block := make(chan struct{}) + defer close(block) // let the background install goroutine drain at test end + unloader.installHook = func() { <-block } + + router := NewSmartRouter(reg, SmartRouterOptions{ + Unloader: unloader, + ClientFactory: factory, + ModelLoadCeiling: 200 * time.Millisecond, + }) + + done := make(chan error, 1) + start := time.Now() + go func() { + defer GinkgoRecover() + _, err := router.Route(context.Background(), "wedged-model", + "models/wedged.gguf", "llama-cpp", + &pb.ModelOptions{Model: "models/wedged.gguf"}, false) + done <- err + }() + + var routeErr error + Eventually(done, 5*time.Second).Should(Receive(&routeErr), + "Route must not block on a wedged install past the ceiling") + Expect(routeErr).To(HaveOccurred()) + Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second)) + }) + }) }) Describe("scheduleNewModel (mock-based, via Route)", func() { From 2972165e53a65b899c7a29e3bbd8d12bdfa584e9 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 30 Jun 2026 07:23:12 +0000 Subject: [PATCH 2/2] fix(distributed): bound advisory-lock wait instead of disabling lock_timeout Setting lock_timeout = 0 to override a deployment's short global lock_timeout meant "wait forever" server-side. Safe for SmartRouter.Route (its loadCtx now carries the model-load ceiling) but unsafe for the schema-migration callers that pass context.Background(): a holder whose session never releases would hang them indefinitely. Derive the server-side lock_timeout from the caller's context instead: its remaining budget plus a margin (so the Go context's cancellation still wins with a clean error and the server bound is only a backstop), or a finite 30m backstop when the context has no deadline. Never zero - "wait forever" is no longer possible, while a deployment's hostile short lock_timeout is still overridden so legitimate cross-replica waits don't fail with 55P03. Added a spec proving a deadline-less waiter gives up at the (shrunk) backstop rather than hanging. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --- core/services/advisorylock/advisorylock.go | 46 +++++++++++++++++-- .../advisorylock/advisorylock_test.go | 34 ++++++++++++++ 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/core/services/advisorylock/advisorylock.go b/core/services/advisorylock/advisorylock.go index 1ef0acffcb01..a37403c31940 100644 --- a/core/services/advisorylock/advisorylock.go +++ b/core/services/advisorylock/advisorylock.go @@ -6,10 +6,39 @@ import ( "hash/fnv" "strings" "sync" + "time" "gorm.io/gorm" ) +// advisoryLockWaitBackstop bounds, server-side, how long we will wait to +// acquire a blocking advisory lock when the caller's context carries no +// deadline (e.g. a startup schema migration using context.Background()). It +// only exists so such a caller cannot hang forever behind a holder whose +// session never releases the lock; it is far longer than any legitimate +// guarded section. A var (not const) so tests can shrink it. +var advisoryLockWaitBackstop = 30 * time.Minute + +// advisoryLockTimeoutMargin is added to a context's remaining budget when +// deriving the server-side lock_timeout, so the Go context's own (cleaner) +// cancellation fires first and the server bound is only ever a backstop. +const advisoryLockTimeoutMargin = 30 * time.Second + +// advisoryLockWaitBudget returns the server-side lock_timeout to use for a +// blocking acquire: the caller context's remaining time plus a margin (so the +// Go context still governs), or the backstop when the context has no deadline. +// Never returns zero - "wait forever" must not be possible. +func advisoryLockWaitBudget(ctx context.Context) time.Duration { + if dl, ok := ctx.Deadline(); ok { + budget := time.Until(dl) + advisoryLockTimeoutMargin + if budget < time.Second { + budget = time.Second + } + return budget + } + return advisoryLockWaitBackstop +} + // localLocks holds one buffered channel (capacity 1) per lock key, used as an // in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is // effectively single-process, so serializing guarded sections within this @@ -130,16 +159,23 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e } defer conn.Close() - // Neutralize any deployment-wide lock_timeout on this dedicated connection. + // Override any deployment-wide lock_timeout on this dedicated connection. // Operators commonly set a short global lock_timeout (on the role or // database) to bound ordinary row-lock waits. Applied to the blocking // pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns // LocalAI's intentional cross-replica "wait your turn, then re-check" // coordination into a hard error for the caller (e.g. a chat request that - // just wanted to reuse a model another replica is loading). Let the Go - // context be the single source of truth for how long we wait instead. - if _, err := conn.ExecContext(ctx, "SET lock_timeout = 0"); err != nil { - return fmt.Errorf("advisorylock: disabling lock_timeout: %w", err) + // just wanted to reuse a model another replica is loading). + // + // We do NOT disable it outright (lock_timeout = 0 would wait forever, which + // is unsafe for the schema-migration callers that pass context.Background()). + // Instead we set a bound derived from the caller's context: its remaining + // budget plus a margin so the Go context's cancellation wins with a clean + // error, or a finite backstop when the context has no deadline. + waitBudget := advisoryLockWaitBudget(ctx) + if _, err := conn.ExecContext(ctx, + fmt.Sprintf("SET lock_timeout = %d", waitBudget.Milliseconds())); err != nil { + return fmt.Errorf("advisorylock: setting lock_timeout: %w", err) } // Restore the session default before this pooled connection is reused. defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }() diff --git a/core/services/advisorylock/advisorylock_test.go b/core/services/advisorylock/advisorylock_test.go index e37034916396..a77df56c446a 100644 --- a/core/services/advisorylock/advisorylock_test.go +++ b/core/services/advisorylock/advisorylock_test.go @@ -205,6 +205,40 @@ var _ = Describe("AdvisoryLock", func() { <-released }) + It("bounds a deadline-less waiter with the backstop instead of waiting forever", func() { + const lockKey int64 = 704 + + // A caller with no context deadline (e.g. startup schema migration + // passing context.Background()) must not hang forever if the holder + // never releases. Shrink the backstop so the test is fast. + origBackstop := advisoryLockWaitBackstop + advisoryLockWaitBackstop = 500 * time.Millisecond + DeferCleanup(func() { advisoryLockWaitBackstop = origBackstop }) + + holding := make(chan struct{}) + release := make(chan struct{}) + go func() { + defer GinkgoRecover() + _ = WithLockCtx(context.Background(), db, lockKey, func() error { + close(holding) + <-release // hold until the test releases us + return nil + }) + }() + defer close(release) + + <-holding + + start := time.Now() + err := WithLockCtx(context.Background(), db, lockKey, func() error { + Fail("waiter should not have acquired the still-held lock") + return nil + }) + Expect(err).To(HaveOccurred(), "deadline-less waiter should give up at the backstop, not hang") + Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second), + "backstop must cap the wait well under the test timeout") + }) + It("serializes concurrent WithLockCtx on same key", func() { const lockKey int64 = 702