Skip to content

Commit 4746938

Browse files
committed
[common] Support variant type
1 parent 9a299db commit 4746938

72 files changed

Lines changed: 8310 additions & 46 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,15 @@ Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
528528
new PbFetchLogReqForTable().setTableId(finalTableId);
529529
if (readContext.isProjectionPushDowned()) {
530530
assert projection != null;
531+
// When shredding is enabled, use the expanded projection
532+
// that includes shredded columns for the server request
533+
int[] projectedFields =
534+
readContext.getStorageProjectionInOrder() != null
535+
? readContext.getStorageProjectionInOrder()
536+
: projection.getProjectionInOrder();
531537
reqForTable
532538
.setProjectionPushdownEnabled(true)
533-
.setProjectedFields(projection.getProjectionInOrder());
539+
.setProjectedFields(projectedFields);
534540
} else {
535541
reqForTable.setProjectionPushdownEnabled(false);
536542
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ public LogScannerImpl(
112112
*/
113113
@Nullable
114114
private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) {
115-
RowType tableRowType = tableInfo.getRowType();
115+
// Validate against the user-visible row type (excludes internal shredded columns like $v.x)
116+
// so that projection indices from callers (e.g. Flink) stay within user-visible bounds.
117+
RowType tableRowType = tableInfo.getUserRowType();
116118
if (projectedFields != null) {
117119
for (int projectedField : projectedFields) {
118120
if (projectedField < 0 || projectedField >= tableRowType.getFieldCount()) {

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metrics.MetricNames;
3535
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
36+
import org.apache.fluss.row.InternalRow;
3637
import org.apache.fluss.row.arrow.ArrowWriter;
3738
import org.apache.fluss.row.arrow.ArrowWriterPool;
39+
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
3840
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
3941
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
42+
import org.apache.fluss.types.variant.ShreddingSchemaInferrer;
4043
import org.apache.fluss.utils.CopyOnWriteMap;
4144
import org.apache.fluss.utils.MathUtils;
4245
import org.apache.fluss.utils.clock.Clock;
@@ -113,6 +116,19 @@ public final class RecordAccumulator {
113116
private final Clock clock;
114117
private final DynamicWriteBatchSizeEstimator batchSizeEstimator;
115118

119+
/**
120+
* Optional Coordinator gateway used to send {@code ApplyShreddingSchema} RPCs. Set after
121+
* construction via {@link #setCoordinatorGateway(CoordinatorGateway)}.
122+
*/
123+
@Nullable private volatile CoordinatorGateway coordinatorGateway;
124+
125+
/**
126+
* Per-table {@link VariantShreddingManager}s. Created lazily on first append to an ARROW_LOG
127+
* table with Variant shredding enabled and at least one Variant column.
128+
*/
129+
private final ConcurrentMap<PhysicalTablePath, VariantShreddingManager> shreddingManagers =
130+
new CopyOnWriteMap<>();
131+
116132
// TODO add retryBackoffMs to retry the produce request upon receiving an error.
117133
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
118134
// TODO add nextBatchExpiryTimeMs
@@ -158,6 +174,91 @@ private void registerMetrics(WriterMetricGroup writerMetricGroup) {
158174
MetricNames.WRITER_BUFFER_WAITING_THREADS, writerBufferPool::queued);
159175
}
160176

177+
/**
178+
* Sets the {@link CoordinatorGateway} to use for Variant shredding schema evolution RPCs.
179+
*
180+
* <p>This must be called once after the accumulator is constructed (and after the coordinator
181+
* server is known) to enable automatic shredding. If not called, Variant statistics will still
182+
* be collected locally but no schema evolution RPC will be fired.
183+
*/
184+
public void setCoordinatorGateway(CoordinatorGateway gateway) {
185+
this.coordinatorGateway = gateway;
186+
}
187+
188+
/**
189+
* Collects Variant statistics for the row being appended, and — once enough samples have been
190+
* observed — fires an async schema-evolution RPC to the Coordinator.
191+
*
192+
* <p>This method is a no-op when:
193+
*
194+
* <ul>
195+
* <li>the write format is not {@link WriteFormat#ARROW_LOG}
196+
* <li>the table has no Variant columns
197+
* <li>Variant shredding is disabled in the table's configuration
198+
* <li>the coordinator gateway has not been set
199+
* </ul>
200+
*/
201+
private void maybeCollectVariantStats(
202+
PhysicalTablePath physicalTablePath,
203+
TableInfo tableInfo,
204+
WriteFormat writeFormat,
205+
InternalRow row) {
206+
if (coordinatorGateway == null) {
207+
return;
208+
}
209+
if (writeFormat != WriteFormat.ARROW_LOG) {
210+
return;
211+
}
212+
if (!tableInfo.isVariantShreddingEnabled()) {
213+
return;
214+
}
215+
int[] variantIndices = tableInfo.getVariantColumnIndices();
216+
if (variantIndices.length == 0) {
217+
return;
218+
}
219+
220+
VariantShreddingManager manager =
221+
shreddingManagers.computeIfAbsent(
222+
physicalTablePath,
223+
path -> {
224+
String[] colNames = new String[variantIndices.length];
225+
for (int i = 0; i < variantIndices.length; i++) {
226+
colNames[i] =
227+
tableInfo
228+
.getRowType()
229+
.getFields()
230+
.get(variantIndices[i])
231+
.getName();
232+
}
233+
ShreddingSchemaInferrer inferrer =
234+
new ShreddingSchemaInferrer()
235+
.setPresenceThreshold(
236+
tableInfo
237+
.getTableConfig()
238+
.getVariantShreddingPresenceThreshold())
239+
.setTypeConsistencyThreshold(
240+
tableInfo
241+
.getTableConfig()
242+
.getVariantShreddingTypeConsistencyThreshold())
243+
.setMaxShreddedFields(
244+
tableInfo
245+
.getTableConfig()
246+
.getVariantShreddingMaxFields())
247+
.setMinSampleSize(
248+
tableInfo
249+
.getTableConfig()
250+
.getVariantShreddingMinSampleSize());
251+
CoordinatorGateway gw = coordinatorGateway;
252+
return new VariantShreddingManager(
253+
path.getTablePath(),
254+
variantIndices,
255+
colNames,
256+
inferrer,
257+
gw::applyShreddingSchema);
258+
});
259+
manager.collectRow(row);
260+
}
261+
161262
/**
162263
* Add a record to the accumulator, return to append result.
163264
*
@@ -195,6 +296,12 @@ public RecordAppendResult append(
195296
synchronized (dq) {
196297
RecordAppendResult appendResult = tryAppend(writeRecord, callback, dq);
197298
if (appendResult != null) {
299+
// Row was appended to an existing batch; collect Variant statistics.
300+
maybeCollectVariantStats(
301+
physicalTablePath,
302+
tableInfo,
303+
writeRecord.getWriteFormat(),
304+
writeRecord.getRow());
198305
return appendResult;
199306
}
200307
}
@@ -212,6 +319,12 @@ public RecordAppendResult append(
212319
writeRecord, callback, bucketId, tableInfo, dq, memorySegments);
213320
if (appendResult.newBatchCreated) {
214321
memorySegments = Collections.emptyList();
322+
// Row was appended to the new batch; collect Variant statistics.
323+
maybeCollectVariantStats(
324+
physicalTablePath,
325+
tableInfo,
326+
writeRecord.getWriteFormat(),
327+
writeRecord.getRow());
215328
}
216329
return appendResult;
217330
}
@@ -628,7 +741,10 @@ private WriteBatch createWriteBatch(
628741
schemaId,
629742
outputView.getPreAllocatedSize(),
630743
tableInfo.getRowType(),
631-
tableInfo.getTableConfig().getArrowCompressionInfo());
744+
tableInfo.getTableConfig().getArrowCompressionInfo(),
745+
tableInfo.getShreddingSchemas().isEmpty()
746+
? null
747+
: tableInfo.getShreddingSchemas());
632748
LogRecordBatchStatisticsCollector statisticsCollector = null;
633749
if (tableInfo.isStatisticsEnabled()) {
634750
statisticsCollector =
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.write;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.metadata.TablePath;
22+
import org.apache.fluss.row.InternalRow;
23+
import org.apache.fluss.rpc.messages.ApplyShreddingSchemaRequest;
24+
import org.apache.fluss.rpc.messages.ApplyShreddingSchemaResponse;
25+
import org.apache.fluss.rpc.messages.PbTablePath;
26+
import org.apache.fluss.types.variant.ShreddingSchema;
27+
import org.apache.fluss.types.variant.ShreddingSchemaInferrer;
28+
import org.apache.fluss.types.variant.Variant;
29+
import org.apache.fluss.types.variant.VariantStatisticsCollector;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.function.Function;
37+
38+
/**
39+
* Manages automatic Variant shredding inference for a single table on the client write path.
40+
*
41+
* <p>Each time a row is appended to a write batch ({@link #collectRow(InternalRow)}), this manager
42+
* extracts the Variant values from the row's Variant-typed columns and feeds them into per-column
43+
* {@link VariantStatisticsCollector}s. Once the minimum sample threshold is met and a non-empty
44+
* {@link ShreddingSchema} is inferred, an asynchronous RPC is dispatched to the Coordinator to
45+
* trigger server-side schema evolution.
46+
*
47+
* <p>Schema evolution is triggered <em>at most once</em> per manager instance. If the RPC fails
48+
* (e.g., transient network error), the {@link #schemaTriggered} flag is reset to allow a retry on
49+
* the next collected row.
50+
*
51+
* <p>Thread safety: {@link #collectRow} is called from the writer thread and is guarded by the
52+
* deque lock in {@link RecordAccumulator}. The {@link #schemaTriggered} flag is an {@link
53+
* AtomicBoolean} so it can safely be reset from the RPC callback thread.
54+
*/
55+
@Internal
56+
public class VariantShreddingManager {
57+
58+
private static final Logger LOG = LoggerFactory.getLogger(VariantShreddingManager.class);
59+
60+
private final TablePath tablePath;
61+
62+
/**
63+
* Column indices (into the row's schema) of all Variant-typed columns. Each index maps to the
64+
* corresponding {@link VariantStatisticsCollector} in {@link #collectors} at the same array
65+
* position.
66+
*/
67+
private final int[] variantColumnIndices;
68+
69+
/**
70+
* Names of the Variant columns, used to construct the column-name-based {@link
71+
* ShreddingSchema}.
72+
*/
73+
private final String[] variantColumnNames;
74+
75+
/** One statistics collector per Variant column. */
76+
private final VariantStatisticsCollector[] collectors;
77+
78+
/** Inferrer, configured from the table's shredding options. */
79+
private final ShreddingSchemaInferrer inferrer;
80+
81+
/**
82+
* Guards against duplicate schema evolution RPCs. Set to {@code true} when an RPC is in flight;
83+
* reset to {@code false} on RPC failure so the next {@link #collectRow} call can retry.
84+
*/
85+
private final AtomicBoolean schemaTriggered = new AtomicBoolean(false);
86+
87+
/**
88+
* Callback that sends the {@link ApplyShreddingSchemaRequest} to the Coordinator and returns a
89+
* future. Injected by {@link RecordAccumulator} so this class does not depend on a concrete RPC
90+
* client.
91+
*/
92+
private final Function<
93+
ApplyShreddingSchemaRequest, CompletableFuture<ApplyShreddingSchemaResponse>>
94+
rpcCaller;
95+
96+
public VariantShreddingManager(
97+
TablePath tablePath,
98+
int[] variantColumnIndices,
99+
String[] variantColumnNames,
100+
ShreddingSchemaInferrer inferrer,
101+
Function<ApplyShreddingSchemaRequest, CompletableFuture<ApplyShreddingSchemaResponse>>
102+
rpcCaller) {
103+
this.tablePath = tablePath;
104+
this.variantColumnIndices = variantColumnIndices;
105+
this.variantColumnNames = variantColumnNames;
106+
this.inferrer = inferrer;
107+
this.rpcCaller = rpcCaller;
108+
109+
this.collectors = new VariantStatisticsCollector[variantColumnIndices.length];
110+
for (int i = 0; i < variantColumnIndices.length; i++) {
111+
this.collectors[i] = new VariantStatisticsCollector();
112+
}
113+
}
114+
115+
/**
116+
* Collects statistics from one row that is about to be (or has just been) written.
117+
*
118+
* <p>This method extracts the Variant value at each variant-column index from {@code row} and
119+
* feeds it into the corresponding {@link VariantStatisticsCollector}. If the inferrer produces
120+
* a non-empty schema for any column, {@link #triggerSchemaEvolution} is called.
121+
*
122+
* @param row the row being written
123+
*/
124+
public void collectRow(InternalRow row) {
125+
if (schemaTriggered.get()) {
126+
return;
127+
}
128+
129+
for (int c = 0; c < variantColumnIndices.length; c++) {
130+
int colIdx = variantColumnIndices[c];
131+
Variant variant = row.isNullAt(colIdx) ? null : row.getVariant(colIdx);
132+
collectors[c].collect(variant);
133+
}
134+
135+
maybeInferAndTrigger();
136+
}
137+
138+
// --------------------------------------------------------------------------------------------
139+
// Internal helpers
140+
// --------------------------------------------------------------------------------------------
141+
142+
private void maybeInferAndTrigger() {
143+
for (int c = 0; c < variantColumnIndices.length; c++) {
144+
VariantStatisticsCollector collector = collectors[c];
145+
long totalRecords = collector.getTotalRecords();
146+
147+
// Skip inference until we have enough samples to be statistically meaningful.
148+
// This avoids creating empty ShreddingSchema objects on every row.
149+
if (totalRecords < inferrer.getMinSampleSize()) {
150+
continue;
151+
}
152+
153+
ShreddingSchema schema =
154+
inferrer.infer(variantColumnNames[c], collector.getStatistics(), totalRecords);
155+
if (!schema.getFields().isEmpty()) {
156+
triggerSchemaEvolution(schema);
157+
return;
158+
}
159+
}
160+
}
161+
162+
private void triggerSchemaEvolution(ShreddingSchema schema) {
163+
if (!schemaTriggered.compareAndSet(false, true)) {
164+
return;
165+
}
166+
167+
String schemaJson = schema.toJson();
168+
LOG.info(
169+
"Triggering Variant shredding schema evolution for table {}: {}",
170+
tablePath,
171+
schemaJson);
172+
173+
ApplyShreddingSchemaRequest request =
174+
new ApplyShreddingSchemaRequest()
175+
.setTablePath(
176+
new PbTablePath()
177+
.setDatabaseName(tablePath.getDatabaseName())
178+
.setTableName(tablePath.getTableName()))
179+
.setShreddingSchemaJson(schemaJson);
180+
181+
rpcCaller
182+
.apply(request)
183+
.whenComplete(
184+
(resp, ex) -> {
185+
if (ex != null) {
186+
LOG.warn(
187+
"Failed to apply Variant shredding schema for table {}, "
188+
+ "will retry on next row: {}",
189+
tablePath,
190+
ex.getMessage());
191+
schemaTriggered.set(false);
192+
} else {
193+
LOG.info(
194+
"Successfully applied Variant shredding schema for table {}",
195+
tablePath);
196+
}
197+
});
198+
}
199+
}

0 commit comments

Comments
 (0)