diff --git a/internal/config/config.go b/internal/config/config.go index 534b87c..6e25277 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -180,6 +180,24 @@ type Config struct { FlowSyntheticDisabled string // FLOW_SYNTHETIC_DISABLED — comma list of per-flow kill switches FlowSyntheticJWTSecret string // JWT_SECRET — shared with api; mints the synthetic session JWT + // Layer-3 payment prober (payment_probe.go) — the money heartbeat. Drives the + // iframe-free payment-funnel contract path against prod every 5 min: checkout + // reachability + billing/invoices read surfaces + the webhook signature + // security contract, plus an OPTIONAL test-mode upgrade proof. INERT unless + // PaymentProbeEnabled is true (the master flag) — a single env flip turns the + // whole prober off. JWTSecret reuses JWT_SECRET to mint the Brevo-free session + // JWT for the authed prod-safe legs. The upgrade leg is gated on + // PaymentProbeTestWebhookSecret + PaymentProbeTestPlanIDPro being set (skips + // clean otherwise) and drives NO live Razorpay / no real money. NEVER wire the + // LIVE webhook secret here. + PaymentProbeEnabled bool // PAYMENT_PROBE_ENABLED — master flag (default false) + PaymentProbeBaseURL string // PAYMENT_PROBE_BASE_URL — default https://api.instanode.dev + PaymentProbeJWTSecret string // JWT_SECRET — shared with api; mints the synthetic session JWT + PaymentProbeEmail string // PAYMENT_PROBE_EMAIL — synthetic team primary-user email + PaymentProbeTier string // PAYMENT_PROBE_TIER — seeded tier (default free) + PaymentProbeTestWebhookSecret string // RAZORPAY_TEST_WEBHOOK_SECRET — gates the optional upgrade leg (TEST secret only, never live) + PaymentProbeTestPlanIDPro string // PAYMENT_PROBE_TEST_PLAN_ID_PRO — Razorpay TEST plan_id resolving to "pro" + // Scale-to-zero idle-scaler (deploy_idle_scaler.go, Task #54). INERT unless // DeployScaleToZeroEnabled is true — the master flag (shared name with the // api's wake-path flag). When off, the idle-scaler sweep is a no-op (no k8s @@ -305,6 +323,20 @@ func Load() *Config { FlowSyntheticDisabled: os.Getenv("FLOW_SYNTHETIC_DISABLED"), FlowSyntheticJWTSecret: os.Getenv("JWT_SECRET"), + // Layer-3 payment prober. INERT unless PAYMENT_PROBE_ENABLED=true + // (default off, the DoD habit). JWTSecret reuses JWT_SECRET so the worker + // mints a Brevo-free session JWT the api verifies against. The upgrade leg + // is gated on the TEST webhook secret + test plan id (skips clean + // otherwise) — NEVER the live secret, NEVER a real charge. Defaults applied + // inside jobs.PaymentProbeConfig.Defaults(). + PaymentProbeEnabled: os.Getenv("PAYMENT_PROBE_ENABLED") == "true", + PaymentProbeBaseURL: os.Getenv("PAYMENT_PROBE_BASE_URL"), + PaymentProbeJWTSecret: os.Getenv("JWT_SECRET"), + PaymentProbeEmail: os.Getenv("PAYMENT_PROBE_EMAIL"), + PaymentProbeTier: os.Getenv("PAYMENT_PROBE_TIER"), + PaymentProbeTestWebhookSecret: os.Getenv("RAZORPAY_TEST_WEBHOOK_SECRET"), + PaymentProbeTestPlanIDPro: os.Getenv("PAYMENT_PROBE_TEST_PLAN_ID_PRO"), + // Scale-to-zero idle-scaler (Task #54). Default OFF; idle threshold // default 30 min (parsed below). DeployScaleToZeroEnabled: os.Getenv("DEPLOY_SCALE_TO_ZERO_ENABLED") == "true", diff --git a/internal/jobs/payment_probe.go b/internal/jobs/payment_probe.go new file mode 100644 index 0000000..5b94f99 --- /dev/null +++ b/internal/jobs/payment_probe.go @@ -0,0 +1,968 @@ +package jobs + +// payment_probe.go — Layer-3 high-frequency payment-health synthetic +// (the money heartbeat). +// +// Forum verdict (docs/ci/FORUM-PAYMENT-E2E-TOOLING.md §4 Layer 3): the +// continuous, fastest, most-deterministic money-path signal is an +// in-cluster Go worker prober that drives the iframe-free API/webhook +// contract path every N min — NOT a browser driver. This file is that +// Layer 3. It mirrors auth_probe.go / deploy_probe.go exactly: +// +// - a River periodic job (every 5 min, like auth_probe), +// - a per-leg result enum pass|fail|degraded, +// - instant_payment_probe_outcome_total{leg,result} + an InstantPaymentProbe +// NR event (cohort-tagged, excluded from business metrics), +// - audit_log row + structured slog ERROR line on fail (the NR fallback +// when /metrics is unscrapeable), +// - degraded handling when creds/flags are unset (config drift, not an +// outage — never pages). +// +// # Flag-gated OFF by default (the DoD habit) +// +// The WHOLE prober is inert unless PAYMENT_PROBE_ENABLED=true. A single env +// flip kills it instantly. Until the operator lights the flag, the periodic +// registration is present but produces ZERO traffic (Work() no-ops first +// thing on the disabled flag). This is the proven flow_synthetic pattern. +// +// # No real money — EVER +// +// Every prod-safe leg is contract-only: it asserts the SHAPE of the payment +// funnel without driving a real charge. The forum's central thesis is that +// what protects real money is the assertion target + key isolation, not the +// tool. So this prober: +// +// - never hits live Razorpay's hosted checkout (the checkout leg asserts +// the api endpoint is reachable + returns a sane shape — a short_url in +// a test-cohort context, OR the honest billing_not_configured / 4xx +// blocked-but-alive shape — a 5xx crash is the only fail); +// - asserts the webhook SECURITY contract (an unsigned/garbage payload is +// rejected 400 invalid_signature) — a positive proof the signature gate +// is live, with no money implication at all; +// - mints a fresh cohort team for the OPTIONAL upgrade leg and injects a +// correctly-signed TEST-mode subscription.charged (never a live charge), +// reaping the cohort after — and that leg is gated on the test webhook +// secret being configured (skips clean otherwise). +// +// # Truth surfaces (rule 12) +// +// The upgrade leg's pass is NOT a webhook 200. It is the post-webhook +// downstream state: teams.plan_tier advanced to the entitled tier. The +// prod-safe legs assert real response shape, not a bare 200. + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "database/sql" + "encoding/base64" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "strings" + "time" + + "github.com/google/uuid" + "github.com/riverqueue/river" + "go.opentelemetry.io/otel" + + "instant.dev/common/analyticsevent" + "instant.dev/worker/internal/metrics" +) + +// PaymentProbePromMetrics is the production PaymentProbeMetrics implementation +// — emits to the Prom counter + histogram registered in +// internal/metrics/metrics.go. Stateless; a single instance is shared across +// the worker. Mirrors AuthProbePromMetrics / DeployProbePromMetrics in shape. +type PaymentProbePromMetrics struct{} + +// IncOutcome bumps instant_payment_probe_outcome_total{leg, result}. +func (PaymentProbePromMetrics) IncOutcome(leg, result string) { + metrics.PaymentProbeOutcomeTotal.WithLabelValues(leg, result).Inc() +} + +// ObserveLatency records on instant_payment_probe_latency_seconds{leg}. +func (PaymentProbePromMetrics) ObserveLatency(leg string, d time.Duration) { + metrics.PaymentProbeLatencySeconds.WithLabelValues(leg).Observe(d.Seconds()) +} + +// paymentProbeInterval is the dispatch cadence. 5 minutes matches the +// auth_probe cadence knee: short enough that a paid-funnel regression pages +// inside the 10-minute alert window, long enough that the prod-safe legs +// don't flood the api (4 legs × 12 ticks/hour = 48 contract requests/hour, +// negligible). The optional upgrade leg mints + reaps one cohort team per +// tick — also negligible against the platform DB. +const paymentProbeInterval = 5 * time.Minute + +// paymentProbeHTTPTimeout caps any single HTTP request — the hard ceiling a +// TCP black-hole at the load balancer can't pin a goroutine past. The per-leg +// budgets are enforced separately via context deadlines. +const paymentProbeHTTPTimeout = 15 * time.Second + +// paymentProbeLeg* are the leg names emitted as the `leg` Prometheus label +// and the `leg=` log key. Constants (not inline strings) so a test asserts +// the exact label values the alert NRQL keys on, and the catalog/dashboard +// stay in lockstep. +const ( + paymentProbeLegCheckout = "checkout_reachable" // POST /api/v1/billing/checkout — non-5xx + sane shape + paymentProbeLegBillingState = "billing_state" // GET /api/v1/billing — non-5xx + paymentProbeLegInvoices = "invoices_reachable" // GET /api/v1/billing/invoices — non-5xx + paymentProbeLegWebhookSecurity = "webhook_security" // POST /razorpay/webhook (garbage) — 400 invalid_signature + paymentProbeLegUpgrade = "upgrade_webhook_e2e" // mint cohort → signed test webhook → tier flip → reap +) + +// PaymentProbeLegsForTest exposes the leg-id set so the external _test package +// can assert the canonical vocabulary (the dashboard grid + alert NRQL key on +// these EXACT strings). Returned in the order Work runs them. +func PaymentProbeLegsForTest() []string { + return []string{ + paymentProbeLegCheckout, + paymentProbeLegBillingState, + paymentProbeLegInvoices, + paymentProbeLegWebhookSecurity, + paymentProbeLegUpgrade, + } +} + +// paymentProbeResult* are the outcome enum values emitted as the `result` +// label. +// +// pass — leg met all assertions inside its latency budget. +// fail — leg failed an assertion (5xx crash, missing security +// rejection, tier didn't flip). Triggers audit_log row + +// structured ERROR slog line + NR alert. +// degraded — leg passed assertions but crossed its latency budget, OR is +// configured-off (no bearer / no test webhook secret). Tracked +// separately so a slow-but-working endpoint — or a not-yet-wired +// operator secret — doesn't page. +const ( + paymentProbeResultPass = "pass" + paymentProbeResultFail = "fail" + paymentProbeResultDegraded = "degraded" +) + +// paymentProbeLegLatencyBudgets is the per-leg latency budget. Crossing the +// budget is recorded as result="degraded" (slow-but-correct) so it stays +// distinguishable from a real outage. The checkout + upgrade legs get the +// widest budgets: checkout validates the plan before touching Razorpay, and +// the upgrade leg mints a cohort team + injects a webhook + reads back the +// tier (a few DB round-trips + one HTTP call). +var paymentProbeLegLatencyBudgets = map[string]time.Duration{ + paymentProbeLegCheckout: 5 * time.Second, + paymentProbeLegBillingState: 2 * time.Second, + paymentProbeLegInvoices: 2 * time.Second, + paymentProbeLegWebhookSecurity: 2 * time.Second, + paymentProbeLegUpgrade: 8 * time.Second, +} + +// paymentProbeDefaultBaseURL is the production api host probed by default. +// Overridable via PAYMENT_PROBE_BASE_URL so a dev/staging worker probes its +// own cluster's api (same convention as AUTH_PROBE_BASE_URL). +const paymentProbeDefaultBaseURL = "https://api.instanode.dev" + +// auditKindPaymentProbeFailed is the audit_log kind emitted on a probe leg +// failure. Operators correlate audit_log rows + structured log lines + the NR +// alert on this kind for a single triage entry-point. Distinct from +// auth_probe_failed / deploy_probe_failed / flow_test_failed. +const auditKindPaymentProbeFailed = "payment_probe_failed" + +// paymentProbeActor is the actor string written to audit_log so a join on +// actor='system:payment_probe' enumerates every payment-probe failure across +// time. Distinct from system:auth_probe / system:deploy_probe / system:flow_synthetic. +const paymentProbeActor = "system:payment_probe" + +// paymentProbeUserAgent identifies the prober's requests in api logs. Mirrors +// instanode-auth-probe/1 / instanode-flow-synthetic/1. +const paymentProbeUserAgent = "instanode-payment-probe/1" + +// paymentProbeInvalidSignatureCode is the api's canonical error_code +// (api/internal/handlers/billing.go RazorpayWebhook) returned with HTTP 400 +// when X-Razorpay-Signature does not match the HMAC over the raw body. The +// webhook-security leg asserts this EXACT code so a future regression that +// silently accepts unsigned payloads (or returns a different error envelope) +// reds the leg. Locked in lockstep with the api's typed-error contract. +const paymentProbeInvalidSignatureCode = "invalid_signature" + +// paymentProbeGarbageWebhookBody is the deliberately-unsigned, junk payload +// the security leg POSTs to /razorpay/webhook. It is valid-ish JSON (so the +// handler reaches the signature gate rather than bailing on a parse error +// first) carrying NO valid signature — the api MUST reject it 400 +// invalid_signature. A constant so the assertion + the value live in one place. +const paymentProbeGarbageWebhookBody = `{"event":"subscription.charged","payload":{"synthetic":"payment-probe-security-leg-not-a-real-webhook"}}` + +// PaymentProbeArgs is the River job payload — no fields, every tick is a full +// sweep of the legs against the configured base URL. +type PaymentProbeArgs struct{} + +// Kind is the River worker key. +func (PaymentProbeArgs) Kind() string { return "payment_probe" } + +// PaymentProbeMetrics is the narrow surface the worker uses to emit outcome +// counters + latency observations. Extracted as an interface so tests can +// capture emissions without scraping the real /metrics registry (avoids +// cross-test cardinality leaks). +type PaymentProbeMetrics interface { + // IncOutcome bumps instant_payment_probe_outcome_total{leg, result} by 1. + IncOutcome(leg, result string) + // ObserveLatency records on instant_payment_probe_latency_seconds{leg}. + // Called only when an HTTP response was received (DNS/TCP errors omit the + // observation so the histogram isn't polluted with 0s timeouts). + ObserveLatency(leg string, d time.Duration) +} + +// PaymentProbeConfig bundles the runtime tunables. The prober is INERT unless +// Enabled is true (the master flag). All other fields are optional — +// Defaults() fills the gaps. +type PaymentProbeConfig struct { + Enabled bool // PAYMENT_PROBE_ENABLED — master flag; false = whole prober no-op + + BaseURL string // PAYMENT_PROBE_BASE_URL — default https://api.instanode.dev + + // JWTSecret mints the Brevo-free session JWT for the authed prod-safe legs + // (checkout / billing / invoices), reusing the synthetic flow team. Reuses + // JWT_SECRET (the same value the api verifies against). Empty → the authed + // legs degrade (config drift, not an outage). + JWTSecret string + + // Email + Tier identify the synthetic flow team the authed legs run as. They + // default to the same cohort-tagged synthetic team flow_synthetic seeds, so + // the payment prober never needs its own team-seeding path — it reuses the + // is_test_cohort=true team that every team-iterating background job already + // no-ops for. + Email string // PAYMENT_PROBE_EMAIL — default synthetic flow email + Tier string // PAYMENT_PROBE_TIER — default free + + // TestWebhookSecret gates the OPTIONAL upgrade leg. When set (the operator + // has wired RAZORPAY_TEST_WEBHOOK_SECRET / E2E_RAZORPAY_WEBHOOK_SECRET into + // the worker), the upgrade leg mints a fresh cohort team and injects a + // correctly-signed TEST-mode subscription.charged, then asserts the tier + // flipped and reaps the team. Empty → the upgrade leg skips clean + // (result=degraded, reason=test-secret-unset). Never the LIVE secret — this + // leg drives no live Razorpay and no real charge. + TestWebhookSecret string + + // TestPlanIDPro is the Razorpay TEST plan_id whose planIDToTier resolution + // is "pro" — stamped into the injected subscription.charged so the api + // upgrades the cohort team to pro. Empty + a set TestWebhookSecret → the + // upgrade leg degrades (the secret alone can't drive a tier resolution). + TestPlanIDPro string +} + +// Defaults fills empty fields with their paymentProbeDefault* counterparts and +// normalises the base URL. Returns a copy so the caller's input is not mutated. +func (c PaymentProbeConfig) Defaults() PaymentProbeConfig { + out := c + if out.BaseURL == "" { + out.BaseURL = paymentProbeDefaultBaseURL + } + if out.Email == "" { + out.Email = flowSyntheticDefaultEmail + } + if out.Tier == "" { + out.Tier = flowSyntheticDefaultTier + } + out.BaseURL = strings.TrimRight(out.BaseURL, "/") + return out +} + +// PaymentProbeWorker is the River worker. db is used for: minting the upgrade +// leg's cohort team, the tier read-back, the reap, and audit_log insertions on +// fail (nil disables those but the prod-safe legs still run + metrics still +// emit — fail-open). httpCli drives all HTTP probes; nil installs a default +// with the global timeout. emitter pushes the InstantPaymentProbe custom event +// to NR; nil is tolerated (the analyticsevent helper no-ops on a nil emitter). +type PaymentProbeWorker struct { + river.WorkerDefaults[PaymentProbeArgs] + db *sql.DB + httpCli *http.Client + metrics PaymentProbeMetrics + emitter analyticsevent.Emitter + cfg PaymentProbeConfig + + // budgetOverride is a test-only per-leg latency-budget override (nil map = + // use paymentProbeLegLatencyBudgets). A 0-duration override makes a leg's + // degraded-latency branch reachable without a real slow server. Production + // wiring leaves this nil. + budgetOverride map[string]time.Duration + + // nowUnix is the clock seam for the injected webhook's created_at (the api's + // ±5-min replay window). nil → time.Now().Unix(). Test-only. + nowUnix func() int64 +} + +// NewPaymentProbeWorker constructs the worker. metrics is required — pass the +// production PaymentProbePromMetrics or a test fake. emitter may be the noop +// emitter when NR is not configured (analyticsevent.Factory returns one). +func NewPaymentProbeWorker(db *sql.DB, httpCli *http.Client, m PaymentProbeMetrics, emitter analyticsevent.Emitter, cfg PaymentProbeConfig) *PaymentProbeWorker { + if httpCli == nil { + httpCli = &http.Client{ + Timeout: paymentProbeHTTPTimeout, + // CheckRedirect: refuse redirects on every leg — a probe that silently + // follows a 302 to a different host would mask a misrouted DNS / LB + // config change. Reuses the flow_synthetic refuse-redirect hook. + CheckRedirect: flowSyntheticNoRedirect, + } + } + return &PaymentProbeWorker{ + db: db, + httpCli: httpCli, + metrics: m, + emitter: emitter, + cfg: cfg.Defaults(), + } +} + +// budgetFor returns the per-leg latency budget, honouring a test override. +func (w *PaymentProbeWorker) budgetFor(leg string) time.Duration { + if w.budgetOverride != nil { + if d, ok := w.budgetOverride[leg]; ok { + return d + } + } + return paymentProbeLegLatencyBudgets[leg] +} + +// SetBudgetOverrideForTest installs a per-leg latency-budget override so the +// external _test package can drive the degraded-latency branches +// deterministically (a 0 budget makes any real latency "over budget"). +func (w *PaymentProbeWorker) SetBudgetOverrideForTest(m map[string]time.Duration) { + w.budgetOverride = m +} + +// SetNowUnixForTest installs a clock seam for the injected webhook's +// created_at so a test can stamp a value inside (or outside) the api's +// ±5-min replay window deterministically. +func (w *PaymentProbeWorker) SetNowUnixForTest(fn func() int64) { w.nowUnix = fn } + +// effectiveNowUnix returns the current Unix second, honouring the test seam. +func (w *PaymentProbeWorker) effectiveNowUnix() int64 { + if w.nowUnix != nil { + return w.nowUnix() + } + return time.Now().Unix() +} + +// Work runs one sweep of the payment-health legs. Each leg runs sequentially +// (not in parallel) so a slow leg doesn't artificially mask another leg's +// latency in the histogram. The whole prober is a no-op unless the master flag +// is set. +// +// Returns nil unconditionally: a River retry would just queue the next tick +// faster than the cadence; the metric + event + audit_log already capture the +// failure for the operator. +func (w *PaymentProbeWorker) Work(ctx context.Context, job *river.Job[PaymentProbeArgs]) error { + ctx, span := otel.Tracer("instant.dev/worker").Start(ctx, "job.payment_probe") + defer span.End() + + if !w.cfg.Enabled { + slog.Debug("jobs.payment_probe.disabled", "reason", "PAYMENT_PROBE_ENABLED unset", "job_id", job.ID) + return nil + } + + start := time.Now() + runID := uuid.NewString() + + // Mint the session JWT once for the authed prod-safe legs. When JWT_SECRET + // is unset the authed legs degrade (config drift, not an outage). + bearer, mintErr := w.mintProbeSession() + if mintErr != nil { + slog.Warn("jobs.payment_probe.mint_failed", "reason", mintErr.Error()) + } + + results := map[string]string{} + + // Leg 1 — checkout reachability (authed, contract-only, non-charging). + results[paymentProbeLegCheckout] = w.runLeg(ctx, runID, paymentProbeLegCheckout, func(lctx context.Context) paymentProbeLegResult { + if bearer == "" { + return paymentProbeLegResult{leg: paymentProbeLegCheckout, result: paymentProbeResultDegraded, reason: "JWT_SECRET unset — checkout leg skipped"} + } + return w.legCheckout(lctx, bearer) + }) + + // Leg 2 — billing-state reachability (authed, read-only). + results[paymentProbeLegBillingState] = w.runLeg(ctx, runID, paymentProbeLegBillingState, func(lctx context.Context) paymentProbeLegResult { + if bearer == "" { + return paymentProbeLegResult{leg: paymentProbeLegBillingState, result: paymentProbeResultDegraded, reason: "JWT_SECRET unset — billing leg skipped"} + } + return w.legGetReachable(lctx, paymentProbeLegBillingState, "/api/v1/billing", bearer) + }) + + // Leg 3 — invoices reachability (authed, read-only). + results[paymentProbeLegInvoices] = w.runLeg(ctx, runID, paymentProbeLegInvoices, func(lctx context.Context) paymentProbeLegResult { + if bearer == "" { + return paymentProbeLegResult{leg: paymentProbeLegInvoices, result: paymentProbeResultDegraded, reason: "JWT_SECRET unset — invoices leg skipped"} + } + return w.legGetReachable(lctx, paymentProbeLegInvoices, "/api/v1/billing/invoices", bearer) + }) + + // Leg 4 — webhook security contract (no auth — the gate must reject a + // garbage payload). Always runs; needs no secrets. + results[paymentProbeLegWebhookSecurity] = w.runLeg(ctx, runID, paymentProbeLegWebhookSecurity, func(lctx context.Context) paymentProbeLegResult { + return w.legWebhookSecurity(lctx) + }) + + // Leg 5 — OPTIONAL test-mode upgrade proof. Gated on the test webhook + // secret + a test plan id + a DB. Skips clean (degraded) otherwise. + results[paymentProbeLegUpgrade] = w.runLeg(ctx, runID, paymentProbeLegUpgrade, func(lctx context.Context) paymentProbeLegResult { + return w.legUpgradeWebhook(lctx, runID) + }) + + slog.Info("jobs.payment_probe.completed", + "run_id", runID, + "results", results, + "duration_ms", time.Since(start).Milliseconds(), + "job_id", job.ID, + ) + return nil +} + +// paymentProbeLegResult bundles one leg's outcome for the record dispatcher. +// observeLatency is true when the leg should record a histogram observation +// (i.e. an HTTP response / DB round-trip was actually performed — a +// config-skipped leg has no meaningful latency to record). +type paymentProbeLegResult struct { + leg string + result string + reason string + latency time.Duration + observeLatency bool + httpStatus int +} + +// runLeg is the per-leg isolation + record wrapper. It runs fn under its own +// timeout (2× the leg budget, capped by the global HTTP timeout) and a +// recover() boundary so one leg's panic can't poison the sweep or wedge the +// River pool (worker convention 3), then records the result. Returns the +// result string for the completion log. +func (w *PaymentProbeWorker) runLeg(ctx context.Context, runID, leg string, fn func(context.Context) paymentProbeLegResult) (out string) { + budget := w.budgetFor(leg) + hardWall := budget * 2 + if hardWall == 0 || hardWall > paymentProbeHTTPTimeout { + hardWall = paymentProbeHTTPTimeout + } + lctx, cancel := context.WithTimeout(ctx, hardWall) + defer cancel() + + var r paymentProbeLegResult + func() { + defer func() { + if rec := recover(); rec != nil { + r = paymentProbeLegResult{leg: leg, result: paymentProbeResultFail, reason: fmt.Sprintf("panic: %v", rec)} + } + }() + r = fn(lctx) + }() + if r.leg == "" { + r.leg = leg + } + + w.record(ctx, runID, r) + return r.result +} + +// record emits the per-leg Prom metric + the InstantPaymentProbe NR event + +// (on fail) the audit_log row and structured ERROR slog line. Dual-surface +// emit (metric + event + log) so a failure is visible even if one surface is +// down. Mirrors flow_synthetic.record / auth_probe.recordLeg. +func (w *PaymentProbeWorker) record(ctx context.Context, runID string, r paymentProbeLegResult) { + if w.metrics != nil { + w.metrics.IncOutcome(r.leg, r.result) + if r.observeLatency { + w.metrics.ObserveLatency(r.leg, r.latency) + } + } + + // Push the custom event to NR. cohort=synthetic is baked in by the + // PaymentProbe event's Attrs so business/funnel dashboards exclude it. + analyticsevent.RecordPaymentProbe(ctx, w.emitter, analyticsevent.PaymentProbe{ + Leg: r.leg, + Result: r.result, + LatencyMs: r.latency.Milliseconds(), + Reason: r.reason, + HTTPStatus: r.httpStatus, + SyntheticRunID: runID, + }) + + switch r.result { + case paymentProbeResultFail: + w.emitPaymentProbeFailed(ctx, runID, r) + case paymentProbeResultDegraded: + slog.Warn("payment_probe_degraded", + "leg", r.leg, "reason", r.reason, + "latency_ms", r.latency.Milliseconds(), "http_status", r.httpStatus, + ) + default: + slog.Debug("payment_probe_pass", + "leg", r.leg, + "latency_ms", r.latency.Milliseconds(), "http_status", r.httpStatus, + ) + } +} + +// emitPaymentProbeFailed writes the failure audit row + the structured ERROR +// slog line. The log key (payment_probe_failed) is the NR fallback when the +// /metrics scrape is itself down. Mirrors emitAuthProbeFailed / emitFlowTestFailed. +func (w *PaymentProbeWorker) emitPaymentProbeFailed(ctx context.Context, runID string, r paymentProbeLegResult) { + slog.Error("payment_probe_failed", + "leg", r.leg, "reason", r.reason, + "http_status", r.httpStatus, "latency_ms", r.latency.Milliseconds(), + "run_id", runID, + ) + if w.db == nil { + return + } + meta := map[string]any{ + "leg": r.leg, "reason": r.reason, "http_status": r.httpStatus, + "latency_ms": r.latency.Milliseconds(), "run_id": runID, "base_url": w.cfg.BaseURL, + } + metaBytes, _ := json.Marshal(meta) + summary := fmt.Sprintf("payment probe leg=%s failed: %s", r.leg, r.reason) + // team_id is NULL — probe failures are platform-level, not tenant-scoped. + if _, err := w.db.ExecContext(ctx, ` + INSERT INTO audit_log (team_id, actor, kind, summary, metadata) + VALUES (NULL, $1, $2, $3, $4) + `, paymentProbeActor, auditKindPaymentProbeFailed, summary, metaBytes); err != nil { + slog.Warn("jobs.payment_probe.audit_insert_failed", "leg", r.leg, "error", err) + } +} + +// ─── Leg 1: checkout reachability ───────────────────────────────────────────── + +// legCheckout drives POST /api/v1/billing/checkout with the synthetic session +// and asserts the endpoint is reachable + does not 5xx. Razorpay live recurring +// is operator-blocked (project_razorpay_recurring_not_enabled.md), so a real +// charge can't be driven — a 402/409/502-with-known-body (Razorpay-blocked) is +// an acceptable "alive-but-blocked" shape. Only a 5xx CRASH (a checkout handler +// panic/regression) fails the leg. Truth surface: the checkout endpoint's +// non-crash response — catches a checkout handler that panics/regresses even +// while Razorpay itself is blocked. Contract-only — drives no real charge. +func (w *PaymentProbeWorker) legCheckout(ctx context.Context, bearer string) paymentProbeLegResult { + budget := w.budgetFor(paymentProbeLegCheckout) + r := paymentProbeLegResult{leg: paymentProbeLegCheckout} + + target := w.cfg.BaseURL + "/api/v1/billing/checkout" + // The body mirrors the dashboard's "Upgrade to Pro" call; the handler + // validates the plan before touching Razorpay, so even a blocked account + // exercises the handler path. http.NewRequestWithContext can only error on + // an unparseable URL or a bad method; target is built from the + // Defaults()-normalised + ValidatePaymentProbeBaseURL'd base + a constant + // path, and the method is a constant — so the err is unreachable and `_`'d + // to keep the patch-coverage gate at 100% (same posture as + // flow_synthetic.reapResource / auth_probe.legExchangeHeaders). + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, target, strings.NewReader(`{"plan":"pro"}`)) + req.Header.Set("Authorization", "Bearer "+bearer) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", paymentProbeUserAgent) + + start := time.Now() + resp, err := w.httpCli.Do(req) + r.latency = time.Since(start) + if err != nil { + r.result = paymentProbeResultFail + r.reason = "http_error: " + err.Error() + return r + } + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + + r.observeLatency = true + r.httpStatus = resp.StatusCode + // Contract-only: any non-5xx means the checkout handler is alive and + // responded (a short_url, or a known blocked/validation status). A 5xx is a + // handler crash/regression → fail. + if resp.StatusCode >= 500 { + r.result = paymentProbeResultFail + r.reason = fmt.Sprintf("status=%d (checkout endpoint 5xx/crash); body=%s", resp.StatusCode, truncateForLog(string(body), 256)) + return r + } + if r.latency > budget { + r.result = paymentProbeResultDegraded + r.reason = fmt.Sprintf("checkout responded %d but latency=%dms over budget=%dms", resp.StatusCode, r.latency.Milliseconds(), budget.Milliseconds()) + return r + } + r.result = paymentProbeResultPass + r.reason = fmt.Sprintf("checkout endpoint alive (status=%d; contract-only, Razorpay live operator-blocked)", resp.StatusCode) + return r +} + +// ─── Legs 2 & 3: billing-state + invoices reachability ──────────────────────── + +// legGetReachable drives a GET against an authed billing read endpoint and +// asserts the endpoint is reachable + does not 5xx. Shared by the billing_state +// (GET /api/v1/billing) and invoices_reachable (GET /api/v1/billing/invoices) +// legs — both are read-only money-funnel surfaces a paying customer hits to see +// their plan + history; a 5xx on either is a paid-tier UX regression. Truth +// surface: the endpoint's non-crash response. +func (w *PaymentProbeWorker) legGetReachable(ctx context.Context, leg, path, bearer string) paymentProbeLegResult { + budget := w.budgetFor(leg) + r := paymentProbeLegResult{leg: leg} + + target := w.cfg.BaseURL + path + // Unreachable build error `_`'d — see legCheckout for the rationale (the + // base URL is validated/normalised + path is a constant + GET is constant). + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + req.Header.Set("Authorization", "Bearer "+bearer) + req.Header.Set("User-Agent", paymentProbeUserAgent) + + start := time.Now() + resp, err := w.httpCli.Do(req) + r.latency = time.Since(start) + if err != nil { + r.result = paymentProbeResultFail + r.reason = "http_error: " + err.Error() + return r + } + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + + r.observeLatency = true + r.httpStatus = resp.StatusCode + if resp.StatusCode >= 500 { + r.result = paymentProbeResultFail + r.reason = fmt.Sprintf("status=%d (%s endpoint 5xx/crash); body=%s", resp.StatusCode, path, truncateForLog(string(body), 256)) + return r + } + if r.latency > budget { + r.result = paymentProbeResultDegraded + r.reason = fmt.Sprintf("%s responded %d but latency=%dms over budget=%dms", path, resp.StatusCode, r.latency.Milliseconds(), budget.Milliseconds()) + return r + } + r.result = paymentProbeResultPass + r.reason = fmt.Sprintf("%s endpoint alive (status=%d)", path, resp.StatusCode) + return r +} + +// ─── Leg 4: webhook security contract ───────────────────────────────────────── + +// legWebhookSecurity POSTs a deliberately-unsigned garbage payload to +// /razorpay/webhook and asserts the api REJECTS it with 400 invalid_signature. +// This is a positive proof the signature gate is live — the security backstop +// that stops a forged "success" from driving a free upgrade. No money +// implication: nothing is charged, nothing is upgraded. A 2xx (the gate let an +// unsigned payload through — a critical security regression), or any 4xx whose +// error_code is NOT invalid_signature, fails the leg. Truth surface: the +// rejection envelope's error_code, not a bare status. +func (w *PaymentProbeWorker) legWebhookSecurity(ctx context.Context) paymentProbeLegResult { + budget := w.budgetFor(paymentProbeLegWebhookSecurity) + r := paymentProbeLegResult{leg: paymentProbeLegWebhookSecurity} + + target := w.cfg.BaseURL + "/razorpay/webhook" + // Unreachable build error `_`'d — see legCheckout for the rationale. + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, target, strings.NewReader(paymentProbeGarbageWebhookBody)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", paymentProbeUserAgent) + // Intentionally NO X-Razorpay-Signature header — the gate must reject it. + + start := time.Now() + resp, err := w.httpCli.Do(req) + r.latency = time.Since(start) + if err != nil { + r.result = paymentProbeResultFail + r.reason = "http_error: " + err.Error() + return r + } + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + + r.observeLatency = true + r.httpStatus = resp.StatusCode + + // A 2xx here is the catastrophic case: the api accepted an unsigned payload. + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + r.result = paymentProbeResultFail + r.reason = fmt.Sprintf("SECURITY: unsigned payload ACCEPTED (status=%d, want 400 invalid_signature); body=%s", resp.StatusCode, truncateForLog(string(body), 256)) + return r + } + if resp.StatusCode != http.StatusBadRequest { + r.result = paymentProbeResultFail + r.reason = fmt.Sprintf("status=%d (want 400 invalid_signature); body=%s", resp.StatusCode, truncateForLog(string(body), 256)) + return r + } + var parsed struct { + Error string `json:"error"` + } + _ = json.Unmarshal(body, &parsed) + if parsed.Error != paymentProbeInvalidSignatureCode { + r.result = paymentProbeResultFail + r.reason = fmt.Sprintf("400 but unexpected error=%q (want %q); body=%s", parsed.Error, paymentProbeInvalidSignatureCode, truncateForLog(string(body), 256)) + return r + } + if r.latency > budget { + r.result = paymentProbeResultDegraded + r.reason = fmt.Sprintf("webhook security gate ok but latency=%dms over budget=%dms", r.latency.Milliseconds(), budget.Milliseconds()) + return r + } + r.result = paymentProbeResultPass + r.reason = "webhook signature gate rejected unsigned payload (400 invalid_signature)" + return r +} + +// ─── Leg 5: OPTIONAL test-mode upgrade proof ────────────────────────────────── + +// legUpgradeWebhook is the full upgrade-health proof: it mints a fresh +// is_test_cohort=true team, injects a correctly-signed TEST-mode +// subscription.charged at /razorpay/webhook (reusing the api's HMAC-SHA256 +// raw-body scheme), then asserts teams.plan_tier flipped to the entitled tier +// (the rule-12 downstream truth surface — NOT the webhook 200), and reaps the +// team. It drives NO live Razorpay and charges NO real money: the webhook is +// signed with the TEST secret + carries a TEST plan_id, which the api verifies +// via its test-secret leg and routes through planIDToTier. +// +// Gated on (db != nil) AND TestWebhookSecret AND TestPlanIDPro — any of these +// unset → the leg skips clean (result=degraded, the operator hasn't wired the +// test secret). It NEVER fails on a config gap; it only fails when the secret +// IS wired and the tier did not flip (the real upgrade-pipeline regression). +func (w *PaymentProbeWorker) legUpgradeWebhook(ctx context.Context, runID string) paymentProbeLegResult { + budget := w.budgetFor(paymentProbeLegUpgrade) + r := paymentProbeLegResult{leg: paymentProbeLegUpgrade} + + if w.db == nil { + r.result = paymentProbeResultDegraded + r.reason = "db unavailable — upgrade leg skipped" + return r + } + if w.cfg.TestWebhookSecret == "" { + r.result = paymentProbeResultDegraded + r.reason = "RAZORPAY_TEST_WEBHOOK_SECRET unset — upgrade leg skipped (no test webhook secret)" + return r + } + if w.cfg.TestPlanIDPro == "" { + r.result = paymentProbeResultDegraded + r.reason = "PAYMENT_PROBE_TEST_PLAN_ID_PRO unset — upgrade leg skipped (cannot resolve a test tier)" + return r + } + + start := time.Now() + + // Mint a fresh cohort team for THIS tick (not the shared synthetic team) so + // the free→pro transition is unambiguous and a reap can fully tear it down. + teamID := uuid.NewString() + if err := w.mintUpgradeCohortTeam(ctx, teamID); err != nil { + r.result = paymentProbeResultDegraded + r.reason = "mint cohort team failed (DB drift, not a payment outage): " + err.Error() + return r + } + // Reap is ALWAYS attempted, even when the assertion below fails, so we never + // leak a synthetic team. + defer w.reapUpgradeCohortTeam(ctx, teamID, runID) + + // Inject the correctly-signed TEST-mode subscription.charged. + subID := "sub_payprobe_" + uuid.NewString()[:12] + eventID := "evt_payprobe_" + uuid.NewString() + rawBody := w.subscriptionChargedBody(teamID, subID, w.cfg.TestPlanIDPro, w.effectiveNowUnix()) + status, respBody, err := w.postSignedWebhook(ctx, rawBody, eventID) + r.latency = time.Since(start) + if err != nil { + r.result = paymentProbeResultFail + r.reason = "webhook http_error: " + err.Error() + return r + } + r.observeLatency = true + r.httpStatus = status + if status != http.StatusOK { + r.result = paymentProbeResultFail + r.reason = fmt.Sprintf("signed test webhook status=%d (want 200); body=%s", status, truncateForLog(respBody, 256)) + return r + } + + // Truth surface (rule 12): read back teams.plan_tier. The webhook 200 is + // NOT the proof — the downstream tier flip is. + tier, terr := w.readTeamTier(ctx, teamID) + if terr != nil { + r.result = paymentProbeResultFail + r.reason = "tier read-back failed after 200 webhook: " + terr.Error() + return r + } + if tier == w.cfg.Tier || tier == "free" || tier == "anonymous" { + r.result = paymentProbeResultFail + r.reason = fmt.Sprintf("webhook 200 but team tier did NOT advance (still %q) — upgrade pipeline broken", tier) + return r + } + if r.latency > budget { + r.result = paymentProbeResultDegraded + r.reason = fmt.Sprintf("tier advanced to %q but latency=%dms over budget=%dms", tier, r.latency.Milliseconds(), budget.Milliseconds()) + return r + } + r.result = paymentProbeResultPass + r.reason = fmt.Sprintf("test-mode upgrade proof: free→%s via signed test webhook (no real money)", tier) + return r +} + +// mintUpgradeCohortTeam INSERTs a fresh is_test_cohort=true team at the seeded +// tier so the upgrade leg can prove a free→pro transition. is_test_cohort=true +// means every team-iterating background job no-ops for it (test_cohort.go), and +// the e2e_cohort_sweep job reaps any leak as a backstop. +func (w *PaymentProbeWorker) mintUpgradeCohortTeam(ctx context.Context, teamID string) error { + _, err := w.db.ExecContext(ctx, ` + INSERT INTO teams (id, name, plan_tier, status, is_test_cohort) + VALUES ($1::uuid, $2, $3, 'active', true) + `, teamID, "payment-probe-"+teamID[:8], w.cfg.Tier) + return err +} + +// reapUpgradeCohortTeam tombstones the per-tick cohort team after the leg, so a +// fresh team is minted every tick and nothing accumulates. Best-effort: a +// failed reap is logged (and the e2e_cohort_sweep backstop catches the leak). +// Guarded by is_test_cohort=true in the WHERE so it can NEVER touch a real team. +func (w *PaymentProbeWorker) reapUpgradeCohortTeam(ctx context.Context, teamID, runID string) { + if w.db == nil { + return + } + // Hard-delete the synthetic team. The FK from users/resources is irrelevant + // here — the upgrade leg creates no resources, only the team row + whatever + // the webhook upgrade path wrote (plan_tier on the same row). The + // is_test_cohort=true guard is the safety rail. + if _, err := w.db.ExecContext(ctx, ` + DELETE FROM teams WHERE id = $1::uuid AND is_test_cohort = true + `, teamID); err != nil { + slog.Error("payment_probe_cohort_reap_failed", "team_id", teamID, "run_id", runID, "error", err) + } +} + +// readTeamTier reads teams.plan_tier for the cohort team — the rule-12 truth +// surface for "did the upgrade land". +func (w *PaymentProbeWorker) readTeamTier(ctx context.Context, teamID string) (string, error) { + var tier string + err := w.db.QueryRowContext(ctx, `SELECT plan_tier FROM teams WHERE id = $1::uuid`, teamID).Scan(&tier) + return tier, err +} + +// subscriptionChargedBody builds the EXACT subscription.charged JSON body the +// api's RazorpayWebhook reads, with created_at inside the ±5-min replay window +// and notes.team_id set so resolveTeamFromNotes resolves the cohort team. The +// caller signs THESE bytes and POSTs THEM unchanged (re-marshalling after +// signing would change the byte order and break the HMAC). Mirrors the api +// test's subscriptionChargedRawBody. +func (w *PaymentProbeWorker) subscriptionChargedBody(teamID, subID, planID string, createdAt int64) []byte { + subEntity, _ := json.Marshal(map[string]any{ + "id": subID, + "entity": "subscription", + "plan_id": planID, + "status": "active", + "notes": map[string]any{"team_id": teamID}, + }) + payEntity, _ := json.Marshal(map[string]any{ + "id": "pay_payprobe_" + uuid.NewString()[:12], + "entity": "payment", + "status": "captured", + "amount": 410000, + "currency": "INR", + }) + event := map[string]any{ + "id": "evt_payprobe_" + uuid.NewString(), + "entity": "event", + "event": "subscription.charged", + "created_at": createdAt, + "payload": map[string]any{ + "subscription": map[string]any{"entity": json.RawMessage(subEntity)}, + "payment": map[string]any{"entity": json.RawMessage(payEntity)}, + }, + } + // json.Marshal on this map of marshalable values cannot return an error — + // skip the defensive branch to keep the patch-coverage gate at 100% (same + // posture as flow_synthetic.mintSessionJWT's `_ = json.Marshal(...)`). + body, _ := json.Marshal(event) + return body +} + +// postSignedWebhook signs the EXACT bytes with the TEST webhook secret +// (HMAC-SHA256 over the raw body, the same primitive verifyRazorpaySignature +// checks — guaranteeing parity) and POSTs them unchanged with the +// X-Razorpay-Signature + X-Razorpay-Event-Id headers. Returns the status code + +// a truncated body. Drives the api's TEST-secret verify leg — never live. +func (w *PaymentProbeWorker) postSignedWebhook(ctx context.Context, body []byte, eventID string) (int, string, error) { + sig := paymentProbeSignRazorpay(w.cfg.TestWebhookSecret, body) + target := w.cfg.BaseURL + "/razorpay/webhook" + // Unreachable build error `_`'d — see legCheckout for the rationale. + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, target, strings.NewReader(string(body))) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Razorpay-Signature", sig) + req.Header.Set("X-Razorpay-Event-Id", eventID) + req.Header.Set("User-Agent", paymentProbeUserAgent) + + resp, err := w.httpCli.Do(req) + if err != nil { + return 0, "", err + } + defer func() { _ = resp.Body.Close() }() + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return resp.StatusCode, string(respBody), nil +} + +// paymentProbeSignRazorpay computes hex(HMAC-SHA256(key=secret, msg=body)) — +// the EXACT scheme the api's verifyRazorpaySignature checks (convention rule 9: +// no timestamp prefix, raw-body HMAC). Hand-rolled (same posture as +// flow_synthetic.mintSessionJWT) so the worker stays dependency-light and the +// signing primitive lives beside the prober that uses it. Exported via +// PaymentProbeSignForTest so the _test package can assert parity. +func paymentProbeSignRazorpay(secret string, body []byte) string { + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write(body) + return hex.EncodeToString(mac.Sum(nil)) +} + +// PaymentProbeSignForTest is an exported test seam over the unexported signer so +// the external _test package can assert the HMAC-SHA256 raw-body scheme matches +// the api's verifier without reaching into the unexported function. +func PaymentProbeSignForTest(secret string, body []byte) string { + return paymentProbeSignRazorpay(secret, body) +} + +// ─── session mint (Brevo-free, shared synthetic team) ───────────────────────── + +// mintProbeSession signs a short-lived HS256 session JWT for the shared +// synthetic flow team (the is_test_cohort=true team flow_synthetic seeds), +// matching the claims auth.go issues: {uid, tid, email, jti, iat, exp}. Reuses +// the flow_synthetic stable team/user UUIDs + the flowSyntheticB64 helper so the +// authed prod-safe legs run AS that already-seeded cohort team — the payment +// prober needs no team-seeding path of its own. Errors when JWT_SECRET is unset. +// Identical signing shape to flow_synthetic.mintSessionJWT (rule 16: one +// session-JWT scheme, two callers). +func (w *PaymentProbeWorker) mintProbeSession() (string, error) { + if w.cfg.JWTSecret == "" { + return "", errors.New("JWT_SECRET unset — cannot mint payment-probe session JWT") + } + header := flowSyntheticB64(`{"alg":"HS256","typ":"JWT"}`) + now := time.Now().UTC().Unix() + claims := map[string]any{ + "uid": flowSyntheticUserID.String(), + "tid": flowSyntheticTeamID.String(), + "email": w.cfg.Email, + "jti": uuid.NewString(), + "iat": now, + "exp": now + int64(flowSyntheticSessionMaxAge.Seconds()), + } + claimsJSON, _ := json.Marshal(claims) + body := header + "." + flowSyntheticB64(string(claimsJSON)) + mac := hmac.New(sha256.New, []byte(w.cfg.JWTSecret)) + mac.Write([]byte(body)) + sig := base64.RawURLEncoding.EncodeToString(mac.Sum(nil)) + return body + "." + sig, nil +} + +// ValidatePaymentProbeBaseURL is a startup-time sanity check for the +// PAYMENT_PROBE_BASE_URL env var. Returns an error iff the URL is set but +// unparseable; an empty value is accepted (Defaults() fills the prod host). +// Exported so main.go can fail-fast on a typo rather than discovering the bad +// URL on the first tick. Mirrors ValidateAuthProbeBaseURL / ValidateFlowSyntheticBaseURL. +func ValidatePaymentProbeBaseURL(raw string) error { + if raw == "" { + return nil + } + u, err := url.Parse(raw) + if err != nil { + return fmt.Errorf("PAYMENT_PROBE_BASE_URL parse: %w", err) + } + if u.Scheme != "http" && u.Scheme != "https" { + return errors.New("PAYMENT_PROBE_BASE_URL must be http(s)") + } + if u.Host == "" { + return errors.New("PAYMENT_PROBE_BASE_URL missing host") + } + return nil +} diff --git a/internal/jobs/payment_probe_internal_test.go b/internal/jobs/payment_probe_internal_test.go new file mode 100644 index 0000000..2a886f1 --- /dev/null +++ b/internal/jobs/payment_probe_internal_test.go @@ -0,0 +1,402 @@ +package jobs + +// payment_probe_internal_test.go — white-box tests for unexported helpers in +// payment_probe.go that the black-box (jobs_test) package can't reach: the +// production PromMetrics impl, the clock seam, the default-client construction, +// the nil-db reap guard, and the per-leg http-error / build-error branches. + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" +) + +// TestPaymentProbe_PromMetrics_DoesNotPanic exercises the production +// PaymentProbePromMetrics impl (IncOutcome + ObserveLatency) — they write to the +// process-global Prom registry, so the assertion is simply "no panic / no +// duplicate-registration explosion". +func TestPaymentProbe_PromMetrics_DoesNotPanic(t *testing.T) { + m := PaymentProbePromMetrics{} + m.IncOutcome(paymentProbeLegCheckout, paymentProbeResultPass) + m.ObserveLatency(paymentProbeLegCheckout, 42*time.Millisecond) +} + +// TestPaymentProbe_Kind asserts the River worker key is stable (the periodic +// registration + UniqueOpts key on it). +func TestPaymentProbe_Kind(t *testing.T) { + if got := (PaymentProbeArgs{}).Kind(); got != "payment_probe" { + t.Errorf("Kind() = %q, want payment_probe", got) + } +} + +// TestPaymentProbe_EffectiveNowUnix covers the clock seam: the override fires +// when set, and the real clock fires when not. +func TestPaymentProbe_EffectiveNowUnix(t *testing.T) { + w := &PaymentProbeWorker{} + // Real clock path. + if w.effectiveNowUnix() <= 0 { + t.Error("effectiveNowUnix real clock should be positive") + } + // Seam path. + w.SetNowUnixForTest(func() int64 { return 12345 }) + if got := w.effectiveNowUnix(); got != 12345 { + t.Errorf("effectiveNowUnix seam = %d, want 12345", got) + } +} + +// TestPaymentProbe_NewWorker_DefaultClient covers the nil-httpCli branch in +// NewPaymentProbeWorker (installs a default client with the global timeout + +// refuse-redirect hook). +func TestPaymentProbe_NewWorker_DefaultClient(t *testing.T) { + w := NewPaymentProbeWorker(nil, nil, PaymentProbePromMetrics{}, nil, PaymentProbeConfig{}) + if w.httpCli == nil { + t.Fatal("nil httpCli should install a default client") + } + if w.httpCli.Timeout != paymentProbeHTTPTimeout { + t.Errorf("default client timeout = %v, want %v", w.httpCli.Timeout, paymentProbeHTTPTimeout) + } + // The refuse-redirect hook returns http.ErrUseLastResponse. + if err := w.httpCli.CheckRedirect(nil, nil); err != http.ErrUseLastResponse { + t.Errorf("default client CheckRedirect = %v, want ErrUseLastResponse", err) + } +} + +// TestPaymentProbe_Defaults covers the Defaults() fill branches (empty base URL, +// email, tier → defaults; a set base URL is trimmed). +func TestPaymentProbe_Defaults(t *testing.T) { + out := PaymentProbeConfig{BaseURL: "https://x.example/"}.Defaults() + if out.BaseURL != "https://x.example" { + t.Errorf("trailing slash not trimmed: %q", out.BaseURL) + } + if out.Email == "" || out.Tier == "" { + t.Errorf("email/tier defaults not filled: email=%q tier=%q", out.Email, out.Tier) + } + full := PaymentProbeConfig{}.Defaults() + if full.BaseURL != paymentProbeDefaultBaseURL { + t.Errorf("empty base url default = %q, want %q", full.BaseURL, paymentProbeDefaultBaseURL) + } +} + +// TestPaymentProbe_ReapNilDB covers the db==nil guard in reapUpgradeCohortTeam +// (fail-open: no panic, no-op). Reachable directly only via the white-box seam. +func TestPaymentProbe_ReapNilDB(t *testing.T) { + w := &PaymentProbeWorker{} // nil db + w.reapUpgradeCohortTeam(context.Background(), "00000000-0000-0000-0000-000000000000", "run") +} + +// TestPaymentProbe_LegCheckout_HTTPError covers the http_error branch of +// legCheckout (the http client returns an error — e.g. a closed server). +func TestPaymentProbe_LegCheckout_HTTPError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) + srv.Close() // immediately closed → the request errors + + w := NewPaymentProbeWorker(nil, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + BaseURL: srv.URL, Enabled: true, + }) + r := w.legCheckout(context.Background(), "bearer") + if r.result != paymentProbeResultFail { + t.Errorf("legCheckout http error: want fail, got %q (%s)", r.result, r.reason) + } +} + +// TestPaymentProbe_LegGetReachable_HTTPError covers the http_error branch of the +// shared GET leg. +func TestPaymentProbe_LegGetReachable_HTTPError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) + srv.Close() + + w := NewPaymentProbeWorker(nil, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + BaseURL: srv.URL, Enabled: true, + }) + r := w.legGetReachable(context.Background(), paymentProbeLegBillingState, "/api/v1/billing", "bearer") + if r.result != paymentProbeResultFail { + t.Errorf("legGetReachable http error: want fail, got %q (%s)", r.result, r.reason) + } +} + +// TestPaymentProbe_LegWebhookSecurity_HTTPError covers the http_error branch of +// the webhook-security leg. +func TestPaymentProbe_LegWebhookSecurity_HTTPError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) + srv.Close() + + w := NewPaymentProbeWorker(nil, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + BaseURL: srv.URL, Enabled: true, + }) + r := w.legWebhookSecurity(context.Background()) + if r.result != paymentProbeResultFail { + t.Errorf("legWebhookSecurity http error: want fail, got %q (%s)", r.result, r.reason) + } +} + +// TestPaymentProbe_Upgrade_MintFails_Degrades covers the mint-failure branch of +// legUpgradeWebhook: a DB error on the team INSERT degrades the leg (DB drift, +// not a payment outage) — never a page. +func TestPaymentProbe_Upgrade_MintFails_Degrades(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + mock.ExpectExec(`INSERT INTO teams`).WillReturnError(context.DeadlineExceeded) + + w := NewPaymentProbeWorker(db, http.DefaultClient, PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + Enabled: true, + TestWebhookSecret: "s", + TestPlanIDPro: "plan_test_pro", + Tier: "free", + }) + r := w.legUpgradeWebhook(context.Background(), "run") + if r.result != paymentProbeResultDegraded { + t.Errorf("mint fail: want degraded, got %q (%s)", r.result, r.reason) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock: %v", err) + } +} + +// TestPaymentProbe_Upgrade_ReapFails covers the DELETE-error branch of +// reapUpgradeCohortTeam (logged, best-effort). Driven via the upgrade leg with a +// webhook that fails (so the leg returns before tier read) and a reap that errors. +func TestPaymentProbe_Upgrade_ReapFails(t *testing.T) { + // A server whose webhook returns 500 so the leg fails fast after mint. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"ok":false}`)) + })) + defer srv.Close() + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + mock.ExpectExec(`INSERT INTO teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`DELETE FROM teams`).WillReturnError(context.DeadlineExceeded) // reap fails (logged) + + w := NewPaymentProbeWorker(db, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + Enabled: true, + BaseURL: srv.URL, + TestWebhookSecret: "s", + TestPlanIDPro: "plan_test_pro", + Tier: "free", + }) + r := w.legUpgradeWebhook(context.Background(), "run") + if r.result != paymentProbeResultFail { + t.Errorf("webhook 500: want fail, got %q (%s)", r.result, r.reason) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock: %v", err) + } +} + +// TestPaymentProbe_Upgrade_TierReadFails covers the tier read-back error branch: +// a webhook 200 but the SELECT errors → fail. +func TestPaymentProbe_Upgrade_TierReadFails(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"ok":true}`)) + })) + defer srv.Close() + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + mock.ExpectExec(`INSERT INTO teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery(`SELECT plan_tier FROM teams`).WillReturnError(context.DeadlineExceeded) + mock.ExpectExec(`DELETE FROM teams`).WillReturnResult(sqlmock.NewResult(0, 1)) // deferred reap + + w := NewPaymentProbeWorker(db, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + Enabled: true, + BaseURL: srv.URL, + TestWebhookSecret: "s", + TestPlanIDPro: "plan_test_pro", + Tier: "free", + }) + r := w.legUpgradeWebhook(context.Background(), "run") + if r.result != paymentProbeResultFail { + t.Errorf("tier read fail: want fail, got %q (%s)", r.result, r.reason) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock: %v", err) + } +} + +// TestPaymentProbe_Upgrade_DegradedLatency covers the over-budget branch of the +// upgrade leg: tier flips to pro but the (0) budget is crossed → degraded. +func TestPaymentProbe_Upgrade_DegradedLatency(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"ok":true}`)) + })) + defer srv.Close() + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + mock.ExpectExec(`INSERT INTO teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery(`SELECT plan_tier FROM teams`). + WillReturnRows(sqlmock.NewRows([]string{"plan_tier"}).AddRow("pro")) + mock.ExpectExec(`DELETE FROM teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + + w := NewPaymentProbeWorker(db, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + Enabled: true, + BaseURL: srv.URL, + TestWebhookSecret: "s", + TestPlanIDPro: "plan_test_pro", + Tier: "free", + }) + w.SetBudgetOverrideForTest(map[string]time.Duration{paymentProbeLegUpgrade: 0}) + r := w.legUpgradeWebhook(context.Background(), "run") + if r.result != paymentProbeResultDegraded { + t.Errorf("upgrade 0-budget: want degraded, got %q (%s)", r.result, r.reason) + } +} + +// TestPaymentProbe_EmitFailed_AuditInsertErrors covers the audit-insert error +// branch of emitPaymentProbeFailed (the DB write fails → logged, fail-open). +func TestPaymentProbe_EmitFailed_AuditInsertErrors(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnError(context.DeadlineExceeded) + + w := NewPaymentProbeWorker(db, http.DefaultClient, PaymentProbePromMetrics{}, nil, PaymentProbeConfig{}) + w.emitPaymentProbeFailed(context.Background(), "run", paymentProbeLegResult{ + leg: paymentProbeLegCheckout, result: paymentProbeResultFail, reason: "boom", + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock: %v", err) + } +} + +// TestPaymentProbe_RunLeg_HardWallCap covers runLeg's hardWall cap branch: a +// budget larger than the global HTTP timeout is clamped to the global ceiling. +func TestPaymentProbe_RunLeg_HardWallCap(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"invalid_signature"}`)) + })) + defer srv.Close() + + w := NewPaymentProbeWorker(nil, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + Enabled: true, BaseURL: srv.URL, + }) + // A budget > the global HTTP timeout → hardWall clamps to the global ceiling. + w.SetBudgetOverrideForTest(map[string]time.Duration{ + paymentProbeLegWebhookSecurity: paymentProbeHTTPTimeout * 4, + }) + got := w.runLeg(context.Background(), "run", paymentProbeLegWebhookSecurity, func(ctx context.Context) paymentProbeLegResult { + return w.legWebhookSecurity(ctx) + }) + if got != paymentProbeResultPass { + t.Errorf("hardWall-cap leg: want pass, got %q", got) + } +} + +// TestPaymentProbe_RunLeg_FillsEmptyLeg covers runLeg's `if r.leg == ""` guard: +// a fn that returns a zero-leg result has its leg backfilled from the leg arg so +// the metric/event always carries a leg label. +func TestPaymentProbe_RunLeg_FillsEmptyLeg(t *testing.T) { + fm := &capturingPayMetrics{} + w := NewPaymentProbeWorker(nil, http.DefaultClient, fm, nil, PaymentProbeConfig{Enabled: true}) + got := w.runLeg(context.Background(), "run", paymentProbeLegCheckout, func(context.Context) paymentProbeLegResult { + // Deliberately omit leg → the guard must backfill it. + return paymentProbeLegResult{result: paymentProbeResultPass} + }) + if got != paymentProbeResultPass { + t.Errorf("runLeg result = %q, want pass", got) + } + if fm.lastLeg != paymentProbeLegCheckout { + t.Errorf("empty leg not backfilled: metric leg = %q, want %q", fm.lastLeg, paymentProbeLegCheckout) + } +} + +// TestPaymentProbe_WebhookSecurity_Non400Status covers the webhook-security +// leg's "not 2xx, not 400" branch (e.g. a 500 from the webhook endpoint). +func TestPaymentProbe_WebhookSecurity_Non400Status(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) // 503: not 2xx, not 400 + _, _ = w.Write([]byte(`{"error":"down"}`)) + })) + defer srv.Close() + + w := NewPaymentProbeWorker(nil, srv.Client(), PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + Enabled: true, BaseURL: srv.URL, + }) + r := w.legWebhookSecurity(context.Background()) + if r.result != paymentProbeResultFail { + t.Errorf("webhook 503: want fail, got %q (%s)", r.result, r.reason) + } +} + +// TestPaymentProbe_Upgrade_WebhookTransportError covers the upgrade leg's +// "webhook http_error" branch (the signed-webhook POST transport fails — a +// closed server) AND postSignedWebhook's Do-error return. The cohort team is +// still reaped. +func TestPaymentProbe_Upgrade_WebhookTransportError(t *testing.T) { + closed := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) + base := closed.URL + closed.Close() // the webhook POST now errors at the transport layer + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + mock.ExpectExec(`INSERT INTO teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`DELETE FROM teams`).WillReturnResult(sqlmock.NewResult(0, 1)) // deferred reap + // nil db on the worker would skip the audit; here db is set so the fail audit + // also runs after the deferred reap. + mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + + w := NewPaymentProbeWorker(db, &http.Client{}, PaymentProbePromMetrics{}, nil, PaymentProbeConfig{ + Enabled: true, + BaseURL: base, + TestWebhookSecret: "s", + TestPlanIDPro: "plan_test_pro", + Tier: "free", + }) + // Drive via runLeg so the deferred reap + the fail audit (record) both fire. + got := w.runLeg(context.Background(), "run", paymentProbeLegUpgrade, func(ctx context.Context) paymentProbeLegResult { + return w.legUpgradeWebhook(ctx, "run") + }) + if got != paymentProbeResultFail { + t.Errorf("upgrade webhook transport error: want fail, got %q", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock: %v", err) + } +} + +// capturingPayMetrics records the last (leg, result) for the empty-leg guard test. +type capturingPayMetrics struct { + lastLeg, lastResult string +} + +func (c *capturingPayMetrics) IncOutcome(leg, result string) { c.lastLeg, c.lastResult = leg, result } +func (c *capturingPayMetrics) ObserveLatency(string, time.Duration) {} + +// TestPaymentProbe_SubscriptionChargedBody covers the body builder: it produces +// valid JSON carrying the team_id in notes, the plan_id, and the created_at. +func TestPaymentProbe_SubscriptionChargedBody(t *testing.T) { + w := &PaymentProbeWorker{} + body := w.subscriptionChargedBody("team-uuid", "sub_x", "plan_test_pro", 1700000000) + s := string(body) + for _, want := range []string{"team-uuid", "plan_test_pro", "subscription.charged", "1700000000"} { + if !strings.Contains(s, want) { + t.Errorf("subscriptionChargedBody missing %q in %s", want, s) + } + } +} diff --git a/internal/jobs/payment_probe_test.go b/internal/jobs/payment_probe_test.go new file mode 100644 index 0000000..addc067 --- /dev/null +++ b/internal/jobs/payment_probe_test.go @@ -0,0 +1,679 @@ +package jobs_test + +// payment_probe_test.go — hermetic tests for the Layer-3 PaymentProbeWorker. +// +// Each test stands up an httptest.Server that simulates the api's payment-funnel +// surface (POST /api/v1/billing/checkout, GET /api/v1/billing[/invoices], POST +// /razorpay/webhook) for one scenario, then asserts: +// - the per-leg outcome metric is bumped with the right (leg, result) label, +// - an audit_log row is inserted on result=fail (and NOT on pass/degraded), +// - the flag-off path probes NOTHING (zero probes, no HTTP, no DB), +// - the optional upgrade leg mints → injects a signed test webhook → asserts +// the tier flip (rule-12 truth surface) → reaps the cohort team. +// +// Metric emissions are captured through a fakePaymentProbeMetrics so the +// process-global Prom registry isn't polluted across tests. DB-touching paths +// (upgrade leg + audit) use go-sqlmock with the regexp query matcher. The sweep +// is driven via the shared fakeJob[T] helper (expire_test.go), the same way the +// flow_synthetic / deploy_probe suites drive Work. + +import ( + "context" + "database/sql" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + + "instant.dev/worker/internal/jobs" +) + +// ─── fake metrics ───────────────────────────────────────────────────────────── + +type fakePaymentOutcome struct{ leg, result string } +type fakePaymentLatency struct { + leg string + d time.Duration +} + +// fakePaymentProbeMetrics captures every IncOutcome / ObserveLatency call. +type fakePaymentProbeMetrics struct { + mu sync.Mutex + outcomes []fakePaymentOutcome + latencies []fakePaymentLatency +} + +func (f *fakePaymentProbeMetrics) IncOutcome(leg, result string) { + f.mu.Lock() + defer f.mu.Unlock() + f.outcomes = append(f.outcomes, fakePaymentOutcome{leg, result}) +} + +func (f *fakePaymentProbeMetrics) ObserveLatency(leg string, d time.Duration) { + f.mu.Lock() + defer f.mu.Unlock() + f.latencies = append(f.latencies, fakePaymentLatency{leg, d}) +} + +// resultFor returns the recorded result for a leg ("" if the leg never emitted). +func (f *fakePaymentProbeMetrics) resultFor(leg string) string { + f.mu.Lock() + defer f.mu.Unlock() + for _, o := range f.outcomes { + if o.leg == leg { + return o.result + } + } + return "" +} + +func (f *fakePaymentProbeMetrics) count() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.outcomes) +} + +// ─── api stub server ────────────────────────────────────────────────────────── + +// paymentAPIState configures the stub api's per-endpoint responses for one +// scenario. The defaults (paymentHappyState) are the all-healthy contract. +type paymentAPIState struct { + checkoutStatus int + checkoutBody string + + billingStatus int + billingBody string + + invoicesStatus int + invoicesBody string + + // webhook responses. A request carrying a non-empty X-Razorpay-Signature is + // treated as "signed" (the upgrade leg) and gets webhookSignedResp. Any other + // is "unsigned" (the security leg) and gets webhookUnsignedRsp/Bod. + webhookSignedResp int // status for a signed webhook (upgrade leg) + webhookUnsignedRsp int // status for an unsigned/garbage webhook (security leg) + webhookUnsignedBod string // body for the unsigned rejection (carries error_code) +} + +func paymentHappyState() paymentAPIState { + return paymentAPIState{ + checkoutStatus: http.StatusOK, + checkoutBody: `{"short_url":"https://rzp.test/x"}`, + billingStatus: http.StatusOK, + billingBody: `{"plan_tier":"free"}`, + invoicesStatus: http.StatusOK, + invoicesBody: `{"invoices":[]}`, + webhookSignedResp: http.StatusOK, + webhookUnsignedRsp: http.StatusBadRequest, + webhookUnsignedBod: `{"ok":false,"error":"invalid_signature","message":"bad sig"}`, + } +} + +// newPaymentAPIServer builds the stub api server for a scenario. The webhook +// handler branches on the presence of an X-Razorpay-Signature header — so the +// upgrade leg's signed POST is accepted while the security leg's unsigned POST +// is rejected, exactly the real api's contract surface. +func newPaymentAPIServer(st paymentAPIState) *httptest.Server { + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/billing/checkout", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(st.checkoutStatus) + _, _ = w.Write([]byte(st.checkoutBody)) + }) + mux.HandleFunc("/api/v1/billing/invoices", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(st.invoicesStatus) + _, _ = w.Write([]byte(st.invoicesBody)) + }) + mux.HandleFunc("/api/v1/billing", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(st.billingStatus) + _, _ = w.Write([]byte(st.billingBody)) + }) + mux.HandleFunc("/razorpay/webhook", func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("X-Razorpay-Signature") != "" { + w.WriteHeader(st.webhookSignedResp) + _, _ = w.Write([]byte(`{"ok":true}`)) + return + } + w.WriteHeader(st.webhookUnsignedRsp) + _, _ = w.Write([]byte(st.webhookUnsignedBod)) + }) + return httptest.NewServer(mux) +} + +// ─── fixture ────────────────────────────────────────────────────────────────── + +// paymentFixture bundles a worker wired against the stub server, its captured +// metrics, and (optionally) a sqlmock DB. +type paymentFixture struct { + w *jobs.PaymentProbeWorker + fm *fakePaymentProbeMetrics + db *sql.DB + mock sqlmock.Sqlmock +} + +// newPaymentFixtureNoDB wires a worker with a nil DB — exercises the prod-safe +// legs only (the upgrade leg degrades on the nil-DB guard). An empty JWTSecret +// degrades the authed legs on the JWT_SECRET-unset guard. +func newPaymentFixtureNoDB(t *testing.T, srv *httptest.Server, cfg jobs.PaymentProbeConfig) *paymentFixture { + t.Helper() + cfg.BaseURL = srv.URL + cfg.Enabled = true + fm := &fakePaymentProbeMetrics{} + w := jobs.NewPaymentProbeWorker(nil, srv.Client(), fm, nil, cfg) + return &paymentFixture{w: w, fm: fm} +} + +// newPaymentFixtureDB wires a worker with a sqlmock DB (upgrade leg + audit). +func newPaymentFixtureDB(t *testing.T, srv *httptest.Server, cfg jobs.PaymentProbeConfig) *paymentFixture { + t.Helper() + cfg.BaseURL = srv.URL + cfg.Enabled = true + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + fm := &fakePaymentProbeMetrics{} + w := jobs.NewPaymentProbeWorker(db, srv.Client(), fm, nil, cfg) + return &paymentFixture{w: w, fm: fm, db: db, mock: mock} +} + +func (f *paymentFixture) done() { + if f.db != nil { + _ = f.db.Close() + } +} + +// run drives one full sweep via Work + the shared fakeJob helper. +func (f *paymentFixture) run(t *testing.T) { + t.Helper() + if err := f.w.Work(context.Background(), fakeJob[jobs.PaymentProbeArgs]()); err != nil { + t.Fatalf("Work: %v", err) + } +} + +// ─── flag-off inert ─────────────────────────────────────────────────────────── + +// TestPaymentProbe_FlagOff_ProbesNothing is the headline DoD proof: with +// Enabled=false the worker no-ops — ZERO outcome emissions, no HTTP, no DB. This +// is the inert-in-prod guarantee until the operator lights PAYMENT_PROBE_ENABLED. +func TestPaymentProbe_FlagOff_ProbesNothing(t *testing.T) { + hit := false + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + hit = true + })) + defer srv.Close() + + fm := &fakePaymentProbeMetrics{} + w := jobs.NewPaymentProbeWorker(nil, srv.Client(), fm, nil, jobs.PaymentProbeConfig{ + Enabled: false, // master flag OFF + BaseURL: srv.URL, + }) + if err := w.Work(context.Background(), fakeJob[jobs.PaymentProbeArgs]()); err != nil { + t.Fatalf("Work: %v", err) + } + if fm.count() != 0 { + t.Errorf("flag off: want 0 outcome emissions, got %d (%v)", fm.count(), fm.outcomes) + } + if hit { + t.Error("flag off: prober made an HTTP request — must be fully inert") + } +} + +// ─── prod-safe legs: happy path ─────────────────────────────────────────────── + +// TestPaymentProbe_HappyPath_AuthedLegsPass drives all prod-safe legs green with +// a session JWT minted (JWT_SECRET set). checkout + billing + invoices + +// webhook_security all pass; the upgrade leg degrades (no test secret / no DB). +func TestPaymentProbe_HappyPath_AuthedLegsPass(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + }) + f.run(t) + + for _, leg := range []string{"checkout_reachable", "billing_state", "invoices_reachable", "webhook_security"} { + if got := f.fm.resultFor(leg); got != "pass" { + t.Errorf("%s: want pass, got %q", leg, got) + } + } + if got := f.fm.resultFor("upgrade_webhook_e2e"); got != "degraded" { + t.Errorf("upgrade leg: want degraded (no db/secret), got %q", got) + } + if f.fm.count() != 5 { + t.Errorf("want 5 leg outcomes, got %d (%v)", f.fm.count(), f.fm.outcomes) + } +} + +// TestPaymentProbe_NoJWTSecret_AuthedLegsDegrade proves the authed prod-safe +// legs degrade (config drift, never page) when JWT_SECRET is unset, while the +// no-auth webhook_security leg still runs and passes. +func TestPaymentProbe_NoJWTSecret_AuthedLegsDegrade(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{ /* no JWTSecret */ }) + f.run(t) + + for _, leg := range []string{"checkout_reachable", "billing_state", "invoices_reachable"} { + if got := f.fm.resultFor(leg); got != "degraded" { + t.Errorf("%s: want degraded (no JWT_SECRET), got %q", leg, got) + } + } + if got := f.fm.resultFor("webhook_security"); got != "pass" { + t.Errorf("webhook_security: want pass (no auth needed), got %q", got) + } +} + +// ─── checkout leg ───────────────────────────────────────────────────────────── + +// TestPaymentProbe_Checkout5xx_Fails proves a 5xx from /billing/checkout fails +// the leg (a handler crash/regression) AND writes a fail audit row. +func TestPaymentProbe_Checkout5xx_Fails(t *testing.T) { + st := paymentHappyState() + st.checkoutStatus = http.StatusInternalServerError + st.checkoutBody = `{"error":"boom"}` + srv := newPaymentAPIServer(st) + defer srv.Close() + + f := newPaymentFixtureDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + }) + defer f.done() + f.mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) + // upgrade leg degrades (no test secret) → no further DB writes. + + f.run(t) + + if got := f.fm.resultFor("checkout_reachable"); got != "fail" { + t.Errorf("checkout 5xx: want fail, got %q", got) + } + if err := f.mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestPaymentProbe_CheckoutBlocked_Non5xx_Passes proves the "alive-but-blocked" +// shape (402 billing_not_configured) is a PASS — only a 5xx crash fails. +func TestPaymentProbe_CheckoutBlocked_Non5xx_Passes(t *testing.T) { + st := paymentHappyState() + st.checkoutStatus = http.StatusPaymentRequired + st.checkoutBody = `{"error":"billing_not_configured"}` + srv := newPaymentAPIServer(st) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + }) + f.run(t) + + if got := f.fm.resultFor("checkout_reachable"); got != "pass" { + t.Errorf("checkout 402 blocked-but-alive: want pass, got %q", got) + } +} + +// ─── billing / invoices legs ────────────────────────────────────────────────── + +// TestPaymentProbe_BillingState5xx_Fails proves a 5xx on GET /api/v1/billing +// fails the billing_state leg. +func TestPaymentProbe_BillingState5xx_Fails(t *testing.T) { + st := paymentHappyState() + st.billingStatus = http.StatusServiceUnavailable + srv := newPaymentAPIServer(st) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + }) + f.run(t) + + if got := f.fm.resultFor("billing_state"); got != "fail" { + t.Errorf("billing 503: want fail, got %q", got) + } +} + +// TestPaymentProbe_Invoices5xx_Fails proves a 5xx on GET /api/v1/billing/invoices +// fails the invoices_reachable leg. +func TestPaymentProbe_Invoices5xx_Fails(t *testing.T) { + st := paymentHappyState() + st.invoicesStatus = http.StatusInternalServerError + srv := newPaymentAPIServer(st) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + }) + f.run(t) + + if got := f.fm.resultFor("invoices_reachable"); got != "fail" { + t.Errorf("invoices 500: want fail, got %q", got) + } +} + +// ─── webhook security leg ───────────────────────────────────────────────────── + +// TestPaymentProbe_WebhookSecurity_RejectsUnsigned proves the security leg +// PASSES when the api rejects the unsigned garbage payload with 400 +// invalid_signature (the positive proof the signature gate is live). +func TestPaymentProbe_WebhookSecurity_RejectsUnsigned(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{}) + f.run(t) + + if got := f.fm.resultFor("webhook_security"); got != "pass" { + t.Errorf("webhook security (unsigned rejected): want pass, got %q", got) + } +} + +// TestPaymentProbe_WebhookSecurity_AcceptedUnsigned_Fails is the critical +// security-regression case: the api ACCEPTS an unsigned payload (2xx) → the leg +// FAILS loudly (the gate would let a forged success through). +func TestPaymentProbe_WebhookSecurity_AcceptedUnsigned_Fails(t *testing.T) { + st := paymentHappyState() + st.webhookUnsignedRsp = http.StatusOK // gate broken: accepts unsigned + st.webhookUnsignedBod = `{"ok":true}` + srv := newPaymentAPIServer(st) + defer srv.Close() + + f := newPaymentFixtureDB(t, srv, jobs.PaymentProbeConfig{}) + defer f.done() + f.mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) // security fail audit + + f.run(t) + + if got := f.fm.resultFor("webhook_security"); got != "fail" { + t.Errorf("webhook security (unsigned ACCEPTED): want fail, got %q", got) + } + if err := f.mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestPaymentProbe_WebhookSecurity_WrongErrorCode_Fails proves a 400 with an +// unexpected error_code (not invalid_signature) fails the leg — the gate must +// reject with the canonical contract code. +func TestPaymentProbe_WebhookSecurity_WrongErrorCode_Fails(t *testing.T) { + st := paymentHappyState() + st.webhookUnsignedBod = `{"ok":false,"error":"some_other_error"}` + srv := newPaymentAPIServer(st) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{}) + f.run(t) + + if got := f.fm.resultFor("webhook_security"); got != "fail" { + t.Errorf("webhook security (wrong error_code): want fail, got %q", got) + } +} + +// ─── upgrade leg (optional, test-mode) ──────────────────────────────────────── + +// upgradeConfig returns a config with the test webhook secret + plan id set so +// the upgrade leg runs. +func upgradeConfig() jobs.PaymentProbeConfig { + return jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + TestWebhookSecret: "test-webhook-secret", + TestPlanIDPro: "plan_test_pro_xyz", + Tier: "free", + } +} + +// TestPaymentProbe_Upgrade_TierFlips_Passes drives the full upgrade proof: mint +// cohort → signed test webhook 200 → tier read-back returns "pro" → reap. The +// rule-12 truth surface (the tier flip), not the webhook 200, is the pass. +func TestPaymentProbe_Upgrade_TierFlips_Passes(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) // webhook accepts signed → 200 + defer srv.Close() + + f := newPaymentFixtureDB(t, srv, upgradeConfig()) + defer f.done() + // upgrade leg DB ops: mint team → read tier → reap. + f.mock.ExpectExec(`INSERT INTO teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + f.mock.ExpectQuery(`SELECT plan_tier FROM teams`). + WillReturnRows(sqlmock.NewRows([]string{"plan_tier"}).AddRow("pro")) + f.mock.ExpectExec(`DELETE FROM teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + + f.run(t) + + if got := f.fm.resultFor("upgrade_webhook_e2e"); got != "pass" { + t.Errorf("upgrade tier-flip: want pass, got %q", got) + } + if err := f.mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestPaymentProbe_Upgrade_TierDoesNotFlip_Fails proves the rule-12 discipline: +// a webhook 200 with the tier STILL at free is a FAIL (the upgrade pipeline is +// broken even though the webhook was accepted). The cohort team is still reaped. +func TestPaymentProbe_Upgrade_TierDoesNotFlip_Fails(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureDB(t, srv, upgradeConfig()) + defer f.done() + // Execution order: mint team → read tier → (deferred) reap → record fail audit. + // The deferred DELETE runs when legUpgradeWebhook returns, BEFORE record's + // audit insert. + f.mock.ExpectExec(`INSERT INTO teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + f.mock.ExpectQuery(`SELECT plan_tier FROM teams`). + WillReturnRows(sqlmock.NewRows([]string{"plan_tier"}).AddRow("free")) // did NOT flip + f.mock.ExpectExec(`DELETE FROM teams`).WillReturnResult(sqlmock.NewResult(0, 1)) // deferred reap + f.mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) // fail audit + + f.run(t) + + if got := f.fm.resultFor("upgrade_webhook_e2e"); got != "fail" { + t.Errorf("upgrade no-flip: want fail, got %q", got) + } + if err := f.mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestPaymentProbe_Upgrade_WebhookNon200_Fails proves a non-200 from the signed +// test webhook fails the leg (the webhook path itself broke), and the cohort is +// still reaped. +func TestPaymentProbe_Upgrade_WebhookNon200_Fails(t *testing.T) { + st := paymentHappyState() + st.webhookSignedResp = http.StatusInternalServerError // signed webhook 500 + srv := newPaymentAPIServer(st) + defer srv.Close() + + f := newPaymentFixtureDB(t, srv, upgradeConfig()) + defer f.done() + // mint team → (webhook 500, no tier read) → (deferred) reap → record fail audit. + f.mock.ExpectExec(`INSERT INTO teams`).WillReturnResult(sqlmock.NewResult(0, 1)) + f.mock.ExpectExec(`DELETE FROM teams`).WillReturnResult(sqlmock.NewResult(0, 1)) // deferred reap + f.mock.ExpectExec(`INSERT INTO audit_log`).WillReturnResult(sqlmock.NewResult(1, 1)) // fail audit + + f.run(t) + + if got := f.fm.resultFor("upgrade_webhook_e2e"); got != "fail" { + t.Errorf("upgrade webhook 500: want fail, got %q", got) + } + if err := f.mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestPaymentProbe_Upgrade_NoTestSecret_Degrades proves the leg skips clean +// (degraded, never page) when the test webhook secret is unset — even with a DB. +func TestPaymentProbe_Upgrade_NoTestSecret_Degrades(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + // no TestWebhookSecret + }) + defer f.done() + f.run(t) + + if got := f.fm.resultFor("upgrade_webhook_e2e"); got != "degraded" { + t.Errorf("upgrade no-secret: want degraded, got %q", got) + } + if err := f.mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations (should be none): %v", err) + } +} + +// TestPaymentProbe_Upgrade_NoTestPlanID_Degrades proves the leg degrades when +// the secret is set but the test plan id is missing (can't resolve a tier). +func TestPaymentProbe_Upgrade_NoTestPlanID_Degrades(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + TestWebhookSecret: "test-webhook-secret", + // no TestPlanIDPro + }) + defer f.done() + f.run(t) + + if got := f.fm.resultFor("upgrade_webhook_e2e"); got != "degraded" { + t.Errorf("upgrade no-plan: want degraded, got %q", got) + } +} + +// TestPaymentProbe_Upgrade_NoDB_Degrades proves the leg degrades when there is +// no DB (it cannot mint a cohort team), even with the test secret set. +func TestPaymentProbe_Upgrade_NoDB_Degrades(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, upgradeConfig()) + f.run(t) + + if got := f.fm.resultFor("upgrade_webhook_e2e"); got != "degraded" { + t.Errorf("upgrade no-db: want degraded, got %q", got) + } +} + +// ─── degraded latency ───────────────────────────────────────────────────────── + +// TestPaymentProbe_DegradedLatency drives the over-budget branch via a 0-budget +// override: the prod-safe legs respond 200 but cross the (0) budget → degraded. +func TestPaymentProbe_DegradedLatency(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + f := newPaymentFixtureNoDB(t, srv, jobs.PaymentProbeConfig{ + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + }) + f.w.SetBudgetOverrideForTest(map[string]time.Duration{ + "checkout_reachable": 0, + "billing_state": 0, + "invoices_reachable": 0, + "webhook_security": 0, + }) + f.run(t) + + if got := f.fm.resultFor("checkout_reachable"); got != "degraded" { + t.Errorf("0-budget checkout: want degraded, got %q", got) + } + if got := f.fm.resultFor("webhook_security"); got != "degraded" { + t.Errorf("0-budget webhook_security: want degraded, got %q", got) + } +} + +// ─── panic boundary ─────────────────────────────────────────────────────────── + +// TestPaymentProbe_PanicBoundary proves runLeg's recover() converts a panicking +// leg into result=fail rather than crashing the sweep. We force a panic by +// pointing the worker at a transport that panics on the first request. nil DB so +// no audit insert is attempted (the panic path's fail still emits the metric). +func TestPaymentProbe_PanicBoundary(t *testing.T) { + srv := newPaymentAPIServer(paymentHappyState()) + defer srv.Close() + + fm := &fakePaymentProbeMetrics{} + pw := jobs.NewPaymentProbeWorker(nil, &http.Client{Transport: panickingTransport{}}, fm, nil, jobs.PaymentProbeConfig{ + Enabled: true, + BaseURL: srv.URL, + JWTSecret: "test-jwt-secret-32-bytes-long-xxx", + }) + if err := pw.Work(context.Background(), fakeJob[jobs.PaymentProbeArgs]()); err != nil { + t.Fatalf("Work must not propagate a panic: %v", err) + } + if got := fm.resultFor("checkout_reachable"); got != "fail" { + t.Errorf("panic boundary: checkout leg want fail, got %q", got) + } +} + +// ─── leg vocabulary + signer parity ─────────────────────────────────────────── + +// TestPaymentProbe_LegsVocabularyStable asserts the canonical leg-id set the +// dashboard grid + alert NRQL key on — a rename would red this (rule 16). +func TestPaymentProbe_LegsVocabularyStable(t *testing.T) { + want := []string{ + "checkout_reachable", + "billing_state", + "invoices_reachable", + "webhook_security", + "upgrade_webhook_e2e", + } + got := jobs.PaymentProbeLegsForTest() + if len(got) != len(want) { + t.Fatalf("legs: got %v, want %v", got, want) + } + for i := range want { + if got[i] != want[i] { + t.Errorf("leg[%d]: got %q, want %q", i, got[i], want[i]) + } + } +} + +// TestPaymentProbe_SignerParity asserts the prober's HMAC-SHA256 raw-body +// signing scheme produces a stable 64-hex digest (the api's verifyRazorpaySignature +// requires exactly 64 hex chars). Parity is the whole point — a drift here means +// the upgrade leg's signed webhook would be rejected as invalid_signature. +func TestPaymentProbe_SignerParity(t *testing.T) { + sig := jobs.PaymentProbeSignForTest("secret", []byte(`{"a":1}`)) + if len(sig) != 64 { + t.Errorf("signature length = %d, want 64 hex chars", len(sig)) + } + if jobs.PaymentProbeSignForTest("secret", []byte(`{"a":1}`)) != sig { + t.Error("signer is not deterministic") + } + if jobs.PaymentProbeSignForTest("other", []byte(`{"a":1}`)) == sig { + t.Error("different secret must yield a different signature") + } +} + +// TestPaymentProbe_ValidateBaseURL covers the startup validator's branches. +func TestPaymentProbe_ValidateBaseURL(t *testing.T) { + if err := jobs.ValidatePaymentProbeBaseURL(""); err != nil { + t.Errorf("empty base url should be accepted, got %v", err) + } + if err := jobs.ValidatePaymentProbeBaseURL("https://api.instanode.dev"); err != nil { + t.Errorf("valid https url rejected: %v", err) + } + if err := jobs.ValidatePaymentProbeBaseURL("ftp://nope"); err == nil { + t.Error("non-http(s) scheme should be rejected") + } + if err := jobs.ValidatePaymentProbeBaseURL("http://"); err == nil { + t.Error("host-less url should be rejected") + } + // A control character makes url.Parse itself error (the parse-error branch). + if err := jobs.ValidatePaymentProbeBaseURL("http://exa\x7fmple"); err == nil { + t.Error("unparseable url should be rejected") + } +} + +// ─── helpers ────────────────────────────────────────────────────────────────── + +// panickingTransport panics on every RoundTrip — used to exercise runLeg's +// recover() boundary deterministically without a real slow/crashing server. +type panickingTransport struct{} + +func (panickingTransport) RoundTrip(*http.Request) (*http.Response, error) { + panic("synthetic transport panic for the payment-probe recover() boundary test") +} diff --git a/internal/jobs/workers.go b/internal/jobs/workers.go index 197eb88..46ead71 100644 --- a/internal/jobs/workers.go +++ b/internal/jobs/workers.go @@ -829,6 +829,31 @@ func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *confi }), nrApp, )) + // Layer-3 payment prober (payment_probe.go) — the money heartbeat. Every 5 + // minutes drives the iframe-free payment-funnel contract path against prod: + // checkout reachability + billing/invoices read surfaces + the webhook + // signature security contract, plus an OPTIONAL test-mode upgrade proof + // (mint cohort → inject signed TEST webhook → assert tier flip → reap). It + // emits instant_payment_probe_outcome_total + the InstantPaymentProbe NR + // event. INERT unless PAYMENT_PROBE_ENABLED=true (the Work method no-ops + // first thing when the flag is off) — a single env flip turns the whole + // prober off. Drives NO live Razorpay / no real money; the upgrade leg is + // further gated on RAZORPAY_TEST_WEBHOOK_SECRET (skips clean otherwise). + // Reuses the same NR sink (flowEmitter, cohort=synthetic) so payment-probe + // events never pollute the billing/revenue dashboards. See payment_probe.go + // for the per-leg truth-surface assertions (forum verdict §4 Layer 3). + river.AddWorker(workers, WithObservability( + NewPaymentProbeWorker(db, nil, PaymentProbePromMetrics{}, flowEmitter, PaymentProbeConfig{ + Enabled: cfg.PaymentProbeEnabled, + BaseURL: cfg.PaymentProbeBaseURL, + JWTSecret: cfg.PaymentProbeJWTSecret, + Email: cfg.PaymentProbeEmail, + Tier: cfg.PaymentProbeTier, + TestWebhookSecret: cfg.PaymentProbeTestWebhookSecret, + TestPlanIDPro: cfg.PaymentProbeTestPlanIDPro, + }), + nrApp, + )) // Razorpay webhook-events prune — daily DELETE of razorpay_webhook_events // rows > 30d. The api appends one dedup row per Razorpay webhook delivery; // migration 033 envisioned a periodic prune but never shipped one, so the @@ -1457,6 +1482,23 @@ func buildPeriodicJobs(cfg *config.Config) []*river.PeriodicJob { }, &river.PeriodicJobOpts{RunOnStart: true}, ), + // Layer-3 payment prober — every 5 minutes. Drives the iframe-free + // payment-funnel contract path against prod (checkout reachability + + // billing/invoices read surfaces + webhook signature security + an + // optional test-mode upgrade proof). INERT unless + // PAYMENT_PROBE_ENABLED=true (the Work method no-ops first thing when the + // flag is off), so this periodic registration is always present but + // produces no traffic until the operator lights the flag. Routed to the + // reconcile queue (reconcileInsertOpts carries the UniqueOpts so + // replicas:2 doesn't double-run). RunOnStart=true so a worker restart + // immediately writes a baseline pass/fail per leg. + river.NewPeriodicJob( + river.PeriodicInterval(paymentProbeInterval), + func() (river.JobArgs, *river.InsertOpts) { + return PaymentProbeArgs{}, reconcileInsertOpts(paymentProbeInterval) + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), // Razorpay webhook-events prune — daily DELETE of dedup rows > 30d. // RunOnStart=false: a restart shouldn't immediately scan; the table // grows slowly (one row per webhook delivery) so a day's delay before diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index d400534..b7f216e 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -892,6 +892,41 @@ var ( Buckets: []float64{0.5, 1, 5, 10, 30, 60, 90, 120}, }, []string{"leg"}) + // PaymentProbeOutcomeTotal — Layer-3 payment-prober counters (the money + // heartbeat). Labelled by `leg` (checkout_reachable | billing_state | + // invoices_reachable | webhook_security | upgrade_webhook_e2e) and `result` + // (pass | fail | degraded). result="fail" is the alert-able signal — the + // paid-revenue funnel is broken (checkout 5xx, the webhook signature gate + // accepting an unsigned payload, or the test-mode upgrade pipeline failing to + // flip the tier). Forum verdict (docs/ci/FORUM-PAYMENT-E2E-TOOLING.md §4 + // Layer 3): this is the fastest, most-deterministic money-path signal, an + // iframe-free in-cluster Go prober. + // + // LAZY *Vec: INERT until PAYMENT_PROBE_ENABLED=true — a series first appears + // at /metrics only after the prober runs (i.e. when the operator lights the + // flag). The upgrade_webhook_e2e leg additionally needs a test webhook secret + // (degraded otherwise). The label families are primed in metrics_test.go so + // the dashboard tile renders from process start once the prober is on. + // + // NR alert: payment-probe-fail.json (P1: paid revenue path down). Prom rule: + // PaymentProbeFail (instant-worker-probes group). Emit site: + // worker/internal/jobs/payment_probe.go (PaymentProbePromMetrics). + PaymentProbeOutcomeTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "instant_payment_probe_outcome_total", + Help: "Layer-3 payment-prober outcomes per leg (checkout_reachable|billing_state|invoices_reachable|webhook_security|upgrade_webhook_e2e) and result (pass|fail|degraded). INERT until PAYMENT_PROBE_ENABLED=true.", + }, []string{"leg", "result"}) + + // PaymentProbeLatencySeconds — per-leg HTTP/DB latency histogram. Only + // observed when a real request was performed (a config-skipped leg omits the + // observation so the histogram isn't polluted with 0s entries). Buckets span + // the per-leg budgets (50ms…8s upgrade-leg ceiling). LAZY *Vec (see + // PaymentProbeOutcomeTotal). + PaymentProbeLatencySeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "instant_payment_probe_latency_seconds", + Help: "Layer-3 payment-prober per-leg latency. Buckets span the per-leg budgets up to the 8s upgrade-leg ceiling.", + Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2, 5, 8}, + }, []string{"leg"}) + // FlowTestTotal — continuous-monitoring synthetic flow-matrix counters // (flow_synthetic.go). One series per (flow, actor, tier, layer, result) // so the NR matrix dashboard renders a green/red cell per flow×actor and diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 6d6f596..6f20a43 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -116,6 +116,16 @@ func TestAllMetrics_AreRegistered(t *testing.T) { DeployScaledToZeroTotal.WithLabelValues("wake_failed").Add(0) DeployScaledToZeroTotal.WithLabelValues("scale_failed").Add(0) + // Prime the Layer-3 payment-prober label families so /metrics exposes the + // series from process start (lazy *Vec otherwise leaves the dashboard tile + // empty until the operator lights PAYMENT_PROBE_ENABLED and the first tick + // fires). One representative (leg, result) plus a latency observation. + PaymentProbeOutcomeTotal.WithLabelValues("checkout_reachable", "pass").Add(0) + PaymentProbeOutcomeTotal.WithLabelValues("checkout_reachable", "fail").Add(0) + PaymentProbeOutcomeTotal.WithLabelValues("webhook_security", "pass").Add(0) + PaymentProbeOutcomeTotal.WithLabelValues("upgrade_webhook_e2e", "degraded").Add(0) + PaymentProbeLatencySeconds.WithLabelValues("checkout_reachable").Observe(0) + // Plain gauge DeployIdleApps.Set(0)