Skip to content

Commit 77407f5

Browse files
authored
fixes panic when using snappy with ingester client (#7459)
* Triggers issue in #7456 Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> * Including integration tests Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> * Fix snappy register in grpcclient.go Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> * Improve integration test Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> * Update changelog and spawn grpc integration test for this Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> --------- Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com>
1 parent d398376 commit 77407f5

5 files changed

Lines changed: 145 additions & 0 deletions

File tree

.github/workflows/test-build-deploy.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ jobs:
194194
- runner: ubuntu-24.04
195195
arch: amd64
196196
tags: integration_remote_write_v2
197+
- runner: ubuntu-24.04
198+
arch: amd64
199+
tags: integration_grpc
197200
- runner: ubuntu-24.04-arm
198201
arch: arm64
199202
tags: requires_docker
@@ -224,6 +227,9 @@ jobs:
224227
- runner: ubuntu-24.04-arm
225228
arch: arm64
226229
tags: integration_querier_microservices_mode
230+
- runner: ubuntu-24.04-arm
231+
arch: arm64
232+
tags: integration_grpc
227233
steps:
228234
- name: Upgrade golang
229235
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
1919
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
2020
* [BUGFIX] Packaging: Fix RPM and deb packages to install the binary to `/usr/bin`, install the systemd unit to the correct system path (`/usr/lib/systemd/system` for RPM, `/lib/systemd/system` for deb), and mark the sysconfig/default env file as a config file so it is not overwritten on upgrade. #7445
21+
* [BUGFIX] gRPC: Fix panic when `grpc_compression` is set to `snappy` on ingester client or store-gateway client configurations. #7459
2122

2223
## 1.21.0 2026-04-24
2324

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
//go:build integration_grpc
2+
3+
package integration
4+
5+
import (
6+
"fmt"
7+
"testing"
8+
"time"
9+
10+
"github.com/prometheus/common/model"
11+
"github.com/prometheus/prometheus/model/labels"
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/cortexproject/cortex/integration/e2e"
17+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
18+
"github.com/cortexproject/cortex/integration/e2ecortex"
19+
)
20+
21+
func TestGRPCCompression(t *testing.T) {
22+
compressions := []string{"snappy", "snappy-block", "gzip", "zstd"}
23+
24+
for _, compression := range compressions {
25+
t.Run(fmt.Sprintf("ingester client/%s", compression), func(t *testing.T) {
26+
s, err := e2e.NewScenario(networkName)
27+
require.NoError(t, err)
28+
defer s.Close()
29+
30+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
31+
"-distributor.replication-factor": "1",
32+
"-ingester.client.grpc-compression": compression,
33+
})
34+
35+
consul := e2edb.NewConsul()
36+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
37+
require.NoError(t, s.StartAndWaitReady(consul, minio))
38+
39+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
40+
ingester := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
41+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
42+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
43+
44+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
45+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
46+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
47+
48+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
49+
require.NoError(t, err)
50+
51+
now := time.Now()
52+
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})
53+
54+
res, err := c.Push(series)
55+
require.NoError(t, err)
56+
require.Equal(t, 200, res.StatusCode)
57+
58+
result, err := c.Query("series_1", now)
59+
require.NoError(t, err)
60+
require.Equal(t, model.ValVector, result.Type())
61+
assert.Equal(t, expectedVector, result.(model.Vector))
62+
})
63+
64+
t.Run(fmt.Sprintf("store gateway client/%s", compression), func(t *testing.T) {
65+
const blockRangePeriod = 5 * time.Second
66+
67+
s, err := e2e.NewScenario(networkName)
68+
require.NoError(t, err)
69+
defer s.Close()
70+
71+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
72+
"-distributor.replication-factor": "1",
73+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
74+
"-blocks-storage.tsdb.ship-interval": "1s",
75+
"-blocks-storage.bucket-store.sync-interval": "1s",
76+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
77+
"-store-gateway.sharding-enabled": "false",
78+
"-querier.store-gateway-client.grpc-compression": compression,
79+
})
80+
81+
consul := e2edb.NewConsul()
82+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
83+
require.NoError(t, s.StartAndWaitReady(consul, minio))
84+
85+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
86+
ingester := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
87+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
88+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway))
89+
90+
querierFlags := mergeFlags(flags, map[string]string{
91+
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
92+
})
93+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), querierFlags, "")
94+
require.NoError(t, s.StartAndWaitReady(querier))
95+
96+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
97+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
98+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
99+
100+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
101+
require.NoError(t, err)
102+
103+
// Push two series far enough apart to trigger TSDB head compaction and block shipping.
104+
series1Timestamp := time.Now()
105+
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
106+
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "foo", Value: "bar"})
107+
series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "foo", Value: "baz"})
108+
109+
res, err := c.Push(series1)
110+
require.NoError(t, err)
111+
require.Equal(t, 200, res.StatusCode)
112+
113+
res, err = c.Push(series2)
114+
require.NoError(t, err)
115+
require.Equal(t, 200, res.StatusCode)
116+
117+
// Wait until the TSDB head is shipped to storage and the store-gateway picks it up.
118+
require.NoError(t, ingester.WaitSumMetrics(e2e.Greater(0), "cortex_ingester_shipper_uploads_total"))
119+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Greater(0), "cortex_storegateway_bucket_sync_total"))
120+
121+
// Query the first series — this goes through the store-gateway with snappy compression.
122+
result, err := c.Query("series_1", series1Timestamp)
123+
require.NoError(t, err)
124+
require.Equal(t, model.ValVector, result.Type())
125+
assert.Equal(t, expectedVector1, result.(model.Vector))
126+
})
127+
}
128+
}

pkg/util/grpcclient/grpcclient.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ func (cfg *Config) Validate(log log.Logger) error {
7575
default:
7676
return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression)
7777
}
78+
if cfg.GRPCCompression == snappy.Name {
79+
snappy.Register()
80+
}
7881
return nil
7982
}
8083

pkg/util/grpcencoding/snappy/snappy.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ func init() {
1515
encoding.RegisterCompressor(newCompressor())
1616
}
1717

18+
// Register re-registers the Cortex snappy compressor with gRPC, overriding any
19+
// previously registered compressor with the same name (e.g. the Thanos vendored
20+
// one whose Read() panics). See https://github.com/cortexproject/cortex/issues/7456.
21+
func Register() {
22+
encoding.RegisterCompressor(newCompressor())
23+
}
24+
1825
type compressor struct {
1926
writersPool sync.Pool
2027
readersPool sync.Pool

0 commit comments

Comments
 (0)