diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index 379bb5470..ef6522b62 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -25,7 +25,7 @@ from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame from tsfile.dataset.formatting import format_timestamp from tsfile.dataset.metadata import MetadataCatalog, build_series_path, resolve_series_path -from tsfile.dataset.reader import TsFileSeriesReader +from tsfile.dataset.reader import TsFileSeriesReader, _build_exact_tag_filter def _write_weather_file(path, start): @@ -63,6 +63,19 @@ def _write_weather_rows_file(path, rows): writer.write_dataframe(df) +def _write_empty_weather_file(path): + schema = TableSchema( + "weather", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + with TsFileTableWriter(str(path), schema): + pass + + def _write_numeric_and_text_file(path): schema = TableSchema( "weather", @@ -229,6 +242,162 @@ def test_dataset_basic_access_patterns(tmp_path, capsys): assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out +def test_dataset_loc_aligns_timestamp_union_and_preserves_requested_order(tmp_path): + path = tmp_path / "weather_sparse.tsfile" + _write_weather_rows_file( + path, + { + "time": [0, 1, 2], + "device": ["device_a", "device_a", "device_a"], + "temperature": [10.0, np.nan, 30.0], + "humidity": [np.nan, 200.0, 300.0], + }, + ) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + aligned = tsdf.loc[ + 0:2, + [ + "weather.device_a.humidity", + "weather.device_a.temperature", + ], + ] + + assert isinstance(aligned, AlignedTimeseries) + assert aligned.series_names == [ + "weather.device_a.humidity", + "weather.device_a.temperature", + ] + np.testing.assert_array_equal(aligned.timestamps, np.array([0, 1, 2], dtype=np.int64)) + assert aligned.shape == (3, 2) + assert np.isnan(aligned.values[0, 0]) + assert aligned.values[0, 1] == 10.0 + assert aligned.values[1, 0] == 200.0 + assert np.isnan(aligned.values[1, 1]) + assert aligned.values[2, 0] == 300.0 + assert aligned.values[2, 1] == 30.0 + + +def test_dataset_loc_supports_single_timestamp_and_mixed_series_specifiers(tmp_path): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 0) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + aligned = tsdf.loc[1, [0, "weather.device_a.humidity"]] + + assert isinstance(aligned, AlignedTimeseries) + assert aligned.series_names == [ + "weather.device_a.temperature", + "weather.device_a.humidity", + ] + np.testing.assert_array_equal(aligned.timestamps, np.array([1], dtype=np.int64)) + np.testing.assert_array_equal(aligned.values, np.array([[21.5, 52.0]])) + + +def test_dataset_loc_supports_open_ended_ranges_and_negative_series_index(tmp_path): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 100) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + aligned = tsdf.loc[:101, [-1]] + + assert isinstance(aligned, AlignedTimeseries) + assert aligned.series_names == ["weather.device_a.humidity"] + np.testing.assert_array_equal(aligned.timestamps, np.array([100, 101], dtype=np.int64)) + np.testing.assert_array_equal(aligned.values, np.array([[50.0], [52.0]])) + + +def test_dataset_loc_with_nulls_does_not_expand_beyond_requested_time_range(tmp_path): + path = tmp_path / "weather_sparse_range.tsfile" + _write_weather_rows_file( + path, + { + "time": [0, 1, 2, 3], + "device": ["device_a", "device_a", "device_a", "device_a"], + "temperature": [10.0, np.nan, np.nan, 40.0], + "humidity": [np.nan, 20.0, np.nan, 50.0], + }, + ) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + aligned = tsdf.loc[ + 1:2, + [ + "weather.device_a.temperature", + "weather.device_a.humidity", + ], + ] + + assert isinstance(aligned, AlignedTimeseries) + np.testing.assert_array_equal(aligned.timestamps, np.array([1, 2], dtype=np.int64)) + assert aligned.shape == (2, 2) + assert np.isnan(aligned.values[0, 0]) + assert aligned.values[0, 1] == 20.0 + assert np.isnan(aligned.values[1, 0]) + assert np.isnan(aligned.values[1, 1]) + + +def test_dataset_loc_single_timestamp_with_nulls_keeps_exact_time_window(tmp_path): + path = tmp_path / "weather_sparse_point.tsfile" + _write_weather_rows_file( + path, + { + "time": [0, 1, 2], + "device": ["device_a", "device_a", "device_a"], + "temperature": [10.0, np.nan, 30.0], + "humidity": [np.nan, 20.0, 40.0], + }, + ) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + aligned = tsdf.loc[ + 1, + [ + "weather.device_a.temperature", + "weather.device_a.humidity", + ], + ] + + assert isinstance(aligned, AlignedTimeseries) + np.testing.assert_array_equal(aligned.timestamps, np.array([1], dtype=np.int64)) + assert aligned.shape == (1, 2) + assert np.isnan(aligned.values[0, 0]) + assert aligned.values[0, 1] == 20.0 + + +def test_dataset_repr_only_builds_preview_rows(tmp_path, monkeypatch): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 0) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + tsdf._index.series_refs_ordered = [(0, 0)] * 1000 + + built_rows = [] + + def fake_build_series_info(series_ref): + built_rows.append(series_ref) + return { + "table_name": "weather", + "field": "temperature", + "tag_columns": ("device",), + "tag_values": {"device": "device_a"}, + "min_time": 0, + "max_time": 2, + "count": 3, + } + + def fail_build_series_name(_series_ref): + raise AssertionError("__repr__ should not build full series names for preview output") + + monkeypatch.setattr(tsdf, "_build_series_info", fake_build_series_info) + monkeypatch.setattr(tsdf, "_build_series_name", fail_build_series_name) + + rendered = repr(tsdf) + assert "TsFileDataFrame(1000 time series, 1 files)" in rendered + assert "..." in rendered + assert len(built_rows) == 20 + + def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path): path = tmp_path / "numeric_and_text.tsfile" _write_numeric_and_text_file(path) @@ -359,6 +528,31 @@ def test_dataset_rejects_incompatible_table_schemas_across_shards(tmp_path): TsFileDataFrame([str(path1), str(path2)], show_progress=False) +def test_dataset_skips_empty_tsfile_shards(tmp_path): + empty_path = tmp_path / "empty.tsfile" + data_path = tmp_path / "part.tsfile" + _write_empty_weather_file(empty_path) + _write_weather_file(data_path, 0) + + with TsFileDataFrame([str(empty_path), str(data_path)], show_progress=False) as tsdf: + assert tsdf.list_timeseries() == [ + "weather.device_a.temperature", + "weather.device_a.humidity", + ] + + +def test_reader_allows_empty_tsfile(tmp_path): + path = tmp_path / "empty.tsfile" + _write_empty_weather_file(path) + + reader = TsFileSeriesReader(str(path), show_progress=False) + try: + assert reader.series_paths == [] + assert reader.catalog.series_count == 0 + finally: + reader.close() + + def test_dataset_multi_tag_metadata_discovery(tmp_path): path = tmp_path / "multi_tag.tsfile" _write_multi_tag_file(path) @@ -453,6 +647,61 @@ def test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path): reader.close() +def test_reader_read_series_by_row_retries_across_native_row_query_boundaries(): + class _FakeResultSet: + def __init__(self, rows): + self._rows = rows + self._index = -1 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + def next(self): + self._index += 1 + return self._index < len(self._rows) + + def get_value_by_name(self, name): + return self._rows[self._index][name] + + class _FakeNativeReader: + def __init__(self, timestamps, values, boundary): + self._timestamps = timestamps + self._values = values + self._boundary = boundary + + def query_table_by_row(self, table_name, column_names, offset=0, limit=-1, tag_filter=None): + assert table_name == "pvf" + assert column_names == ["totalcloudcover"] + assert tag_filter is None + if limit < 0: + stop = len(self._timestamps) + else: + stop = min(offset + limit, len(self._timestamps)) + + # Simulate the current native bug: one row query cannot cross the + # next internal boundary, so callers must re-issue from the + # advanced offset to complete a large logical window. + chunk_stop = min(stop, ((offset // self._boundary) + 1) * self._boundary) + rows = [ + {"time": int(self._timestamps[idx]), "totalcloudcover": float(self._values[idx])} + for idx in range(offset, chunk_stop) + ] + return _FakeResultSet(rows) + + reader = object.__new__(TsFileSeriesReader) + reader._reader = _FakeNativeReader(np.arange(30, dtype=np.int64), np.arange(30, dtype=np.float64), boundary=10) + reader._catalog = MetadataCatalog() + table_id = reader._catalog.add_table("pvf", (), (), ("totalcloudcover",)) + device_id = reader._catalog.add_device(table_id, (), 0, 29) + + ts_arr, values = reader.read_series_by_row(device_id, 0, 5, 12) + np.testing.assert_array_equal(ts_arr, np.arange(5, 17, dtype=np.int64)) + np.testing.assert_array_equal(values, np.arange(5, 17, dtype=np.float64)) + + def test_series_path_resolution_allows_prefix_tag_values(): catalog = MetadataCatalog() table_id = catalog.add_table( @@ -474,3 +723,217 @@ def test_series_path_resolution_allows_prefix_tag_values(): series_path = build_series_path(catalog, device_id, 0) assert series_path == "weather.site_a.device_a.temperature" assert resolve_series_path(catalog, series_path) == (table_id, device_id, 0) + + +def test_series_path_resolution_allows_missing_trailing_tag_value(): + catalog = MetadataCatalog() + table_id = catalog.add_table( + "weather", + ("device",), + (TSDataType.STRING,), + ("temperature",), + ) + device_id = catalog.add_device(table_id, (), 0, 1) + catalog.series_stats_by_ref[(device_id, 0)] = { + "length": 1, + "min_time": 0, + "max_time": 0, + "timeline_length": 1, + "timeline_min_time": 0, + "timeline_max_time": 0, + } + + series_path = build_series_path(catalog, device_id, 0) + assert series_path == "weather.temperature" + assert resolve_series_path(catalog, series_path) == (table_id, device_id, 0) + + +def test_series_path_resolution_uses_named_tags_for_sparse_non_prefix_values(): + catalog = MetadataCatalog() + table_id = catalog.add_table( + "weather", + ("city", "device", "region"), + (TSDataType.STRING, TSDataType.STRING, TSDataType.STRING), + ("temperature",), + ) + device_id = catalog.add_device(table_id, (None, "device_a", None), 0, 1) + catalog.series_stats_by_ref[(device_id, 0)] = { + "length": 1, + "min_time": 0, + "max_time": 0, + "timeline_length": 1, + "timeline_min_time": 0, + "timeline_max_time": 0, + } + + series_path = build_series_path(catalog, device_id, 0) + assert series_path == "weather.device_a.temperature" + assert resolve_series_path(catalog, series_path) == (table_id, device_id, 0) + + +def test_reader_metadata_tag_values_trim_trailing_none(): + class _Group: + segments = ("weather", "device_a", None, None) + + assert TsFileSeriesReader._metadata_tag_values(_Group(), 3) == ("device_a",) + assert TsFileSeriesReader._metadata_tag_values(_Group(), 1) == ("device_a",) + + +def test_exact_tag_filter_rejects_none_tag_values(): + with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): + _build_exact_tag_filter({"device": None}) + with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): + _build_exact_tag_filter({"city": "beijing", "device": None}) + + +def test_reader_exact_match_with_none_tag_values_fails_fast(): + class _FakeNativeReader: + def query_table(self, *args, **kwargs): + raise AssertionError("query should not be issued when None-tag exact matching is unsupported") + + def query_table_by_row(self, *args, **kwargs): + raise AssertionError("row query should not be issued when None-tag exact matching is unsupported") + + reader = object.__new__(TsFileSeriesReader) + reader._reader = _FakeNativeReader() + reader._catalog = MetadataCatalog() + table_id = reader._catalog.add_table( + "weather", + ("city", "device", "region"), + (TSDataType.STRING, TSDataType.STRING, TSDataType.STRING), + ("temperature",), + ) + device_id = reader._catalog.add_device(table_id, (None, "device_a", "north"), 0, 1) + reader._catalog.series_stats_by_ref[(device_id, 0)] = { + "length": 2, + "min_time": 0, + "max_time": 1, + "timeline_length": 2, + "timeline_min_time": 0, + "timeline_max_time": 1, + } + + with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): + reader.read_series_by_ref(device_id, 0, 0, 1) + with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): + reader.read_series_by_row(device_id, 0, 0, 2) + + +def test_dataframe_resolves_named_sparse_tag_series_path(): + tsdf = object.__new__(TsFileDataFrame) + tsdf._index = dataframe_module._LogicalIndex() + tsdf._index.table_entries["weather"] = dataframe_module.TableEntry( + table_name="weather", + tag_columns=("city", "device", "region"), + tag_types=(TSDataType.STRING, TSDataType.STRING, TSDataType.STRING), + field_columns=("temperature",), + ) + device_key = ("weather", (None, "device_a")) + tsdf._index.device_order = [device_key] + tsdf._index.device_index_by_key = {device_key: 0} + tsdf._index.tables_with_sparse_tag_values = {"weather"} + tsdf._index.sparse_device_indices_by_compressed_path = {("weather", ("device_a",)): [0]} + tsdf._index.device_refs = [[]] + tsdf._index.series_refs_ordered = [(0, 0)] + tsdf._index.series_ref_set = {(0, 0)} + tsdf._index.series_ref_map = {(0, 0): []} + + assert tsdf.list_timeseries() == ["weather.device_a.temperature"] + assert tsdf._resolve_series_name("weather.device_a.temperature") == (0, 0) + + +def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): + tsdf = object.__new__(TsFileDataFrame) + tsdf._index = dataframe_module._LogicalIndex() + tsdf._index.table_entries["weather"] = dataframe_module.TableEntry( + table_name="weather", + tag_columns=("city", "device", "region"), + tag_types=(TSDataType.STRING, TSDataType.STRING, TSDataType.STRING), + field_columns=("temperature",), + ) + tsdf._index.device_order = [ + ("weather", (None, "device_a")), + ("weather", ("beijing", "device_b")), + ] + tsdf._index.device_index_by_key = { + ("weather", (None, "device_a")): 0, + ("weather", ("beijing", "device_b")): 1, + } + tsdf._index.tables_with_sparse_tag_values = {"weather"} + tsdf._index.sparse_device_indices_by_compressed_path = { + ("weather", ("device_a",)): [0], + ("weather", ("beijing", "device_b")): [1], + } + tsdf._index.device_refs = [[], []] + tsdf._index.series_refs_ordered = [(0, 0), (1, 0)] + tsdf._index.series_ref_set = {(0, 0), (1, 0)} + tsdf._index.series_ref_map = {(0, 0): [], (1, 0): []} + + assert tsdf.list_timeseries("weather.device_a") == ["weather.device_a.temperature"] + + +def test_dataframe_list_timeseries_prefix_can_skip_full_name_build(tmp_path, monkeypatch): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 0) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + tsdf._index.series_refs_ordered = [(0, 0)] * 1000 + + def fail_build_series_name(_series_ref): + raise AssertionError("list_timeseries(prefix) should not build full names for non-matching series") + + monkeypatch.setattr(tsdf, "_build_series_name", fail_build_series_name) + assert tsdf.list_timeseries("pvf") == [] + + +def test_series_path_resolution_reports_ambiguous_sparse_path(): + catalog = MetadataCatalog() + table_id = catalog.add_table( + "weather", + ("city", "device"), + (TSDataType.STRING, TSDataType.STRING), + ("temperature",), + ) + first_id = catalog.add_device(table_id, ("beijing", None), 0, 1) + second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1) + for device_id in (first_id, second_id): + catalog.series_stats_by_ref[(device_id, 0)] = { + "length": 1, + "min_time": 0, + "max_time": 0, + "timeline_length": 1, + "timeline_min_time": 0, + "timeline_max_time": 0, + } + + assert build_series_path(catalog, first_id, 0) == "weather.beijing.temperature" + assert build_series_path(catalog, second_id, 0) == "weather.beijing.temperature" + with pytest.raises(ValueError, match="Ambiguous series path"): + resolve_series_path(catalog, "weather.beijing.temperature") + + +def test_reader_show_progress_reports_start_immediately(tmp_path, capsys): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 0) + + reader = TsFileSeriesReader(str(path), show_progress=True) + try: + stderr = capsys.readouterr().err + assert "Reading TsFile metadata: 0/1" in stderr + assert "Reading TsFile metadata: 1 table(s), 2 series ... done" in stderr + finally: + reader.close() + + +def test_dataframe_parallel_show_progress_reports_start_immediately(tmp_path, capsys): + path1 = tmp_path / "part1.tsfile" + path2 = tmp_path / "part2.tsfile" + _write_weather_file(path1, 0) + _write_weather_file(path2, 3) + + with TsFileDataFrame([str(path1), str(path2)], show_progress=True): + pass + + stderr = capsys.readouterr().err + assert "Loading TsFile shards: 0/2" in stderr + assert "Loading TsFile shards: 2/2 (4 series) ... done" in stderr diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index baef77c31..4cc373cae 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -29,7 +29,12 @@ import numpy as np from .formatting import format_dataframe_table -from .metadata import TableEntry, _coerce_path_component, build_logical_series_path, split_logical_series_path +from .metadata import ( + TableEntry, + build_logical_series_components, + build_logical_series_path, + split_logical_series_path, +) from .merge import build_aligned_matrix, merge_time_value_parts, merge_timestamp_parts from .timeseries import AlignedTimeseries, Timeseries @@ -60,6 +65,11 @@ class _LogicalIndex: device_order: List[DeviceKey] = field(default_factory=list) # Map one logical device key to its dataframe-local device index. device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict) + # Tables that need sparse compressed-path lookup because some devices + # contain non-trailing missing tag values. + tables_with_sparse_tag_values: Set[str] = field(default_factory=set) + # Map one compressed tree-style device path to sparse logical devices only. + sparse_device_indices_by_compressed_path: Dict[Tuple[str, Tuple[str, ...]], List[int]] = field(default_factory=dict) # For each logical device, keep the contributing reader-local device refs. device_refs: List[List[DeviceRef]] = field(default_factory=list) @@ -155,6 +165,15 @@ def _register_reader( index.device_index_by_key[device_key] = device_idx index.device_order.append(device_key) index.device_refs.append([]) + if any(value is None for value in device_entry.tag_values): + index.tables_with_sparse_tag_values.add(table_entry.table_name) + compressed_components = tuple( + build_logical_series_components( + table_entry.table_name, device_entry.tag_values, "", table_entry.tag_columns + )[1:-1] + ) + compressed_key = (table_entry.table_name, compressed_components) + index.sparse_device_indices_by_compressed_path.setdefault(compressed_key, []).append(device_idx) index.device_refs[device_idx].append((reader, device_id)) for field_idx in range(len(table_entry.field_columns)): @@ -557,14 +576,31 @@ def _load_metadata(self): if not self._index.series_refs_ordered: raise ValueError("No valid time series found in the provided TsFile files") + def _show_loading_progress(self, done: int, total: int, total_series: int = None): + if not self._show_progress or total <= 0: + return + + if total_series is None: + sys.stderr.write(f"\rLoading TsFile shards: {done}/{total}") + else: + sys.stderr.write(f"\rLoading TsFile shards: {done}/{total} ({total_series} series) ... done\n") + sys.stderr.flush() + def _load_metadata_serial(self, reader_class): - for file_path in self._paths: + total = len(self._paths) + self._show_loading_progress(0, total) + + for index, file_path in enumerate(self._paths, start=1): _register_reader( self._readers, self._index, file_path, - reader_class(file_path, show_progress=self._show_progress), + reader_class(file_path, show_progress=self._show_progress and total == 1), ) + if total > 1: + self._show_loading_progress(index, total) + + self._show_loading_progress(total, total, sum(reader.series_count for reader in self._readers.values())) def _load_metadata_parallel(self, reader_class): from concurrent.futures import ThreadPoolExecutor, as_completed @@ -573,6 +609,7 @@ def open_file(file_path): return file_path, reader_class(file_path, show_progress=False) total = len(self._paths) + self._show_loading_progress(0, total) with ThreadPoolExecutor(max_workers=min(total, os.cpu_count() or 4)) as executor: futures = {executor.submit(open_file, path): path for path in self._paths} results = {} @@ -581,14 +618,9 @@ def open_file(file_path): file_path, reader = future.result() results[file_path] = reader done += 1 - if self._show_progress: - sys.stderr.write(f"\rLoading TsFile shards: {done}/{total}") - sys.stderr.flush() + self._show_loading_progress(done, total) - if self._show_progress and total > 0: - total_series = sum(reader.series_count for reader in results.values()) - sys.stderr.write(f"\rLoading TsFile shards: {total}/{total} ({total_series} series) ... done\n") - sys.stderr.flush() + self._show_loading_progress(total, total, sum(reader.series_count for reader in results.values())) for file_path in self._paths: _register_reader( @@ -607,7 +639,7 @@ def _build_series_name(self, series_ref: SeriesRefKey) -> str: device_key, table_entry, field_idx = self._get_series_components(series_ref) table_name, tag_values = device_key field_name = table_entry.field_columns[field_idx] - return build_logical_series_path(table_name, tag_values, field_name) + return build_logical_series_path(table_name, tag_values, field_name, table_entry.tag_columns) def _resolve_series_name(self, series_name: str) -> SeriesRefKey: try: @@ -622,24 +654,33 @@ def _resolve_series_name(self, series_name: str) -> SeriesRefKey: raise KeyError(_series_lookup_hint(series_name)) table_entry = self._index.table_entries[table_name] - expected_parts = len(table_entry.tag_columns) + 2 - if len(parts) > expected_parts: - raise KeyError(_series_lookup_hint(series_name)) - field_name = parts[-1] try: field_idx = table_entry.get_field_index(field_name) except ValueError as exc: raise KeyError(_series_lookup_hint(series_name)) from exc - tag_values = tuple( - _coerce_path_component(raw_value, tag_type) - for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types) - ) - device_key = (table_name, tag_values) - device_idx = self._index.device_index_by_key.get(device_key) - if device_idx is None: - raise KeyError(_series_lookup_hint(series_name)) + tag_parts = parts[1:-1] + direct_device_idx = self._index.device_index_by_key.get((table_name, tuple(tag_parts))) + + if table_name not in self._index.tables_with_sparse_tag_values: + if direct_device_idx is None: + raise KeyError(_series_lookup_hint(series_name)) + device_idx = direct_device_idx + else: + compressed_key = (table_name, tuple(tag_parts)) + sparse_device_indices = self._index.sparse_device_indices_by_compressed_path.get(compressed_key, []) + candidate_indices = [] + if direct_device_idx is not None: + candidate_indices.append(direct_device_idx) + for device_idx in sparse_device_indices: + if device_idx not in candidate_indices: + candidate_indices.append(device_idx) + if not candidate_indices: + raise KeyError(_series_lookup_hint(series_name)) + if len(candidate_indices) > 1: + raise KeyError(f"Ambiguous series path: '{series_name}'.") + device_idx = candidate_indices[0] series_ref = (device_idx, field_idx) if series_ref not in self._index.series_ref_set: @@ -664,11 +705,26 @@ def __len__(self) -> int: return len(self._index.series_refs_ordered) def list_timeseries(self, path_prefix: str = "") -> List[str]: - names = [self._build_series_name(series_ref) for series_ref in self._index.series_refs_ordered] if not path_prefix: - return names - prefix = path_prefix if path_prefix.endswith(".") else path_prefix + "." - return [name for name in names if name.startswith(prefix) or name == path_prefix] + return [self._build_series_name(series_ref) for series_ref in self._index.series_refs_ordered] + + try: + prefix_parts = split_logical_series_path(path_prefix) + except ValueError: + return [] + + matched = [] + for series_ref in self._index.series_refs_ordered: + device_key, table_entry, field_idx = self._get_series_components(series_ref) + components = build_logical_series_components( + table_entry.table_name, + device_key[1], + table_entry.field_columns[field_idx], + table_entry.tag_columns, + ) + if prefix_parts == components[: len(prefix_parts)]: + matched.append(self._build_series_name(series_ref)) + return matched def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries: self._assert_open() @@ -764,20 +820,44 @@ def _collect_tag_columns(self) -> List[str]: seen.setdefault(column, True) return list(seen.keys()) + @staticmethod + def _preview_indices(indices: List[int], max_rows: int) -> Tuple[List[int], bool, int]: + total = len(indices) + if total <= max_rows: + return indices, False, total + + head = max_rows // 2 + tail = max_rows - head + return list(indices[:head]) + list(indices[-tail:]), True, head + def _format_table(self, indices=None, max_rows: int = 20) -> str: - series_names = [] - merged_info = {} - for series_ref in self._index.series_refs_ordered: - series_name = self._build_series_name(series_ref) - series_names.append(series_name) - merged_info[series_name] = self._build_series_info(series_ref) + if indices is None: + indices = list(range(len(self._index.series_refs_ordered))) + else: + indices = list(indices) + + preview_indices, truncated, split_index = self._preview_indices(indices, max_rows) + rows = [] + for idx in preview_indices: + series_ref = self._index.series_refs_ordered[idx] + info = self._build_series_info(series_ref) + row = { + "index": idx, + "table": info["table_name"], + "field": info["field"], + "start_time": info["min_time"], + "end_time": info["max_time"], + "count": info["count"], + } + row.update(info["tag_values"]) + rows.append(row) return format_dataframe_table( - series_names, - merged_info, + rows, self._collect_tag_columns(), - indices=indices, - max_rows=max_rows, + total_count=len(indices), + truncated=truncated, + split_index=split_index, ) def _repr_header(self) -> str: diff --git a/python/tsfile/dataset/formatting.py b/python/tsfile/dataset/formatting.py index 5e01bb39b..527d87988 100644 --- a/python/tsfile/dataset/formatting.py +++ b/python/tsfile/dataset/formatting.py @@ -19,7 +19,7 @@ """String formatting helpers for dataset objects.""" from datetime import datetime -from typing import Dict, List, Optional +from typing import List, Optional import numpy as np @@ -97,50 +97,35 @@ def format_aligned_timeseries( def format_dataframe_table( - series_list: List[str], - merged_info: Dict[str, dict], + rows: List[dict], tag_columns: List[str], - indices: Optional[List[int]] = None, - max_rows: int = 20, + total_count: int, + truncated: bool = False, + split_index: Optional[int] = None, ) -> str: """Render the metadata table used by TsFileDataFrame.__repr__.""" - if indices is None: - indices = list(range(len(series_list))) - else: - indices = list(indices) - - total = len(indices) - if total > max_rows: - show_indices = list(indices[: max_rows // 2]) + list(indices[-max_rows // 2 :]) - truncated = True - else: - show_indices = indices - truncated = False + if not rows: + return "Empty TsFileDataFrame" - rows = [] - for idx in show_indices: - name = series_list[idx] - info = merged_info[name] - row = { - "index": idx, - "table": info["table_name"], - "field": info["field"], - "start_time": format_timestamp(info["min_time"]), - "end_time": format_timestamp(info["max_time"]), - "count": info["count"], + rendered_rows = [] + for row in rows: + rendered = { + "index": row["index"], + "table": row["table"], + "field": row["field"], + "start_time": format_timestamp(row["start_time"]), + "end_time": format_timestamp(row["end_time"]), + "count": row["count"], } for tag_col in tag_columns: - row[tag_col] = info["tag_values"].get(tag_col, "") - rows.append(row) - - if not rows: - return "Empty TsFileDataFrame" + rendered[tag_col] = row.get(tag_col, "") + rendered_rows.append(rendered) headers = ["", "table"] + tag_columns + ["field", "start_time", "end_time", "count"] widths = {header: len(header) for header in headers} - widths[""] = max(len(str(row["index"])) for row in rows) + widths[""] = max(len(str(row["index"])) for row in rendered_rows) - for row in rows: + for row in rendered_rows: widths[""] = max(widths[""], len(str(row["index"]))) widths["table"] = max(widths["table"], len(row["table"])) widths["field"] = max(widths["field"], len(row["field"])) @@ -151,8 +136,8 @@ def format_dataframe_table( widths[tag_col] = max(widths[tag_col], len(str(row[tag_col]))) lines = [" ".join(header.rjust(widths[header]) for header in headers)] - split = len(rows) // 2 if truncated else len(rows) - for row_idx, row in enumerate(rows): + split = split_index if truncated and split_index is not None else len(rendered_rows) + for row_idx, row in enumerate(rendered_rows): if truncated and row_idx == split: lines.append("...") parts = [str(row["index"]).rjust(widths[""]), row["table"].rjust(widths["table"])] diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index 195e3e9f5..2a4938b37 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -69,6 +69,8 @@ class MetadataCatalog: device_entries: List[DeviceEntry] = field(default_factory=list) table_id_by_name: Dict[str, int] = field(default_factory=dict) device_id_by_key: Dict[Tuple[int, tuple], int] = field(default_factory=dict) + tables_with_sparse_tag_values: set = field(default_factory=set) + sparse_device_ids_by_compressed_path: Dict[Tuple[int, Tuple[str, ...]], List[int]] = field(default_factory=dict) series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] = field(default_factory=dict) def add_table( @@ -97,7 +99,8 @@ def add_device( min_time: int, max_time: int, ) -> int: - key = (table_id, tuple(tag_values)) + normalized_tag_values = _normalize_tag_values(tag_values) + key = (table_id, normalized_tag_values) if key in self.device_id_by_key: return self.device_id_by_key[key] @@ -105,12 +108,16 @@ def add_device( self.device_entries.append( DeviceEntry( table_id=table_id, - tag_values=tuple(tag_values), + tag_values=normalized_tag_values, min_time=min_time, max_time=max_time, ) ) self.device_id_by_key[key] = device_id + if _has_sparse_tag_holes(normalized_tag_values): + self.tables_with_sparse_tag_values.add(table_id) + compressed_key = (table_id, _compressed_tag_path_components(normalized_tag_values)) + self.sparse_device_ids_by_compressed_path.setdefault(compressed_key, []).append(device_id) return device_id @property @@ -122,6 +129,21 @@ def _escape_path_component(value: Any) -> str: return str(value).replace(_PATH_ESCAPE, _PATH_ESCAPE * 2).replace(_PATH_SEPARATOR, _PATH_ESCAPE + _PATH_SEPARATOR) +def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: + values = list(tag_values) + while values and values[-1] is None: + values.pop() + return tuple(values) + + +def _compressed_tag_path_components(tag_values: Iterable[Any]) -> Tuple[str, ...]: + return tuple(str(value) for value in tag_values if value is not None) + + +def _has_sparse_tag_holes(tag_values: Iterable[Any]) -> bool: + return any(value is None for value in tag_values) + + def split_logical_series_path(series_path: str) -> List[str]: parts = [] current = [] @@ -148,17 +170,37 @@ def split_logical_series_path(series_path: str) -> List[str]: return parts -def build_logical_series_path(table_name: str, tag_values: Iterable[Any], field_name: str) -> str: - components = [table_name, *tag_values, field_name] +def build_logical_series_path( + table_name: str, + tag_values: Iterable[Any], + field_name: str, + tag_columns: Iterable[str] = (), +) -> str: + components = build_logical_series_components(table_name, tag_values, field_name, tag_columns) return _PATH_SEPARATOR.join(_escape_path_component(component) for component in components) +def build_logical_series_components( + table_name: str, + tag_values: Iterable[Any], + field_name: str, + _tag_columns: Iterable[str] = (), +) -> List[str]: + components = [table_name, *_compressed_tag_path_components(tag_values), field_name] + return [str(component) for component in components] + + def build_series_path(catalog: MetadataCatalog, device_id: int, field_idx: int) -> str: """Return the external logical series name for one device field.""" device_entry = catalog.device_entries[device_id] table_entry = catalog.table_entries[device_entry.table_id] field_name = table_entry.field_columns[field_idx] - return build_logical_series_path(table_entry.table_name, device_entry.tag_values, field_name) + return build_logical_series_path( + table_entry.table_name, + device_entry.tag_values, + field_name, + table_entry.tag_columns, + ) def iter_series_refs(catalog: MetadataCatalog) -> Iterator[Tuple[int, int]]: @@ -187,25 +229,45 @@ def resolve_series_path(catalog: MetadataCatalog, series_path: str) -> Tuple[int table_id = catalog.table_id_by_name[table_name] table_entry = catalog.table_entries[table_id] - expected_parts = len(table_entry.tag_columns) + 2 - if len(parts) > expected_parts: - raise ValueError(f"Series not found: {series_path}") - field_name = parts[-1] try: field_idx = table_entry.get_field_index(field_name) except ValueError as exc: raise ValueError(f"Series not found: {series_path}") from exc - tag_values = tuple( + tag_parts = parts[1:-1] + direct_device_id = None + direct_tag_values = _normalize_tag_values( _coerce_path_component(raw_value, tag_type) - for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types) + for raw_value, tag_type in zip(tag_parts, table_entry.tag_types) ) - key = (table_id, tag_values) - if key not in catalog.device_id_by_key: + direct_key = (table_id, direct_tag_values) + if direct_key in catalog.device_id_by_key: + direct_device_id = catalog.device_id_by_key[direct_key] + + if table_id not in catalog.tables_with_sparse_tag_values: + if direct_device_id is None: + raise ValueError(f"Series not found: {series_path}") + return table_id, direct_device_id, field_idx + + compressed_key = (table_id, tuple(tag_parts)) + sparse_device_ids = catalog.sparse_device_ids_by_compressed_path.get(compressed_key, []) + candidate_ids = [] + seen_ids = set() + if direct_device_id is not None: + candidate_ids.append(direct_device_id) + seen_ids.add(direct_device_id) + for device_id in sparse_device_ids: + if device_id in seen_ids: + continue + candidate_ids.append(device_id) + seen_ids.add(device_id) + if not candidate_ids: raise ValueError(f"Series not found: {series_path}") + if len(candidate_ids) > 1: + raise ValueError(f"Ambiguous series path: {series_path}") - return table_id, catalog.device_id_by_key[key], field_idx + return table_id, candidate_ids[0], field_idx def _coerce_path_component(value: str, data_type: TSDataType) -> Any: diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 953a22cb0..cb5b70165 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -44,7 +44,16 @@ def _to_python_scalar(value): return value.item() if hasattr(value, "item") else value +def _ensure_supported_exact_tag_values(tag_values: Dict[str, object]) -> None: + if any(tag_value is None for tag_value in tag_values.values()): + raise NotImplementedError( + "Exact tag matching with None tag values is not supported yet. " + "Native tag filter support for IS NULL / IS NOT NULL is required." + ) + + def _build_exact_tag_filter(tag_values: Dict[str, object]): + _ensure_supported_exact_tag_values(tag_values) tag_filter = None for tag_column, tag_value in tag_values.items(): expr = tag_eq(tag_column, str(tag_value)) @@ -119,6 +128,9 @@ def _cache_metadata_table_model(self): self._catalog = MetadataCatalog() table_names = list(table_schemas.keys()) metadata_groups = self._reader.get_timeseries_metadata(None) + if self.show_progress: + sys.stderr.write(f"\rReading TsFile metadata: 0/{len(table_names)}") + sys.stderr.flush() for table_index, table_name in enumerate(table_names): table_schema = table_schemas[table_name] @@ -183,15 +195,12 @@ def _cache_metadata_table_model(self): ) sys.stderr.flush() - if self.show_progress and self.series_count > 0: + if self.show_progress: sys.stderr.write( f"\rReading TsFile metadata: {len(table_names)} table(s), {self.series_count} series ... done\n" ) sys.stderr.flush() - if self.series_count == 0: - raise ValueError("No valid numeric series found in TsFile") - @staticmethod def _metadata_device_stats(group) -> dict: """Derive cheap device-level metadata hints from native field statistics. @@ -218,11 +227,16 @@ def _metadata_tag_values(group, tag_count: int) -> tuple: A table-model DeviceID may only materialize a prefix of the declared tag columns. Preserve the available prefix rather than requiring a - full-length tag tuple here. + full-length tag tuple here. Some backends may still materialize + trailing missing tags as explicit ``None`` values; normalize those + back to the same prefix representation. """ if tag_count == 0: return () - return tuple(group.segments[1 : min(len(group.segments), 1 + tag_count)]) + values = list(group.segments[1 : min(len(group.segments), 1 + tag_count)]) + while values and values[-1] is None: + values.pop() + return tuple(values) @staticmethod def _metadata_field_stats(group) -> Dict[str, dict]: @@ -308,25 +322,50 @@ def read_series_by_time_range(self, series_path: str, start_time: int, end_time: def read_series_by_row(self, device_id: int, field_idx: int, offset: int, limit: int) -> Tuple[np.ndarray, np.ndarray]: """Read one logical series by device-local row offset/limit.""" + if limit <= 0: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + table_entry, device_entry, field_name = self._resolve_series_ref(device_id, field_idx) tag_values = dict(zip(table_entry.tag_columns, device_entry.tag_values)) tag_filter = _build_exact_tag_filter(tag_values) if tag_values else None - timestamps = [] - values = [] - with self._reader.query_table_by_row( - table_entry.table_name, - [field_name], - offset=offset, - limit=limit, - tag_filter=tag_filter, - ) as result_set: - while result_set.next(): - timestamps.append(result_set.get_value_by_name("time")) - value = result_set.get_value_by_name(field_name) - values.append(np.nan if value is None else float(value)) + # Some native row-query paths stop at an internal block boundary even + # when the requested window extends further. Re-issue from the advanced + # offset until we fill the caller's logical row window or reach EOF. + timestamp_parts = [] + value_parts = [] + remaining = limit + next_offset = offset + + while remaining > 0: + batch_timestamps = [] + batch_values = [] + with self._reader.query_table_by_row( + table_entry.table_name, + [field_name], + offset=next_offset, + limit=remaining, + tag_filter=tag_filter, + ) as result_set: + while result_set.next(): + batch_timestamps.append(result_set.get_value_by_name("time")) + value = result_set.get_value_by_name(field_name) + batch_values.append(np.nan if value is None else float(value)) + + if not batch_timestamps: + break + + timestamp_parts.append(np.asarray(batch_timestamps, dtype=np.int64)) + value_parts.append(np.asarray(batch_values, dtype=np.float64)) + read_count = len(batch_timestamps) + next_offset += read_count + remaining -= read_count - return np.asarray(timestamps, dtype=np.int64), np.asarray(values, dtype=np.float64) + if not timestamp_parts: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + if len(timestamp_parts) == 1: + return timestamp_parts[0], value_parts[0] + return np.concatenate(timestamp_parts), np.concatenate(value_parts) def read_device_fields_by_time_range( self, device_id: int, field_indices: List[int], start_time: int, end_time: int @@ -394,7 +433,13 @@ def _read_arrow( {field_column: np.array([], dtype=np.float64) for field_column in field_columns}, ) - return ( - np.concatenate(timestamp_parts).astype(np.int64), - {field_column: np.concatenate(field_parts[field_column]) for field_column in field_columns}, - ) + timestamps = np.concatenate(timestamp_parts).astype(np.int64) + field_values = {field_column: np.concatenate(field_parts[field_column]) for field_column in field_columns} + + # Keep the dataset layer strict about the requested time window even if + # the underlying query path returns boundary-adjacent null rows. + mask = (timestamps >= start_time) & (timestamps <= end_time) + timestamps = timestamps[mask] + field_values = {field_column: values[mask] for field_column, values in field_values.items()} + + return timestamps, field_values diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 55da081c7..7286e78fb 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -54,7 +54,7 @@ cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reade int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object table_name, object column_list, int64_t start_time, int64_t end_time, - TagFilterHandle tag_filter, int batch_size) + void* tag_filter, int batch_size) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) @@ -64,11 +64,11 @@ cdef public api ResultSet tsfile_reader_query_tree_by_row_c(TsFileReader reader, cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader, object table_name, object column_list, int offset, - int limit, TagFilterHandle tag_filter, + int limit, void* tag_filter, int batch_size) cdef public api ResultSet tsfile_reader_query_table_with_tag_filter_c(TsFileReader reader, object table_name, object column_list, int64_t start_time, - int64_t end_time, TagFilterHandle tag_filter, + int64_t end_time, void* tag_filter, int batch_size) cdef public api object get_table_schema(TsFileReader reader, object table_name) @@ -78,4 +78,4 @@ cdef public api object reader_get_all_devices_c(TsFileReader reader) cdef public api object reader_get_timeseries_metadata_c(TsFileReader reader, object device_ids) cpdef public api object get_tsfile_config() -cpdef public api void set_tsfile_config(dict new_config) \ No newline at end of file +cpdef public api void set_tsfile_config(dict new_config) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 70518b70b..1aea243ed 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -811,7 +811,7 @@ cdef ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader, object table_name, object column_list, int offset, int limit, - TagFilterHandle tag_filter, + void* tag_filter, int batch_size): cdef ResultSet result cdef int column_num = len(column_list) @@ -844,7 +844,7 @@ cdef ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader, columns = NULL cdef ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object table_name, object column_list, - int64_t start_time, int64_t end_time, TagFilterHandle tag_filter, + int64_t start_time, int64_t end_time, void* tag_filter, int batch_size): cdef ResultSet result cdef int column_num = len(column_list) @@ -906,7 +906,7 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_na cdef ResultSet tsfile_reader_query_table_with_tag_filter_c(TsFileReader reader, object table_name, object column_list, int64_t start_time, - int64_t end_time, TagFilterHandle tag_filter, + int64_t end_time, void* tag_filter, int batch_size): cdef ResultSet result cdef int column_num = len(column_list)