Skip to content

Commit 226f6e8

Browse files
committed
[core] Introduce blob v2
1 parent f955994 commit 226f6e8

46 files changed

Lines changed: 616 additions & 164 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/content/append-table/blob.md

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,19 @@ For details about the blob file format structure, see [File Format - BLOB]({{< r
9494
<td>No</td>
9595
<td style="word-wrap: break-word;">false</td>
9696
<td>Boolean</td>
97-
<td>When set to true, the blob field input is treated as a serialized BlobDescriptor. Paimon reads from the descriptor's URI and streams the data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory. This is useful for writing very large blobs that cannot fit in memory. When reading, if set to true, returns the BlobDescriptor bytes; if false, returns actual blob bytes.</td>
97+
<td>Controls read output format for blob fields. When set to true, queries return serialized BlobDescriptor bytes; when false, queries return actual blob bytes. This option is dynamic and can be changed with <code>ALTER TABLE ... SET</code>.</td>
98+
</tr>
99+
<tr>
100+
<td><h5>blob.stored-descriptor-fields</h5></td>
101+
<td>No</td>
102+
<td style="word-wrap: break-word;">(none)</td>
103+
<td>String</td>
104+
<td>
105+
Comma-separated BLOB field names stored as serialized <code>BlobDescriptor</code> bytes inline in normal data files.
106+
By default, all blob fields store blob bytes in separate <code>.blob</code> files.
107+
If configured, one table can mix:
108+
some BLOB fields in <code>.blob</code> files and some as descriptor references.
109+
</td>
98110
</tr>
99111
<tr>
100112
<td><h5>blob.target-file-size</h5></td>
@@ -217,31 +229,18 @@ SELECT id, name FROM image_table;
217229
SELECT * FROM image_table WHERE id = 1;
218230
```
219231

220-
### Blob Descriptor Mode
232+
### Blob Read Output Mode (`blob-as-descriptor`)
221233

222-
When you want to store references from external blob data (stored in object storage) without loading the entire blob into memory, you can use the `blob-as-descriptor` option:
234+
`blob-as-descriptor` only controls how blob values are returned when reading.
223235

224236
```sql
225-
-- Create table in descriptor mode
226-
CREATE TABLE blob_table (
227-
id INT,
228-
name STRING,
229-
image BYTES
230-
) WITH (
231-
'row-tracking.enabled' = 'true',
232-
'data-evolution.enabled' = 'true',
233-
'blob-field' = 'image',
234-
'blob-as-descriptor' = 'true'
235-
);
236-
237-
-- Insert with serialized BlobDescriptor bytes
238-
-- The BlobDescriptor contains: version (1 byte) + uri_length (4 bytes) + uri_bytes + offset (8 bytes) + length (8 bytes)
239-
-- Paimon will read from the descriptor's URI and stream data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory
240-
INSERT INTO blob_table VALUES (1, 'photo', X'<serialized_blob_descriptor_hex>');
237+
-- Return descriptor bytes
238+
ALTER TABLE blob_table SET ('blob-as-descriptor' = 'true');
239+
SELECT image FROM blob_table;
241240

242-
-- Toggle this setting to control read output format:
241+
-- Return actual blob bytes
243242
ALTER TABLE blob_table SET ('blob-as-descriptor' = 'false');
244-
SELECT * FROM blob_table; -- Returns actual blob bytes from Paimon storage
243+
SELECT image FROM blob_table;
245244
```
246245

247246
## Java API Usage
@@ -442,17 +441,13 @@ long offset = descriptor.offset(); // Starting position in the file
442441
long length = descriptor.length(); // Length of the blob data
443442
```
444443

445-
### Blob Descriptor Mode
444+
### Descriptor-Aware Write Behavior
446445

447-
The `blob-as-descriptor` option enables **memory-efficient writing** for very large blobs. When enabled, you provide a `BlobDescriptor` pointing to external data, and Paimon streams the data from the external source into Paimon's `.blob` files without loading the entire blob into memory.
446+
Paimon write path is descriptor-aware automatically:
448447

449-
**How it works:**
450-
1. **Writing**: You provide a serialized `BlobDescriptor` (containing URI, offset, length) as the blob field value
451-
2. **Paimon copies the data**: Paimon reads from the descriptor's URI in small chunks (e.g., 1024 bytes at a time) and writes to Paimon's `.blob` files
452-
3. **Data is stored in Paimon**: The blob data IS copied to Paimon storage, but in a streaming fashion
453-
454-
**Key benefit:**
455-
- **Memory efficiency**: For very large blobs (e.g., gigabyte-sized videos), you don't need to load the entire file into memory. Paimon streams the data incrementally.
448+
1. For blob fields stored in `.blob` files, input can be either blob bytes or a `BlobDescriptor`.
449+
2. For fields configured in `blob.stored-descriptor-fields`, Paimon stores descriptor bytes inline in data files (no `.blob` files for those fields), and input must be a descriptor.
450+
3. This behavior does not depend on `blob-as-descriptor`.
456451

457452
```java
458453
import org.apache.paimon.catalog.Catalog;
@@ -484,21 +479,21 @@ public class BlobDescriptorExample {
484479
Catalog catalog = CatalogFactory.createCatalog(catalogContext);
485480
catalog.createDatabase("my_db", true);
486481

487-
// Create table with blob-as-descriptor enabled
482+
// Create table: store "video" as descriptor bytes inline
488483
Schema schema = Schema.newBuilder()
489484
.column("id", DataTypes.INT())
490485
.column("name", DataTypes.STRING())
491486
.column("video", DataTypes.BLOB())
492487
.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
493488
.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true")
494-
.option(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true") // This is not necessary in java api
489+
.option(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key(), "video")
495490
.build();
496491

497492
Identifier tableId = Identifier.create("my_db", "video_table");
498493
catalog.createTable(tableId, schema, true);
499494
Table table = catalog.getTable(tableId);
500495

501-
// Write large blob using descriptor (memory-efficient)
496+
// Write blob using descriptor reference
502497
writeLargeBlobWithDescriptor(table);
503498

504499
// Read blob data
@@ -514,7 +509,7 @@ public class BlobDescriptorExample {
514509
// For a very large file (e.g., 2GB video), instead of loading into memory:
515510
// byte[] hugeVideo = Files.readAllBytes(...); // This would cause OutOfMemoryError!
516511
//
517-
// Use BlobDescriptor to let Paimon stream the data:
512+
// Create a descriptor reference to external blob
518513
String externalUri = "s3://my-bucket/videos/large_video.mp4";
519514
long fileSize = 2L * 1024 * 1024 * 1024; // 2GB
520515

@@ -524,8 +519,6 @@ public class BlobDescriptorExample {
524519
UriReader uriReader = UriReader.fromFile(fileIO);
525520
Blob blob = Blob.fromDescriptor(uriReader, descriptor);
526521

527-
// Write the serialized descriptor as blob data
528-
// Paimon will read from the URI and copy data to .blob files in chunks
529522
GenericRow row = GenericRow.of(
530523
1,
531524
BinaryString.fromString("large_video"),
@@ -535,7 +528,7 @@ public class BlobDescriptorExample {
535528
commit.commit(write.prepareCommit());
536529
}
537530

538-
System.out.println("Successfully wrote large blob using descriptor mode");
531+
System.out.println("Successfully wrote large blob using descriptor reference");
539532
}
540533

541534
private static void readBlobData(Table table) throws Exception {
@@ -548,20 +541,19 @@ public class BlobDescriptorExample {
548541
String name = row.getString(1).toString();
549542
Blob blob = row.getBlob(2);
550543

551-
// The blob data is now stored in Paimon's .blob files
552-
// blob.toDescriptor() returns a descriptor pointing to Paimon's internal storage
544+
// Field is configured in blob.stored-descriptor-fields, so descriptor is stored inline
553545
BlobDescriptor descriptor = blob.toDescriptor();
554546
System.out.println("Row " + id + ": " + name);
555-
System.out.println(" Paimon blob URI: " + descriptor.uri());
547+
System.out.println(" Blob URI: " + descriptor.uri());
556548
System.out.println(" Length: " + descriptor.length());
557549
});
558550
}
559551
}
560552
```
561553

562-
**Reading blob data with different modes:**
554+
**Reading blob data with different output modes:**
563555

564-
The `blob-as-descriptor` option also affects how data is returned when reading:
556+
The `blob-as-descriptor` option affects only read output:
565557

566558
```sql
567559
-- When blob-as-descriptor = true: Returns BlobDescriptor bytes (reference to Paimon blob file)
@@ -573,21 +565,30 @@ ALTER TABLE video_table SET ('blob-as-descriptor' = 'false');
573565
SELECT * FROM video_table; -- Returns actual blob bytes from Paimon storage
574566
```
575567

568+
### Descriptor Fields: Reuse by Descriptor (No Copy)
569+
570+
If you want downstream tables to **reuse** upstream blob files (no copying and no new <code>.blob</code> files), configure the target blob field(s):
571+
572+
```sql
573+
'blob.stored-descriptor-fields' = 'image'
574+
```
575+
576+
For these configured fields, Paimon stores only serialized <code>BlobDescriptor</code> bytes in normal data files. Reading the blob follows the descriptor URI to access bytes, and writing requires descriptor input for those fields.
577+
576578
## Limitations
577579

578-
1. **Single Blob Field**: Currently, only one blob field per table is supported.
579-
2. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported.
580-
3. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates.
581-
4. **No Statistics**: Statistics collection is not supported for blob columns.
582-
5. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`.
580+
1. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported.
581+
2. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates.
582+
3. **No Statistics**: Statistics collection is not supported for blob columns.
583+
4. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`.
583584

584585
## Best Practices
585586

586587
1. **Use Column Projection**: Always select only the columns you need. Avoid `SELECT *` if you don't need blob data.
587588

588589
2. **Set Appropriate Target File Size**: Configure `blob.target-file-size` based on your blob sizes. Larger values mean fewer files but larger individual files.
589590

590-
3. **Consider Descriptor Mode**: For very large blobs that cannot fit in memory, use `blob-as-descriptor` mode to stream data from external sources into Paimon without loading the entire blob into memory.
591+
3. **Use Descriptor Fields When Reusing External Blob Files**: Configure `blob.stored-descriptor-fields` for fields that should keep descriptor references instead of writing new `.blob` files.
591592

592593
4. **Use Partitioning**: Partition your blob tables by date or other dimensions to improve query performance and data management.
593594

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,6 +2146,15 @@ public InlineElement getDescription() {
21462146
"Specifies column names that should be stored as blob type. "
21472147
+ "This is used when you want to treat a BYTES column as a BLOB.");
21482148

2149+
@Immutable
2150+
public static final ConfigOption<String> BLOB_STORED_DESCRIPTOR_FIELDS =
2151+
key("blob.stored-descriptor-fields")
2152+
.stringType()
2153+
.noDefaultValue()
2154+
.withDescription(
2155+
"Comma-separated BLOB field names to store as serialized BlobDescriptor "
2156+
+ "bytes inline in data files.");
2157+
21492158
public static final ConfigOption<Boolean> BLOB_AS_DESCRIPTOR =
21502159
key("blob-as-descriptor")
21512160
.booleanType()
@@ -2710,6 +2719,22 @@ public boolean blobSplitByFileSize() {
27102719
.orElse(!options.get(BLOB_AS_DESCRIPTOR));
27112720
}
27122721

2722+
/**
2723+
* Resolve blob fields that should be stored as serialized descriptor bytes in data files.
2724+
*
2725+
* <p>If this option is not set, all blob fields are stored in '.blob' files by default.
2726+
*/
2727+
public Set<String> blobStoredDescriptorFields() {
2728+
return options.getOptional(BLOB_STORED_DESCRIPTOR_FIELDS)
2729+
.map(
2730+
s ->
2731+
Arrays.stream(s.split(","))
2732+
.map(String::trim)
2733+
.filter(str -> !str.isEmpty())
2734+
.collect(Collectors.toSet()))
2735+
.orElse(Collections.emptySet());
2736+
}
2737+
27132738
public long compactionFileSize(boolean hasPrimaryKey) {
27142739
// file size to join the compaction, we don't process on middle file size to avoid
27152740
// compact a same file twice (the compression is not calculate so accurately. the output

paimon-api/src/main/java/org/apache/paimon/types/BlobType.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.ArrayList;
2525
import java.util.List;
26+
import java.util.Set;
2627

2728
/**
2829
* Data type of binary large object.
@@ -67,13 +68,25 @@ public <R> R accept(DataTypeVisitor<R> visitor) {
6768
}
6869

6970
public static Pair<RowType, RowType> splitBlob(RowType rowType) {
71+
return splitBlob(rowType, java.util.Collections.emptySet());
72+
}
73+
74+
/**
75+
* Split row fields into normal fields and blob-file fields.
76+
*
77+
* <p>Blob fields contained in {@code blobStoredDescriptorFields} are treated as normal fields
78+
* (stored inline as serialized descriptor bytes), while other blob fields are treated as
79+
* blob-file fields.
80+
*/
81+
public static Pair<RowType, RowType> splitBlob(
82+
RowType rowType, Set<String> blobStoredDescriptorFields) {
7083
List<DataField> fields = rowType.getFields();
7184
List<DataField> normalFields = new ArrayList<>();
7285
List<DataField> blobFields = new ArrayList<>();
7386

7487
for (DataField field : fields) {
7588
DataTypeRoot type = field.type().getTypeRoot();
76-
if (type == DataTypeRoot.BLOB) {
89+
if (type == DataTypeRoot.BLOB && !blobStoredDescriptorFields.contains(field.name())) {
7790
blobFields.add(field);
7891
} else {
7992
normalFields.add(field);

paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
public class BlobDescriptor implements Serializable {
4646

4747
private static final long serialVersionUID = 1L;
48-
49-
private static final byte CURRENT_VERSION = 1;
48+
private static final long MAGIC = 0x424C4F4244455343L; // "BLOBDESC"
49+
private static final byte CURRENT_VERSION = 2;
5050

5151
private final byte version;
5252
private final String uri;
@@ -113,11 +113,12 @@ public byte[] serialize() {
113113
byte[] uriBytes = uri.getBytes(UTF_8);
114114
int uriLength = uriBytes.length;
115115

116-
int totalSize = 1 + 4 + uriLength + 8 + 8;
116+
int totalSize = 1 + 8 + 4 + uriLength + 8 + 8;
117117
ByteBuffer buffer = ByteBuffer.allocate(totalSize);
118118
buffer.order(ByteOrder.LITTLE_ENDIAN);
119119

120120
buffer.put(version);
121+
buffer.putLong(MAGIC);
121122
buffer.putInt(uriLength);
122123
buffer.put(uriBytes);
123124

@@ -130,16 +131,26 @@ public byte[] serialize() {
130131
public static BlobDescriptor deserialize(byte[] bytes) {
131132
ByteBuffer buffer = ByteBuffer.wrap(bytes);
132133
buffer.order(ByteOrder.LITTLE_ENDIAN);
133-
134134
byte version = buffer.get();
135-
if (version != CURRENT_VERSION) {
135+
if (version > CURRENT_VERSION) {
136136
throw new UnsupportedOperationException(
137-
"Expecting BlobDescriptor version to be "
137+
"Expecting BlobDescriptor version to be less than or equal to "
138138
+ CURRENT_VERSION
139139
+ ", but found "
140140
+ version
141141
+ ".");
142142
}
143+
144+
if (version > 1) {
145+
if (MAGIC != buffer.getLong()) {
146+
throw new IllegalArgumentException(
147+
"Invalid BlobDescriptor: missing magic header. Expected magic: "
148+
+ MAGIC
149+
+ ", but found: "
150+
+ buffer.getLong());
151+
}
152+
}
153+
143154
int uriLength = buffer.getInt();
144155
byte[] uriBytes = new byte[uriLength];
145156
buffer.get(uriBytes);
@@ -149,4 +160,21 @@ public static BlobDescriptor deserialize(byte[] bytes) {
149160
long length = buffer.getLong();
150161
return new BlobDescriptor(version, uri, offset, length);
151162
}
163+
164+
public static boolean isBlobDescriptor(byte[] bytes) {
165+
if (bytes.length < 9) {
166+
return false;
167+
}
168+
ByteBuffer buffer = ByteBuffer.wrap(bytes);
169+
buffer.order(ByteOrder.LITTLE_ENDIAN);
170+
171+
byte version = buffer.get();
172+
if (version == 1) {
173+
return true;
174+
} else if (version > CURRENT_VERSION) {
175+
return false;
176+
} else {
177+
return MAGIC == buffer.getLong();
178+
}
179+
}
152180
}

0 commit comments

Comments
 (0)