Skip to content

[server] Manage users for sasl/plain authentication via cluster properties.#3047

Open
loserwang1024 wants to merge 1 commit intoapache:mainfrom
loserwang1024:fluss-add-user
Open

[server] Manage users for sasl/plain authentication via cluster properties.#3047
loserwang1024 wants to merge 1 commit intoapache:mainfrom
loserwang1024:fluss-add-user

Conversation

@loserwang1024
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #3046

Brief change log

Tests

API and Format

Documentation

@loserwang1024 loserwang1024 changed the title [server] Manager users for sasl/plain authentication via cluster properties. [server] Manage users for sasl/plain authentication via cluster properties. Apr 9, 2026
@loserwang1024 loserwang1024 requested a review from Copilot April 10, 2026 02:45
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds dynamic management of SASL/PLAIN users via cluster properties (append/subtract semantics) and wires server-side components to apply these updates at runtime (close #3046).

Changes:

  • Introduce security.sasl.users as a list config option and support APPEND/SUBTRACT operations in dynamic config updates.
  • Make FlussProtocolPlugin reconfigurable so SASL user changes can be applied without server restarts.
  • Add Flink procedures and tests/IT cases covering append/subtract and runtime authentication changes.

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java Adds unit tests for APPEND/SUBTRACT behavior on list-typed configs
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java Registers protocol plugin for dynamic SASL updates
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java Registers protocol plugin for dynamic SASL updates
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java Allows security.sasl.users among dynamically alterable server configs
fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java Implements APPEND/SUBTRACT operations with list-type validation
fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java Adds IT verifying add/delete user via runtime reconfigure
fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java Switches tests to use security.sasl.users instead of JAAS string
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServer.java Exposes FlussProtocolPlugin instance for registration/reconfigure
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussProtocolPlugin.java Implements ServerReconfigurable and generates JAAS from user list
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java Adds IT for append/subtract procedures managing SASL users
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SubtractClusterConfigsProcedure.java New procedure for SUBTRACT list-config entries
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java Registers new append/subtract procedures
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AppendClusterConfigsProcedure.java New procedure for APPEND list-config entries
fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java Updates tests for new supplier-based server authenticator loading API
fluss-common/src/main/java/org/apache/fluss/security/auth/AuthenticationFactory.java Changes server authenticator loading to accept a config supplier
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java Adds SERVER_SASL_USERS list config option

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@@ -83,7 +83,8 @@ public static Supplier<ClientAuthenticator> loadClientAuthenticatorSupplier(
* authenticators.
*/
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes a public static method signature from (Configuration) to (Supplier<Configuration>), which is a source/binary breaking API change for any external callers. Consider re-introducing an overload loadServerAuthenticatorSuppliers(Configuration configuration) that delegates to the new supplier-based method (optionally deprecate it) to preserve compatibility.

Suggested change
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(
Configuration configuration) {
return loadServerAuthenticatorSuppliers(() -> configuration);
}
/**
* Loads suppliers for server authenticators for each endpoint, based on listener-specific
* protocols.
*
* @param configurationSupplier The configuration supplier containing authentication settings
* and protocol definitions.
* @return A map mapping listener names to suppliers for their corresponding server
* authenticators.
*/
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +119
@Override
public void validate(Configuration newConfig) throws ConfigException {
List<String> users = newConfig.get(ConfigOptions.SERVER_SASL_USERS);
if (users == null) {
return;
}
Set<String> uniqueUsernames = new HashSet<>();
for (int i = 0; i < users.size(); i++) {
String entry = users.get(i).trim();
int colonIdx = entry.indexOf(':');
if (colonIdx <= 0 || colonIdx == entry.length() - 1) {
throw new ConfigException(
String.format(
"security.sasl.users[%d] must be in 'username:password' format, but got '%s'.",
i, entry));
}
String username = entry.substring(0, colonIdx);
if (!uniqueUsernames.add(username)) {
throw new ConfigException(
"security.sasl.users must not contain duplicate usernames: '"
+ username
+ "'.");
}
}
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated JAAS config later uses user_%s=\"%s\" with raw username/password. Without validating/escaping, usernames may contain characters invalid for JAAS option keys (and passwords may contain quotes/backslashes), leading to invalid configs or potential injection into the JAAS string. Recommend: (1) validate username against a strict allowlist suitable for user_<name> keys, (2) either disallow or correctly escape \" and \\ in passwords (and other problematic characters), and (3) trim username/password components consistently (currently only the whole entry is trimmed in validate()).

Copilot uses AI. Check for mistakes.
Comment on lines +131 to +150
private static Configuration enrichWithJaasConfig(Configuration config) {
List<String> users = config.get(ConfigOptions.SERVER_SASL_USERS);
if (users == null) {
return config;
}
StringBuilder sb =
new StringBuilder(
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required");
for (String entry : users) {
int colonIdx = entry.indexOf(':');
checkArgument(colonIdx > 0, "Invalid user entry format: '%s'", entry);
String username = entry.substring(0, colonIdx);
String password = entry.substring(colonIdx + 1);
sb.append(String.format(" user_%s=\"%s\"", username, password));
}
sb.append(";");
Configuration enriched = new Configuration(config);
enriched.setString("security.sasl.plain.jaas.config", sb.toString());
return enriched;
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enrichWithJaasConfig() uses checkArgument(...) which throws IllegalArgumentException, even though the surrounding reconfiguration path is declared to throw ConfigException and other code/tests appear to expect config-related failures as ConfigException. To keep error handling consistent (and avoid unexpected runtime exceptions), convert this validation to throw ConfigException with a user-facing message, and apply the same trimming logic here as in validate() (currently validate() trims but enrichment does not).

Copilot uses AI. Check for mistakes.
Comment on lines +161 to +174
case SUBTRACT:
validateListType(configPropName);
String existingSubtract = configsProps.get(configPropName);
if (existingSubtract != null) {
List<String> items =
new ArrayList<>(Arrays.asList(existingSubtract.split(",")));
items.remove(configPropValue);
if (items.isEmpty()) {
configsProps.remove(configPropName);
} else {
configsProps.put(configPropName, String.join(",", items));
}
}
break;
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUBTRACT uses a raw split(\",\") and then items.remove(configPropValue). This will fail to remove entries if the stored list contains whitespace (e.g. \"a, b\" -> item is \" b\"), and it only removes the first matching occurrence. Consider normalizing items (trim each element and filter empty segments) and removing all matches (e.g., removeIf(v -> v.equals(configPropValue))) so SUBTRACT behaves predictably.

Copilot uses AI. Check for mistakes.
serverMetricGroup,
RequestsMetrics.createCoordinatorServerRequestMetrics(
serverMetricGroup));
// Register FlussProtocolPlugin for dynamic SASL config updates
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hard-casts rpcServer to NettyServer. If RpcServer can be implemented by other server types (now or in the future), this will throw ClassCastException during startup. Prefer either (a) exposing the needed hook via the RpcServer abstraction, or (b) guarding with instanceof NettyServer and failing with a clear ConfigException/IllegalStateException that explains the requirement.

Suggested change
// Register FlussProtocolPlugin for dynamic SASL config updates
// Register FlussProtocolPlugin for dynamic SASL config updates
if (!(rpcServer instanceof NettyServer)) {
throw new IllegalStateException(
"CoordinatorServer requires RpcServer to be a NettyServer for dynamic "
+ "SASL config updates, but got: "
+ rpcServer.getClass().getName());
}

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +283
/** Returns the FlussProtocolPlugin instance used by this server. */
public FlussProtocolPlugin getFlussProtocolPlugin() {
for (NetworkProtocolPlugin protocol : protocols) {
if (protocol instanceof FlussProtocolPlugin) {
return (FlussProtocolPlugin) protocol;
}
}
throw new IllegalStateException("FlussProtocolPlugin not found in loaded protocols.");
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a hard failure if FlussProtocolPlugin is not present in protocols. Since callers use this to wire dynamic SASL updates, consider making the contract explicit and easier to handle (e.g., return Optional<FlussProtocolPlugin> or provide a boolean/nullable getter) so startup can either (a) clearly error with context about why the plugin is required, or (b) skip registration when the protocol isn't loaded.

Copilot uses AI. Check for mistakes.
.withDescription(
"List of user credentials for SASL/PLAIN authentication in 'username:password' format. "
+ "For example: 'admin:admin-secret,bob:bob-secret'. "
+ "This is syntactic sugar that auto-generates the JAAS config string.");
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option stores plaintext credentials and is now exposed as a regular cluster config. To reduce accidental leakage (logs, get_cluster_configs, UIs), consider marking this option as sensitive (if the config framework supports it) and/or ensuring config listing APIs mask/redact values for security.sasl.users (and the derived security.sasl.plain.jaas.config). Also consider documenting the security implications explicitly in the description.

Suggested change
+ "This is syntactic sugar that auto-generates the JAAS config string.");
+ "This is syntactic sugar that auto-generates the JAAS config string. "
+ "Warning: this option stores plaintext credentials and should be handled as sensitive configuration. "
+ "Avoid placing it in shared config files or exposing it through logs, config listing APIs, or UIs.");

Copilot uses AI. Check for mistakes.
Comment on lines +839 to +859
// Verify "bob" can authenticate by creating a catalog with bob's credentials
tEnv.executeSql(createCatalogDDL).await();

// Grant bob DESCRIBE permission on cluster so bob can query configs
tEnv.executeSql(
String.format(
"Call %s.sys.add_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();

// Bob should be able to get cluster configs
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
bobCatalog, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
}
tEnv.executeSql("drop catalog " + bobCatalog);
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This IT adds cluster state (catalog + ACL) but cleanup is not protected by a finally block. If an assertion or intermediate call fails after creating the catalog or adding the ACL, the test can leak state and make subsequent tests flaky. Wrap catalog creation/ACL grant in try/finally to reliably drop the catalog and remove the ACL.

Suggested change
// Verify "bob" can authenticate by creating a catalog with bob's credentials
tEnv.executeSql(createCatalogDDL).await();
// Grant bob DESCRIBE permission on cluster so bob can query configs
tEnv.executeSql(
String.format(
"Call %s.sys.add_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
// Bob should be able to get cluster configs
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
bobCatalog, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
}
tEnv.executeSql("drop catalog " + bobCatalog);
boolean bobCatalogCreated = false;
boolean bobDescribeAclAdded = false;
try {
// Verify "bob" can authenticate by creating a catalog with bob's credentials
tEnv.executeSql(createCatalogDDL).await();
bobCatalogCreated = true;
// Grant bob DESCRIBE permission on cluster so bob can query configs
tEnv.executeSql(
String.format(
"Call %s.sys.add_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
bobDescribeAclAdded = true;
// Bob should be able to get cluster configs
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
bobCatalog, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
}
} finally {
if (bobDescribeAclAdded) {
tEnv.executeSql(
String.format(
"Call %s.sys.remove_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
}
if (bobCatalogCreated) {
tEnv.executeSql("drop catalog " + bobCatalog).await();
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +839 to +859
// Verify "bob" can authenticate by creating a catalog with bob's credentials
tEnv.executeSql(createCatalogDDL).await();

// Grant bob DESCRIBE permission on cluster so bob can query configs
tEnv.executeSql(
String.format(
"Call %s.sys.add_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();

// Bob should be able to get cluster configs
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
bobCatalog, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
}
tEnv.executeSql("drop catalog " + bobCatalog);
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This IT adds cluster state (catalog + ACL) but cleanup is not protected by a finally block. If an assertion or intermediate call fails after creating the catalog or adding the ACL, the test can leak state and make subsequent tests flaky. Wrap catalog creation/ACL grant in try/finally to reliably drop the catalog and remove the ACL.

Suggested change
// Verify "bob" can authenticate by creating a catalog with bob's credentials
tEnv.executeSql(createCatalogDDL).await();
// Grant bob DESCRIBE permission on cluster so bob can query configs
tEnv.executeSql(
String.format(
"Call %s.sys.add_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
// Bob should be able to get cluster configs
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
bobCatalog, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
}
tEnv.executeSql("drop catalog " + bobCatalog);
boolean bobCatalogCreated = false;
boolean bobDescribeAclAdded = false;
try {
// Verify "bob" can authenticate by creating a catalog with bob's credentials
tEnv.executeSql(createCatalogDDL).await();
bobCatalogCreated = true;
// Grant bob DESCRIBE permission on cluster so bob can query configs
tEnv.executeSql(
String.format(
"Call %s.sys.add_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
bobDescribeAclAdded = true;
// Bob should be able to get cluster configs
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
bobCatalog, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
}
} finally {
try {
if (bobDescribeAclAdded) {
tEnv.executeSql(
String.format(
"Call %s.sys.remove_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
}
} finally {
if (bobCatalogCreated) {
tEnv.executeSql("drop catalog " + bobCatalog).await();
}
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +862 to +898
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.subtract_cluster_configs('%s', 'bob:bob_pass')",
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
assertThat(results.get(0).getField(0))
.asString()
.contains("Successfully subtracted")
.contains(ConfigOptions.SERVER_SASL_USERS.key());
}

// Verify "bob" was deleted from config
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
// After subtracting the only dynamically-added entry, the config may be empty
if (!results.isEmpty()) {
assertThat((String) results.get(0).getField(1)).doesNotContain("bob");
}
}

// Verify "bob" can no longer authenticate
assertThatThrownBy(() -> tEnv.executeSql(createCatalogDDL).await())
.hasMessageContaining("Invalid username or password");
// Cleanup: remove bob's ACL
tEnv.executeSql(
String.format(
"Call %s.sys.drop_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This IT adds cluster state (catalog + ACL) but cleanup is not protected by a finally block. If an assertion or intermediate call fails after creating the catalog or adding the ACL, the test can leak state and make subsequent tests flaky. Wrap catalog creation/ACL grant in try/finally to reliably drop the catalog and remove the ACL.

Suggested change
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.subtract_cluster_configs('%s', 'bob:bob_pass')",
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
assertThat(results.get(0).getField(0))
.asString()
.contains("Successfully subtracted")
.contains(ConfigOptions.SERVER_SASL_USERS.key());
}
// Verify "bob" was deleted from config
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
// After subtracting the only dynamically-added entry, the config may be empty
if (!results.isEmpty()) {
assertThat((String) results.get(0).getField(1)).doesNotContain("bob");
}
}
// Verify "bob" can no longer authenticate
assertThatThrownBy(() -> tEnv.executeSql(createCatalogDDL).await())
.hasMessageContaining("Invalid username or password");
// Cleanup: remove bob's ACL
tEnv.executeSql(
String.format(
"Call %s.sys.drop_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
try {
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.subtract_cluster_configs('%s', 'bob:bob_pass')",
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(results).hasSize(1);
assertThat(results.get(0).getField(0))
.asString()
.contains("Successfully subtracted")
.contains(ConfigOptions.SERVER_SASL_USERS.key());
}
// Verify "bob" was deleted from config
try (CloseableIterator<Row> resultIterator =
tEnv.executeSql(
String.format(
"Call %s.sys.get_cluster_configs('%s')",
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
.collect()) {
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
// After subtracting the only dynamically-added entry, the config may be empty
if (!results.isEmpty()) {
assertThat((String) results.get(0).getField(1)).doesNotContain("bob");
}
}
// Verify "bob" can no longer authenticate
assertThatThrownBy(() -> tEnv.executeSql(createCatalogDDL).await())
.hasMessageContaining("Invalid username or password");
} finally {
// Cleanup: remove bob's ACL
tEnv.executeSql(
String.format(
"Call %s.sys.drop_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')",
CATALOG_NAME))
.await();
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support add users for SASL/Plain authentication

2 participants