Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
23ccc3b
Add Apache Iceberg export support to LQP
mjschleich Mar 19, 2026
32a6ec8
Merge branch 'main' of github.com:RelationalAI/logical-query-protocol…
vustef Mar 26, 2026
4c28a80
proto refactor, includes iceberg_data for the read path
vustef Mar 26, 2026
af3eb4f
map type in codegen
vustef Mar 26, 2026
6e73897
fix transactions.proto: remove IcebergCatalogProperties
vustef Mar 26, 2026
fc2ef78
switch back to auth_properties, to support potential evolution of dif…
vustef Mar 26, 2026
f105bb9
update grammar
vustef Mar 26, 2026
4fec6ac
make protobuf
vustef Mar 26, 2026
b375628
make force-parsers
vustef Mar 26, 2026
c97f4d6
make force-printers
vustef Mar 26, 2026
892c99b
tests + make update-bins + make update-snapshots
vustef Mar 26, 2026
d9e5b17
cd meta && uv run ruff format
vustef Mar 26, 2026
fcefae5
equality, not auto-generated?
vustef Mar 26, 2026
1933735
grammar and template updates
vustef Mar 26, 2026
d8faea8
make force-parsers
vustef Mar 26, 2026
598487f
cd meta && uv run ruff format
vustef Mar 26, 2026
4b59a1b
fix equality.jl
vustef Mar 26, 2026
b082b3a
fix cd meta && uv run python -m pytest
vustef Mar 26, 2026
3d74d01
uv run ruff format
vustef Mar 26, 2026
e0fd473
fix go test
vustef Mar 26, 2026
0e18be4
revert _extract_value_int64 return type change in favor of casting in…
vustef Mar 26, 2026
ce29c19
update snapshots, fix julia errors (hopefully)
vustef Mar 26, 2026
8bca5c5
remove unneeded things
vustef Mar 26, 2026
47e53a0
.
vustef Mar 26, 2026
804eb56
update protos based on comments
vustef Mar 27, 2026
cfc532a
.
vustef Mar 27, 2026
65d373a
the rest
vustef Mar 27, 2026
39c9be5
rename to iceberg_catalog_config in the grammar
vustef Mar 27, 2026
e78edfc
update proto to make it possible to pass table_def vs gnf_def. Distin…
vustef Mar 27, 2026
c85d213
fix nested repeated (not allowed in proto) + grammar + equality
vustef Mar 27, 2026
7827ff2
generated files
vustef Mar 27, 2026
5732cdb
remove source_gnf_defs option
vustef Mar 27, 2026
7e5bd13
bring back the todo
vustef Mar 27, 2026
5169c28
change grammar a bit to nest source_table_def
vustef Mar 27, 2026
5c3c45c
prettify the manual lqp
vustef Mar 27, 2026
5899a98
remove types from columns, they are implicit
vustef Mar 30, 2026
ca9a01f
bring back comment
vustef Mar 30, 2026
adbf5ad
PR feedback, proto
vustef Mar 30, 2026
e35f5e1
grammar and generated files
vustef Mar 30, 2026
e9d82d3
mask_secret_value
vustef Mar 30, 2026
0f00380
change grammar to use non-terminals, for better pretty printing. Roun…
vustef Mar 30, 2026
de73f13
fix bug in memoization, it had collisions when the same proto object …
vustef Mar 30, 2026
718dcf2
fix pretty printing recursion error
vustef Mar 30, 2026
17c9b1d
move snapshot info out of locator, for reuse with export. rename expo…
vustef Mar 31, 2026
a429b3a
forgot a file
vustef Mar 31, 2026
9b9086c
equality tests
vustef Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions meta/src/meta/codegen_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class BuiltinTemplate:
"make_empty_bytes": BuiltinTemplate('b""'),
"dict_from_list": BuiltinTemplate("dict({0})"),
"dict_get": BuiltinTemplate("{0}.get({1})"),
"dict_to_pairs": BuiltinTemplate("sorted({0}.items())"),
"has_proto_field": BuiltinTemplate("{0}.HasField({1})"),
"string_to_upper": BuiltinTemplate("{0}.upper()"),
"string_in_list": BuiltinTemplate("{0} in {1}"),
Expand Down Expand Up @@ -144,6 +145,7 @@ class BuiltinTemplate:
"make_empty_bytes": BuiltinTemplate("UInt8[]"),
"dict_from_list": BuiltinTemplate("Dict({0})"),
"dict_get": BuiltinTemplate("get({0}, {1}, nothing)"),
"dict_to_pairs": BuiltinTemplate("sort([(k, v) for (k, v) in {0}])"),
"has_proto_field": BuiltinTemplate("_has_proto_field({0}, Symbol({1}))"),
"string_to_upper": BuiltinTemplate("uppercase({0})"),
"string_in_list": BuiltinTemplate("({0} in {1})"),
Expand Down Expand Up @@ -259,6 +261,7 @@ class BuiltinTemplate:
"make_empty_bytes": BuiltinTemplate("[]byte{}"),
"dict_from_list": BuiltinTemplate("dictFromList({0})"),
"dict_get": BuiltinTemplate("dictGetValue({0}, {1})"),
"dict_to_pairs": BuiltinTemplate("dictToPairs({0})"),
"has_proto_field": BuiltinTemplate("hasProtoField({0}, {1})"),
"string_to_upper": BuiltinTemplate("strings.ToUpper({0})"),
"string_in_list": BuiltinTemplate("stringInList({0}, {1})"),
Expand Down
153 changes: 152 additions & 1 deletion meta/src/meta/grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@
%nonterm export_csv_config transactions.ExportCSVConfig
%nonterm export_csv_path String
%nonterm export_csv_source transactions.ExportCSVSource
%nonterm export_iceberg_config transactions.ExportIcebergConfig
%nonterm iceberg_config logic.IcebergConfig
%nonterm iceberg_config_scope String
%nonterm iceberg_data logic.IcebergData
%nonterm iceberg_export_column transactions.IcebergExportColumn
%nonterm iceberg_locator logic.IcebergLocator
%nonterm iceberg_property_entry Tuple[String, String]
%nonterm iceberg_to_snapshot String
%nonterm false logic.Disjunction
%nonterm ffi logic.FFI
%nonterm ffi_args Sequence[logic.Abstraction]
Expand Down Expand Up @@ -1039,6 +1047,10 @@ data
construct: $$ = logic.Data(csv_data=$1)
deconstruct if builtin.has_proto_field($$, 'csv_data'):
$1: logic.CSVData = $$.csv_data
| iceberg_data
construct: $$ = logic.Data(iceberg_data=$1)
deconstruct if builtin.has_proto_field($$, 'iceberg_data'):
$1: logic.IcebergData = $$.iceberg_data

