From 9b3f19b8c732e65f4905fb155bc9998233f2a2ee Mon Sep 17 00:00:00 2001 From: Rovshan Baghirov Date: Tue, 28 Apr 2026 02:09:53 +0400 Subject: [PATCH] feat: Add connection-level query timeout support and related tests --- .gitignore | 1 + .../driver/jdbc/ArrowFlightConnection.java | 5 + .../driver/jdbc/ArrowFlightInfoStatement.java | 16 ++ .../driver/jdbc/ArrowFlightMetaImpl.java | 33 +++- .../jdbc/ArrowFlightPreparedStatement.java | 2 +- .../driver/jdbc/ArrowFlightStatement.java | 2 +- .../client/ArrowFlightSqlClientHandler.java | 31 ++- .../ArrowFlightConnectionConfigImpl.java | 7 + .../driver/jdbc/QueryTimeoutHeaderTest.java | 180 ++++++++++++++++++ 9 files changed, 263 insertions(+), 14 deletions(-) create mode 100644 flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/QueryTimeoutHeaderTest.java diff --git a/.gitignore b/.gitignore index 17d1d43ae1..cb206e51d7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ .project .settings/ .vscode/ +.metals/ /*-build/ /.mvn/.develocity/ /apache-arrow-java-* diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java index c7c6e65d3c..6878252326 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java @@ -260,4 +260,9 @@ BufferAllocator getBufferAllocator() { public ArrowFlightMetaImpl getMeta() { return (ArrowFlightMetaImpl) this.meta; } + + /** Returns the connection-level query timeout in seconds (0 means no timeout). */ + public int getConnectionQueryTimeoutSeconds() { + return config.getQueryTimeoutSeconds(); + } } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightInfoStatement.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightInfoStatement.java index 37ee93722a..bbc1b11923 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightInfoStatement.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightInfoStatement.java @@ -18,7 +18,10 @@ import java.sql.SQLException; import java.sql.Statement; +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.FlightCallHeaders; import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.HeaderCallOption; /** A {@link Statement} that deals with {@link FlightInfo}. */ public interface ArrowFlightInfoStatement extends Statement { @@ -33,4 +36,17 @@ public interface ArrowFlightInfoStatement extends Statement { * @throws SQLException on error. */ FlightInfo executeFlightInfoQuery() throws SQLException; + + default CallOption[] buildTimeoutOption() throws SQLException { + int timeoutSeconds = getQueryTimeout(); + if (timeoutSeconds == 0) { + timeoutSeconds = getConnection().getConnectionQueryTimeoutSeconds(); + } + if (timeoutSeconds > 0) { + final FlightCallHeaders headers = new FlightCallHeaders(); + headers.insert("x-query-timeout-ms", String.valueOf((long) timeoutSeconds * 1000)); + return new CallOption[] {new HeaderCallOption(headers)}; + } + return new CallOption[0]; + } } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java index 64529b50c8..592d341278 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java @@ -27,10 +27,14 @@ import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement; import org.apache.arrow.driver.jdbc.utils.AvaticaParameterBinder; import org.apache.arrow.driver.jdbc.utils.ConvertUtils; +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.FlightCallHeaders; +import org.apache.arrow.flight.HeaderCallOption; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaParameter; +import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.MetaImpl; import org.apache.calcite.avatica.NoSuchStatementException; @@ -87,6 +91,27 @@ public void closeStatement(final StatementHandle statementHandle) { } } + private CallOption[] buildTimeoutOption(final StatementHandle statementHandle) { + int timeoutSeconds = 0; + final AvaticaStatement stmt = connection.statementMap.get(statementHandle.id); + if (stmt != null) { + try { + timeoutSeconds = stmt.getQueryTimeout(); + } catch (final SQLException ignored) { + // fall through to connection-level timeout + } + } + if (timeoutSeconds == 0) { + timeoutSeconds = ((ArrowFlightConnection) connection).getConnectionQueryTimeoutSeconds(); + } + if (timeoutSeconds > 0) { + final FlightCallHeaders headers = new FlightCallHeaders(); + headers.insert("x-query-timeout-ms", String.valueOf((long) timeoutSeconds * 1000)); + return new CallOption[] {new HeaderCallOption(headers)}; + } + return new CallOption[0]; + } + @Override public void commit(final ConnectionHandle connectionHandle) { // TODO Fill this stub. @@ -112,7 +137,7 @@ public ExecuteResult execute( if (statementHandle.signature == null || statementHandle.signature.statementType == StatementType.IS_DML) { // Update query - long updatedCount = preparedStatement.executeUpdate(); + long updatedCount = preparedStatement.executeUpdate(buildTimeoutOption(statementHandle)); return new ExecuteResult( Collections.singletonList( MetaResultSet.count(statementHandle.connectionId, statementHandle.id, updatedCount))); @@ -157,7 +182,7 @@ public ExecuteBatchResult executeBatch( } // Update query - long[] updatedCounts = {preparedStatement.executeUpdate()}; + long[] updatedCounts = {preparedStatement.executeUpdate(buildTimeoutOption(statementHandle))}; return new ExecuteBatchResult(updatedCounts); } @@ -215,7 +240,9 @@ public ExecuteResult prepareAndExecute( final StatementType statementType = preparedStatement.getType(); final long updateCount = - statementType.equals(StatementType.UPDATE) ? preparedStatement.executeUpdate() : -1; + statementType.equals(StatementType.UPDATE) + ? preparedStatement.executeUpdate(buildTimeoutOption(handle)) + : -1; synchronized (callback.getMonitor()) { callback.clear(); callback.assign(handle.signature, null, updateCount); diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java index d7af6902f4..aef0ef8f3d 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java @@ -76,6 +76,6 @@ public synchronized void close() throws SQLException { @Override public FlightInfo executeFlightInfoQuery() throws SQLException { - return preparedStatement.executeQuery(); + return preparedStatement.executeQuery(buildTimeoutOption()); } } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java index 577aee3b4a..f30ab389de 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java @@ -56,6 +56,6 @@ public FlightInfo executeFlightInfoQuery() throws SQLException { ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields())); setSignature(signature); - return preparedStatement.executeQuery(); + return preparedStatement.executeQuery(buildTimeoutOption()); } } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index 60dc7e57e0..b8fa730516 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -356,17 +356,19 @@ public interface PreparedStatement extends AutoCloseable { /** * Executes this {@link PreparedStatement}. * + * @param extraOptions additional {@link CallOption}s to include for this call only. * @return the {@link FlightInfo} representing the outcome of this query execution. * @throws SQLException on error. */ - FlightInfo executeQuery() throws SQLException; + FlightInfo executeQuery(CallOption... extraOptions) throws SQLException; /** * Executes a {@link StatementType#UPDATE} query. * + * @param extraOptions additional {@link CallOption}s to include for this call only. * @return the number of rows affected. */ - long executeUpdate(); + long executeUpdate(CallOption... extraOptions); /** * Gets the {@link StatementType} of this {@link PreparedStatement}. @@ -436,23 +438,34 @@ public void setCatalog(final String catalog) throws SQLException { } /** - * Creates a new {@link PreparedStatement} for the given {@code query}. + * Merges the handler's base {@link CallOption}s with the provided extra options. * - * @param query the SQL query. - * @return a new prepared statement. + * @param extraOptions additional options for a single call. + * @return a combined array of options. */ + private CallOption[] mergeOptions(final CallOption... extraOptions) { + final CallOption[] baseOptions = getOptions(); + if (extraOptions.length == 0) { + return baseOptions; + } + final CallOption[] combined = new CallOption[baseOptions.length + extraOptions.length]; + System.arraycopy(baseOptions, 0, combined, 0, baseOptions.length); + System.arraycopy(extraOptions, 0, combined, baseOptions.length, extraOptions.length); + return combined; + } + public PreparedStatement prepare(final String query) { final FlightSqlClient.PreparedStatement preparedStatement = sqlClient.prepare(query, getOptions()); return new PreparedStatement() { @Override - public FlightInfo executeQuery() throws SQLException { - return preparedStatement.execute(getOptions()); + public FlightInfo executeQuery(final CallOption... extraOptions) throws SQLException { + return preparedStatement.execute(mergeOptions(extraOptions)); } @Override - public long executeUpdate() { - return preparedStatement.executeUpdate(getOptions()); + public long executeUpdate(final CallOption... extraOptions) { + return preparedStatement.executeUpdate(mergeOptions(extraOptions)); } @Override diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java index cf35bb1dd0..f1c7ffb085 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java @@ -177,6 +177,12 @@ public Duration getConnectTimeout() { return Duration.ofMillis(timeout); } + /** The default query timeout in seconds (0 means no timeout). */ + public int getQueryTimeoutSeconds() { + Integer timeout = ArrowFlightConnectionProperty.QUERY_TIMEOUT_SECONDS.getInteger(properties); + return timeout != null ? timeout : 0; + } + /** Whether to enable the client cache. */ public boolean useClientCache() { return ArrowFlightConnectionProperty.USE_CLIENT_CACHE.getBoolean(properties); @@ -282,6 +288,7 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty { RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false), CATALOG("catalog", null, Type.STRING, false), CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false), + QUERY_TIMEOUT_SECONDS("queryTimeout", 0, Type.NUMBER, false), USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false), // OAuth configuration properties diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/QueryTimeoutHeaderTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/QueryTimeoutHeaderTest.java new file mode 100644 index 0000000000..b8f23a6e36 --- /dev/null +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/QueryTimeoutHeaderTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.driver.jdbc; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers; +import org.apache.arrow.flight.FlightMethod; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Tests that the {@code x-query-timeout-ms} header is correctly sent (or omitted) as a Flight call + * header, based on the query timeout configured at statement or connection level. + */ +public class QueryTimeoutHeaderTest { + + private static final String QUERY_TIMEOUT_HEADER = "x-query-timeout-ms"; + + @RegisterExtension + public static final FlightServerTestExtension FLIGHT_SERVER_TEST_EXTENSION = + FlightServerTestExtension.createStandardTestExtension( + CoreMockedSqlProducers.getLegacyProducer()); + + /** + * When {@link Statement#setQueryTimeout(int)} is set, the {@code x-query-timeout-ms} header must + * be sent on the {@code GET_FLIGHT_INFO} RPC with the value converted to milliseconds. + */ + @Test + public void testStatementTimeoutSendsHeader() throws SQLException { + final int timeoutSeconds = 10; + try (Connection connection = FLIGHT_SERVER_TEST_EXTENSION.getConnection(false); + Statement statement = connection.createStatement()) { + + statement.setQueryTimeout(timeoutSeconds); + try (ResultSet rs = + statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + // consume results to ensure the RPC completed + while (rs.next()) {} + } + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertEquals( + String.valueOf((long) timeoutSeconds * 1000), + headerValue, + "x-query-timeout-ms header should be set to timeoutSeconds * 1000"); + } + } + + /** + * When the connection-level {@code queryTimeout} property is set and no statement-level timeout + * overrides it, the {@code x-query-timeout-ms} header must be sent with the connection timeout + * converted to milliseconds. + */ + @Test + public void testConnectionLevelTimeoutSendsHeader() throws SQLException { + final int timeoutSeconds = 30; + + final Properties props = new Properties(); + props.put("user", FlightServerTestExtension.DEFAULT_USER); + props.put("password", FlightServerTestExtension.DEFAULT_PASSWORD); + props.put("queryTimeout", timeoutSeconds); + + final String url = + String.format( + "jdbc:arrow-flight-sql://localhost:%d?useEncryption=false", + FLIGHT_SERVER_TEST_EXTENSION.getPort()); + + try (Connection connection = DriverManager.getConnection(url, props); + Statement statement = connection.createStatement(); + ResultSet rs = + statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + while (rs.next()) {} + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertEquals( + String.valueOf((long) timeoutSeconds * 1000), + headerValue, + "x-query-timeout-ms header should reflect connection-level queryTimeout"); + } + } + + /** + * When no timeout is configured (neither at statement nor connection level), the {@code + * x-query-timeout-ms} header must NOT be sent. + */ + @Test + public void testNoTimeoutOmitsHeader() throws SQLException { + try (Connection connection = FLIGHT_SERVER_TEST_EXTENSION.getConnection(false); + Statement statement = connection.createStatement()) { + + // explicitly ensure timeout is 0 (no timeout) + statement.setQueryTimeout(0); + + try (ResultSet rs = + statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + while (rs.next()) {} + } + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertNull( + headerValue, "x-query-timeout-ms header should NOT be present when no timeout is set"); + } + } + + /** + * When the statement timeout overrides a non-zero connection-level timeout, the header value must + * reflect the statement-level timeout (not the connection one). + */ + @Test + public void testStatementTimeoutOverridesConnectionTimeout() throws SQLException { + final int connectionTimeoutSeconds = 60; + final int statementTimeoutSeconds = 5; + + final Properties props = new Properties(); + props.put("user", FlightServerTestExtension.DEFAULT_USER); + props.put("password", FlightServerTestExtension.DEFAULT_PASSWORD); + props.put("queryTimeout", connectionTimeoutSeconds); + + final String url = + String.format( + "jdbc:arrow-flight-sql://localhost:%d?useEncryption=false", + FLIGHT_SERVER_TEST_EXTENSION.getPort()); + + try (Connection connection = DriverManager.getConnection(url, props); + Statement statement = connection.createStatement()) { + + statement.setQueryTimeout(statementTimeoutSeconds); + + try (ResultSet rs = + statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + while (rs.next()) {} + } + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertEquals( + String.valueOf((long) statementTimeoutSeconds * 1000), + headerValue, + "x-query-timeout-ms header should reflect statement-level timeout when both are set"); + } + } +} +