Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [FEATURE] StoreGateway: Add optional limit `blocks-storage.bucket-store.max-concurrent-bytes` on series bytes per store gateway to protect from oomkill. This returns an error that is retryable at querier level. #7271
* [FEATURE] Update prometheus Alertmanager version to v0.31.1 and add new integration to mattermost. #7267
* [ENHANCEMENT] Distributor: Add `cortex_distributor_push_requests_total` metric to track the number of push requests by type. #7239
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Max number of bytes being processed concurrently across all queries. When
# the limit is reached, new requests are rejected with HTTP 503. 0 to
# disable.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-bytes
[max_concurrent_bytes: <int> | default = 0]

# Maximum number of concurrent tenants syncing blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Max number of bytes being processed concurrently across all queries. When
# the limit is reached, new requests are rejected with HTTP 503. 0 to
# disable.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-bytes
[max_concurrent_bytes: <int> | default = 0]

# Maximum number of concurrent tenants syncing blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,11 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Max number of bytes being processed concurrently across all queries. When
# the limit is reached, new requests are rejected with HTTP 503. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-bytes
[max_concurrent_bytes: <int> | default = 0]

# Maximum number of concurrent tenants syncing blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
13 changes: 10 additions & 3 deletions pkg/frontend/transport/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/storegateway"
)

type Retry struct {
Expand Down Expand Up @@ -77,9 +78,15 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)
}

func isBodyRetryable(body string) bool {
// If pool exhausted, retry at query frontend might make things worse.
// Rely on retries at querier level only.
return !strings.Contains(body, pool.ErrPoolExhausted.Error())
// If pool exhausted or concurrent bytes limit exceeded, retry at query frontend
// might make things worse. Rely on retries at querier level only.
if strings.Contains(body, pool.ErrPoolExhausted.Error()) {
return false
}
if strings.Contains(body, storegateway.ErrMaxConcurrentBytesLimitExceeded.Error()) {
return false
}
return true
}

func yoloString(b []byte) string {
Expand Down
24 changes: 24 additions & 0 deletions pkg/frontend/transport/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/thanos-io/thanos/pkg/pool"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/storegateway"
)

func TestRetry(t *testing.T) {
Expand Down Expand Up @@ -52,3 +54,25 @@ func TestNoRetryOnChunkPoolExhaustion(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int32(500), res.Code)
}

func TestNoRetryOnMaxConcurrentBytesLimitExceeded(t *testing.T) {
tries := atomic.NewInt64(3)
r := NewRetry(3, nil)
ctx := context.Background()
res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
try := tries.Dec()
if try > 1 {
return &httpgrpc.HTTPResponse{
Code: 500,
Body: []byte(storegateway.ErrMaxConcurrentBytesLimitExceeded.Error()),
}, nil
}
return &httpgrpc.HTTPResponse{
Code: 200,
}, nil

})

