Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ BufferAllocator getBufferAllocator() {
return allocator;
}

long getMaxRetriesPerExecute() {
return maxRetriesPerExecute;
}

public ArrowFlightMetaImpl getMeta() {
return (ArrowFlightMetaImpl) this.meta;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.sql.SQLException;
import java.util.Properties;
import java.util.TimeZone;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.SqlStatement;
import org.apache.arrow.memory.RootAllocator;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
Expand Down Expand Up @@ -82,8 +82,8 @@ public ArrowFlightPreparedStatement newPreparedStatement(
final int resultSetHoldability)
throws SQLException {
final ArrowFlightConnection flightConnection = (ArrowFlightConnection) connection;
ArrowFlightSqlClientHandler.PreparedStatement preparedStatement =
flightConnection.getMeta().getPreparedStatement(statementHandle);
SqlStatement preparedStatement =
flightConnection.getMeta().getStatement(statementHandle);

return ArrowFlightPreparedStatement.newPreparedStatement(
flightConnection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo(
final TimeZone timeZone = TimeZone.getDefault();
final QueryState state = new QueryState();

final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null);
final Meta.Signature signature = ArrowFlightMetaImpl.newStatementSignature(null);

final AvaticaResultSetMetaData resultSetMetaData =
new AvaticaResultSetMetaData(null, null, signature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
final TimeZone timeZone = TimeZone.getDefault();
final QueryState state = new QueryState();

final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null);
final Meta.Signature signature = ArrowFlightMetaImpl.newStatementSignature(null);

final AvaticaResultSetMetaData resultSetMetaData =
new AvaticaResultSetMetaData(null, null, signature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.SqlStatement;
import org.apache.arrow.driver.jdbc.utils.AvaticaParameterBinder;
import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
import org.apache.arrow.util.Preconditions;
Expand All @@ -39,7 +40,7 @@

/** Metadata handler for Arrow Flight. */
public class ArrowFlightMetaImpl extends MetaImpl {
private final Map<StatementHandleKey, PreparedStatement> statementHandlePreparedStatementMap;
private final Map<StatementHandleKey, SqlStatement> statementHandleMap;

/**
* Constructs a {@link MetaImpl} object specific for Arrow Flight.
Expand All @@ -48,12 +49,16 @@ public class ArrowFlightMetaImpl extends MetaImpl {
*/
public ArrowFlightMetaImpl(final AvaticaConnection connection) {
super(connection);
this.statementHandlePreparedStatementMap = new ConcurrentHashMap<>();
this.statementHandleMap = new ConcurrentHashMap<>();
setDefaultConnectionProperties();
}

/** Construct a signature. */
static Signature newSignature(final String sql, Schema resultSetSchema, Schema parameterSchema) {
static Signature newSignature(
final String sql,
final Schema resultSetSchema,
final Schema parameterSchema,
final StatementType statementType) {
List<ColumnMetaData> columnMetaData =
resultSetSchema == null
? new ArrayList<>()
Expand All @@ -62,10 +67,6 @@ static Signature newSignature(final String sql, Schema resultSetSchema, Schema p
parameterSchema == null
? new ArrayList<>()
: ConvertUtils.convertArrowFieldsToAvaticaParameters(parameterSchema.getFields());
StatementType statementType =
resultSetSchema == null || resultSetSchema.getFields().isEmpty()
? StatementType.IS_DML
: StatementType.SELECT;
return new Signature(
columnMetaData,
sql,
Expand All @@ -75,15 +76,23 @@ static Signature newSignature(final String sql, Schema resultSetSchema, Schema p
statementType);
}

static Signature newStatementSignature(final String sql) {
return newSignature(sql, null, null, StatementType.SELECT);
}

static Signature newUpdateSignature(final String sql) {
return newSignature(sql, null, null, StatementType.IS_DML);
}

@Override
public void closeStatement(final StatementHandle statementHandle) {
PreparedStatement preparedStatement =
statementHandlePreparedStatementMap.remove(new StatementHandleKey(statementHandle));
SqlStatement statementHandleInstance =
statementHandleMap.remove(new StatementHandleKey(statementHandle));
// Testing if the prepared statement was created because the statement can be
// not created until
// this moment
if (preparedStatement != null) {
preparedStatement.close();
if (statementHandleInstance != null) {
statementHandleInstance.close();
}
}

Expand All @@ -92,14 +101,23 @@ public void commit(final ConnectionHandle connectionHandle) {
// TODO Fill this stub.
}

@Override
public StatementHandle createStatement(final ConnectionHandle connectionHandle) {
final StatementHandle handle = super.createStatement(connectionHandle);
statementHandleMap.put(
new StatementHandleKey(handle),
((ArrowFlightConnection) connection).getClientHandler().createAdhocStatement());
return handle;
}

@Override
public ExecuteResult execute(
final StatementHandle statementHandle,
final List<TypedValue> typedValues,
final long maxRowCount) {
Preconditions.checkArgument(
connection.id.equals(statementHandle.connectionId), "Connection IDs are not consistent");
PreparedStatement preparedStatement = getPreparedStatement(statementHandle);
SqlStatement preparedStatement = getStatement(statementHandle);

if (preparedStatement == null) {
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
Expand All @@ -109,8 +127,11 @@ public ExecuteResult execute(
preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator())
.bind(typedValues);

if (statementHandle.signature == null
|| statementHandle.signature.statementType == StatementType.IS_DML) {
final StatementType statementType =
statementHandle.signature == null
? StatementType.IS_DML
: statementHandle.signature.statementType;
if (statementType == StatementType.IS_DML || statementType == StatementType.UPDATE) {
// Update query
long updatedCount = preparedStatement.executeUpdate();
return new ExecuteResult(
Expand Down Expand Up @@ -143,7 +164,7 @@ public ExecuteBatchResult executeBatch(
throws IllegalStateException {
Preconditions.checkArgument(
connection.id.equals(statementHandle.connectionId), "Connection IDs are not consistent");
PreparedStatement preparedStatement = getPreparedStatement(statementHandle);
SqlStatement preparedStatement = getStatement(statementHandle);

if (preparedStatement == null) {
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
Expand Down Expand Up @@ -178,15 +199,22 @@ private PreparedStatement prepareForHandle(final String query, StatementHandle h
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
handle.signature =
newSignature(
query, preparedStatement.getDataSetSchema(), preparedStatement.getParameterSchema());
statementHandlePreparedStatementMap.put(new StatementHandleKey(handle), preparedStatement);
query,
preparedStatement.getDataSetSchema(),
preparedStatement.getParameterSchema(),
preparedStatement.getType());
final StatementHandleKey key = new StatementHandleKey(handle);
final SqlStatement previous = statementHandleMap.put(key, preparedStatement);
if (previous != null && previous != preparedStatement) {
previous.close();
}
return preparedStatement;
}

@Override
public StatementHandle prepare(
final ConnectionHandle connectionHandle, final String query, final long maxRowCount) {
final StatementHandle handle = super.createStatement(connectionHandle);
final StatementHandle handle = createStatement(connectionHandle);
prepareForHandle(query, handle);
return handle;
}
Expand Down Expand Up @@ -280,8 +308,16 @@ void setDefaultConnectionProperties() {
.setTransactionIsolation(Connection.TRANSACTION_NONE);
}

PreparedStatement getPreparedStatement(StatementHandle statementHandle) {
return statementHandlePreparedStatementMap.get(new StatementHandleKey(statementHandle));
SqlStatement getStatement(StatementHandle statementHandle) {
return statementHandleMap.get(new StatementHandleKey(statementHandle));
}

void updateStatementHandle(StatementHandle statementHandle, SqlStatement newHandle) {
final StatementHandleKey key = new StatementHandleKey(statementHandle);
final SqlStatement previous = statementHandleMap.put(key, newHandle);
if (previous != null && previous != newHandle) {
previous.close();
}
}

// Helper used to look up prepared statement instances later. Avatica doesn't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.SqlStatement;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.util.Preconditions;
import org.apache.calcite.avatica.AvaticaPreparedStatement;
Expand All @@ -29,24 +30,25 @@
public class ArrowFlightPreparedStatement extends AvaticaPreparedStatement
implements ArrowFlightInfoStatement {

private final ArrowFlightSqlClientHandler.PreparedStatement preparedStatement;
private final SqlStatement preparedStatement;

private ArrowFlightPreparedStatement(
final ArrowFlightConnection connection,
final ArrowFlightSqlClientHandler.PreparedStatement preparedStatement,
final SqlStatement preparedStatement,
final StatementHandle handle,
final Signature signature,
final int resultSetType,
final int resultSetConcurrency,
final int resultSetHoldability)
throws SQLException {
super(connection, handle, signature, resultSetType, resultSetConcurrency, resultSetHoldability);
Preconditions.checkArgument(preparedStatement instanceof ArrowFlightSqlClientHandler.PreparedStatement);
this.preparedStatement = Preconditions.checkNotNull(preparedStatement);
}

static ArrowFlightPreparedStatement newPreparedStatement(
final ArrowFlightConnection connection,
final ArrowFlightSqlClientHandler.PreparedStatement preparedStmt,
final SqlStatement preparedStmt,
final StatementHandle statementHandle,
final Signature signature,
final int resultSetType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.apache.arrow.driver.jdbc;

import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.SqlStatement;
import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaResultSet;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.Meta.StatementHandle;
Expand All @@ -42,20 +46,99 @@ public ArrowFlightConnection getConnection() throws SQLException {
return (ArrowFlightConnection) super.getConnection();
}

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
checkOpen();
updateCount = -1;
resetAdhocHandle();
try {
final Meta.Signature signature = ArrowFlightMetaImpl.newStatementSignature(sql);
setSignature(signature);
return executeQueryInternal(signature, false);
} catch (RuntimeException exception) {
throw AvaticaConnection.HELPER.createException(
"Error while executing SQL \"" + sql + "\": " + exception.getMessage(),
exception);
}
}

@Override
public long executeLargeUpdate(final String sql) throws SQLException {
checkOpen();
clearOpenResultSet();
updateCount = -1;

resetAdhocHandle();
try {
final ArrowFlightMetaImpl meta = getConnection().getMeta();
final SqlStatement statementHandle = meta.getStatement(handle);
final long updatedCount = statementHandle.executeUpdate(sql);
setSignature(ArrowFlightMetaImpl.newUpdateSignature(sql));
updateCount = updatedCount;
return updatedCount;
} catch (RuntimeException exception) {
throw AvaticaConnection.HELPER.createException(
"Error while executing SQL \"" + sql + "\": " + exception.getMessage(),
exception);
}
}

@Override
public FlightInfo executeFlightInfoQuery() throws SQLException {
final PreparedStatement preparedStatement =
getConnection().getMeta().getPreparedStatement(handle);
final ArrowFlightConnection connection = getConnection();
final ArrowFlightMetaImpl meta = connection.getMeta();
final Meta.Signature signature = getSignature();
if (signature == null) {
return null;
}

final Schema resultSetSchema = preparedStatement.getDataSetSchema();
signature.columns.addAll(
ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));
setSignature(signature);
final SqlStatement statement = meta.getStatement(handle);
// Statement.execute(String) goes through Meta.prepareAndExecute, which stores a prepared
// handle even though the JDBC object is still an ArrowFlightStatement. Therefore,
// executeFlightInfoQuery must handle both direct and prepared handles here.
if (statement instanceof PreparedStatement) {
final Schema resultSetSchema = statement.getDataSetSchema();
signature.columns.addAll(
ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));
setSignature(signature);
return statement.executeQuery();
}

final FlightInfo flightInfo =
statement != null
? statement.executeQuery(signature.sql)
: connection.getClientHandler().getInfo(signature.sql);
final Schema resultSetSchema = flightInfo.getSchemaOptional().orElse(null);
if (resultSetSchema != null) {
signature.columns.addAll(
ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));
setSignature(signature);
}
return flightInfo;
}

return preparedStatement.executeQuery();
private void clearOpenResultSet() throws SQLException {
synchronized (this) {
if (openResultSet != null) {
final AvaticaResultSet resultSet = openResultSet;
openResultSet = null;
try {
resultSet.close();
} catch (Exception exception) {
throw AvaticaConnection.HELPER.createException(
"Error while closing previous result set", exception);
}
}
}
}

private void resetAdhocHandle() throws SQLException {
final ArrowFlightConnection conn = getConnection();
final ArrowFlightMetaImpl meta = conn.getMeta();
final SqlStatement statementHandle = meta.getStatement(handle);
if (statementHandle == null || statementHandle instanceof PreparedStatement) {
final SqlStatement newStatementHandle = conn.getClientHandler().createAdhocStatement();
meta.updateStatementHandle(handle, newStatementHandle);
}
}
}
Loading