Skip to content
13 changes: 13 additions & 0 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ pub(super) struct ParquetOpener {
pub max_predicate_cache_size: Option<usize>,
/// Whether to read row groups in reverse order
pub reverse_row_groups: bool,
/// Whether to read pages in reverse order within row groups
/// Phase 1: Infrastructure flag for future page-level reverse support
Comment thread
GaneshPatil7517 marked this conversation as resolved.
#[expect(dead_code)]
pub reverse_pages: bool,
}

/// Represents a prepared access plan with optional row selection
Expand Down Expand Up @@ -1051,6 +1055,7 @@ mod test {
coerce_int96: Option<arrow::datatypes::TimeUnit>,
max_predicate_cache_size: Option<usize>,
reverse_row_groups: bool,
reverse_pages: bool,
}

impl ParquetOpenerBuilder {
Expand All @@ -1076,6 +1081,7 @@ mod test {
coerce_int96: None,
max_predicate_cache_size: None,
reverse_row_groups: false,
reverse_pages: false,
}
}

Expand Down Expand Up @@ -1133,6 +1139,12 @@ mod test {
self
}

/// Set reverse pages flag.
fn with_reverse_pages(mut self, enable: bool) -> Self {
self.reverse_pages = enable;
self
}

/// Build the ParquetOpener instance.
///
/// # Panics
Expand Down Expand Up @@ -1183,6 +1195,7 @@ mod test {
encryption_factory: None,
max_predicate_cache_size: self.max_predicate_cache_size,
reverse_row_groups: self.reverse_row_groups,
reverse_pages: self.reverse_pages,
}
}
}
Expand Down
86 changes: 84 additions & 2 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ pub struct ParquetSource {
/// so we still need to sort them after reading, so the reverse scan is inexact.
/// Used to optimize ORDER BY ... DESC on sorted data.
reverse_row_groups: bool,
/// If true, read pages within row groups in reverse order.
/// Used in sort pushdown phase 1 to optimize ORDER BY ... DESC on sorted data.
/// Note: This is infrastructure for phase 1; actual page reversal may be implemented
/// in future phases depending on arrow-rs capabilities.
reverse_pages: bool,
}

impl ParquetSource {
Expand All @@ -318,6 +323,7 @@ impl ParquetSource {
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
reverse_row_groups: false,
reverse_pages: false,
}
}

Expand Down Expand Up @@ -477,10 +483,21 @@ impl ParquetSource {
self.reverse_row_groups = reverse_row_groups;
self
}

pub(crate) fn with_reverse_pages(mut self, reverse_pages: bool) -> Self {
self.reverse_pages = reverse_pages;
self
}

#[cfg(test)]
pub(crate) fn reverse_row_groups(&self) -> bool {
self.reverse_row_groups
}

#[cfg(test)]
pub(crate) fn reverse_pages(&self) -> bool {
self.reverse_pages
}
}

/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
Expand Down Expand Up @@ -567,6 +584,7 @@ impl FileSource for ParquetSource {
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
reverse_pages: self.reverse_pages,
});
Ok(opener)
}
Expand Down Expand Up @@ -625,6 +643,10 @@ impl FileSource for ParquetSource {
write!(f, ", reverse_row_groups=true")?;
}

if self.reverse_pages {
write!(f, ", reverse_pages=true")?;
}

// Try to build a the pruning predicates.
Comment thread
GaneshPatil7517 marked this conversation as resolved.
// These are only generated here because it's useful to have *some*
// idea of what pushdown is happening when viewing plans.
Expand Down Expand Up @@ -803,9 +825,12 @@ impl FileSource for ParquetSource {
return Ok(SortOrderPushdownResult::Unsupported);
}

// Return Inexact because we're only reversing row group order,
// Return Inexact because we're only reversing row group and page order,
// not guaranteeing perfect row-level ordering
Comment thread
GaneshPatil7517 marked this conversation as resolved.
let new_source = self.clone().with_reverse_row_groups(true);
let new_source = self
.clone()
.with_reverse_row_groups(true)
.with_reverse_pages(true);
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
})
Expand All @@ -814,6 +839,7 @@ impl FileSource for ParquetSource {
// - File reordering based on min/max statistics
// - Detection of exact ordering (return Exact to remove Sort operator)
// - Partial sort pushdown for prefix matches
// - Actual page-level reversal implementation when arrow-rs supports it
}
}

Expand Down Expand Up @@ -916,4 +942,60 @@ mod tests {
assert!(source.reverse_row_groups());
assert!(source.filter().is_some());
}

#[test]
fn test_reverse_pages_default_value() {
use arrow::datatypes::Schema;

let schema = Arc::new(Schema::empty());
let source = ParquetSource::new(schema);

assert!(!source.reverse_pages());
}

#[test]
fn test_reverse_pages_with_setter() {
use arrow::datatypes::Schema;

let schema = Arc::new(Schema::empty());

let source = ParquetSource::new(schema.clone()).with_reverse_pages(true);
assert!(source.reverse_pages());

let source = source.with_reverse_pages(false);
assert!(!source.reverse_pages());
}

#[test]
fn test_reverse_pages_clone_preserves_value() {
use arrow::datatypes::Schema;

let schema = Arc::new(Schema::empty());

let source = ParquetSource::new(schema).with_reverse_pages(true);
let cloned = source.clone();

assert!(cloned.reverse_pages());
assert_eq!(source.reverse_pages(), cloned.reverse_pages());
}

#[test]
fn test_reverse_pages_independent_of_reverse_row_groups() {
use arrow::datatypes::Schema;

let schema = Arc::new(Schema::empty());

// Test: reverse_pages can be set independently
let source = ParquetSource::new(schema.clone())
.with_reverse_row_groups(true)
.with_reverse_pages(true);

assert!(source.reverse_row_groups());
assert!(source.reverse_pages());

// Test: reverse_pages can be set without reverse_row_groups
let source = ParquetSource::new(schema).with_reverse_pages(true);
assert!(!source.reverse_row_groups());
assert!(source.reverse_pages());
}
}
Loading