require.NoError(t, err)
require.Equal(t, int32(500), res.Code)
}
5 changes: 3 additions & 2 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,8 +1239,9 @@ func isRetryableError(err error) bool {
case codes.Canceled:
return strings.Contains(err.Error(), "grpc: the client connection is closing")
case codes.Unknown:
// Catch chunks pool exhaustion error only.
return strings.Contains(err.Error(), pool.ErrPoolExhausted.Error())
// Catch chunks pool exhaustion error or concurrent bytes limit exceeded error.
return strings.Contains(err.Error(), pool.ErrPoolExhausted.Error()) ||
strings.Contains(err.Error(), storegateway.ErrMaxConcurrentBytesLimitExceeded.Error())
default:
return false
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
},
"multiple store-gateways has the block, but one of them fails to return due to max concurrent bytes limit exceeded": {
finderResult: bucketindex.Blocks{
&bucketindex.Block{ID: block1},
},
storeSetResponses: []any{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1",
mockedSeriesErr: status.Error(codes.Unknown, storegateway.ErrMaxConcurrentBytesLimitExceeded.Error()),
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series1Label.Name, series1Label.Value), []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil),
mockHintsResponse(block1),
}}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedSeries: []seriesResult{
{
lbls: labels.New(metricNameLabel, series1Label),
values: []valueResult{
{t: minT, v: 2},
},
},
},
},
"all store-gateways return PermissionDenied": {
finderResult: bucketindex.Blocks{
&bucketindex.Block{ID: block1},
Expand Down Expand Up @@ -2603,6 +2632,7 @@ func TestBlocksStoreQuerier_isRetryableError(t *testing.T) {
require.True(t, isRetryableError(limiter.ErrResourceLimitReached))
require.True(t, isRetryableError(status.Error(codes.Canceled, "grpc: the client connection is closing")))
require.True(t, isRetryableError(errors.New("pool exhausted")))
require.True(t, isRetryableError(errors.New("max concurrent bytes limit exceeded")))

require.False(t, isRetryableError(status.Error(codes.ResourceExhausted, "some other error")))
require.False(t, isRetryableError(status.Error(codes.Canceled, "some other error")))
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0")
ErrInvalidBucketStoreType = errors.New("invalid bucket store type")
ErrInvalidMaxConcurrentBytes = errors.New("max concurrent bytes must be non-negative")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -281,6 +282,7 @@ type BucketStoreConfig struct {
SyncInterval time.Duration `yaml:"sync_interval"`
MaxConcurrent int `yaml:"max_concurrent"`
MaxInflightRequests int `yaml:"max_inflight_requests"`
MaxConcurrentBytes int64 `yaml:"max_concurrent_bytes"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
Expand Down Expand Up @@ -365,6 +367,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.")
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.")
f.Int64Var(&cfg.MaxConcurrentBytes, "blocks-storage.bucket-store.max-concurrent-bytes", 0, "Max number of bytes being processed concurrently across all queries. When the limit is reached, new requests are rejected with HTTP 503. 0 to disable.")
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants syncing blocks.")
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks syncing per tenant.")
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")
Expand Down Expand Up @@ -429,6 +432,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
}
if cfg.MaxConcurrentBytes < 0 {
return ErrInvalidMaxConcurrentBytes
}
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: errUnSupportedWALCompressionType,
},
"should fail on negative max concurrent bytes": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = -1
},
expectedErr: ErrInvalidMaxConcurrentBytes,
},
"should pass on zero max concurrent bytes (disabled)": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = 0
},
expectedErr: nil,
},
"should pass on positive max concurrent bytes": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = 1024 * 1024 * 1024 // 1GB
},
expectedErr: nil,
},
}

for testName, testData := range tests {
Expand Down
46 changes: 31 additions & 15 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ type ThanosBucketStores struct {
// Keeps number of inflight requests
inflightRequests *util.InflightRequestTracker

// Concurrent bytes tracker for limiting bytes being processed across all queries.
concurrentBytesTracker ConcurrentBytesTracker

// Holder for per-request bytes trackers. The BytesLimiterFactory (created
// once per user store) reads from this to find the current request's tracker.
requestBytesTrackerHolder *requestBytesTrackerHolder

// Metrics.
syncTimes prometheus.Histogram
syncLastSuccess prometheus.Gauge
Expand Down Expand Up @@ -133,20 +140,22 @@ func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shardi
}).Set(float64(cfg.BucketStore.MaxConcurrent))

u := &ThanosBucketStores{
logger: logger,
cfg: cfg,
limits: limits,
bucket: cachingBucket,
shardingStrategy: shardingStrategy,
stores: map[string]*store.BucketStore{},
storesErrors: map[string]error{},
logLevel: logLevel,
bucketStoreMetrics: NewBucketStoreMetrics(),
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
inflightRequests: util.NewInflightRequestTracker(),
logger: logger,
cfg: cfg,
limits: limits,
bucket: cachingBucket,
shardingStrategy: shardingStrategy,
stores: map[string]*store.BucketStore{},
storesErrors: map[string]error{},
logLevel: logLevel,
bucketStoreMetrics: NewBucketStoreMetrics(),
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
inflightRequests: util.NewInflightRequestTracker(),
concurrentBytesTracker: NewConcurrentBytesTracker(uint64(cfg.BucketStore.MaxConcurrentBytes), reg),
requestBytesTrackerHolder: &requestBytesTrackerHolder{},
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_bucket_stores_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand Down Expand Up @@ -381,6 +390,13 @@ func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Stor
defer u.inflightRequests.Dec()
}

reqTracker := newRequestBytesTracker(u.concurrentBytesTracker)
u.requestBytesTrackerHolder.Set(reqTracker)
defer func() {
u.requestBytesTrackerHolder.Clear()
reqTracker.ReleaseAll()
}()

err = store.Series(req, spanSeriesServer{
Store_SeriesServer: srv,
ctx: spanCtx,
Expand Down Expand Up @@ -697,7 +713,7 @@ func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve, u.requestBytesTrackerHolder),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down
Loading
Loading