Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
.project
.settings/
.vscode/
.metals/
/*-build/
/.mvn/.develocity/
/apache-arrow-java-*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,9 @@ BufferAllocator getBufferAllocator() {
public ArrowFlightMetaImpl getMeta() {
return (ArrowFlightMetaImpl) this.meta;
}

/** Returns the connection-level query timeout in seconds (0 means no timeout). */
public int getConnectionQueryTimeoutSeconds() {
return config.getQueryTimeoutSeconds();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

import java.sql.SQLException;
import java.sql.Statement;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightCallHeaders;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.HeaderCallOption;

/** A {@link Statement} that deals with {@link FlightInfo}. */
public interface ArrowFlightInfoStatement extends Statement {
Expand All @@ -33,4 +36,17 @@ public interface ArrowFlightInfoStatement extends Statement {
* @throws SQLException on error.
*/
FlightInfo executeFlightInfoQuery() throws SQLException;

default CallOption[] buildTimeoutOption() throws SQLException {
int timeoutSeconds = getQueryTimeout();
if (timeoutSeconds == 0) {
timeoutSeconds = getConnection().getConnectionQueryTimeoutSeconds();
}
if (timeoutSeconds > 0) {
final FlightCallHeaders headers = new FlightCallHeaders();
headers.insert("x-query-timeout-ms", String.valueOf((long) timeoutSeconds * 1000));
return new CallOption[] {new HeaderCallOption(headers)};
}
return new CallOption[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.driver.jdbc.utils.AvaticaParameterBinder;
import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightCallHeaders;
import org.apache.arrow.flight.HeaderCallOption;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.NoSuchStatementException;
Expand Down Expand Up @@ -87,6 +91,27 @@ public void closeStatement(final StatementHandle statementHandle) {
}
}

private CallOption[] buildTimeoutOption(final StatementHandle statementHandle) {
int timeoutSeconds = 0;
final AvaticaStatement stmt = connection.statementMap.get(statementHandle.id);
if (stmt != null) {
try {
timeoutSeconds = stmt.getQueryTimeout();
} catch (final SQLException ignored) {
// fall through to connection-level timeout
}
}
if (timeoutSeconds == 0) {
timeoutSeconds = ((ArrowFlightConnection) connection).getConnectionQueryTimeoutSeconds();
}
if (timeoutSeconds > 0) {
final FlightCallHeaders headers = new FlightCallHeaders();
headers.insert("x-query-timeout-ms", String.valueOf((long) timeoutSeconds * 1000));
return new CallOption[] {new HeaderCallOption(headers)};
}
return new CallOption[0];
}

@Override
public void commit(final ConnectionHandle connectionHandle) {
// TODO Fill this stub.
Expand All @@ -112,7 +137,7 @@ public ExecuteResult execute(
if (statementHandle.signature == null
|| statementHandle.signature.statementType == StatementType.IS_DML) {
// Update query
long updatedCount = preparedStatement.executeUpdate();
long updatedCount = preparedStatement.executeUpdate(buildTimeoutOption(statementHandle));
return new ExecuteResult(
Collections.singletonList(
MetaResultSet.count(statementHandle.connectionId, statementHandle.id, updatedCount)));
Expand Down Expand Up @@ -157,7 +182,7 @@ public ExecuteBatchResult executeBatch(
}

// Update query
long[] updatedCounts = {preparedStatement.executeUpdate()};
long[] updatedCounts = {preparedStatement.executeUpdate(buildTimeoutOption(statementHandle))};
return new ExecuteBatchResult(updatedCounts);
}

Expand Down Expand Up @@ -215,7 +240,9 @@ public ExecuteResult prepareAndExecute(
final StatementType statementType = preparedStatement.getType();

final long updateCount =
statementType.equals(StatementType.UPDATE) ? preparedStatement.executeUpdate() : -1;
statementType.equals(StatementType.UPDATE)
? preparedStatement.executeUpdate(buildTimeoutOption(handle))
: -1;
synchronized (callback.getMonitor()) {
callback.clear();
callback.assign(handle.signature, null, updateCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ public synchronized void close() throws SQLException {

@Override
public FlightInfo executeFlightInfoQuery() throws SQLException {
return preparedStatement.executeQuery();
return preparedStatement.executeQuery(buildTimeoutOption());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ public FlightInfo executeFlightInfoQuery() throws SQLException {
ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));
setSignature(signature);

return preparedStatement.executeQuery();
return preparedStatement.executeQuery(buildTimeoutOption());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,19 @@ public interface PreparedStatement extends AutoCloseable {
/**
* Executes this {@link PreparedStatement}.
*
* @param extraOptions additional {@link CallOption}s to include for this call only.
* @return the {@link FlightInfo} representing the outcome of this query execution.
* @throws SQLException on error.
*/
FlightInfo executeQuery() throws SQLException;
FlightInfo executeQuery(CallOption... extraOptions) throws SQLException;

/**
* Executes a {@link StatementType#UPDATE} query.
*
* @param extraOptions additional {@link CallOption}s to include for this call only.
* @return the number of rows affected.
*/
long executeUpdate();
long executeUpdate(CallOption... extraOptions);

/**
* Gets the {@link StatementType} of this {@link PreparedStatement}.
Expand Down Expand Up @@ -436,23 +438,34 @@ public void setCatalog(final String catalog) throws SQLException {
}

/**
* Creates a new {@link PreparedStatement} for the given {@code query}.
* Merges the handler's base {@link CallOption}s with the provided extra options.
*
* @param query the SQL query.
* @return a new prepared statement.
* @param extraOptions additional options for a single call.
* @return a combined array of options.
*/
private CallOption[] mergeOptions(final CallOption... extraOptions) {
final CallOption[] baseOptions = getOptions();
if (extraOptions.length == 0) {
return baseOptions;
}
final CallOption[] combined = new CallOption[baseOptions.length + extraOptions.length];
System.arraycopy(baseOptions, 0, combined, 0, baseOptions.length);
System.arraycopy(extraOptions, 0, combined, baseOptions.length, extraOptions.length);
return combined;
}

public PreparedStatement prepare(final String query) {
final FlightSqlClient.PreparedStatement preparedStatement =
sqlClient.prepare(query, getOptions());
return new PreparedStatement() {
@Override
public FlightInfo executeQuery() throws SQLException {
return preparedStatement.execute(getOptions());
public FlightInfo executeQuery(final CallOption... extraOptions) throws SQLException {
return preparedStatement.execute(mergeOptions(extraOptions));
}

@Override
public long executeUpdate() {
return preparedStatement.executeUpdate(getOptions());
public long executeUpdate(final CallOption... extraOptions) {
return preparedStatement.executeUpdate(mergeOptions(extraOptions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ public Duration getConnectTimeout() {
return Duration.ofMillis(timeout);
}

/** The default query timeout in seconds (0 means no timeout). */
public int getQueryTimeoutSeconds() {
Integer timeout = ArrowFlightConnectionProperty.QUERY_TIMEOUT_SECONDS.getInteger(properties);
return timeout != null ? timeout : 0;
}

/** Whether to enable the client cache. */
public boolean useClientCache() {
return ArrowFlightConnectionProperty.USE_CLIENT_CACHE.getBoolean(properties);
Expand Down Expand Up @@ -282,6 +288,7 @@ public enum ArrowFlightConnectionProperty implements ConnectionProperty {
RETAIN_AUTH("retainAuth", true, Type.BOOLEAN, false),
CATALOG("catalog", null, Type.STRING, false),
CONNECT_TIMEOUT_MILLIS("connectTimeoutMs", 10000, Type.NUMBER, false),
QUERY_TIMEOUT_SECONDS("queryTimeout", 0, Type.NUMBER, false),
USE_CLIENT_CACHE("useClientCache", true, Type.BOOLEAN, false),

// OAuth configuration properties
Expand Down
Loading