diff --git a/Cargo.lock b/Cargo.lock index 0e9337b50e6f2..9f34b5f136145 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2664,6 +2664,7 @@ dependencies = [ name = "datafusion-wasmtest" version = "52.0.0" dependencies = [ + "bytes", "chrono", "console_error_panic_hook", "datafusion", @@ -2673,6 +2674,7 @@ dependencies = [ "datafusion-optimizer", "datafusion-physical-plan", "datafusion-sql", + "futures", "getrandom 0.3.4", "object_store", "tokio", diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index 16fa9790f65b6..08a4d3999cc0f 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -47,7 +47,7 @@ chrono = { version = "0.4", features = ["wasmbind"] } # all the `std::fmt` and `std::panicking` infrastructure, so isn't great for # code size when deploying. console_error_panic_hook = { version = "0.1.1", optional = true } -datafusion = { workspace = true, features = ["parquet", "sql"] } +datafusion = { workspace = true, features = ["compression", "parquet", "sql"] } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } @@ -59,6 +59,8 @@ getrandom = { version = "0.3", features = ["wasm_js"] } wasm-bindgen = "0.2.99" [dev-dependencies] +bytes = { workspace = true } +futures = { workspace = true } object_store = { workspace = true } # needs to be compiled tokio = { workspace = true } diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index c5948bd7343a6..9a4096a297ca2 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -80,6 +80,8 @@ mod test { use std::sync::Arc; use super::*; + use bytes::Bytes; + use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::{ arrow::{ array::{ArrayRef, Int32Array, RecordBatch, StringArray}, @@ -87,8 +89,9 @@ mod test { }, datasource::MemTable, execution::context::SessionContext, + prelude::CsvReadOptions, }; - use datafusion_common::test_util::batches_to_string; + use datafusion_common::{DataFusionError, test_util::batches_to_string}; use datafusion_execution::{ config::SessionConfig, disk_manager::{DiskManagerBuilder, DiskManagerMode}, @@ -96,7 +99,8 @@ mod test { }; use datafusion_physical_plan::collect; use datafusion_sql::parser::DFParser; - use object_store::{ObjectStore, memory::InMemory, path::Path}; + use futures::{StreamExt, TryStreamExt, stream}; + use object_store::{ObjectStore, PutPayload, memory::InMemory, path::Path}; use url::Url; use wasm_bindgen_test::wasm_bindgen_test; @@ -259,4 +263,55 @@ mod test { +----+-------+" ); } + + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_csv_read_xz_compressed() { + let csv_data = "id,value\n1,a\n2,b\n3,c\n"; + let input = Bytes::from(csv_data.as_bytes().to_vec()); + let input_stream = + stream::iter(vec![Ok::(input)]).boxed(); + + let compressed_stream = FileCompressionType::XZ + .convert_to_compress_stream(input_stream) + .unwrap(); + let compressed_data: Vec = compressed_stream.try_collect().await.unwrap(); + + let store = InMemory::new(); + let path = Path::from("data.csv.xz"); + store + .put(&path, PutPayload::from_iter(compressed_data)) + .await + .unwrap(); + + let url = Url::parse("memory://").unwrap(); + let ctx = SessionContext::new(); + ctx.register_object_store(&url, Arc::new(store)); + + let csv_options = CsvReadOptions::new() + .has_header(true) + .file_compression_type(FileCompressionType::XZ) + .file_extension("csv.xz"); + ctx.register_csv("compressed", "memory:///data.csv.xz", csv_options) + .await + .unwrap(); + + let result = ctx + .sql("SELECT * FROM compressed") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!( + batches_to_string(&result), + "+----+-------+\n\ + | id | value |\n\ + +----+-------+\n\ + | 1 | a |\n\ + | 2 | b |\n\ + | 3 | c |\n\ + +----+-------+" + ); + } }