edb_path
: "[" STRING* "]"
Expand Down Expand Up @@ -1126,6 +1138,57 @@ gnf_column
$4: Optional[logic.RelationId] = $$.target_id if builtin.has_proto_field($$, "target_id") else None
$6: Sequence[logic.Type] = $$.types

iceberg_property_entry
: "(" "prop" STRING STRING ")"
construct: $$ = builtin.tuple($3, $4)
deconstruct:
$3: String = $$[0]
$4: String = $$[1]

iceberg_locator
: "(" "iceberg_locator" "(" "table_name" STRING ")" "(" "namespace" STRING* ")" "(" "warehouse" STRING ")" ")"
construct: $$ = logic.IcebergLocator(table_name=$5, namespace=$9, warehouse=$13)
deconstruct:
$5: String = $$.table_name
$9: Sequence[String] = $$.namespace
$13: String = $$.warehouse

iceberg_config_scope
: "(" "scope" STRING ")"
construct: $$ = $3
deconstruct: $3: String = $$

iceberg_config
: "(" "iceberg_config" "(" "catalog_uri" STRING ")" iceberg_config_scope? "(" "properties" iceberg_property_entry* ")" "(" "auth_properties" iceberg_property_entry* ")" ")"
construct: $$ = construct_iceberg_config($5, $7, $10, $14)
deconstruct:
$5: String = $$.catalog_uri
$7: Optional[String] = deconstruct_iceberg_config_scope_optional($$)
$10: Sequence[Tuple[String, String]] = builtin.dict_to_pairs($$.properties)
$14: Sequence[Tuple[String, String]] = builtin.dict_to_pairs($$.auth_properties)

