Skip to content

Commit 0ea3188

Browse files
committed
Merge branch 'feat/sync-wal-lag-metric' into 'master'
feat: add sync WAL lag Prometheus metric for physical mode Closes #673 See merge request postgres-ai/database-lab!1093
2 parents 712b576 + 6801c02 commit 0ea3188

4 files changed

Lines changed: 327 additions & 0 deletions

File tree

PROMETHEUS.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ The endpoint is publicly accessible (no authentication required) and returns met
7474
| `dblab_datasets_total` | Gauge | `pool` | Total number of datasets (slots) in the pool |
7575
| `dblab_datasets_available` | Gauge | `pool` | Number of available (non-busy) dataset slots for reuse |
7676

77+
### Sync Instance Metrics (Physical Mode)
78+
79+
These metrics are only available when DBLab is running in physical mode with a sync instance enabled. They track the WAL replay status of the sync instance.
80+
81+
| Metric Name | Type | Labels | Description |
82+
|-------------|------|--------|-------------|
83+
| `dblab_sync_status` | Gauge | `status` | Status of the sync instance (1=active for status code) |
84+
| `dblab_sync_wal_lag_seconds` | Gauge | - | WAL replay lag in seconds for the sync instance |
85+
| `dblab_sync_uptime_seconds` | Gauge | - | Uptime of the sync instance in seconds |
86+
| `dblab_sync_last_replayed_timestamp` | Gauge | - | Unix timestamp of the last replayed transaction |
87+
7788
### Observability Metrics
7889

7990
These metrics help monitor the health of the metrics collection system itself.
@@ -146,6 +157,18 @@ dblab_clones_by_status
146157
time() - dblab_scrape_success_timestamp
147158
```
148159

160+
### WAL Replay Lag (Physical Mode)
161+
162+
```promql
163+
dblab_sync_wal_lag_seconds
164+
```
165+
166+
### Time Since Last Replayed Transaction
167+
168+
```promql
169+
time() - dblab_sync_last_replayed_timestamp
170+
```
171+
149172
## Alerting Examples
150173

151174
### Low Disk Space Alert
@@ -200,6 +223,32 @@ time() - dblab_scrape_success_timestamp
200223
description: "DBLab metrics have not been updated for more than 5 minutes"
201224
```
202225
226+
### High WAL Replay Lag Alert (Physical Mode)
227+
228+
```yaml
229+
- alert: DBLabHighWALLag
230+
expr: dblab_sync_wal_lag_seconds > 3600
231+
for: 10m
232+
labels:
233+
severity: warning
234+
annotations:
235+
summary: "DBLab sync instance has high WAL lag"
236+
description: "DBLab sync instance WAL replay is {{ $value | humanizeDuration }} behind"
237+
```
238+
239+
### Sync Instance Down Alert (Physical Mode)
240+
241+
```yaml
242+
- alert: DBLabSyncDown
243+
expr: dblab_sync_status{status="down"} == 1 or dblab_sync_status{status="error"} == 1
244+
for: 5m
245+
labels:
246+
severity: critical
247+
annotations:
248+
summary: "DBLab sync instance is down"
249+
description: "DBLab sync instance is not healthy"
250+
```
251+
203252
## OpenTelemetry Integration
204253
205254
DBLab metrics can be exported to OpenTelemetry-compatible backends using the OpenTelemetry Collector. This allows you to send metrics to Grafana Cloud, Datadog, New Relic, and other observability platforms.

engine/internal/srv/metrics/collector.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ const (
2727
dockerStatsTimeout = 5 * time.Second
2828
dockerStatsWorkers = 10
2929
cpuNoData = -1.0
30+
31+
// postgresTimestampFormat is the default text representation of PostgreSQL's timestamp with time zone.
32+
postgresTimestampFormat = "2006-01-02 15:04:05.999999-07"
3033
)
3134

