-
Notifications
You must be signed in to change notification settings - Fork 149
Expand file tree
/
Copy pathmulti_file.rs
More file actions
62 lines (52 loc) · 2.18 KB
/
multi_file.rs
File metadata and controls
62 lines (52 loc) · 2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::path::Path;
use std::sync::Arc;
use url::Url;
use vortex::error::VortexResult;
use vortex::error::vortex_err;
use vortex::file::multi::MultiFileDataSource;
use vortex::io::runtime::BlockingRuntime;
use vortex::scan::api::DataSourceRef;
use crate::RUNTIME;
use crate::SESSION;
use crate::datasource::DataSourceTableFunction;
use crate::duckdb::BindInputRef;
use crate::duckdb::ClientContextRef;
use crate::duckdb::LogicalType;
use crate::filesystem::resolve_filesystem;
/// Vortex multi-file scan table function (`vortex_scan` / `read_vortex`).
///
/// Takes a file glob parameter and resolves it into a [`MultiFileDataSource`].
/// All other table function logic is provided by the blanket [`DataSourceTableFunction`]
/// implementation.
#[derive(Debug)]
pub struct VortexMultiFileScan;
impl DataSourceTableFunction for VortexMultiFileScan {
fn parameters() -> Vec<LogicalType> {
vec![LogicalType::varchar()]
}
fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult<DataSourceRef> {
let glob_url_parameter = input
.get_parameter(0)
.ok_or_else(|| vortex_err!("Missing file glob parameter"))?;
// Parse the URL and separate the base URL (keep scheme, host, etc.) from the path.
let glob_url_str = glob_url_parameter.as_string();
let glob_url = match Url::parse(glob_url_str.as_str()) {
Ok(url) => Ok(url),
// TODO(myrrc): doesn't parse relative paths like FROM 'test.vortex'
Err(_) => Url::from_file_path(Path::new(glob_url_str.as_str()))
.map_err(|_| vortex_err!("Neither URL nor path: '{}' ", glob_url_str.as_str())),
}?;
let mut base_url = glob_url.clone();
base_url.set_path("");
let fs = resolve_filesystem(&base_url, ctx)?;
RUNTIME.block_on(async {
let builder = MultiFileDataSource::new(SESSION.clone())
.with_filesystem(fs)
.with_glob(glob_url.path());
let ds = builder.build().await?;
VortexResult::Ok(Arc::new(ds) as DataSourceRef)
})
}
}