diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java index c7c6e65d3..e94ae1cce 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java @@ -260,4 +260,9 @@ BufferAllocator getBufferAllocator() { public ArrowFlightMetaImpl getMeta() { return (ArrowFlightMetaImpl) this.meta; } + + /** Returns the connection-level query timeout in seconds (0 means no timeout). */ + public int getQueryTimeoutSeconds() { + return config.getQueryTimeoutSeconds(); + } } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java index 64529b50c..089aedf67 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java @@ -27,6 +27,7 @@ import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement; import org.apache.arrow.driver.jdbc.utils.AvaticaParameterBinder; import org.apache.arrow.driver.jdbc.utils.ConvertUtils; +import org.apache.arrow.driver.jdbc.utils.QueryTimeoutCallOption; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.calcite.avatica.AvaticaConnection; @@ -112,7 +113,9 @@ public ExecuteResult execute( if (statementHandle.signature == null || statementHandle.signature.statementType == StatementType.IS_DML) { // Update query - long updatedCount = preparedStatement.executeUpdate(); + long updatedCount = + preparedStatement.executeUpdate( + QueryTimeoutCallOption.fromStatementHandle(statementHandle.id, connection)); return new ExecuteResult( Collections.singletonList( MetaResultSet.count(statementHandle.connectionId, statementHandle.id, updatedCount))); @@ -157,7 +160,10 @@ public ExecuteBatchResult executeBatch( } // Update query - long[] updatedCounts = {preparedStatement.executeUpdate()}; + long[] updatedCounts = { + preparedStatement.executeUpdate( + QueryTimeoutCallOption.fromStatementHandle(statementHandle.id, connection)) + }; return new ExecuteBatchResult(updatedCounts); } @@ -215,7 +221,10 @@ public ExecuteResult prepareAndExecute( final StatementType statementType = preparedStatement.getType(); final long updateCount = - statementType.equals(StatementType.UPDATE) ? preparedStatement.executeUpdate() : -1; + statementType.equals(StatementType.UPDATE) + ? preparedStatement.executeUpdate( + QueryTimeoutCallOption.fromStatementHandle(handle.id, connection)) + : -1; synchronized (callback.getMonitor()) { callback.clear(); callback.assign(handle.signature, null, updateCount); diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java index d7af6902f..bc42b47b5 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatement.java @@ -19,6 +19,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler; +import org.apache.arrow.driver.jdbc.utils.QueryTimeoutCallOption; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.util.Preconditions; import org.apache.calcite.avatica.AvaticaPreparedStatement; @@ -76,6 +77,6 @@ public synchronized void close() throws SQLException { @Override public FlightInfo executeFlightInfoQuery() throws SQLException { - return preparedStatement.executeQuery(); + return preparedStatement.executeQuery(QueryTimeoutCallOption.fromStatement(this)); } } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java index 577aee3b4..187a1eb5c 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightStatement.java @@ -19,6 +19,7 @@ import java.sql.SQLException; import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement; import org.apache.arrow.driver.jdbc.utils.ConvertUtils; +import org.apache.arrow.driver.jdbc.utils.QueryTimeoutCallOption; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.calcite.avatica.AvaticaStatement; @@ -56,6 +57,6 @@ public FlightInfo executeFlightInfoQuery() throws SQLException { ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields())); setSignature(signature); - return preparedStatement.executeQuery(); + return preparedStatement.executeQuery(QueryTimeoutCallOption.fromStatement(this)); } } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index 60dc7e57e..6a13ea72d 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -39,6 +39,7 @@ import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache; import org.apache.arrow.driver.jdbc.client.utils.FlightLocationQueue; import org.apache.arrow.driver.jdbc.utils.ArrowFlightProxyDetector; +import org.apache.arrow.driver.jdbc.utils.QueryTimeoutCallOption; import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.CloseSessionRequest; @@ -356,17 +357,19 @@ public interface PreparedStatement extends AutoCloseable { /** * Executes this {@link PreparedStatement}. * + * @param extraOptions additional {@link CallOption}s to include for this call only. * @return the {@link FlightInfo} representing the outcome of this query execution. * @throws SQLException on error. */ - FlightInfo executeQuery() throws SQLException; + FlightInfo executeQuery(CallOption... extraOptions) throws SQLException; /** * Executes a {@link StatementType#UPDATE} query. * + * @param extraOptions additional {@link CallOption}s to include for this call only. * @return the number of rows affected. */ - long executeUpdate(); + long executeUpdate(CallOption... extraOptions); /** * Gets the {@link StatementType} of this {@link PreparedStatement}. @@ -435,6 +438,16 @@ public void setCatalog(final String catalog) throws SQLException { } } + /** + * Merges the handler's base {@link CallOption}s with the provided extra options. + * + * @param extraOptions additional options for a single call. + * @return a combined array of options. + */ + private CallOption[] mergeOptions(final CallOption... extraOptions) { + return QueryTimeoutCallOption.mergeOptions(getOptions(), extraOptions); + } + /** * Creates a new {@link PreparedStatement} for the given {@code query}. * @@ -446,13 +459,13 @@ public PreparedStatement prepare(final String query) { sqlClient.prepare(query, getOptions()); return new PreparedStatement() { @Override - public FlightInfo executeQuery() throws SQLException { - return preparedStatement.execute(getOptions()); + public FlightInfo executeQuery(final CallOption... extraOptions) throws SQLException { + return preparedStatement.execute(mergeOptions(extraOptions)); } @Override - public long executeUpdate() { - return preparedStatement.executeUpdate(getOptions()); + public long executeUpdate(final CallOption... extraOptions) { + return preparedStatement.executeUpdate(mergeOptions(extraOptions)); } @Override diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java index cf35bb1dd..f1c7ffb08 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/ArrowFlightConnectionConfigImpl.java @@ -177,6 +177,12 @@ public Duration getConnectTimeout() { return Duration.ofMillis(timeout); } + /** The default query timeout in seconds (0 means no timeout). */ + public int getQueryTimeoutSeconds() { + Integer timeout = ArrowFlightConnectionProperty.QUERY_TIMEOUT_SECONDS.getInteger(properties); + return timeout != null ? timeout : 0; + } + /** Whether to enable the client cache. */ public boolean useClientCache() { return ArrowFlightConnectionProperty.USE_CLIENT_CACHE.getBoolean(properties); @@ -282,6 +288,7 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty { RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false), CATALOG("catalog", null, Type.STRING, false), CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false), + QUERY_TIMEOUT_SECONDS("queryTimeout", 0, Type.NUMBER, false), USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false), // OAuth configuration properties diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/QueryTimeoutCallOption.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/QueryTimeoutCallOption.java new file mode 100644 index 000000000..30bb04a46 --- /dev/null +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/QueryTimeoutCallOption.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.driver.jdbc.utils; + +import java.sql.SQLException; +import java.time.Duration; +import org.apache.arrow.driver.jdbc.ArrowFlightConnection; +import org.apache.arrow.driver.jdbc.ArrowFlightInfoStatement; +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.FlightCallHeaders; +import org.apache.arrow.flight.HeaderCallOption; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; + +/** Utility for building query-timeout {@link CallOption}s for Flight SQL calls. */ +public final class QueryTimeoutCallOption { + + private QueryTimeoutCallOption() {} + + /** + * Merges base call options with extra options. + * + * @param baseOptions the base options + * @param extraOptions additional options to merge + * @return combined array of options + */ + public static CallOption[] mergeOptions(CallOption[] baseOptions, CallOption... extraOptions) { + if (baseOptions == null || baseOptions.length == 0) { + return extraOptions; + } + if (extraOptions.length == 0) { + return baseOptions; + } + final CallOption[] combined = new CallOption[baseOptions.length + extraOptions.length]; + System.arraycopy(baseOptions, 0, combined, 0, baseOptions.length); + System.arraycopy(extraOptions, 0, combined, baseOptions.length, extraOptions.length); + return combined; + } + + /** + * Builds a CallOption array from a statement's timeout configuration. + * + * @param statement the statement to extract timeouts from + * @return CallOption array with timeout header, or empty array if no timeout configured + * @throws SQLException on error retrieving timeout values + */ + public static CallOption[] fromStatement(ArrowFlightInfoStatement statement) throws SQLException { + final Duration statementTimeout = Duration.ofSeconds(statement.getQueryTimeout()); + final Duration connectionTimeout = + Duration.ofSeconds(statement.getConnection().getQueryTimeoutSeconds()); + return build(statementTimeout, connectionTimeout); + } + + /** + * Builds a CallOption array from a statement ID and connection. + * + * @param statementId the statement ID to look up in the connection's statement map + * @param connection the Avatica connection to extract timeouts from + * @return CallOption array with timeout header, or empty array if no timeout configured + */ + public static CallOption[] fromStatementHandle(int statementId, AvaticaConnection connection) { + Duration statementTimeout = Duration.ZERO; + final AvaticaStatement stmt = connection.statementMap.get(statementId); + if (stmt != null) { + try { + statementTimeout = Duration.ofSeconds(stmt.getQueryTimeout()); + } catch (final SQLException ignored) { + // fall through to connection-level timeout + } + } + Duration connectionTimeout = Duration.ZERO; + if (connection instanceof ArrowFlightConnection) { + connectionTimeout = + Duration.ofSeconds(((ArrowFlightConnection) connection).getQueryTimeoutSeconds()); + } + return build(statementTimeout, connectionTimeout); + } + + /** + * Builds a CallOption array containing query timeout header if a timeout is configured. + * + * @param statementTimeout timeout at statement level (ZERO means use connection-level) + * @param connectionTimeout timeout at connection level + * @return CallOption array with timeout header, or empty array if no timeout configured + */ + private static CallOption[] build(Duration statementTimeout, Duration connectionTimeout) { + final Duration effectiveTimeout = + !statementTimeout.isZero() ? statementTimeout : connectionTimeout; + if (effectiveTimeout.isZero()) { + return new CallOption[0]; + } + final FlightCallHeaders headers = new FlightCallHeaders(); + headers.insert("x-query-timeout-ms", String.valueOf(effectiveTimeout.toMillis())); + return new CallOption[] {new HeaderCallOption(headers)}; + } +} diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/QueryTimeoutHeaderTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/QueryTimeoutHeaderTest.java new file mode 100644 index 000000000..dfa57babc --- /dev/null +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/QueryTimeoutHeaderTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.driver.jdbc; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers; +import org.apache.arrow.flight.FlightMethod; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Tests that the {@code x-query-timeout-ms} header is correctly sent (or omitted) as a Flight call + * header, based on the query timeout configured at statement or connection level. + */ +public class QueryTimeoutHeaderTest { + + private static final String QUERY_TIMEOUT_HEADER = "x-query-timeout-ms"; + + @RegisterExtension + public static final FlightServerTestExtension FLIGHT_SERVER_TEST_EXTENSION = + FlightServerTestExtension.createStandardTestExtension( + CoreMockedSqlProducers.getLegacyProducer()); + + /** + * When {@link Statement#setQueryTimeout(int)} is set, the {@code x-query-timeout-ms} header must + * be sent on the {@code GET_FLIGHT_INFO} RPC with the value converted to milliseconds. + */ + @Test + public void testStatementTimeoutSendsHeader() throws SQLException { + final int timeoutSeconds = 10; + try (Connection connection = FLIGHT_SERVER_TEST_EXTENSION.getConnection(false); + Statement statement = connection.createStatement()) { + + statement.setQueryTimeout(timeoutSeconds); + try (ResultSet rs = statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + // consume results to ensure the RPC completed + while (rs.next()) {} + } + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertEquals( + String.valueOf((long) timeoutSeconds * 1000), + headerValue, + "x-query-timeout-ms header should be set to timeoutSeconds * 1000"); + } + } + + /** + * When the connection-level {@code queryTimeout} property is set and no statement-level timeout + * overrides it, the {@code x-query-timeout-ms} header must be sent with the connection timeout + * converted to milliseconds. + */ + @Test + public void testConnectionLevelTimeoutSendsHeader() throws SQLException { + final int timeoutSeconds = 30; + + final Properties props = new Properties(); + props.put("user", FlightServerTestExtension.DEFAULT_USER); + props.put("password", FlightServerTestExtension.DEFAULT_PASSWORD); + props.put("queryTimeout", timeoutSeconds); + + final String url = + String.format( + "jdbc:arrow-flight-sql://localhost:%d?useEncryption=false", + FLIGHT_SERVER_TEST_EXTENSION.getPort()); + + try (Connection connection = DriverManager.getConnection(url, props); + Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + while (rs.next()) {} + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertEquals( + String.valueOf((long) timeoutSeconds * 1000), + headerValue, + "x-query-timeout-ms header should reflect connection-level queryTimeout"); + } + } + + /** + * When no timeout is configured (neither at statement nor connection level), the {@code + * x-query-timeout-ms} header must NOT be sent. + */ + @Test + public void testNoTimeoutOmitsHeader() throws SQLException { + try (Connection connection = FLIGHT_SERVER_TEST_EXTENSION.getConnection(false); + Statement statement = connection.createStatement()) { + + // explicitly ensure timeout is 0 (no timeout) + statement.setQueryTimeout(0); + + try (ResultSet rs = statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + while (rs.next()) {} + } + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertNull( + headerValue, "x-query-timeout-ms header should NOT be present when no timeout is set"); + } + } + + /** + * When the statement timeout overrides a non-zero connection-level timeout, the header value must + * reflect the statement-level timeout (not the connection one). + */ + @Test + public void testStatementTimeoutOverridesConnectionTimeout() throws SQLException { + final int connectionTimeoutSeconds = 60; + final int statementTimeoutSeconds = 5; + + final Properties props = new Properties(); + props.put("user", FlightServerTestExtension.DEFAULT_USER); + props.put("password", FlightServerTestExtension.DEFAULT_PASSWORD); + props.put("queryTimeout", connectionTimeoutSeconds); + + final String url = + String.format( + "jdbc:arrow-flight-sql://localhost:%d?useEncryption=false", + FLIGHT_SERVER_TEST_EXTENSION.getPort()); + + try (Connection connection = DriverManager.getConnection(url, props); + Statement statement = connection.createStatement()) { + + statement.setQueryTimeout(statementTimeoutSeconds); + + try (ResultSet rs = statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) { + while (rs.next()) {} + } + + String headerValue = + FLIGHT_SERVER_TEST_EXTENSION + .getInterceptorFactory() + .getHeader(FlightMethod.GET_FLIGHT_INFO, QUERY_TIMEOUT_HEADER); + + assertEquals( + String.valueOf((long) statementTimeoutSeconds * 1000), + headerValue, + "x-query-timeout-ms header should reflect statement-level timeout when both are set"); + } + } +}