Skip to content

Commit 6d41a79

Browse files
[FLINK-38742][cdc-pipeline/postgres] Fix TIMESTAMPTZ type mapping to TIMESTAMP_LTZ
PostgreSQL TIMESTAMPTZ columns were causing NumberFormatException because PostgresTypeUtils incorrectly mapped them to ZonedTimestampType (TIMESTAMP_WITH_TIME_ZONE). PostgreSQL TIMESTAMPTZ stores values internally in UTC and converts on display based on session timezone, which semantically matches TIMESTAMP_LTZ, not TIMESTAMP_WITH_TIME_ZONE. Additionally, Debezium's PostgreSQL connector always converts TIMESTAMPTZ values to UTC format (e.g., '2025-12-30T05:59:50.724893Z') before serialization. The type mismatch caused sinks to call getZonedTimestamp() on LocalZonedTimestampData, resulting in NumberFormatException when trying to parse binary data as a string. This commit fixes PostgresTypeUtils to correctly map TIMESTAMPTZ and TIMESTAMPTZ_ARRAY to TIMESTAMP_LTZ type, matching both PostgreSQL semantics and Debezium's serialization format. The E2E test is updated to verify the fix.
1 parent 44fb79f commit 6d41a79

3 files changed

Lines changed: 25 additions & 25 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.cdc.common.types.DataType;
2121
import org.apache.flink.cdc.common.types.DataTypes;
22-
import org.apache.flink.cdc.common.types.ZonedTimestampType;
2322
import org.apache.flink.table.types.logical.DecimalType;
2423

2524
import io.debezium.config.CommonConnectorConfig;
@@ -166,9 +165,9 @@ private static DataType convertFromColumn(
166165
return DataTypes.ARRAY(
167166
handleTimestampWithTemporalMode(temporalPrecisionMode, scale));
168167
case PgOid.TIMESTAMPTZ:
169-
return new ZonedTimestampType(scale);
168+
return DataTypes.TIMESTAMP_LTZ(scale);
170169
case PgOid.TIMESTAMPTZ_ARRAY:
171-
return DataTypes.ARRAY(new ZonedTimestampType(scale));
170+
return DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(scale));
172171
case PgOid.TIME:
173172
return handleTimeWithTemporalMode(temporalPrecisionMode, scale);
174173
case PgOid.TIME_ARRAY:

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,16 @@ void testSyncWholeDatabase() throws Exception {
142142
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, Edward, Walker, ed@walker.com], op=INSERT, meta=()}",
143143
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, George, Bailey, gbailey@foobar.com], op=INSERT, meta=()}",
144144
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, Sally, Thomas, sally.thomas@acme.com], op=INSERT, meta=()}",
145-
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL 'nextval('inventory.products_id_seq'::regclass)',`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}",
146-
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}",
147-
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3], op=INSERT, meta=()}",
148-
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1], op=INSERT, meta=()}",
149-
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875], op=INSERT, meta=()}",
150-
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0], op=INSERT, meta=()}",
151-
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8], op=INSERT, meta=()}",
152-
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75], op=INSERT, meta=()}",
153-
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14], op=INSERT, meta=()}",
154-
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1], op=INSERT, meta=()}");
145+
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL 'nextval('inventory.products_id_seq'::regclass)',`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT,`created_at` TIMESTAMP_LTZ(6)}, primaryKeys=id, options=()}",
146+
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, 2024-01-09T18:00], op=INSERT, meta=()}",
147+
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, 2024-01-07T16:45], op=INSERT, meta=()}",
148+
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, 2024-01-08T17:00], op=INSERT, meta=()}",
149+
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, 2024-01-05T14:20], op=INSERT, meta=()}",
150+
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, 2024-01-06T15:30], op=INSERT, meta=()}",
151+
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 2024-01-03T12:00], op=INSERT, meta=()}",
152+
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, 2024-01-04T13:15], op=INSERT, meta=()}",
153+
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, 2024-01-01T10:00], op=INSERT, meta=()}",
154+
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, 2024-01-02T11:30], op=INSERT, meta=()}");
155155

156156
LOG.info("Begin incremental reading stage.");
157157

@@ -165,9 +165,9 @@ void testSyncWholeDatabase() throws Exception {
165165

166166
// Perform DML changes after the wal log is generated
167167
waitUntilSpecificEvent(
168-
"DataChangeEvent{tableId=inventory.products, before=[106, hammer, 16oz carpenter's hammer, 1.0], after=[106, hammer, 18oz carpenter hammer, 1.0], op=UPDATE, meta=()}");
168+
"DataChangeEvent{tableId=inventory.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, 2024-01-06T15:30], after=[106, hammer, 18oz carpenter hammer, 1.0, 2024-01-06T15:30], op=UPDATE, meta=()}");
169169
waitUntilSpecificEvent(
170-
"DataChangeEvent{tableId=inventory.products, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}");
170+
"DataChangeEvent{tableId=inventory.products, before=[107, rocks, box of assorted rocks, 5.3, 2024-01-07T16:45], after=[107, rocks, box of assorted rocks, 5.1, 2024-01-07T16:45], op=UPDATE, meta=()}");
171171
} catch (Exception e) {
172172
LOG.error("Update table for CDC failed.", e);
173173
throw new RuntimeException(e);

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,22 @@ CREATE TABLE products (
2323
id SERIAL NOT NULL PRIMARY KEY,
2424
name VARCHAR(255) NOT NULL,
2525
description VARCHAR(512),
26-
weight FLOAT(24)
26+
weight FLOAT(24),
27+
created_at TIMESTAMPTZ
2728
);
2829
ALTER SEQUENCE products_id_seq RESTART WITH 101;
2930
ALTER TABLE products REPLICA IDENTITY FULL;
3031

3132
INSERT INTO products
32-
VALUES (default,'scooter','Small 2-wheel scooter',3.14),
33-
(default,'car battery','12V car battery',8.1),
34-
(default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
35-
(default,'hammer','12oz carpenter''s hammer',0.75),
36-
(default,'hammer','14oz carpenter''s hammer',0.875),
37-
(default,'hammer','16oz carpenter''s hammer',1.0),
38-
(default,'rocks','box of assorted rocks',5.3),
39-
(default,'jacket','water resistent black wind breaker',0.1),
40-
(default,'spare tire','24 inch spare tire',22.2);
33+
VALUES (default,'scooter','Small 2-wheel scooter',3.14,'2024-01-01 10:00:00+00'),
34+
(default,'car battery','12V car battery',8.1,'2024-01-02 11:30:00+00'),
35+
(default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8,'2024-01-03 12:00:00+00'),
36+
(default,'hammer','12oz carpenter''s hammer',0.75,'2024-01-04 13:15:00+00'),
37+
(default,'hammer','14oz carpenter''s hammer',0.875,'2024-01-05 14:20:00+00'),
38+
(default,'hammer','16oz carpenter''s hammer',1.0,'2024-01-06 15:30:00+00'),
39+
(default,'rocks','box of assorted rocks',5.3,'2024-01-07 16:45:00+00'),
40+
(default,'jacket','water resistent black wind breaker',0.1,'2024-01-08 17:00:00+00'),
41+
(default,'spare tire','24 inch spare tire',22.2,'2024-01-09 18:00:00+00');
4142

4243
-- Create customers table
4344
CREATE TABLE customers (

0 commit comments

Comments
 (0)