Skip to content

Commit 34ccd84

Browse files
tchivsleonardBang
authored andcommitted
[FLINK-38839][cdc-runtime] Support custom delimiter for table-options
1 parent 78d6adf commit 34ccd84

11 files changed

Lines changed: 240 additions & 25 deletions

File tree

docs/content.zh/docs/core-concept/transform.md

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,17 @@ under the License.
3131
# 参数
3232
为了定义一个 transform 规则,可以使用以下参数:
3333

34-
| 参数 | 含义 | 是否必填 |
35-
|---------------------------|--------------------------------------------------------|----------|
36-
| source-table | 源表 ID,支持正则表达式 | 必填 |
37-
| projection | 投影规则,支持类似 SQL 中 SELECT 子句的语法 | 可选 |
38-
| filter | 过滤规则,支持类似 SQL 中 WHERE 子句的语法 | 可选 |
39-
| primary-keys | 目标表主键,以逗号分隔 | 可选 |
40-
| partition-keys | 目标表分区键,以逗号分隔 | 可选 |
41-
| table-options | 用于配置自动建表时的建表语句 | 可选 |
42-
| converter-after-transform | 用于在 transform 处理后添加转换器来修改 DataChangeEvent | 可选 |
43-
| description | Transform 规则描述 | 可选 |
34+
| 参数 | 含义 | 是否必填 |
35+
|---------------------------|-------------------------------------------|------|
36+
| source-table | 源表 ID,支持正则表达式 | 必填 |
37+
| projection | 投影规则,支持类似 SQL 中 SELECT 子句的语法 | 可选 |
38+
| filter | 过滤规则,支持类似 SQL 中 WHERE 子句的语法 | 可选 |
39+
| primary-keys | 目标表主键,以逗号分隔 | 可选 |
40+
| partition-keys | 目标表分区键,以逗号分隔 | 可选 |
41+
| table-options | 用于配置自动建表时的建表语句 | 可选 |
42+
| table-options.delimiter | 多个表属性的分隔符, 默认值为 `,` | 可选 |
43+
| converter-after-transform | 用于在 transform 处理后添加转换器来修改 DataChangeEvent | 可选 |
44+
| description | Transform 规则描述 | 可选 |
4445

4546
多个 transform 规则可以声明在一个 pipeline YAML 文件中。
4647

@@ -335,7 +336,13 @@ transform:
335336
table-options: comment=web order
336337
description: auto creating table options example
337338
```
338-
小技巧:table-options 的格式是 `key1=value1,key2=value2`。
339+
小技巧:table-options 的格式是 `key1=value1,key2=value2`;如果 value 中包含逗号或其他特殊字符,可以使用 `table-options.delimiter` 指定自定义分隔符(如 `;`、`|`、`$` 等):
340+
```yaml
341+
transform:
342+
- source-table: mydb.web_order
343+
table-options: sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
344+
table-options.delimiter: ";"
345+
```
339346

340347
## 分类映射
341348
在一张表同时被多个转换规则命中时,
@@ -467,8 +474,8 @@ transform:
467474

468475
## Embedding AI 模型
469476

470-
Embedding AI 模型可以在 transform 规则中使用
471-
为了使用 Embedding AI 模型,你需要下载内置模型的 jar,然后在 `flink-cdc.sh` 命令中添加 `--jar {$BUILT_IN_MODEL_PATH}`。
477+
内置AI模型可以在transform规则中使用
478+
为了使用内置AI模型,你需要下载内置模型的jar,然后在flink-cdc.sh命令中添加`--jar {$BUILT_IN_MODEL_PATH}`。
472479

473480
如何定义一个 Embedding AI 模型:
474481

docs/content/docs/core-concept/transform.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ To describe a transform rule, the following parameters can be used:
3939
| primary-keys | Sink table primary keys, separated by commas | optional |
4040
| partition-keys | Sink table partition keys, separated by commas | optional |
4141
| table-options | used to the configure table creation statement when automatically creating tables | optional |
42+
| table-options.delimiter | delimiter for table-options key-value pairs, default is `,` | optional |
4243
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
4344
| description | Transform rule description | optional |
4445

@@ -339,6 +340,13 @@ transform:
339340
description: auto creating table options example
340341
```
341342
Tips: The format of table-options is `key1=value1,key2=value2`.
343+
If option values contain commas or other special characters, you can specify a custom delimiter using `table-options.delimiter` (such as `;`, `|`, `$`, etc.):
344+
```yaml
345+
transform:
346+
- source-table: mydb.web_order
347+
table-options: sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
348+
table-options.delimiter: ";"
349+
```
342350

