[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361
[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361Daniel Ayaz (danielayaz) wants to merge 2 commits into
Conversation
…eate Extract a generic pkg/wait.Poll[T] from the two inline retry blocks in flink statement create (cloud + on-prem) and expose shared --wait / --wait-timeout flag registrars in pkg/cmd/flags.go. The framework is the foundation for adding --wait to other 202-style async-provisioning resources (network/*, kafka cluster, ksqlDB cluster, tableflow, ccpm, flink compute pool/connection) in follow-up tickets. Behavior of flink statement create --wait is preserved (existing create-wait.golden unchanged). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR introduces a reusable wait/polling framework and shared wait flags, then refactors Flink statement creation to use them for both Cloud and on-prem paths.
Changes:
- Adds generic
pkg/wait.Poll[T]with timeout, tick, failed-state, and context-cancellation handling. - Adds shared
--waitand--wait-timeoutflag helpers inpkg/cmd. - Updates Flink statement create polling and adds Cloud timeout integration coverage/fixtures.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
pkg/wait/wait.go |
Adds the generic polling framework. |
pkg/wait/wait_test.go |
Adds unit tests for polling outcomes. |
pkg/cmd/flags.go |
Adds shared wait flag helpers. |
internal/flink/command_statement_create.go |
Refactors Cloud statement create wait behavior. |
internal/flink/command_statement_create_onprem.go |
Refactors on-prem statement create wait behavior. |
test/flink_test.go |
Adds Cloud wait-timeout integration coverage. |
test/test-server/flink_gateway_router.go |
Adds a pending statement mock response. |
test/fixtures/output/flink/statement/create-wait-timeout.golden |
Adds expected timeout output. |
test/fixtures/output/flink/statement/create-missing-sql-failure.golden |
Updates on-prem help output. |
test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden |
Updates on-prem help output. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| v, err := opts.Fetch() | ||
| if err != nil { | ||
| return last, true, err |
| if err != nil { | ||
| return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.") |
| if err != nil { | ||
| return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.") |
| if shouldWait { | ||
| timeout, err := cmd.Flags().GetDuration("wait-timeout") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| finalStatement, err = wait.Poll(cmd.Context(), wait.Options[cmfsdk.Statement]{ |
CI surfaced two help-output goldens I missed locally because their fixture paths are constructed dynamically by TestHelp's tree walker, not literal in any test file. Diff is the two intended changes: - --wait description swapped to the generic one - --wait-timeout duration line added Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
| return client.GetStatement(environmentId, name, c.Context.LastOrgId) | ||
| }, | ||
| IsTerminal: func(s flinkgatewayv1.SqlV1Statement) bool { | ||
| return s.Status.GetPhase() != "PENDING" |
There was a problem hiding this comment.
"PENDING" may not be the only intermediate status, there are "FAILING" and "STOPPING" for Flink statement:
https://docs.confluent.io/cloud/current/ccloud/get-sqlv-1-statement/?_highlight=statement
If we think even one level higher, is there a way we can retrieve information from OpenAPI spec to determine which states are pending states, and which states are terminal states, this is the question we need to give answer to ourselves.
For example, a naive idea would be, if the status ends with "ING", we need to wait, otherwise we should terminate.
https://github.com/confluentinc/api/blob/master/flink-gateway/v1/openapi.yaml#L3313
Can you check if such idea can be applied to other async resources?
| type Options[T any] struct { | ||
| Fetch func() (T, error) | ||
| IsTerminal func(T) bool | ||
| IsFailed func(T) bool |
There was a problem hiding this comment.
IsFailed is not wired up to the command source file, so we can't tell if the successful wait status or failed wait status, can we handle this better?
| IsTerminal: func(s cmfsdk.Statement) bool { | ||
| return s.GetStatus().Phase != "PENDING" | ||
| }, | ||
| Tick: 2 * time.Second, |
There was a problem hiding this comment.
2 * time.Second may work for Flink statement but not work for others, a short interval increases the chance of being rate limited, a long interval may upset users.
Also, please investigate on how TF handles the polling interval and timeout, is it purely manual setting, is it semi-manual setting with default, is it something we can derive from OpenAPI spec, is it something we can write to registry.yaml as override value, once you have a proposal, please sync with Kostya Linou (@linouk23) for the unified implementation.
| cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.") | ||
| cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.") | ||
| pcmd.AddWaitFlag(cmd) | ||
| pcmd.AddWaitTimeoutFlag(cmd, time.Minute) |
There was a problem hiding this comment.
TF doesn't have the user configurable wait timeout, as it's purely handled inside an internal function with hardcoded value, since we want to have unified UX for CLI and TF users, when designing please take this into consideration, we need to either alter the existing TF behavior or let CLI follow the CLI behavior.




Release Notes
New Features
--wait/--wait-timeoutflags viapkg/cmd.AddWaitFlagandpkg/cmd.AddWaitTimeoutFlag, backed by a new genericpkg/wait.Poll[T]polling framework. Reference adoption isconfluent flink statement create(cloud + on-prem); no behavior change to existing--waitusage. Follow-up tickets extend the flags to other async-provisioning resources.Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.--wait-timeoutis additive)What
Applies to: Confluent Cloud AND Confluent Platform.
Problem.
--waitexists today on exactly two commands (flink statement createcloud + on-prem) as near-duplicate inlineretry.Retry(...)blocks. Many other 202-style create commands (network/*,flink compute-pool/connection,kafka cluster,ksql cluster,tableflow,ccpm) lack--waitentirely. Copy-pasting the inline pattern to add--waiteverywhere would compound the duplication.Approach.
pkg/wait— a new genericPoll[T](ctx, Options[T])with pluggableIsTerminal/IsFailedpredicates, configurableTickandTimeout, sentinel errors (ErrTimeout,ErrFailed), and context-cancellation support. Direct loop (not a wrapper aroundpkg/retry) becauseretry.Retry'sfunc() errorsignature can't return the polled value or distinguish "still pending" from "permanently failed" from "fetch error" cleanly.pkg/cmd/flags.go—AddWaitFlagandAddWaitTimeoutFlag(cmd, defaultTimeout)helpers so future adoptions are one-liners.pkg/cmd/flags.gois the only file inpkg/cmd/in CLAUDE.md's "edit freely" zone; the rest ofpkg/cmd/is untouched.internal/flink/command_statement_create.go(1s tick, 1m default timeout) andinternal/flink/command_statement_create_onprem.go(2s tick, 1m default timeout) refactored to use the framework. Local variable renamedwait→shouldWaitto avoid colliding with thewaitpackage name.flink statement create --wait. The existingcreate-wait.goldenis unchanged — parity proof. The new--wait-timeoutflag is additive with the same 1m default as the previous hardcoded timeout.Notable detail (pflag gotcha).
pflag.Durationinterprets the first backtick-quoted word in a flag description as a type-name override.AddWaitTimeoutFlag's description therefore uses plain--wait(no backticks) so the help renders as--wait-timeout durationinstead of--wait-timeout --wait. CLAUDE.md's backtick convention still applies inShort/Long/error messages.Blast Radius
confluent flink statement create --wait— risk of regression in polling behavior. Mitigated by parity check (existingcreate-wait.goldenunchanged) and unit tests covering immediate-ready, eventually-ready, failed, timeout, fetch-error, and context-cancel paths.confluent flink statement create --wait(on-prem) — same risk, same mitigation.pkg/cmd/flags.go— additive helpers, no existing call site touched, so risk is bounded to the new helpers themselves.pkg/retrycallers — unchanged; the package still exists with 5+ other call sites ininternal/asyncapi,internal/kafka,internal/connect,pkg/auth. Flink statement create just no longer imports it.References
Test & Review
Automated tests run locally (Go 1.25.7; repo pins
.go-versionto 1.26.1 which isn't installed locally, so 1.25.7 was used — CI will pick up 1.26.1):go test ./pkg/wait/ ./pkg/cmd/ ./internal/flink/(unit)pkg/waitmake lint(golangci-lint + hunspell spell check)make integration-test -run TestCLI/TestFlinkStatementCreate$(cloud)--waitcase (unchanged golden) and new--wait-timeout 100mscase (exit 1, timeout error)make integration-test -run TestCLI/TestFlinkStatementCreateOnPrem(on-prem)--waitdescription and added--wait-timeout durationline; no other goldens touchedPre-existing test failure not introduced by this PR:
pkg/flink/internal/controller/TestInteractiveOutputControllerTestSuitefails in non-TTY environments withopen /dev/tty: device not configured(tview/tcell needs a real terminal). Reproduced on baseline (origin/mainwithout these changes). Expected to pass in Semaphore CI which provides a pseudo-TTY.Golden file changes (all intentional):
create-wait.golden— unchanged (parity proof).create-wait-timeout.golden— new (timeout error path).create-missing-sql-failure.golden,create-missing-compute-pool-failure.golden— regenerated to reflect the new--waitdescription ("Block until the resource reaches a terminal state.") and the new--wait-timeout durationline. Diff is exactly two lines in each fixture.Backwards compatibility. Per CLAUDE.md compatibility rules: adding
--wait-timeoutis non-breaking (additive flag); the--waithelp text changed but the flag name and default value (false) did not, so no contract was broken. Existing scripts that pass--waitcontinue to work identically.🤖 Generated with Claude Code