Skip to content

[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361

Open
Daniel Ayaz (danielayaz) wants to merge 2 commits into
mainfrom
dayaz/apie-1040-wait-framework
Open

[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361
Daniel Ayaz (danielayaz) wants to merge 2 commits into
mainfrom
dayaz/apie-1040-wait-framework

Conversation

@danielayaz
Copy link
Copy Markdown
Member

@danielayaz Daniel Ayaz (danielayaz) commented May 19, 2026

Release Notes

New Features

  • Adds shared --wait / --wait-timeout flags via pkg/cmd.AddWaitFlag and pkg/cmd.AddWaitTimeoutFlag, backed by a new generic pkg/wait.Poll[T] polling framework. Reference adoption is confluent flink statement create (cloud + on-prem); no behavior change to existing --wait usage. Follow-up tickets extend the flags to other async-provisioning resources.

Checklist

  • I have successfully built and used a custom CLI binary, without linter issues from this PR.
  • I have clearly specified in the What section below whether this PR applies to Confluent Cloud, Confluent Platform, or both.
  • I have verified this PR in Confluent Cloud pre-prod or production environment, if applicable. (verified via integration mocks; live verification pending if reviewer requests)
  • I have verified this PR in Confluent Platform on-premises environment, if applicable. (verified via integration mocks; live verification pending if reviewer requests)
  • I have attached manual CLI verification results or screenshots in the Test & Review section below.
  • I have added appropriate CLI integration or unit tests for any new or updated commands and functionality.
  • I confirm that this PR introduces no breaking changes or backward compatibility issues.
  • I have indicated the potential customer impact if something goes wrong in the Blast Radius section below.
  • I have put checkmarks below confirming that the feature associated with this PR is enabled in:
    • Confluent Cloud prod (N/A — refactor preserves existing behavior; --wait-timeout is additive)
    • Confluent Cloud stag (N/A — same reason)
    • Confluent Platform (N/A — same reason)
    • Check this box if the feature is enabled for certain organizations only

What

Applies to: Confluent Cloud AND Confluent Platform.

Problem. --wait exists today on exactly two commands (flink statement create cloud + on-prem) as near-duplicate inline retry.Retry(...) blocks. Many other 202-style create commands (network/*, flink compute-pool/connection, kafka cluster, ksql cluster, tableflow, ccpm) lack --wait entirely. Copy-pasting the inline pattern to add --wait everywhere would compound the duplication.

Approach.

  1. pkg/wait — a new generic Poll[T](ctx, Options[T]) with pluggable IsTerminal / IsFailed predicates, configurable Tick and Timeout, sentinel errors (ErrTimeout, ErrFailed), and context-cancellation support. Direct loop (not a wrapper around pkg/retry) because retry.Retry's func() error signature can't return the polled value or distinguish "still pending" from "permanently failed" from "fetch error" cleanly.
  2. pkg/cmd/flags.goAddWaitFlag and AddWaitTimeoutFlag(cmd, defaultTimeout) helpers so future adoptions are one-liners. pkg/cmd/flags.go is the only file in pkg/cmd/ in CLAUDE.md's "edit freely" zone; the rest of pkg/cmd/ is untouched.
  3. Reference adoption. internal/flink/command_statement_create.go (1s tick, 1m default timeout) and internal/flink/command_statement_create_onprem.go (2s tick, 1m default timeout) refactored to use the framework. Local variable renamed waitshouldWait to avoid colliding with the wait package name.
  4. No behavior change to existing flink statement create --wait. The existing create-wait.golden is unchanged — parity proof. The new --wait-timeout flag is additive with the same 1m default as the previous hardcoded timeout.

Notable detail (pflag gotcha). pflag.Duration interprets the first backtick-quoted word in a flag description as a type-name override. AddWaitTimeoutFlag's description therefore uses plain --wait (no backticks) so the help renders as --wait-timeout duration instead of --wait-timeout --wait. CLAUDE.md's backtick convention still applies in Short/Long/error messages.

Blast Radius

  • Confluent Cloud customers using confluent flink statement create --wait — risk of regression in polling behavior. Mitigated by parity check (existing create-wait.golden unchanged) and unit tests covering immediate-ready, eventually-ready, failed, timeout, fetch-error, and context-cancel paths.
  • Confluent Platform customers using confluent flink statement create --wait (on-prem) — same risk, same mitigation.
  • All commands using pkg/cmd/flags.go — additive helpers, no existing call site touched, so risk is bounded to the new helpers themselves.
  • pkg/retry callers — unchanged; the package still exists with 5+ other call sites in internal/asyncapi, internal/kafka, internal/connect, pkg/auth. Flink statement create just no longer imports it.

References

Test & Review

Automated tests run locally (Go 1.25.7; repo pins .go-version to 1.26.1 which isn't installed locally, so 1.25.7 was used — CI will pick up 1.26.1):

Suite Result
go test ./pkg/wait/ ./pkg/cmd/ ./internal/flink/ (unit) PASS — 7 new wait tests, 100% coverage of pkg/wait
make lint (golangci-lint + hunspell spell check) PASS
make integration-test -run TestCLI/TestFlinkStatementCreate$ (cloud) PASS — includes existing --wait case (unchanged golden) and new --wait-timeout 100ms case (exit 1, timeout error)
make integration-test -run TestCLI/TestFlinkStatementCreateOnPrem (on-prem) PASS — 2 help-output goldens regenerated for the new --wait description and added --wait-timeout duration line; no other goldens touched

Pre-existing test failure not introduced by this PR:

  • pkg/flink/internal/controller/TestInteractiveOutputControllerTestSuite fails in non-TTY environments with open /dev/tty: device not configured (tview/tcell needs a real terminal). Reproduced on baseline (origin/main without these changes). Expected to pass in Semaphore CI which provides a pseudo-TTY.

Golden file changes (all intentional):

  • create-wait.goldenunchanged (parity proof).
  • create-wait-timeout.goldennew (timeout error path).
  • create-missing-sql-failure.golden, create-missing-compute-pool-failure.golden — regenerated to reflect the new --wait description ("Block until the resource reaches a terminal state.") and the new --wait-timeout duration line. Diff is exactly two lines in each fixture.

Backwards compatibility. Per CLAUDE.md compatibility rules: adding --wait-timeout is non-breaking (additive flag); the --wait help text changed but the flag name and default value (false) did not, so no contract was broken. Existing scripts that pass --wait continue to work identically.

🤖 Generated with Claude Code

…eate

Extract a generic pkg/wait.Poll[T] from the two inline retry blocks in
flink statement create (cloud + on-prem) and expose shared --wait /
--wait-timeout flag registrars in pkg/cmd/flags.go.

The framework is the foundation for adding --wait to other 202-style
async-provisioning resources (network/*, kafka cluster, ksqlDB cluster,
tableflow, ccpm, flink compute pool/connection) in follow-up tickets.

Behavior of flink statement create --wait is preserved (existing
create-wait.golden unchanged).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@danielayaz Daniel Ayaz (danielayaz) requested a review from a team as a code owner May 19, 2026 19:29
Copilot AI review requested due to automatic review settings May 19, 2026 19:29
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a reusable wait/polling framework and shared wait flags, then refactors Flink statement creation to use them for both Cloud and on-prem paths.

Changes:

  • Adds generic pkg/wait.Poll[T] with timeout, tick, failed-state, and context-cancellation handling.
  • Adds shared --wait and --wait-timeout flag helpers in pkg/cmd.
  • Updates Flink statement create polling and adds Cloud timeout integration coverage/fixtures.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pkg/wait/wait.go Adds the generic polling framework.
pkg/wait/wait_test.go Adds unit tests for polling outcomes.
pkg/cmd/flags.go Adds shared wait flag helpers.
internal/flink/command_statement_create.go Refactors Cloud statement create wait behavior.
internal/flink/command_statement_create_onprem.go Refactors on-prem statement create wait behavior.
test/flink_test.go Adds Cloud wait-timeout integration coverage.
test/test-server/flink_gateway_router.go Adds a pending statement mock response.
test/fixtures/output/flink/statement/create-wait-timeout.golden Adds expected timeout output.
test/fixtures/output/flink/statement/create-missing-sql-failure.golden Updates on-prem help output.
test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden Updates on-prem help output.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pkg/wait/wait.go
Comment on lines +31 to +33
v, err := opts.Fetch()
if err != nil {
return last, true, err
Comment on lines +173 to +174
if err != nil {
return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.")
Comment on lines +143 to +144
if err != nil {
return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.")
Comment on lines +128 to +133
if shouldWait {
timeout, err := cmd.Flags().GetDuration("wait-timeout")
if err != nil {
return err
}
finalStatement, err = wait.Poll(cmd.Context(), wait.Options[cmfsdk.Statement]{
CI surfaced two help-output goldens I missed locally because their
fixture paths are constructed dynamically by TestHelp's tree walker,
not literal in any test file. Diff is the two intended changes:
  - --wait description swapped to the generic one
  - --wait-timeout duration line added

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sonarqube-confluent
Copy link
Copy Markdown

return client.GetStatement(environmentId, name, c.Context.LastOrgId)
},
IsTerminal: func(s flinkgatewayv1.SqlV1Statement) bool {
return s.Status.GetPhase() != "PENDING"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PENDING" may not be the only intermediate status, there are "FAILING" and "STOPPING" for Flink statement:
https://docs.confluent.io/cloud/current/ccloud/get-sqlv-1-statement/?_highlight=statement

If we think even one level higher, is there a way we can retrieve information from OpenAPI spec to determine which states are pending states, and which states are terminal states, this is the question we need to give answer to ourselves.

For example, a naive idea would be, if the status ends with "ING", we need to wait, otherwise we should terminate.
https://github.com/confluentinc/api/blob/master/flink-gateway/v1/openapi.yaml#L3313

Can you check if such idea can be applied to other async resources?

Comment thread pkg/wait/wait.go
type Options[T any] struct {
Fetch func() (T, error)
IsTerminal func(T) bool
IsFailed func(T) bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsFailed is not wired up to the command source file, so we can't tell if the successful wait status or failed wait status, can we handle this better?

IsTerminal: func(s cmfsdk.Statement) bool {
return s.GetStatus().Phase != "PENDING"
},
Tick: 2 * time.Second,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 * time.Second may work for Flink statement but not work for others, a short interval increases the chance of being rate limited, a long interval may upset users.

Also, please investigate on how TF handles the polling interval and timeout, is it purely manual setting, is it semi-manual setting with default, is it something we can derive from OpenAPI spec, is it something we can write to registry.yaml as override value, once you have a proposal, please sync with Kostya Linou (@linouk23) for the unified implementation.

cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.")
cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.")
pcmd.AddWaitFlag(cmd)
pcmd.AddWaitTimeoutFlag(cmd, time.Minute)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TF doesn't have the user configurable wait timeout, as it's purely handled inside an internal function with hardcoded value, since we want to have unified UX for CLI and TF users, when designing please take this into consideration, we need to either alter the existing TF behavior or let CLI follow the CLI behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants