Skip to content

Automaticaly update BigQuery table schema when unknown fields are seen.#38058

Open
reuvenlax wants to merge 1 commit intoapache:masterfrom
reuvenlax:bq_add_new_columns
Open

Automaticaly update BigQuery table schema when unknown fields are seen.#38058
reuvenlax wants to merge 1 commit intoapache:masterfrom
reuvenlax:bq_add_new_columns

Conversation

@reuvenlax
Copy link
Copy Markdown
Contributor

No description provided.

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

Gemini encountered an error creating the summary. You can try again by commenting /gemini summary.

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini summary

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

This pull request introduces a significant enhancement to the BigQuery Storage Write API connector, allowing it to automatically adapt to evolving data schemas. When data with new fields or relaxed field constraints arrives, the system will now detect these changes, buffer the affected records, and dynamically update the BigQuery table's schema. This capability greatly simplifies data ingestion pipelines by removing the need for manual schema management in response to minor data evolution, ensuring data flow continues uninterrupted.

Highlights

  • Automatic Schema Evolution: Implemented a mechanism for BigQuery Storage Write API to automatically detect and apply schema updates (field additions and relaxation) to target tables when unknown fields or schema mismatches are encountered in incoming data.
  • Schema Mismatch Detection and Buffering: Introduced logic to identify when an incoming record's schema differs from the BigQuery table's current schema. Records requiring schema updates are buffered and reprocessed after the table schema has been patched.
  • Incremental Schema Generation and Patching: Developed utilities to generate an 'incremental schema' based on detected differences, which is then used to patch the BigQuery table. This process is throttled to prevent excessive API calls.
  • Enhanced Error Collection: Refactored TableRowToStorageApiProto to include an ErrorCollector that gathers schema conversion exceptions, allowing for more granular error handling and the generation of incremental schema updates.
  • API Integration for Schema Updates: Extended BigQueryServices with a patchTableSchema method, enabling programmatic updates to BigQuery table schemas directly from the Beam pipeline.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
    • Updated class declaration with an extra space for formatting consistency.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
    • Imported Preconditions for utility functions.
    • Modified encodeUnknownFields to pass ErrorCollector.DONT_COLLECT to messageFromTableRow.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
    • Added validation to prevent schema update options from being used with auto schema update, Beam schemas, or direct proto writes.
    • Passed elementCoder and destinationCoder to the StorageApiLoads constructor.
    • Updated StorageApiDynamicDestinationsTableRow instantiation to include schemaUpdateOptions.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
    • Added patchTableSchema method to the BigQueryRpc interface for updating table schemas.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
    • Implemented the patchTableSchema method in BigQueryRpcImpl to update BigQuery table schemas.
    • Added DONT_RETRY_INVALID_ARG_OR_PRECONDITION retry function for specific API errors.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/MergeSchemaCombineFn.java
    • Added new class MergeSchemaCombineFn which extends Combine.CombineFn to merge BigQuery TableSchema objects.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
    • Imported isPayloadSchemaOutOfDate and Descriptors.
    • Added schemaMismatchSeen field to Value and SplittingIterable.
    • Updated constructor to accept getCurrentTableSchemaHash and getCurrentTableSchemaDescriptor suppliers.
    • Modified next method to detect and track schema mismatches using isPayloadSchemaOutOfDate.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
    • Refactored expand method to integrate schema update logic, including new PatchTableSchemaDoFn and SchemaUpdateHoldingFn.
    • Added new output tags (patchTableSchemaTag, elementsWaitingForSchemaTag) for schema patching and buffering elements.
    • Introduced SchemaUpdateHoldingFn to buffer elements awaiting schema updates and PatchTableSchemaDoFn to apply schema patches.
    • Modified ConvertMessagesDoFn to use an ErrorCollector for schema conversion errors and to output elements for schema patching or retry.
    • Added BufferedCollectorInformation inner class to track schema errors per destination.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
    • Added getSchemaHash and updateSchemaFromTable methods to the MessageConverter interface.
    • Modified toMessage method in MessageConverter to accept an ErrorCollector parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Updated toMessage method to accept collectedExceptions parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Updated toMessage method to accept collectedExceptions parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Updated toMessage method to accept collectedExceptions parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
    • Introduced SchemaUpgradingTableRowConverter to manage schema updates for TableRow conversions.
    • Added schemaUpdateOptions parameter to the constructor.
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Modified toMessage method to accept collectedExceptions and include schema hash in the payload.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
    • Added elementCoder and destinationCoder to the constructor.
    • Updated StorageApiConvertMessages instantiation to pass elementCoder and destinationCoder.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
    • Added getSchemaHash and setSchemaHash methods to StorageApiWritePayload for tracking schema versions.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
    • Imported isPayloadSchemaOutOfDate for schema mismatch detection.
    • Added schema mismatch detection logic within addMessage and flush methods.
    • Updated SplittingIterable constructor call to pass schema hash and descriptor suppliers.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
    • Added schema mismatch detection logic within process method.
    • Updated SplittingIterable constructor call to pass schema hash and descriptor suppliers.
    • Refined error handling for append failures, including closing clients for non-quota errors.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
    • Imported toSet for stream collectors.
    • Introduced ErrorCollector class to gather schema conversion exceptions.
    • Added new SchemaConversionException types: SchemaTooNarrowException and SchemaMissingRequiredFieldException.
    • Implemented schema hashing logic (tableSchemaHash) for TableSchema and TableFieldSchema.
    • Modified messageFromMap and messageFromTableRow to accept and utilize ErrorCollector for schema evolution handling.
    • Updated messageValueFromFieldValue and singularFieldToProtoValue to propagate ErrorCollector and handle missing required fields.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
    • Added putSchema method to explicitly set a schema in the cache.
    • Modified refreshSchema to use putIfAbsent for tablesToRefresh to avoid overwriting existing refresh requests.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java
    • Added new class UpgradeTableSchema to encapsulate schema upgrade logic.
    • Implemented newErrorCollector to create an ErrorCollector instance.
    • Provided getIncrementalSchema to construct a new schema based on collected errors and an old schema.
    • Implemented mergeSchemas and mergeFields for combining BigQuery table schemas.
    • Added isPayloadSchemaOutOfDate to check for schema mismatches between payload and current table schema.
    • Included hasUnknownFields to recursively check for unknown fields in a protobuf message.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
    • Added getStorageException helper for creating Storage API exceptions.
    • Modified appendRows to return Exceptions.StorageException instead of throwing RuntimeException.
    • Implemented patchTableSchema to simulate BigQuery schema updates.
    • Added checkSchemaChanges and checkSchemaChangesProtos to validate schema evolution rules.
    • Removed check for unknown fields in appendRows to align with new schema evolution behavior.
  • sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
    • Added testAutoPatchTableSchemaTest to verify automatic schema patching functionality.
  • sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
    • Updated calls to messageFromTableRow to include the new ErrorCollector parameter.
Activity
  • reuvenlax requested a Gemini review.
  • gemini-code-assist[bot] reported an error creating the summary.
  • reuvenlax requested a Gemini summary again.

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements automatic schema evolution for the BigQuery Storage Write API. It introduces a mechanism to detect schema mismatches using hashing and handles missing field errors by automatically patching the BigQuery table schema when permitted. Key changes include the UpgradeTableSchema utility for schema merging and a stateful buffering mechanism in StorageApiConvertMessages to hold records during schema updates. The review feedback recommends replacing System.err calls with proper logging, optimizing object allocations in loops, removing redundant semicolons, and using a fixed charset for hashing to ensure consistency across environments.

ElementT value = element.getValue();
boolean needsNewTimer = false;
if (value != null) {
System.err.println("BUFFERING ELEMENT");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This debug print should be removed or replaced with a proper logger call. System.err is not appropriate for production library code as it bypasses logging configurations.

boolean schemaOutOfDate = false;
do {
try {
System.err.println("TRYING TO PATCH TO " + updatedSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This debug print should be removed or replaced with a proper logger call.

}
return;
} catch (IOException e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

ApiErrorExtractor is stateless and can be instantiated once outside of the retry loop to avoid unnecessary object allocation in each iteration.

Comment on lines +668 to +669
collectors.clear();
;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a redundant semicolon here.

Suggested change
collectors.clear();
;
collectors.clear();

Comment on lines +156 to +157
this.isStruct = isStruct;
;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a redundant semicolon here.

Suggested change
this.isStruct = isStruct;
;
this.isStruct = isStruct;

prefix.isEmpty()
? tableFieldSchema.getName()
: String.join(".", prefix, tableFieldSchema.getName());
hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using Charset.defaultCharset() can lead to inconsistent results across different environments. It is safer to use a fixed charset like StandardCharsets.UTF_8 for hashing.

Suggested change
hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset()));
hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), StandardCharsets.UTF_8));

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 3, 2026

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant