Skip to content

Comments

Add Oracle UPSERT#645

Open
psainics wants to merge 8 commits intodata-integrations:developfrom
cloudsufi:orc-upsert
Open

Add Oracle UPSERT#645
psainics wants to merge 8 commits intodata-integrations:developfrom
cloudsufi:orc-upsert

Conversation

@psainics
Copy link
Contributor

This pull request introduces support for upsert operations in the Oracle batch sink plugin.

  • Added a new class OracleETLDBOutputFormat that extends ETLDBOutputFormat and implements the constructUpsertQuery method, generating an Oracle-compatible MERGE-based upsert SQL statement.
  • Updated OracleSink to use the new OracleETLDBOutputFormat by adding an output context with SinkOutputFormatProvider.

UI Update

image

@psainics psainics requested a review from prince-cs January 30, 2026 03:09
@psainics psainics self-assigned this Jan 30, 2026
@psainics psainics added the build label Jan 30, 2026
Copy link
Contributor

@prince-cs prince-cs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@psainics psainics requested a review from prince-cs February 19, 2026 04:35
@psainics psainics marked this pull request as ready for review February 19, 2026 04:35
@Override
public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) {
if (listKeys == null) {
throw new IllegalArgumentException("Column names to be updated should not be null");
Copy link
Contributor

@itsankit-google itsankit-google Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the name for this config in UI?

Column names does not help understand which config are we exactly talking about.

Also, can we move these to sink config validations when UPSERT is selected?

if (listKeys == null) {
throw new IllegalArgumentException("Column names to be updated should not be null");
} else if (fieldNames == null) {
throw new IllegalArgumentException("Field names should not be null");
Copy link
Contributor

@itsankit-google itsankit-google Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment here.

Better to move these to sink config validations to fail early.

@Override
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
String fieldName, int fieldIndex) throws SQLException {
int sqlType = columnTypes.get(fieldIndex).getType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case, is it possible that fieldIndex is greater than columnTypes.size()-1?

int sqlType = Types.OTHER;
// avoid OOB exception in case of mismatch between columnTypes and record schema fields
for (ColumnType columnType : columnTypes) {
if (columnType.getName().equals(fieldName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be using equalsIgnoreCase like

Schema.Field field = record.getSchema().getField(columnType.getName(), true);
?

String fieldName, int fieldIndex) throws SQLException {
int sqlType = columnTypes.get(fieldIndex).getType();
int sqlIndex = fieldIndex + 1;
int sqlType = Types.OTHER;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be throwing an error if none of the fieldName matches in the columnType.getName() list?

Why are we assuming Types.OTHER when there is no match?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants