Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dd75799
chore: remove all skip_targets(["clickhouse"]) markers from test files
devin-ai-integration[bot] Feb 25, 2026
0a2dd80
ci: temporarily limit CI matrix to clickhouse-only for iteration
devin-ai-integration[bot] Feb 25, 2026
41701e9
fix: use NOT IN instead of LEFT JOIN IS NULL for ClickHouse compatibi…
devin-ai-integration[bot] Feb 25, 2026
1d2fe6c
fix: ClickHouse Nullable(Float32) cast + HTTP API seed null fix
devin-ai-integration[bot] Feb 25, 2026
d43adc6
fix: address CodeRabbit review + revert NOT IN back to LEFT JOIN
devin-ai-integration[bot] Feb 25, 2026
e36e33e
fix: address CodeRabbit review round 2 - env vars, timeout, SQL injec…
devin-ai-integration[bot] Feb 25, 2026
ca0db37
fix: ClickHouse full_names adapter.dispatch, seasonality macros, even…
devin-ai-integration[bot] Feb 25, 2026
749d582
fix: ClickHouse event_freshness timediff NULL handling + list_concat …
devin-ai-integration[bot] Feb 25, 2026
4cd2e2a
fix: dynamically resolve ClickHouse schema from dbt profiles.yml inst…
devin-ai-integration[bot] Feb 25, 2026
956c061
ci: restore full CI matrix with all warehouse types
devin-ai-integration[bot] Feb 25, 2026
ad08596
refactor: extract ClickHouse seed repair utils + dispatch empty-strin…
devin-ai-integration[bot] Feb 25, 2026
92d0e89
refactor: remove unused clickhouse__ dispatch from replace_empty_stri…
devin-ai-integration[bot] Feb 25, 2026
48ea275
ci: retrigger CI to verify flaky test_seed_group_attribute failure
devin-ai-integration[bot] Feb 25, 2026
81219e3
Merge origin/master into core-397-clickhouse-support
devin-ai-integration[bot] Feb 27, 2026
8893238
Merge origin/master into core-397-clickhouse-support
devin-ai-integration[bot] Feb 28, 2026
1915c34
refactor: replace clickhouse_utils.py with ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
5b38158
fix: add type inference to ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
4d6e7c2
fix: treat booleans as strings in ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
601db95
fix: use Nullable(Bool) for boolean columns in ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
207d9bd
fix: write CSV for dbt node discovery in ClickHouseDirectSeeder
devin-ai-integration[bot] Mar 1, 2026
335fbf3
refactor: remove run_operation retry logic from run_query path
devin-ai-integration[bot] Mar 1, 2026
6f886e7
Merge origin/master into core-397-clickhouse-support
devin-ai-integration[bot] Mar 1, 2026
8823952
docs: add comment explaining why clickhouse__has_temp_table_support r…
devin-ai-integration[bot] Mar 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,37 +1,45 @@
{% macro replace_empty_strings_with_nulls(table_name) %}
{{ return(adapter.dispatch('replace_empty_strings_with_nulls', 'elementary_tests')(table_name)) }}
Comment thread
haritamar marked this conversation as resolved.
Outdated
{% endmacro %}

{% macro default__replace_empty_strings_with_nulls(table_name) %}
{% set relation = ref(table_name) %}
{% set columns = adapter.get_columns_in_relation(relation) %}

{% for col in columns %}
{% set data_type = elementary.get_column_data_type(col) %}
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}

{% if normalized_data_type == "string" %}
{% set quoted_col = adapter.quote(col["name"]) %}
{% set update_query %}
update {{ relation }}
set {{ quoted_col }} = NULL
where {{ quoted_col }} = ''
{% endset %}
{% do elementary.run_query(update_query) %}
{% endif %}
{% endfor %}
{% endmacro %}

{% macro clickhouse__replace_empty_strings_with_nulls(table_name) %}
{% set relation = ref(table_name) %}
{% set columns = adapter.get_columns_in_relation(relation) %}

{% if target.type == "clickhouse" %}
{# On ClickHouse, columns are non-Nullable by default so NULLs in CSV seeds become
empty strings. We first ALTER each string column to Nullable(String), then use
ALTER TABLE UPDATE to convert empty strings to NULLs.
We use statement blocks for DDL since dbt.run_query may not handle DDL on ClickHouse. #}
{% for col in columns %}
{% set data_type = elementary.get_column_data_type(col) %}
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}
{% if normalized_data_type == "string" %}
{% call statement('alter_nullable_' ~ col['name'], fetch_result=False) %}
alter table {{ relation }} modify column `{{ col['name'] }}` Nullable(String)
{% endcall %}
{% call statement('update_nulls_' ~ col['name'], fetch_result=False) %}
alter table {{ relation }} update `{{ col['name'] }}` = NULL where `{{ col['name'] }}` = '' settings mutations_sync = 1
{% endcall %}
{% endif %}
{% endfor %}
{% else %}
{% for col in columns %}
{% set data_type = elementary.get_column_data_type(col) %}
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}

{% if normalized_data_type == "string" %}
{% set update_query %}
update {{ relation }}
set {{ col["name"] }} = NULL
where {{ col["name"] }} = ''
{% endset %}
{% do elementary.run_query(update_query) %}
{% endif %}
{% endfor %}
{% endif %}
{# On ClickHouse, columns are non-Nullable by default so NULLs in CSV seeds become
Comment thread
haritamar marked this conversation as resolved.
Outdated
empty strings. We first ALTER each string column to Nullable(String), then use
ALTER TABLE UPDATE to convert empty strings to NULLs.
We use statement blocks for DDL since dbt.run_query may not handle DDL on ClickHouse. #}
{% for col in columns %}
{% set data_type = elementary.get_column_data_type(col) %}
{% set normalized_data_type = elementary.normalize_data_type(data_type) %}
{% if normalized_data_type == "string" %}
{% call statement('alter_nullable_' ~ col['name'], fetch_result=False) %}
alter table {{ relation }} modify column `{{ col['name'] }}` Nullable(String)
{% endcall %}
{% call statement('update_nulls_' ~ col['name'], fetch_result=False) %}
alter table {{ relation }} update `{{ col['name'] }}` = NULL where `{{ col['name'] }}` = '' settings mutations_sync = 1
{% endcall %}
{% endif %}
{% endfor %}
{% endmacro %}
202 changes: 202 additions & 0 deletions integration_tests/tests/clickhouse_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""Utilities for fixing ClickHouse seed tables where NULL values became default values.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking, in a different spark PR - https://github.com/elementary-data/dbt-data-reliability/pull/946/files#diff-bb81036cdf10ae73e7ae9702a8706675542e49e5d5fbeecb06e627d9035e34afR61

we added a direct seeder for Spark.
Given all the logic here, do you think it makes sense to do the same for Clickhouse?

(I think eventually I want to do this for all adapters for performance if nothing else, but since this is a new integration maybe it's a good chance).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a ClickHouse direct seeder makes a lot of sense — probably even more so than for Spark, because it would eliminate clickhouse_utils.py entirely.

Current approach (2-step, post-hoc repair):

  1. dbt seed → creates non-Nullable columns, NULLs become defaults (0, '', etc.)
  2. fix_clickhouse_seed_nulls() → queries system.columns, rebuilds table via CREATE AS SELECT ... nullIf(), EXCHANGE TABLES, DROP

Direct seeder approach (1-step, correct from the start):

  1. CREATE TABLE ... (col1 Nullable(String), col2 Nullable(String), ...) ENGINE = MergeTree() ORDER BY tuple()
  2. INSERT INTO ... VALUES (NULL, 'foo', ...) — NULLs are correct immediately

Benefits:

  • Eliminates the HTTP API dependency — the current repair uses urllib.request to hit ClickHouse's HTTP API directly because run_operation can't reliably execute DDL. A direct seeder can use AdapterQueryRunner.execute_sql() (from feat: add Docker-based Spark integration test setup #946) which goes through the dbt adapter connection, which is cleaner.
  • No post-hoc repair needed — no nullIf() heuristics, no EXCHANGE TABLES, no temp table cleanup.
  • Faster — skip dbt seed subprocess overhead (~4s) + skip the rebuild step.
  • Simpler — ~200 lines of clickhouse_utils.py replaced by ~50 lines in a ClickHouseDirectSeeder class.
  • Same pattern as Spark — consistent architecture across adapters.

One consideration: ClickHouse's INSERT VALUES syntax is slightly different from Spark's (ClickHouse uses \N for NULL in some contexts, but NULL literal works in INSERT VALUES), and we'd need to handle ClickHouse-specific escaping (backtick quoting works for identifiers). But these are minor.

Plan: I can implement this as part of this PR — create a ClickHouseDirectSeeder following the same pattern as SparkDirectSeeder, add execute_sql() to AdapterQueryRunner (from #946), and delete clickhouse_utils.py. Want me to go ahead?


ClickHouse columns are non-Nullable by default, so NULL values in CSV seeds become
default values (0 for Int, '' for String, etc.). This module provides functions to
repair those tables by:
1. Determining which columns had NULL values in the original data
2. Querying column types from system.columns
3. Rebuilding the table via INSERT SELECT with nullIf() to restore NULLs

Uses the ClickHouse HTTP API directly because dbt's run_query/statement
don't reliably execute DDL on ClickHouse.
"""

import os
import re
import urllib.parse
import urllib.request
from pathlib import Path
from typing import List, Set, Tuple

from logger import get_logger
from ruamel.yaml import YAML

logger = get_logger(__name__)

SCHEMA_NAME_SUFFIX_ENV = os.environ.get("PYTEST_XDIST_WORKER", None)
_SCHEMA_NAME_SUFFIX = f"_{SCHEMA_NAME_SUFFIX_ENV}" if SCHEMA_NAME_SUFFIX_ENV else ""


def get_clickhouse_schema(schema_name_suffix: str = _SCHEMA_NAME_SUFFIX) -> str:
"""Get the ClickHouse database (schema) name from dbt profiles.yml.

In ClickHouse, database and schema are the same concept. The schema
name comes from the dbt profile's 'schema' property, with the
schema_name_suffix appended for parallel test workers.
"""
profiles_path = Path.home() / ".dbt" / "profiles.yml"
yaml = YAML()
with open(profiles_path) as f:
profiles = yaml.load(f)
# Navigate to the clickhouse target schema
base_schema = (
profiles.get("elementary_tests", {})
.get("outputs", {})
.get("clickhouse", {})
.get("schema", "default")
)
return f"{base_schema}{schema_name_suffix}"


def _get_clickhouse_connection_params() -> Tuple[str, str, str]:
"""Return (base_url, user, password) for ClickHouse HTTP API."""
ch_host = os.environ.get("CLICKHOUSE_HOST", "localhost")
ch_port = os.environ.get("CLICKHOUSE_PORT", "8123")
ch_user = os.environ.get("CLICKHOUSE_USER", "default")
ch_password = os.environ.get("CLICKHOUSE_PASSWORD", "default")
ch_url = f"http://{ch_host}:{ch_port}"
return ch_url, ch_user, ch_password


def clickhouse_query_with_api(query: str) -> str:
"""Execute a SQL query against ClickHouse via the HTTP API.

Uses URL-encoded credentials to handle special characters safely.
"""
ch_url, ch_user, ch_password = _get_clickhouse_connection_params()
encoded = query.encode("utf-8")
query_string = urllib.parse.urlencode(
{"user": ch_user, "password": ch_password, "mutations_sync": 1}
)
req = urllib.request.Request(
f"{ch_url}/?{query_string}",
data=encoded,
)
with urllib.request.urlopen(req, timeout=60) as resp: # noqa: S310
return resp.read().decode("utf-8")


def _find_nullable_columns(data: List[dict]) -> Set[str]:
"""Find columns that contain at least one NULL value in the original data."""
nullable_columns: Set[str] = set()
for row in data:
for col_name, value in row.items():
if value is None:
nullable_columns.add(col_name)
return nullable_columns


def _get_column_types(schema: str, table_name: str) -> List[Tuple[str, str]]:
"""Query system.columns for (name, type) pairs of the given table."""
# Validate identifiers to prevent SQL injection
if not re.fullmatch(r"[A-Za-z0-9_]+", schema):
raise ValueError(f"Invalid schema name: {schema!r}")
if not re.fullmatch(r"[A-Za-z0-9_]+", table_name):
raise ValueError(f"Invalid table name: {table_name!r}")

cols_result = clickhouse_query_with_api(
f"SELECT name, type FROM system.columns "
f"WHERE database = '{schema}' AND table = '{table_name}'"
).strip()
if not cols_result:
return []

columns: List[Tuple[str, str]] = []
for line in cols_result.split("\n"):
parts = line.strip().split("\t")
if len(parts) == 2:
columns.append((parts[0], parts[1]))
return columns


def _build_select_with_null_repair(
columns: List[Tuple[str, str]], nullable_columns: Set[str]
) -> str:
"""Build a SELECT expression list that uses nullIf() for nullable columns."""
select_exprs: List[str] = []
for col_name, col_type in columns:
if col_name in nullable_columns:
# Strip Nullable(...) wrapper from a prior run to avoid
# Nullable(Nullable(...)) nesting
base_type = col_type
if base_type.startswith("Nullable(") and base_type.endswith(")"):
base_type = base_type[len("Nullable(") : -1]
# Get the default value for this type to use with nullIf
if (
base_type == "String"
or base_type.startswith("FixedString")
or base_type.startswith("LowCardinality")
):
default_val = "''"
elif base_type.startswith("Int") or base_type.startswith("UInt"):
default_val = "0"
elif base_type.startswith("Float"):
default_val = "0"
else:
default_val = "defaultValueOfTypeName('" + base_type + "')"
select_exprs.append(
f"nullIf(`{col_name}`, {default_val})::Nullable({base_type}) as `{col_name}`"
)
else:
select_exprs.append(f"`{col_name}`")
return ", ".join(select_exprs)


def _rebuild_table_with_nulls(
schema: str,
table_name: str,
select_sql: str,
nullable_columns: Set[str],
) -> None:
"""Rebuild a ClickHouse table using CREATE temp / EXCHANGE / DROP."""
tmp_name = f"{table_name}_tmp_nullable"

logger.info(
"ClickHouse fix: rebuilding %s.%s with Nullable columns: %s",
schema,
table_name,
nullable_columns,
)

clickhouse_query_with_api(f"DROP TABLE IF EXISTS {schema}.{tmp_name}")
try:
clickhouse_query_with_api(
f"CREATE TABLE {schema}.{tmp_name} "
f"ENGINE = MergeTree() ORDER BY tuple() "
f"AS SELECT {select_sql} FROM {schema}.{table_name}"
)
clickhouse_query_with_api(
f"EXCHANGE TABLES {schema}.{table_name} AND {schema}.{tmp_name}"
)
finally:
clickhouse_query_with_api(f"DROP TABLE IF EXISTS {schema}.{tmp_name}")


def fix_clickhouse_seed_nulls(
table_name: str, data: List[dict], schema_name_suffix: str
) -> None:
"""Fix ClickHouse seed tables where NULL values became default values.

This is the main entry point. It:
1. Finds which columns had NULL values in the original data
2. Queries column types from system.columns
3. Rebuilds the table via INSERT SELECT with nullIf() to restore NULLs
"""
nullable_columns = _find_nullable_columns(data)
if not nullable_columns:
return

schema = get_clickhouse_schema(schema_name_suffix)
columns = _get_column_types(schema, table_name)
if not columns:
logger.warning(
"ClickHouse fix: no columns found for %s.%s - "
"schema may be wrong (using '%s'). NULLs will not be repaired.",
schema,
table_name,
schema,
)
return

select_sql = _build_select_with_null_repair(columns, nullable_columns)
_rebuild_table_with_nulls(schema, table_name, select_sql, nullable_columns)
Loading