3235
// CloningService defines the interface for clone and snapshot operations needed by metrics.
@@ -39,6 +42,7 @@ type CloningService interface {
3942
type RetrievalService interface {
4043
GetRetrievalMode() models.RetrievalMode
4144
GetRetrievalStatus() models.RetrievalStatus
45+
ReportSyncStatus(ctx context.Context) (*models.Sync, error)
4246
}
4347

4448
// PoolService defines the interface for pool operations needed by metrics.
@@ -152,6 +156,7 @@ func (c *Collector) collectAll(ctx context.Context) {
152156
c.collectCloneMetrics(ctx)
153157
c.collectSnapshotMetrics()
154158
c.collectBranchMetrics()
159+
c.collectSyncMetrics(ctx)
155160

156161
c.metrics.ScrapeDurationSeconds.Set(time.Since(start).Seconds())
157162
c.metrics.ScrapeSuccessTimestamp.Set(float64(time.Now().Unix()))
@@ -564,3 +569,57 @@ func (c *Collector) collectBranchMetrics() {
564569

565570
c.metrics.BranchesTotal.Set(float64(totalBranches))
566571
}
572+
573+
func (c *Collector) collectSyncMetrics(ctx context.Context) {
574+
if c.retrieval.GetRetrievalMode() != models.Physical {
575+
return
576+
}
577+
578+
syncState, err := c.retrieval.ReportSyncStatus(ctx)
579+
if err != nil {
580+
log.Dbg("failed to get sync status for metrics:", err)
581+
c.metrics.ScrapeErrorsTotal.Inc()
582+
c.setSyncStatusNotAvailable()
583+
584+
return
585+
}
586+
587+
if syncState == nil {
588+
c.setSyncStatusNotAvailable()
589+
590+
return
591+
}
592+
593+
c.metrics.SyncStatus.Reset()
594+
c.metrics.SyncStatus.WithLabelValues(string(syncState.Status.Code)).Set(1)
595+
c.metrics.SyncWALLagSeconds.Set(float64(syncState.ReplicationLag))
596+
c.metrics.SyncUptimeSeconds.Set(float64(syncState.ReplicationUptime))
597+
598+
if ts := parseTimestamp(syncState.LastReplayedLsnAt); ts != nil {
599+
c.metrics.SyncLastReplayedAt.Set(float64(ts.Unix()))
600+
}
601+
}
602+
603+
func (c *Collector) setSyncStatusNotAvailable() {
604+
c.metrics.SyncStatus.Reset()
605+
c.metrics.SyncStatus.WithLabelValues(string(models.SyncStatusNotAvailable)).Set(1)
606+
c.metrics.SyncUptimeSeconds.Set(0)
607+
}
608+
609+
func parseTimestamp(value string) *time.Time {
610+
if value == "" {
611+
return nil
612+
}
613+
614+
formats := []string{time.RFC3339Nano, postgresTimestampFormat}
615+
616+
for _, format := range formats {
617+
if ts, err := time.Parse(format, value); err == nil {
618+
return &ts
619+
}
620+
}
621+
622+
log.Dbg("failed to parse timestamp:", value)
623+
624+
return nil
625+
}

engine/internal/srv/metrics/collector_test.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/docker/docker/api/types/container"
1717
"github.com/prometheus/client_golang/prometheus"
1818
"github.com/prometheus/client_golang/prometheus/promhttp"
19+
dto "github.com/prometheus/client_model/go"
1920
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
2122

@@ -36,6 +37,10 @@ func (m *mockRetrievalService) GetRetrievalMode() models.RetrievalMode { return
3637

3738
func (m *mockRetrievalService) GetRetrievalStatus() models.RetrievalStatus { return models.Inactive }
3839

40+
func (m *mockRetrievalService) ReportSyncStatus(_ context.Context) (*models.Sync, error) {
41+
return nil, nil
42+
}
43+
3944
type mockPoolService struct{}
4045

4146
func (m *mockPoolService) GetFSManagerList() []pool.FSManager { return nil }
@@ -446,3 +451,175 @@ func TestCalculateCPUPercent_Concurrent(t *testing.T) {
446451

447452
assert.Len(t, c.prevCPUStats, cloneCount)
448453
}
454+
455+
type configMockRetrievalService struct {
456+
mode models.RetrievalMode
457+
syncStatus *models.Sync
458+
syncErr error
459+
}
460+
461+
func (m *configMockRetrievalService) GetRetrievalMode() models.RetrievalMode { return m.mode }
462+
463+
func (m *configMockRetrievalService) GetRetrievalStatus() models.RetrievalStatus {
464+
return models.Inactive
465+
}
466+
func (m *configMockRetrievalService) ReportSyncStatus(_ context.Context) (*models.Sync, error) {
467+
return m.syncStatus, m.syncErr
468+
}
469+
470+
func TestCollectSyncMetrics(t *testing.T) {
471+
t.Run("non-physical mode skips collection", func(t *testing.T) {
472+
m := NewMetrics()
473+
retrieval := &configMockRetrievalService{mode: models.Logical}
474+
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}
475+
476+
c.collectSyncMetrics(context.Background())
477+
478+
assert.Equal(t, float64(0), getGaugeValue(m.SyncWALLagSeconds))
479+
assert.Equal(t, float64(0), getGaugeValue(m.SyncUptimeSeconds))
480+
})
481+
482+
t.Run("error from ReportSyncStatus sets not available status", func(t *testing.T) {
483+
m := NewMetrics()
484+
retrieval := &configMockRetrievalService{mode: models.Physical, syncErr: fmt.Errorf("connection failed")}
485+
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}
486+
487+
initialErrors := getCounterValue(m.ScrapeErrorsTotal)
488+
c.collectSyncMetrics(context.Background())
489+
490+
assert.Equal(t, initialErrors+1, getCounterValue(m.ScrapeErrorsTotal))
491+
assert.Equal(t, float64(0), getGaugeValue(m.SyncUptimeSeconds))
492+
assert.Equal(t, float64(1), getGaugeVecValue(m.SyncStatus, string(models.SyncStatusNotAvailable)))
493+
})
494+
495+
t.Run("nil sync status sets not available status and resets uptime", func(t *testing.T) {
496+
m := NewMetrics()
497+
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: nil}
498+
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}
499+
500+
c.collectSyncMetrics(context.Background())
501+
502+
assert.Equal(t, float64(0), getGaugeValue(m.SyncWALLagSeconds))
503+
assert.Equal(t, float64(0), getGaugeValue(m.SyncUptimeSeconds))
504+
assert.Equal(t, float64(1), getGaugeVecValue(m.SyncStatus, string(models.SyncStatusNotAvailable)))
505+
})
506+
507+
t.Run("successful collection sets all metrics", func(t *testing.T) {
508+
m := NewMetrics()
509+
syncStatus := &models.Sync{
510+
Status: models.Status{Code: models.StatusOK},
511+
ReplicationLag: 120,
512+
ReplicationUptime: 3600,
513+
LastReplayedLsnAt: "2025-01-15T10:30:00.123456789Z",
514+
}
515+
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
516+
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}
517+
518+
c.collectSyncMetrics(context.Background())
519+
520+
assert.Equal(t, float64(120), getGaugeValue(m.SyncWALLagSeconds))
521+
assert.Equal(t, float64(3600), getGaugeValue(m.SyncUptimeSeconds))
522+
assert.Equal(t, float64(1), getGaugeVecValue(m.SyncStatus, "OK"))
523+
assert.Greater(t, getGaugeValue(m.SyncLastReplayedAt), float64(0))
524+
})
525+
526+
t.Run("postgres timestamp format is parsed correctly", func(t *testing.T) {
527+
m := NewMetrics()
528+
syncStatus := &models.Sync{
529+
Status: models.Status{Code: models.StatusOK},
530+
ReplicationLag: 60,
531+
ReplicationUptime: 1800,
532+
LastReplayedLsnAt: "2025-01-15 10:30:00.123456+00",
533+
}
534+
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
535+
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}
536+
537+
c.collectSyncMetrics(context.Background())
538+
539+
assert.Equal(t, float64(60), getGaugeValue(m.SyncWALLagSeconds))
540+
assert.Equal(t, float64(1800), getGaugeValue(m.SyncUptimeSeconds))
541+
assert.Greater(t, getGaugeValue(m.SyncLastReplayedAt), float64(0))
542+
})
543+
544+
t.Run("empty timestamp leaves metric unchanged", func(t *testing.T) {
545+
m := NewMetrics()
546+
syncStatus := &models.Sync{
547+
Status: models.Status{Code: models.StatusOK},
548+
ReplicationLag: 30,
549+
ReplicationUptime: 900,
550+
LastReplayedLsnAt: "",
551+
}
552+
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
553+
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}
554+
555+
c.collectSyncMetrics(context.Background())
556+
557+
assert.Equal(t, float64(30), getGaugeValue(m.SyncWALLagSeconds))
558+
assert.Equal(t, float64(900), getGaugeValue(m.SyncUptimeSeconds))
559+
assert.Equal(t, float64(0), getGaugeValue(m.SyncLastReplayedAt))
560+
})
561+
562+
t.Run("unparseable timestamp leaves metric unchanged", func(t *testing.T) {
563+
m := NewMetrics()
564+
syncStatus := &models.Sync{
565+
Status: models.Status{Code: models.StatusOK},
566+
ReplicationLag: 45,
567+
ReplicationUptime: 1200,
568+
LastReplayedLsnAt: "invalid-timestamp",
569+
}
570+
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
571+
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}
572+
573+
c.collectSyncMetrics(context.Background())
574+
575+
assert.Equal(t, float64(45), getGaugeValue(m.SyncWALLagSeconds))
576+
assert.Equal(t, float64(1200), getGaugeValue(m.SyncUptimeSeconds))
577+
assert.Equal(t, float64(0), getGaugeValue(m.SyncLastReplayedAt))
578+
})
579+
}
580+
581+
func getGaugeValue(g prometheus.Gauge) float64 {
582+
ch := make(chan prometheus.Metric, 1)
583+
g.Collect(ch)
584+
select {
585+
case m := <-ch:
586+
var metric dto.Metric
587+
_ = m.Write(&metric)
588+
if metric.Gauge != nil {
589+
return *metric.Gauge.Value
590+
}
591+
default:
592+
}
593+
return 0
594+
}
595+
596+
func getGaugeVecValue(g *prometheus.GaugeVec, label string) float64 {
597+
ch := make(chan prometheus.Metric, 10)
598+
g.Collect(ch)
599+
close(ch)
600+
for m := range ch {
601+
var metric dto.Metric
602+
_ = m.Write(&metric)
603+
for _, lp := range metric.Label {
604+
if lp.GetValue() == label && metric.Gauge != nil {
605+
return *metric.Gauge.Value
606+
}
607+
}
608+
}
609+
return 0
610+
}
611+
612+
func getCounterValue(c prometheus.Counter) float64 {
613+
ch := make(chan prometheus.Metric, 1)
614+
c.Collect(ch)
615+
select {
616+
case m := <-ch:
617+
var metric dto.Metric
618+
_ = m.Write(&metric)
619+
if metric.Counter != nil {
620+
return *metric.Counter.Value
621+
}
622+
default:
623+
}
624+
return 0
625+
}

0 commit comments

Comments
 (0)