343351
## Classification mapping
344352
If a table hits ultiple transform rules, only the first matched transform rule will apply.
@@ -538,4 +546,4 @@ The following built-in models are provided:
538546
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
539547
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
540548
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
541-
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
549+
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
105105

106106
public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";
107107

108+
public static final String TRANSFORM_TABLE_OPTION_DELIMITER_KEY = "table-options.delimiter";
109+
108110
private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
109111

110112
/** Parse the specified pipeline definition file. */
@@ -333,6 +335,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
333335
TRANSFORM_PRIMARY_KEY_KEY,
334336
TRANSFORM_PARTITION_KEY_KEY,
335337
TRANSFORM_TABLE_OPTION_KEY,
338+
TRANSFORM_TABLE_OPTION_DELIMITER_KEY,
336339
TRANSFORM_DESCRIPTION_KEY,
337340
TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY));
338341

@@ -366,6 +369,10 @@ private TransformDef toTransformDef(JsonNode transformNode) {
366369
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_KEY))
367370
.map(JsonNode::asText)
368371
.orElse(null);
372+
String tableOptionsDelimiter =
373+
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_DELIMITER_KEY))
374+
.map(JsonNode::asText)
375+
.orElse(null);
369376
String description =
370377
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
371378
.map(JsonNode::asText)
@@ -382,6 +389,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
382389
primaryKeys,
383390
partitionKeys,
384391
tableOptions,
392+
tableOptionsDelimiter,
385393
description,
386394
postTransformConverter);
387395
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
* definition.
3131
* <li>filter: a string for filtering the row of matched table as output. Optional for the
3232
* definition.
33-
* <li>primaryKeys: a string for primary key columns for matching input table IDs, seperated by
33+
* <li>primaryKeys: a string for primary key columns for matching input table IDs, separated by
3434
* `,`. Optional for the definition.
35-
* <li>partitionKeys: a string for partition key columns for matching input table IDs, seperated
35+
* <li>partitionKeys: a string for partition key columns for matching input table IDs, separated
3636
* by `,`. Optional for the definition.
3737
* <li>tableOptions: a string for table options for matching input table IDs, options are
38-
* seperated by `,`, key and value are seperated by `=`. Optional for the definition.
38+
* separated by `,`, key and value are separated by `=`. Optional for the definition.
39+
* <li>tableOptionsDelimiter: a string for delimiter of table options, default is `,`. Optional
40+
* for the definition.
3941
* <li>description: description for the transformation. Optional for the definition.
4042
* </ul>
4143
*/
@@ -47,6 +49,7 @@ public class TransformDef {
4749
private final String primaryKeys;
4850
private final String partitionKeys;
4951
private final String tableOptions;
52+
private final String tableOptionsDelimiter;
5053
private final String postTransformConverter;
5154

5255
public TransformDef(
@@ -56,6 +59,7 @@ public TransformDef(
5659
String primaryKeys,
5760
String partitionKeys,
5861
String tableOptions,
62+
String tableOptionsDelimiter,
5963
String description,
6064
String postTransformConverter) {
6165
this.sourceTable = sourceTable;
@@ -64,10 +68,30 @@ public TransformDef(
6468
this.primaryKeys = primaryKeys;
6569
this.partitionKeys = partitionKeys;
6670
this.tableOptions = tableOptions;
71+
this.tableOptionsDelimiter = tableOptionsDelimiter;
6772
this.description = description;
6873
this.postTransformConverter = postTransformConverter;
6974
}
70-
75+
public TransformDef(
76+
String sourceTable,
77+
String projection,
78+
String filter,
79+
String primaryKeys,
80+
String partitionKeys,
81+
String tableOptions,
82+
String description,
83+
String postTransformConverter) {
84+
this(
85+
sourceTable,
86+
projection,
87+
filter,
88+
primaryKeys,
89+
partitionKeys,
90+
tableOptions,
91+
",",
92+
description,
93+
postTransformConverter);
94+
}
7195
public String getSourceTable() {
7296
return sourceTable;
7397
}
@@ -96,6 +120,10 @@ public String getTableOptions() {
96120
return tableOptions;
97121
}
98122

123+
public String getTableOptionsDelimiter() {
124+
return tableOptionsDelimiter;
125+
}
126+
99127
public String getPostTransformConverter() {
100128
return postTransformConverter;
101129
}
@@ -137,6 +165,7 @@ public boolean equals(Object o) {
137165
&& Objects.equals(primaryKeys, that.primaryKeys)
138166
&& Objects.equals(partitionKeys, that.partitionKeys)
139167
&& Objects.equals(tableOptions, that.tableOptions)
168+
&& Objects.equals(tableOptionsDelimiter, that.tableOptionsDelimiter)
140169
&& Objects.equals(postTransformConverter, that.postTransformConverter);
141170
}
142171

@@ -150,6 +179,7 @@ public int hashCode() {
150179
primaryKeys,
151180
partitionKeys,
152181
tableOptions,
182+
tableOptionsDelimiter,
153183
postTransformConverter);
154184
}
155185
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ private PreTransformOperator generatePreTransform(
7474
transform.getPrimaryKeys(),
7575
transform.getPartitionKeys(),
7676
transform.getTableOptions(),
77+
transform.getTableOptionsDelimiter(),
7778
transform.getPostTransformConverter(),
7879
supportedMetadataColumns);
7980
}
@@ -111,6 +112,7 @@ public DataStream<Event> translatePostTransform(
111112
transform.getPrimaryKeys(),
112113
transform.getPartitionKeys(),
113114
transform.getTableOptions(),
115+
transform.getTableOptionsDelimiter(),
114116
transform.getPostTransformConverter(),
115117
supportedMetadataColumns);
116118
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,28 @@ public PostTransformOperatorBuilder addTransform(
4444
String tableOptions,
4545
String postTransformConverter,
4646
SupportedMetadataColumn[] supportedMetadataColumns) {
47+
return addTransform(
48+
tableInclusions,
49+
projection,
50+
filter,
51+
primaryKey,
52+
partitionKey,
53+
tableOptions,
54+
null,
55+
postTransformConverter,
56+
supportedMetadataColumns);
57+
}
58+
59+
public PostTransformOperatorBuilder addTransform(
60+
String tableInclusions,
61+
@Nullable String projection,
62+
@Nullable String filter,
63+
String primaryKey,
64+
String partitionKey,
65+
String tableOptions,
66+
String tableOptionsDelimiter,
67+
String postTransformConverter,
68+
SupportedMetadataColumn[] supportedMetadataColumns) {
4769
transformRules.add(
4870
new TransformRule(
4971
tableInclusions,
@@ -52,6 +74,7 @@ public PostTransformOperatorBuilder addTransform(
5274
primaryKey,
5375
partitionKey,
5476
tableOptions,
77+
tableOptionsDelimiter,
5578
postTransformConverter,
5679
supportedMetadataColumns));
5780
return this;
@@ -67,6 +90,7 @@ public PostTransformOperatorBuilder addTransform(
6790
"",
6891
"",
6992
"",
93+
"",
7094
null,
7195
new SupportedMetadataColumn[0]));
7296
return this;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public void setup(
110110
String primaryKeys = transformRule.getPrimaryKey();
111111
String partitionKeys = transformRule.getPartitionKey();
112112
String tableOptions = transformRule.getTableOption();
113+
String tableOptionsDelimiter = transformRule.getTableOptionsDelimiter();
113114
Selectors selectors =
114115
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
115116
transforms.add(
@@ -120,7 +121,11 @@ public void setup(
120121
schemaMetadataTransformers.add(
121122
new Tuple2<>(
122123
selectors,
123-
new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
124+
new SchemaMetadataTransform(
125+
primaryKeys,
126+
partitionKeys,
127+
tableOptions,
128+
tableOptionsDelimiter)));
124129
}
125130
this.preTransformProcessorMap = new ConcurrentHashMap<>();
126131
this.hasAsteriskMap = new ConcurrentHashMap<>();

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public PreTransformOperatorBuilder addTransform(
4343
"",
4444
"",
4545
"",
46+
"",
4647
null,
4748
new SupportedMetadataColumn[0]));
4849
return this;
@@ -57,6 +58,28 @@ public PreTransformOperatorBuilder addTransform(
5758
String tableOption,
5859
@Nullable String postTransformConverter,
5960
SupportedMetadataColumn[] supportedMetadataColumns) {
61+
return addTransform(
62+
tableInclusions,
63+
projection,
64+
filter,
65+
primaryKey,
66+
partitionKey,
67+
tableOption,
68+
null,
69+
postTransformConverter,
70+
supportedMetadataColumns);
71+
}
72+
73+
public PreTransformOperatorBuilder addTransform(
74+
String tableInclusions,
75+
@Nullable String projection,
76+
@Nullable String filter,
77+
String primaryKey,
78+
String partitionKey,
79+
String tableOption,
80+
String tableOptionsDelimiter,
81+
@Nullable String postTransformConverter,
82+
SupportedMetadataColumn[] supportedMetadataColumns) {
6083
transformRules.add(
6184
new TransformRule(
6285
tableInclusions,
@@ -65,6 +88,7 @@ public PreTransformOperatorBuilder addTransform(
6588
primaryKey,
6689
partitionKey,
6790
tableOption,
91+
tableOptionsDelimiter,
6892
postTransformConverter,
6993
supportedMetadataColumns));
7094
return this;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ public class SchemaMetadataTransform implements Serializable {
4242
private Map<String, String> options = new HashMap<>();
4343

4444
public SchemaMetadataTransform(
45-
String primaryKeyString, String partitionKeyString, String tableOptionString) {
45+
String primaryKeyString,
46+
String partitionKeyString,
47+
String tableOptionString,
48+
String tableOptionsDelimiter) {
4649
if (!StringUtils.isNullOrWhitespaceOnly(primaryKeyString)) {
4750
String[] primaryKeyArr = primaryKeyString.split(",");
4851
for (int i = 0; i < primaryKeyArr.length; i++) {
@@ -58,13 +61,19 @@ public SchemaMetadataTransform(
5861
partitionKeys = Arrays.asList(partitionKeyArr);
5962
}
6063
if (!StringUtils.isNullOrWhitespaceOnly(tableOptionString)) {
61-
for (String tableOption : tableOptionString.split(",")) {
62-
String[] kv = tableOption.split("=");
64+
// Use custom delimiter if provided, otherwise default to comma for backward
65+
// compatibility.
66+
String delimiter =
67+
StringUtils.isNullOrWhitespaceOnly(tableOptionsDelimiter)
68+
? ","
69+
: tableOptionsDelimiter;
70+
for (String tableOption : tableOptionString.split(delimiter)) {
71+
String[] kv = tableOption.split("=", 2);
6372
if (kv.length != 2) {
6473
throw new IllegalArgumentException(
65-
"table option format error: "
66-
+ tableOptionString
67-
+ ", it should be like `key1=value1,key2=value2`.");
74+
String.format(
75+
"table option format error: %s, it should be like `key1=value1%skey2=value2`.",
76+
tableOptionString, delimiter));
6877
}
6978
options.put(kv[0].trim(), kv[1].trim());
7079
}

0 commit comments

Comments
 (0)