Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/bricksllm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/redis/go-redis/v9"
)

const numEventConsumers = 16

func main() {
modePtr := flag.String("m", "dev", "select the mode that bricksllm runs in")
privacyPtr := flag.String("p", "strict", "select the privacy mode that bricksllm runs in")
Expand Down Expand Up @@ -361,12 +363,12 @@ func main() {
c := cache.NewCache(apiCache)

messageBus := message.NewMessageBus()
eventMessageChan := make(chan message.Message)
eventMessageChan := make(chan message.Message, 1000)
messageBus.Subscribe("event", eventMessageChan)

handler := message.NewHandler(rec, log, ace, ce, vllme, aoe, v, uv, m, um, rlm, accessCache, userAccessCache)

eventConsumer := message.NewConsumer(eventMessageChan, log, 4, handler.HandleEventWithRequestAndResponse)
eventConsumer := message.NewConsumer(eventMessageChan, log, numEventConsumers, handler.HandleEventWithRequestAndResponse)
eventConsumer.StartEventMessageConsumers()
Comment on lines +366 to 372

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect consumer stop/drain behavior and channel lifecycle.
ast-grep --lang go --pattern $'func (c *Consumer) Stop() { $$$ }'
rg -n -C4 'func \(c \*Consumer\) Stop\(|StartEventMessageConsumers\(|close\(c\.done\)|c\.done <-|WaitGroup|close\(.*event.*chan|close\(.*message.*chan'
rg -n -C3 'Subscribe\("event"|Publish\("event"'

Repository: codio/BricksLLM

Length of output: 50373


🏁 Script executed:

# Get full Consumer implementation
wc -l internal/message/consumer.go
head -80 internal/message/consumer.go

# Check shutdown sequence in main.go
grep -n "eventConsumer.Stop\|Stop()\|close\|defer" cmd/bricksllm/main.go | head -30

# Check for signal handling and graceful shutdown
rg -n "sigint|sigterm|signal\|interrupt|shutdown|graceful" cmd/bricksllm/main.go -i

Repository: codio/BricksLLM

Length of output: 2058


🏁 Script executed:

# Get context around shutdown in main.go
sed -n '385,415p' cmd/bricksllm/main.go

Repository: codio/BricksLLM

Length of output: 721


Verify graceful drain semantics for buffered event queue on shutdown.

Line 366 increases in-memory backlog to 1000 and Line 371 increases concurrency to 16 workers. The Consumer.Stop() method sends a signal on an unbuffered done channel, but only one worker receives it; the remaining 15 workers remain blocked in their select loops and never exit. Additionally, Stop() does not wait for workers to finish—it returns immediately—allowing the subsequent server shutdown (lines 401–408) to proceed without guaranteeing the 1000-buffered messageChan has been drained. Implement a sync.WaitGroup in the Consumer to coordinate all worker goroutines and ensure Stop() blocks until all workers acknowledge the shutdown signal.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cmd/bricksllm/main.go` around lines 366 - 372, The Consumer needs to ensure
all worker goroutines cleanly exit on shutdown using a sync.WaitGroup. Add a
WaitGroup field to the Consumer struct, then in the StartEventMessageConsumers
method, add each worker goroutine to the WaitGroup before spawning it (Add
call), and have each worker call Done() when it exits after receiving the stop
signal. Finally, modify the Stop() method to call wg.Wait() at the end so it
blocks until all workers have completed, ensuring the buffered eventMessageChan
is fully drained before returning and allowing the server shutdown to proceed.


detector, err := amazon.NewClient(cfg.AmazonRequestTimeout, cfg.AmazonConnectionTimeout, log, cfg.AmazonRegion)
Expand Down
23 changes: 20 additions & 3 deletions internal/message/consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package message

import (
"sync"

"github.com/bricks-cloud/bricksllm/internal/event"
"github.com/bricks-cloud/bricksllm/internal/key"
"go.uber.org/zap"
Expand All @@ -12,6 +14,7 @@ type Consumer struct {
log *zap.Logger
numOfEventConsumers int
handle func(Message) error
wg sync.WaitGroup
}

type recorder interface {
Expand All @@ -33,12 +36,25 @@ func NewConsumer(mc <-chan Message, log *zap.Logger, num int, handle func(Messag

func (c *Consumer) StartEventMessageConsumers() {
for i := 0; i < c.numOfEventConsumers; i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()

for {
select {
case <-c.done:
c.log.Info("event message consumer stoped...")
return
for {
select {
case m := <-c.messageChan:
err := c.handle(m)
if err != nil {
continue
}
default:
c.log.Info("event message consumer stoped...")
return
}
}

case m := <-c.messageChan:
err := c.handle(m)
Expand All @@ -56,5 +72,6 @@ func (c *Consumer) StartEventMessageConsumers() {
func (c *Consumer) Stop() {
c.log.Info("shutting down consumer...")

c.done <- true
close(c.done)
c.wg.Wait()
}
6 changes: 6 additions & 0 deletions internal/provider/openai/cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ func init() {
for model := range OpenAiPerThousandTokenCost["images-tokens-input"] {
imageModelsWithTokensCost[model] = struct{}{}
}

for _, tool := range AllowedTools {
AllowedToolsSet[tool] = struct{}{}
}
}

var OpenAiPerThousandCallsToolCost = map[string]float64{
Expand Down Expand Up @@ -401,6 +405,8 @@ var AllowedTools = []string{
"skills",
}

var AllowedToolsSet = map[string]struct{}{}

type tokenCounter interface {
Count(model string, input string) (int, error)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/server/web/proxy/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func getMessagesHandler(prod, private bool, client http.Client, e anthropicEstim
model := c.GetString("model")

if !isStreaming && res.StatusCode == http.StatusOK {
dur := time.Now().Sub(start)
dur := time.Since(start)
telemetry.Timing("bricksllm.proxy.get_messages_handler.latency", dur, nil, 1)

bytes, err := io.ReadAll(res.Body)
Expand Down
Loading
Loading