iceberg_export_column
: "(" "iceberg_column" STRING type boolean_value ")"
construct: $$ = transactions.IcebergExportColumn(name=$3, type=$4, nullable=$5)
deconstruct:
$3: String = $$.name
$4: logic.Type = $$.type
$5: Boolean = $$.nullable

iceberg_to_snapshot
: "(" "to_snapshot" STRING ")"
construct: $$ = $3
deconstruct: $3: String = $$

iceberg_data
: "(" "iceberg_data" iceberg_locator iceberg_config gnf_columns iceberg_to_snapshot? ")"
construct: $$ = logic.IcebergData(locator=$3, config=$4, columns=$5, to_snapshot=iceberg_optional_string_field($6))
deconstruct:
$3: logic.IcebergLocator = $$.locator
$4: logic.IcebergConfig = $$.config
$5: Sequence[logic.GNFColumn] = $$.columns
$6: Optional[String] = deconstruct_iceberg_data_to_snapshot_optional($$)

undefine
: "(" "undefine" fragment_id ")"
construct: $$ = transactions.Undefine(fragment_id=$3)
Expand Down Expand Up @@ -1202,7 +1265,12 @@ abort
export
: "(" "export" export_csv_config ")"
construct: $$ = transactions.Export(csv_config=$3)
deconstruct: $3: transactions.ExportCSVConfig = $$.csv_config
deconstruct if builtin.has_proto_field($$, 'csv_config'):
$3: transactions.ExportCSVConfig = $$.csv_config
Comment thread
vustef marked this conversation as resolved.
| "(" "export_iceberg" export_iceberg_config ")"
construct: $$ = transactions.Export(iceberg_config=$3)
deconstruct if builtin.has_proto_field($$, 'iceberg_config'):
$3: transactions.ExportIcebergConfig = $$.iceberg_config

export_csv_config
: "(" "export_csv_config_v2" export_csv_path export_csv_source csv_config ")"
Expand Down Expand Up @@ -1241,6 +1309,15 @@ export_csv_source
deconstruct if builtin.has_proto_field($$, 'table_def'):
$3: logic.RelationId = $$.table_def

export_iceberg_config
: "(" "export_iceberg_config" iceberg_locator iceberg_config "(" "columns" iceberg_export_column* ")" config_dict? ")"
construct: $$ = construct_export_iceberg_config_full($3, $4, $7, $9)
deconstruct:
$3: logic.IcebergLocator = $$.locator
$4: logic.IcebergConfig = $$.config
$7: Sequence[transactions.IcebergExportColumn] = $$.columns
$9: Optional[Sequence[Tuple[String, logic.Value]]] = deconstruct_export_iceberg_config_optional($$)


%%

Expand Down Expand Up @@ -1558,6 +1635,80 @@ def deconstruct_export_csv_config(msg: transactions.ExportCSVConfig) -> List[Tup
return builtin.list_sort(result)


def construct_iceberg_config(
catalog_uri: String,
scope_opt: Optional[String],
property_pairs: Sequence[Tuple[String, String]],
auth_property_pairs: Sequence[Tuple[String, String]],
) -> logic.IcebergConfig:
props: Dict[String, String] = builtin.dict_from_list(property_pairs)
auth_props: Dict[String, String] = builtin.dict_from_list(auth_property_pairs)
scope_pb: Optional[String] = iceberg_optional_string_field(scope_opt)
return logic.IcebergConfig(
catalog_uri=catalog_uri,
scope=scope_pb,
properties=props,
auth_properties=auth_props,
)


def iceberg_optional_string_field(s: Optional[String]) -> Optional[String]:
if s is None:
return builtin.none()
return builtin.some(s)


def deconstruct_iceberg_config_scope_optional(msg: logic.IcebergConfig) -> Optional[String]:
if builtin.has_proto_field(msg, "scope"):
return builtin.some(builtin.unwrap_option(msg.scope))
return builtin.none()


def deconstruct_iceberg_data_to_snapshot_optional(msg: logic.IcebergData) -> Optional[String]:
if builtin.has_proto_field(msg, "to_snapshot"):
return builtin.some(builtin.unwrap_option(msg.to_snapshot))
return builtin.none()


def construct_export_iceberg_config_full(
locator: logic.IcebergLocator,
config: logic.IcebergConfig,
columns: Sequence[transactions.IcebergExportColumn],
config_dict: Optional[Sequence[Tuple[String, logic.Value]]],
) -> transactions.ExportIcebergConfig:
prefix: String = ""
target_file_size_bytes: int = 0
compression: String = ""
if config_dict is not None:
cfg: Dict[String, logic.Value] = builtin.dict_from_list(builtin.unwrap_option(config_dict))
prefix = _extract_value_string(builtin.dict_get(cfg, "prefix"), "")
target_file_size_bytes = _extract_value_int64(builtin.dict_get(cfg, "target_file_size_bytes"), 0)
compression = _extract_value_string(builtin.dict_get(cfg, "compression"), "")
return transactions.ExportIcebergConfig(
locator=locator,
config=config,
columns=columns,
prefix=builtin.some(prefix),
target_file_size_bytes=builtin.some(target_file_size_bytes),
compression=compression,
)


def deconstruct_export_iceberg_config_optional(
msg: transactions.ExportIcebergConfig,
) -> Optional[Sequence[Tuple[String, logic.Value]]]:
result: List[Tuple[String, logic.Value]] = list[Tuple[String, logic.Value]]()
if builtin.unwrap_option(msg.prefix) != "":
builtin.list_push(result, builtin.tuple("prefix", _make_value_string(builtin.unwrap_option(msg.prefix))))
if builtin.unwrap_option(msg.target_file_size_bytes) != 0:
builtin.list_push(result, builtin.tuple("target_file_size_bytes", _make_value_int64(builtin.unwrap_option(msg.target_file_size_bytes))))
if msg.compression != "":
builtin.list_push(result, builtin.tuple("compression", _make_value_string(msg.compression)))
if builtin.length(result) == 0:
return None
return builtin.some(builtin.list_sort(result))


def deconstruct_relation_id_string(msg: logic.RelationId) -> String:
name: Optional[String] = builtin.relation_id_to_string(msg)
return builtin.unwrap_option(name)
Expand Down
3 changes: 3 additions & 0 deletions meta/src/meta/proto_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class ProtoField:
number: int
is_repeated: bool = False
is_optional: bool = False
is_map: bool = False
map_key_type: str = ""
map_value_type: str = ""


@dataclass
Expand Down
24 changes: 24 additions & 0 deletions meta/src/meta/proto_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
_NESTED_ENUM_PATTERN = re.compile(r"enum\s+(\w+)\s*\{([^}]+)\}")
_ONEOF_PATTERN = re.compile(r"oneof\s+(\w+)\s*\{((?:[^{}]|\{[^}]*\})*)\}")
_FIELD_PATTERN = re.compile(r"(repeated|optional)?\s*(\w+)\s+(\w+)\s*=\s*(\d+);")
_MAP_FIELD_PATTERN = re.compile(r"map<\s*(\w+)\s*,\s*(\w+)\s*>\s+(\w+)\s*=\s*(\d+);")
_ONEOF_FIELD_PATTERN = re.compile(r"(\w+)\s+(\w+)\s*=\s*(\d+);")
_ENUM_VALUE_PATTERN = re.compile(r"(\w+)\s*=\s*(\d+);")
_RESERVED_PATTERN = re.compile(r"reserved\s+([^;]+);")
Expand Down Expand Up @@ -193,6 +194,29 @@ def _parse_message(self, name: str, body: str) -> ProtoMessage:
)
message.fields.append(proto_field)

# Parse map fields
for match in _MAP_FIELD_PATTERN.finditer(body):
if any(
start <= match.start() and match.end() <= end
for start, end in excluded_spans
):
continue

key_type = match.group(1)
value_type = match.group(2)
field_name = match.group(3)
field_number = int(match.group(4))

proto_field = ProtoField(
name=field_name,
type=f"map<{key_type},{value_type}>",
number=field_number,
is_map=True,
map_key_type=key_type,
map_value_type=value_type,
)
message.fields.append(proto_field)

# Parse nested enums
for match in _NESTED_ENUM_PATTERN.finditer(body):
enum_name = match.group(1)
Expand Down
1 change: 1 addition & 0 deletions meta/src/meta/target_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def is_builtin(name: str) -> bool:
# === Dict operations ===
register_builtin("dict_from_list", [SequenceType(TupleType([K, V]))], DictType(K, V))
register_builtin("dict_get", [DictType(K, V), K], OptionType(V))
register_builtin("dict_to_pairs", [DictType(K, V)], ListType(TupleType([K, V])))

# === Protobuf operations ===
register_builtin("has_proto_field", [T, STRING], BOOLEAN) # msg.HasField(field_name)
Expand Down
14 changes: 14 additions & 0 deletions meta/src/meta/type_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .proto_parser import ProtoParser
from .target import (
BaseType,
DictType,
FunctionType,
MessageType,
OptionType,
Expand All @@ -27,6 +28,13 @@
}


def _scalar_to_target(type_name: str) -> TargetType:
"""Convert a scalar proto type name to a TargetType."""
if type_name in _PRIMITIVE_TO_BASE_TYPE:
return BaseType(_PRIMITIVE_TO_BASE_TYPE[type_name])
raise ValueError(f"Unknown scalar proto type for map: {type_name}")


class TypeEnv:
"""Type environment for validating grammar expressions.

Expand Down Expand Up @@ -87,6 +95,12 @@ def _is_enum_type(self, type_name: str) -> bool:

def _proto_type_to_target(self, proto_field: ProtoField) -> TargetType:
"""Convert a protobuf field to its target type."""
# Handle map fields
if proto_field.is_map:
key_type = _scalar_to_target(proto_field.map_key_type)
value_type = _scalar_to_target(proto_field.map_value_type)
return DictType(key_type, value_type)

# Get base type
base_type: TargetType
if proto_field.type in _PRIMITIVE_TO_BASE_TYPE:
Expand Down
20 changes: 16 additions & 4 deletions meta/src/meta/yacc_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
)
from .target import (
BaseType,
DictType,
ListType,
MessageType,
OptionType,
Expand Down Expand Up @@ -756,10 +757,21 @@ def _make_field_type_lookup(

for (module, msg_name), proto_msg in proto_messages.items():
for field in proto_msg.fields:
field_type = _proto_type_to_target_type(
field.type, field.is_repeated, field.is_optional, name_to_module
)
field_types[(module, msg_name, field.name)] = field_type
if field.is_map:
key_type = _proto_type_to_target_type(
field.map_key_type, False, name_to_module=name_to_module
)
value_type = _proto_type_to_target_type(
field.map_value_type, False, name_to_module=name_to_module
)
field_types[(module, msg_name, field.name)] = DictType(
key_type, value_type
)
else:
field_type = _proto_type_to_target_type(
field.type, field.is_repeated, field.is_optional, name_to_module
)
field_types[(module, msg_name, field.name)] = field_type

# Also add oneof fields
for oneof in proto_msg.oneofs:
Expand Down
24 changes: 22 additions & 2 deletions proto/relationalai/lqp/v1/logic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ message Attribute {
}

//
// Input data (base relations, CSVs)
// Input data (base relations, CSVs, Iceberg)
//
message Data {
oneof data_type {
EDB edb = 1;
BeTreeRelation betree_relation = 2;
CSVData csv_data = 3;
// IcebergData iceberg_data = 4;
IcebergData iceberg_data = 4;
}
}

Expand Down Expand Up @@ -314,6 +314,26 @@ message CSVConfig {
int64 partition_size_mb = 12;
}

message IcebergData {
IcebergLocator locator = 1;
IcebergConfig config = 2;
repeated GNFColumn columns = 3;
optional string to_snapshot = 4;
Comment thread
vustef marked this conversation as resolved.
Outdated
}

message IcebergLocator {
string table_name = 1;
repeated string namespace = 2;
string warehouse = 3;
}

message IcebergConfig {
Comment thread
vustef marked this conversation as resolved.
Outdated
string catalog_uri = 1;
optional string scope = 2;
Comment thread
vustef marked this conversation as resolved.
map<string, string> properties = 3;
Comment thread
vustef marked this conversation as resolved.
map<string, string> auth_properties = 4;
}

message GNFColumn {
repeated string column_path = 1; // Column identifier path (was: string column_name)
optional RelationId target_id = 2; // Target relation (now explicit optional)
Expand Down
Loading
Loading