From 7c2f060a4b053e075a51d2d1cbe89ca3a70e4515 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Tue, 12 May 2026 11:01:22 +0200 Subject: [PATCH 01/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- docs/concepts/models/overview.md | 2 +- sqlmesh/core/engine_adapter/mssql.py | 66 ++++++++++++++++++++++++- tests/core/engine_adapter/test_mssql.py | 23 +++++++++ 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/docs/concepts/models/overview.md b/docs/concepts/models/overview.md index d6356462b4..e37be3822a 100644 --- a/docs/concepts/models/overview.md +++ b/docs/concepts/models/overview.md @@ -184,7 +184,7 @@ This table lists each engine's support for `TABLE` and `VIEW` object comments: | DuckDB <=0.9 | N | N | | DuckDB >=0.10 | Y | Y | | MySQL | Y | Y | -| MSSQL | N | N | +| MSSQL | Y | Y | | Postgres | Y | Y | | GCP Postgres | Y | Y | | Redshift | Y | N | diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index e381c0a198..6a39557d3f 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -2,6 +2,7 @@ from __future__ import annotations +from textwrap import dedent import typing as t import logging @@ -53,8 +54,8 @@ class MSSQLEngineAdapter( SUPPORTS_TUPLE_IN = False SUPPORTS_MATERIALIZED_VIEWS = False CURRENT_CATALOG_EXPRESSION = exp.func("db_name") - COMMENT_CREATION_TABLE = CommentCreationTable.UNSUPPORTED - COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED + COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY + COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY SUPPORTS_REPLACE_TABLE = False MAX_IDENTIFIER_LENGTH = 128 SUPPORTS_QUERY_EXECUTION_TRACKING = True @@ -457,3 +458,64 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> N ) return super().delete_from(table_name, where) + + def _build_create_comment_column_exp( + self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" + ) -> exp.Comment | str: + tsql_text = dedent(f""" + SET NOCOUNT ON; + + DECLARE @comment sql_variant = {exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL"}; + DECLARE @property_name VARCHAR(128) = 'MS_Description'; + DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; + DECLARE @object_name VARCHAR(128) = '{table.name}'; + DECLARE @object_kind VARCHAR(128) = '{table_kind}'; + DECLARE @column_name VARCHAR(128) = '{column_name}'; + DECLARE @existing sql_variant; + + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); + + IF @comment IS NULL + BEGIN + IF @existing IS NOT NULL + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + END + ELSE + BEGIN + IF @existing IS NULL + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + ELSE IF @existing != @comment + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + END + """) + return tsql_text + + def _build_create_comment_table_exp( + self, table: exp.Table, table_comment: str, table_kind: str + ) -> exp.Comment | str: + tsql_text = dedent(f""" + SET NOCOUNT ON; + + DECLARE @comment sql_variant = {exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL"}; + DECLARE @property_name VARCHAR(128) = 'MS_Description'; + DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; + DECLARE @object_name VARCHAR(128) = '{table.name}'; + DECLARE @object_kind VARCHAR(128) = '{table_kind}'; + DECLARE @existing sql_variant; + + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, DEFAULT, DEFAULT); + + IF @comment IS NULL + BEGIN + IF @existing IS NOT NULL + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name; + END + ELSE + BEGIN + IF @existing IS NULL + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name; + ELSE IF @existing != @comment + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name; + END + """) + return tsql_text diff --git a/tests/core/engine_adapter/test_mssql.py b/tests/core/engine_adapter/test_mssql.py index ec6a4ba3e8..1123e0511a 100644 --- a/tests/core/engine_adapter/test_mssql.py +++ b/tests/core/engine_adapter/test_mssql.py @@ -1002,3 +1002,26 @@ def python_scd2_model(context, **kwargs): snapshot: Snapshot = make_snapshot(m) assert snapshot.node.physical_properties == m.physical_properties assert snapshot.node.physical_properties.get("mssql_merge_exists") + + +def test_comments(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): + adapter = make_mocked_engine_adapter(MSSQLEngineAdapter) + + columns_to_types = { + "cola": exp.DataType.build("INT"), + "colb": exp.DataType.build("TEXT"), + } + adapter.create_table( + "test_table", columns_to_types, table_description="\\", column_descriptions={"cola": "\\"} + ) + + sql_calls = to_sql_calls(adapter) + assert sql_calls == [ + """IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'test_table') EXEC('CREATE TABLE [test_table] ([cola] INTEGER, [colb] VARCHAR(MAX))');""", + adapter._build_create_comment_table_exp( + exp.table_("test_table", quoted=True), "\\", "TABLE" + ), + adapter._build_create_comment_column_exp( + exp.table_("test_table", quoted=True), "cola", "\\", "TABLE" + ), + ] From 04a89efbc11e7bd397722af3ad97ed05f15dc0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Mon, 8 Jun 2026 20:15:47 +0200 Subject: [PATCH 02/14] mssql test fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- .../engine_adapter/integration/__init__.py | 18 +++++++++++++++ tests/core/engine_adapter/test_mssql.py | 23 ------------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 47ccdc876a..811175a37d 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -520,6 +520,14 @@ def get_table_comment( AND c.relkind = '{"v" if table_kind == "VIEW" else "r"}' ; """ + elif self.dialect == "tsql": + kind = "table" if table_kind == "BASE TABLE" else "view" + query = f""" + SELECT + ep.name, + CAST(ep.value AS NVARCHAR(MAX)) comment + FROM fn_listextendedproperty('MS_Description', 'schema', '{schema_name}', '{kind}', '{table_name}', DEFAULT, DEFAULT) ep + """ result = self.engine_adapter.fetchall(query) @@ -629,6 +637,16 @@ def get_column_comments( AND c.relkind = '{"v" if table_kind == "VIEW" else "r"}' ; """ + elif self.dialect == "tsql": + kind = "table" if table_kind == "BASE TABLE" else "view" + query = f""" + SELECT + col.COLUMN_NAME column_name, + CAST(ep.value AS NVARCHAR(MAX)) comment + FROM INFORMATION_SCHEMA.COLUMNS col + CROSS APPLY fn_listextendedproperty('MS_Description', 'schema', col.TABLE_SCHEMA, '{kind}', col.TABLE_NAME, 'column', col.COLUMN_NAME) ep + WHERE col.TABLE_SCHEMA = '{schema_name}' AND col.TABLE_NAME = '{table_name}' + """ result = self.engine_adapter.fetchall(query) diff --git a/tests/core/engine_adapter/test_mssql.py b/tests/core/engine_adapter/test_mssql.py index 1123e0511a..ec6a4ba3e8 100644 --- a/tests/core/engine_adapter/test_mssql.py +++ b/tests/core/engine_adapter/test_mssql.py @@ -1002,26 +1002,3 @@ def python_scd2_model(context, **kwargs): snapshot: Snapshot = make_snapshot(m) assert snapshot.node.physical_properties == m.physical_properties assert snapshot.node.physical_properties.get("mssql_merge_exists") - - -def test_comments(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): - adapter = make_mocked_engine_adapter(MSSQLEngineAdapter) - - columns_to_types = { - "cola": exp.DataType.build("INT"), - "colb": exp.DataType.build("TEXT"), - } - adapter.create_table( - "test_table", columns_to_types, table_description="\\", column_descriptions={"cola": "\\"} - ) - - sql_calls = to_sql_calls(adapter) - assert sql_calls == [ - """IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'test_table') EXEC('CREATE TABLE [test_table] ([cola] INTEGER, [colb] VARCHAR(MAX))');""", - adapter._build_create_comment_table_exp( - exp.table_("test_table", quoted=True), "\\", "TABLE" - ), - adapter._build_create_comment_column_exp( - exp.table_("test_table", quoted=True), "cola", "\\", "TABLE" - ), - ] From 3ebaa8364cf4b53c0d2ce7390d773ee9502203f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Thu, 18 Jun 2026 23:40:38 +0200 Subject: [PATCH 03/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- sqlmesh/core/engine_adapter/mssql.py | 65 ++++++++++++++++------------ 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 6a39557d3f..45345a9b7f 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -459,63 +459,74 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> N return super().delete_from(table_name, where) - def _build_create_comment_column_exp( - self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" + def _build_create_comment_table_exp( + self, table: exp.Table, table_comment: str, table_kind: str ) -> exp.Comment | str: - tsql_text = dedent(f""" - SET NOCOUNT ON; - - DECLARE @comment sql_variant = {exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL"}; + template = dedent(""" + DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; - DECLARE @object_name VARCHAR(128) = '{table.name}'; - DECLARE @object_kind VARCHAR(128) = '{table_kind}'; - DECLARE @column_name VARCHAR(128) = '{column_name}'; + DECLARE @schema_name VARCHAR(128) = '{schema_name}'; + DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @object_kind VARCHAR(128) = '{object_kind}'; DECLARE @existing sql_variant; - SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, DEFAULT, DEFAULT); IF @comment IS NULL BEGIN IF @existing IS NOT NULL - EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name; END ELSE BEGIN IF @existing IS NULL - EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name; ELSE IF @existing != @comment - EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name; END """) + tsql_text = template.format( + comment = exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL", + schema_name = table.db if table.db else "dbo", + object_name = table.name, + object_kind = table_kind, + ) return tsql_text - def _build_create_comment_table_exp( - self, table: exp.Table, table_comment: str, table_kind: str + def _build_create_comment_column_exp( + self, table: exp.Table, column_name: str, column_comment: str, table_kind: str ) -> exp.Comment | str: - tsql_text = dedent(f""" - SET NOCOUNT ON; - DECLARE @comment sql_variant = {exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL"}; + template = dedent(""" + DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; - DECLARE @object_name VARCHAR(128) = '{table.name}'; - DECLARE @object_kind VARCHAR(128) = '{table_kind}'; + DECLARE @schema_name VARCHAR(128) = '{schema_name}'; + DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @object_kind VARCHAR(128) = '{object_kind}'; + DECLARE @column_name VARCHAR(128) = '{column_name}'; DECLARE @existing sql_variant; - SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, DEFAULT, DEFAULT); + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); IF @comment IS NULL BEGIN IF @existing IS NOT NULL - EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name; + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; END ELSE BEGIN IF @existing IS NULL - EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name; + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; ELSE IF @existing != @comment - EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name; + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; END """) - return tsql_text + tsql_text = template.format( + comment = exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL", + schema_name = table.db if table.db else "dbo", + object_name = table.name, + object_kind = table_kind, + column_name = column_name, + ) + + return tsql_text \ No newline at end of file From 23fff392062cf1aaa75c313cbafad7bfe62a3089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Fri, 19 Jun 2026 09:39:43 +0200 Subject: [PATCH 04/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- sqlmesh/core/engine_adapter/mssql.py | 29 +++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 45345a9b7f..8acb9e318c 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -460,7 +460,7 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> N return super().delete_from(table_name, where) def _build_create_comment_table_exp( - self, table: exp.Table, table_comment: str, table_kind: str + self, table: exp.Table, table_comment: str, table_kind: str = "TABLE" ) -> exp.Comment | str: template = dedent(""" DECLARE @comment sql_variant = {comment}; @@ -486,17 +486,18 @@ def _build_create_comment_table_exp( END """) tsql_text = template.format( - comment = exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL", - schema_name = table.db if table.db else "dbo", - object_name = table.name, - object_kind = table_kind, + comment=exp.Literal.string(table_comment).sql(dialect=self.dialect) + if table_comment is not None + else "NULL", + schema_name=table.db if table.db else "dbo", + object_name=table.name, + object_kind=table_kind, ) return tsql_text def _build_create_comment_column_exp( - self, table: exp.Table, column_name: str, column_comment: str, table_kind: str + self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" ) -> exp.Comment | str: - template = dedent(""" DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; @@ -522,11 +523,13 @@ def _build_create_comment_column_exp( END """) tsql_text = template.format( - comment = exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL", - schema_name = table.db if table.db else "dbo", - object_name = table.name, - object_kind = table_kind, - column_name = column_name, + comment=exp.Literal.string(column_comment).sql(dialect=self.dialect) + if column_comment is not None + else "NULL", + schema_name=table.db if table.db else "dbo", + object_name=table.name, + object_kind=table_kind, + column_name=column_name, ) - return tsql_text \ No newline at end of file + return tsql_text From f372f6259d7dd20b8c80695829cb5e2c02131221 Mon Sep 17 00:00:00 2001 From: mday-io Date: Fri, 19 Jun 2026 11:27:33 -0400 Subject: [PATCH 05/14] fix(dbt): pin pyOpenSSL>=24.0.0 for dbt 1.6 and 1.7 test environments (#5852) Signed-off-by: mday-io --- Makefile | 6 ++++-- sqlmesh/utils/__init__.py | 4 +--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 3892731352..9c41f62500 100644 --- a/Makefile +++ b/Makefile @@ -49,11 +49,13 @@ install-dev-dbt-%: $(MAKE) install-dev; \ if [ "$$version" = "1.6.0" ]; then \ echo "Applying overrides for dbt 1.6.0"; \ - $(PIP) install 'pydantic>=2.0.0' 'google-cloud-bigquery==3.30.0' 'databricks-sdk==0.28.0' --reinstall; \ + $(PIP) install 'pydantic>=2.0.0' 'google-cloud-bigquery==3.30.0' 'databricks-sdk==0.28.0' \ + 'pyOpenSSL>=24.0.0' --reinstall; \ fi; \ if [ "$$version" = "1.7.0" ]; then \ echo "Applying overrides for dbt 1.7.0"; \ - $(PIP) install 'databricks-sdk==0.28.0' --reinstall; \ + $(PIP) install 'databricks-sdk==0.28.0' \ + 'pyOpenSSL>=24.0.0' --reinstall; \ fi; \ if [ "$$version" = "1.5.0" ]; then \ echo "Applying overrides for dbt 1.5.0"; \ diff --git a/sqlmesh/utils/__init__.py b/sqlmesh/utils/__init__.py index 5b1b077216..59605893ba 100644 --- a/sqlmesh/utils/__init__.py +++ b/sqlmesh/utils/__init__.py @@ -246,9 +246,7 @@ def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any: class classproperty(property): - """ - Similar to a normal property but works for class methods - """ + """Similar to a normal property but works for class methods""" def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: return classmethod(self.fget).__get__(None, owner)() # type: ignore From 1ff0ed4a91a2704f1f5d68fd2bba27d5026d48d7 Mon Sep 17 00:00:00 2001 From: mday-io Date: Fri, 19 Jun 2026 17:29:14 -0400 Subject: [PATCH 06/14] fix(clickhouse): support multi-gateway projects with catalog-aware engines (#5826) Signed-off-by: mday-io Signed-off-by: Michael Day --- docs/integrations/engines/clickhouse.md | 51 ++- sqlmesh/core/config/connection.py | 20 +- sqlmesh/core/config/scheduler.py | 22 +- sqlmesh/core/context.py | 67 +++ sqlmesh/core/engine_adapter/base.py | 25 ++ sqlmesh/core/engine_adapter/clickhouse.py | 54 ++- tests/core/engine_adapter/test_clickhouse.py | 168 ++++++++ tests/core/test_context.py | 409 +++++++++++++++++++ 8 files changed, 812 insertions(+), 4 deletions(-) diff --git a/docs/integrations/engines/clickhouse.md b/docs/integrations/engines/clickhouse.md index 14e931b046..4c2aab6e78 100644 --- a/docs/integrations/engines/clickhouse.md +++ b/docs/integrations/engines/clickhouse.md @@ -420,6 +420,54 @@ If a model has many records in each partition, you may see additional performanc Choose a model's time partitioning granularity based on the characteristics of the data it will process, making sure the total number of partitions is 1000 or fewer. +## Multi-gateway setup + +ClickHouse does not have a catalog concept — its fully-qualified table names are two-level (`database.table`), not three-level (`catalog.database.table`). + +When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as Trino or BigQuery, the two gateway types produce FQNs with different nesting depths. SQLMesh's internal schema tracking requires uniform nesting, so it assigns a **virtual catalog** to ClickHouse models at load time. + +### How the virtual catalog works + +- SQLMesh automatically detects the nesting mismatch and injects a virtual catalog into each ClickHouse adapter when a catalog-aware gateway is also present. +- ClickHouse models will appear with three-level FQNs in `sqlmesh plan` output and logs — for example, `__ch_prod__.mydb.mytable` for a gateway named `ch_prod`. +- The virtual catalog prefix is **never sent to ClickHouse**. It is stripped from every DDL and DML statement before execution. +- When ClickHouse is the only gateway in a project, no virtual catalog is assigned and models remain two-level. + +### Adding a second gateway to an existing ClickHouse-only project + +!!! warning "Re-materialization required" + Adding a catalog-aware gateway (such as Trino or BigQuery) to a project that previously used ClickHouse as the only gateway triggers a **full re-materialization of every ClickHouse model** on the next `sqlmesh apply`. Plan for this before making the change. + +If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`): + +- `FULL` models are recreated once — cost is proportional to the size of each table. +- `INCREMENTAL_BY_TIME_RANGE` models require a **full historical backfill** from the model's configured start date. +- The old 2-level model names appear as **Removed** in the plan and will be cleaned up after the environment TTL expires. + +This is a one-time cost at the transition point and does not recur. There is no way to skip it — `--forward-only` does not apply because SQLMesh treats the 3-level names as new models, not modified ones. + +### Virtual catalog naming + +By default, the virtual catalog name is derived from **the gateway name you chose in your config**, wrapped in double underscores — for example, a gateway named `clickhouse` produces `__clickhouse__`, and a gateway named `ch_prod` produces `__ch_prod__`. The double-underscore wrapping makes it visually clear that this is an internal SQLMesh concept, not a real ClickHouse object. + +You can override the default name by setting `virtual_catalog` in your ClickHouse connection configuration: + +```yaml +gateways: + clickhouse: + connection: + type: clickhouse + host: my-clickhouse-host + username: default + virtual_catalog: ch_virtual # optional; defaults to __{gateway_name}__ (e.g. __clickhouse__) + trino: + connection: + type: trino + ... +``` + +With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytable` in plan output instead of `__clickhouse__.mydb.mytable`. + ## Local/Built-in Scheduler **Engine Adapter Type**: `clickhouse` @@ -446,4 +494,5 @@ If a model has many records in each partition, you may see additional performanc | `server_host_name` | The ClickHouse server hostname as identified by the CN or SNI of its TLS certificate. Set this to avoid SSL errors when connecting through a proxy or tunnel with a different hostname. | string | N | | `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N | | `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N | -| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N | \ No newline at end of file +| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N | +| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N | diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 7ed6b315c9..3c13035220 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -2086,6 +2086,7 @@ class ClickhouseConnectionConfig(ConnectionConfig): password: t.Optional[str] = None port: t.Optional[int] = None cluster: t.Optional[str] = None + virtual_catalog: t.Optional[str] = None connect_timeout: int = 10 send_receive_timeout: int = 300 query_limit: int = 0 @@ -2120,6 +2121,19 @@ class ClickhouseConnectionConfig(ConnectionConfig): _engine_import_validator = _get_engine_import_validator("clickhouse_connect", "clickhouse") + @field_validator("virtual_catalog") + def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]: + if v is not None and not v.strip(): + raise ConfigError( + "virtual_catalog cannot be an empty string. " + "Omit the field to use the default synthetic prefix (____)." + ) + if v is not None and "." in v: + raise ConfigError( + f"virtual_catalog must be a single identifier with no dots (got: {v!r})" + ) + return v + @property def _connection_kwargs_keys(self) -> t.Set[str]: kwargs = { @@ -2181,7 +2195,11 @@ def cloud_mode(self) -> bool: @property def _extra_engine_config(self) -> t.Dict[str, t.Any]: - return {"cluster": self.cluster, "cloud_mode": self.cloud_mode} + return { + "cluster": self.cluster, + "cloud_mode": self.cloud_mode, + "virtual_catalog": self.virtual_catalog, + } @property def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index 9d9d1d3c79..4cce9b0f76 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -138,9 +138,29 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: default_catalogs_per_gateway: t.Dict[str, str] = {} + unsupported_gateways = [] + for gateway, adapter in context.engine_adapters.items(): - if catalog := adapter.default_catalog: + if adapter.catalog_support.is_unsupported: + unsupported_gateways.append((gateway, adapter)) + elif catalog := adapter.default_catalog: default_catalogs_per_gateway[gateway] = catalog + + # When catalog-aware gateways exist, assign the gateway name as a virtual catalog for + # catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the + # project have a uniform 3-level FQN and the MappingSchema nesting level check passes. + # Only adapters that explicitly return True from supports_virtual_catalog() are mutated; + # other UNSUPPORTED adapters are left unchanged to avoid silent breakage. + if default_catalogs_per_gateway and unsupported_gateways: + for gateway, adapter in unsupported_gateways: + if adapter.supports_virtual_catalog(): + adapter.inject_virtual_catalog(gateway) + # Read the actual virtual catalog name back from the adapter — it may differ + # from the gateway name if the user configured a custom virtual_catalog value. + # inject_virtual_catalog() always sets _default_catalog so default_catalog + # cannot return None at this point. + default_catalogs_per_gateway[gateway] = adapter.default_catalog # type: ignore[assignment] + return default_catalogs_per_gateway diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 14e37e1313..5902977331 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -492,6 +492,7 @@ def engine_adapter(self) -> EngineAdapter: @property def snapshot_evaluator(self) -> SnapshotEvaluator: if not self._snapshot_evaluator: + self._ensure_virtual_catalog_injection() self._snapshot_evaluator = SnapshotEvaluator( { gateway: adapter.with_settings(execute_log_level=logging.INFO) @@ -502,6 +503,15 @@ def snapshot_evaluator(self) -> SnapshotEvaluator: ) return self._snapshot_evaluator + def _ensure_virtual_catalog_injection(self) -> None: + """Ensure virtual catalog injection has run before adapters are cloned for SnapshotEvaluator. + + Injection is a side effect of get_default_catalog_per_gateway. In normal usage it fires + earlier (default_catalog is accessed during model loading), but this guard covers the edge + case where snapshot_evaluator is accessed directly on a fresh context before any model ops. + """ + _ = self.default_catalog_per_gateway + def execution_context( self, deployability_index: t.Optional[DeployabilityIndex] = None, @@ -1440,6 +1450,8 @@ def plan( plan = plan_builder.build() + self._warn_if_virtual_catalog_rematerialization(plan) + if no_auto_categorization or plan.uncategorized: # Prompts are required if the auto categorization is disabled # or if there are any uncategorized snapshots in the plan @@ -2746,6 +2758,61 @@ def _run_plan_tests(self, skip_tests: bool = False) -> t.Optional[ModelTextTestR return result return None + def _warn_if_virtual_catalog_rematerialization(self, plan: "Plan") -> None: + """Warn when ClickHouse models appear as new snapshots solely because a virtual catalog + prefix was added to their FQNs after a catalog-aware gateway joined the project. + + This situation causes every previously-applied ClickHouse model to be treated as brand-new + by SQLMesh, triggering full re-materialization and historical backfills. Emitting a warning + before the plan is displayed gives users a chance to understand the cost before applying. + """ + from sqlglot import exp + + # Collect the set of old 2-level snapshot names from the current environment so we can + # detect which new 3-level names are renames rather than genuinely new models. + old_names: t.Set[str] = set() + for s_id in plan.context_diff.removed_snapshots: + old_names.add(s_id.name) + for name in plan.context_diff.snapshots_by_name: + old_names.add(name) + + affected: t.List[t.Tuple[str, str]] = [] # (new_3level_name, old_2level_name) + + for gateway, adapter in self.engine_adapters.items(): + if not adapter.supports_virtual_catalog() or not adapter._default_catalog: + continue + virtual_catalog = adapter._default_catalog + + for snapshot in plan.new_snapshots: + table = exp.to_table(snapshot.name) + if table.catalog != virtual_catalog: + continue + # Reconstruct the 2-level name that would have been used before injection. + old_name = f"{table.db}.{table.name}" + if old_name in old_names: + affected.append((snapshot.name, old_name)) + + if not affected: + return + + max_display = 10 + model_lines = "\n".join( + f" - {new_name} (was: {old_name})" for new_name, old_name in affected[:max_display] + ) + if len(affected) > max_display: + model_lines += f"\n ... and {len(affected) - max_display} more" + + self.console.log_warning( + "ClickHouse models are being re-materialized due to virtual catalog FQN change.\n\n" + "The following ClickHouse models appear as new because their fully-qualified\n" + "names changed from 2-level (db.table) to 3-level (__gateway__.db.table):\n\n" + f"{model_lines}\n\n" + "FULL models will be recreated once. INCREMENTAL_BY_TIME_RANGE models will\n" + "require a full historical backfill from their configured start date.\n\n" + "This is a one-time cost when first adding a catalog-aware gateway to an\n" + "existing ClickHouse project. To proceed, run `sqlmesh apply`." + ) + @property def _model_tables(self) -> t.Dict[str, str]: """Mapping of model name to physical table name. diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 7295714087..54a27c0920 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -223,6 +223,31 @@ def comments_enabled(self) -> bool: def catalog_support(self) -> CatalogSupport: return CatalogSupport.UNSUPPORTED + def supports_virtual_catalog(self) -> bool: + """Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment. + + When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways + (e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays + consistent. Adapters that return True here opt in to receiving an injected virtual catalog + via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog + from DDL expressions rather than raising UnsupportedCatalogOperationError. + """ + return False + + def inject_virtual_catalog(self, gateway: str) -> None: + """Inject a gateway name to configure the adapter's virtual catalog. + + The adapter determines the final catalog name from the gateway name (e.g. ClickHouse + wraps it as __{gateway}__). Only call this on adapters that return True from + supports_virtual_catalog(). After injection, catalog_support should return + SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL + expressions instead of raising an error. + """ + raise NotImplementedError( + f"{self.dialect} does not support virtual catalog injection. " + "Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()." + ) + @cached_property def schema_differ(self) -> SchemaDiffer: return SchemaDiffer( diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index c41681ade2..f5ef705b08 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -8,6 +8,7 @@ from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport from sqlmesh.core.engine_adapter.shared import ( + CatalogSupport, DataObject, DataObjectType, EngineRunMode, @@ -42,6 +43,22 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): DEFAULT_TABLE_ENGINE = "MergeTree" ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$" + @property + def catalog_support(self) -> CatalogSupport: + # This property is intentionally dynamic: it transitions from UNSUPPORTED to + # SINGLE_CATALOG_ONLY after inject_virtual_catalog() sets _default_catalog. Callers must + # not cache the result — always read it live so they see the post-injection state. + if self._default_catalog: + return CatalogSupport.SINGLE_CATALOG_ONLY + return CatalogSupport.UNSUPPORTED + + def supports_virtual_catalog(self) -> bool: + return True + + def inject_virtual_catalog(self, gateway: str) -> None: + configured = self._extra_config.get("virtual_catalog") + self._default_catalog = f"__{gateway}__" if configured is None else configured + @property def engine_run_mode(self) -> EngineRunMode: if self._extra_config.get("cloud_mode"): @@ -172,10 +189,27 @@ def create_schema( Clickhouse has a two-level naming scheme [database].[table]. """ + from sqlmesh.utils.errors import SQLMeshError + properties_copy = properties.copy() if self.engine_run_mode.is_cluster: properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) + # ClickHouse does not support catalogs. When a virtual catalog has been injected + # (self._default_catalog is set), strip it from the schema name. This mirrors the + # SINGLE_CATALOG_ONLY branch in the set_catalog decorator, which does not apply here + # because this override is not wrapped by @set_catalog(). + if self._default_catalog: + schema_exp = to_schema(schema_name) + catalog_name = schema_exp.catalog + if catalog_name: + if catalog_name != self._default_catalog: + raise SQLMeshError( + f"clickhouse requires that all catalog operations be against a single catalog: " + f"{self._default_catalog}. Provided catalog: {catalog_name}" + ) + schema_name = self._strip_virtual_catalog(schema_exp) + # can't call super() because it will try to set a catalog return self._create_schema( schema_name=schema_name, @@ -446,6 +480,7 @@ def insert_overwrite_by_partition( target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None, source_columns: t.Optional[t.List[str]] = None, ) -> None: + table_name = self._strip_virtual_catalog(table_name) source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types( query_or_df, target_columns_to_types, @@ -560,6 +595,20 @@ def _create_table( target_columns_to_types or self.columns(table_name), ) + def _strip_virtual_catalog(self, name: "TableName") -> exp.Table: + """Strip the virtual catalog prefix from a table name if present. + + When a virtual catalog has been injected, ClickHouse table names carry a + synthetic catalog prefix (e.g. ``__gw__``) so they match the 3-level FQN + depth of catalog-aware peers. This helper removes that prefix before any + SQL is sent to the wire, since ClickHouse only supports a two-level + ``[database].[table]`` naming scheme. + """ + table = exp.to_table(name) + if self._default_catalog and table.catalog == self._default_catalog: + table.set("catalog", None) + return table + def _exchange_tables( self, old_table_name: TableName, @@ -598,7 +647,7 @@ def _rename_table( self.execute(f"RENAME TABLE {old_table_sql} TO {new_table_sql}{self._on_cluster_sql()}") def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> None: - delete_expr = exp.delete(table_name, where) + delete_expr = exp.delete(self._strip_virtual_catalog(table_name), where) if self.engine_run_mode.is_cluster: delete_expr.set("cluster", exp.OnCluster(this=exp.to_identifier(self.cluster))) self.execute(delete_expr) @@ -614,6 +663,9 @@ def alter_table( for alter_expression in [ x.expression if isinstance(x, TableAlterOperation) else x for x in alter_expressions ]: + if self._default_catalog and isinstance(alter_expression.this, exp.Table): + if alter_expression.this.catalog == self._default_catalog: + alter_expression.this.set("catalog", None) if self.engine_run_mode.is_cluster: alter_expression.set( "cluster", exp.OnCluster(this=exp.to_identifier(self.cluster)) diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index b2ff0592d2..110f937ccb 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1407,3 +1407,171 @@ def test_exchange_tables( 'RENAME TABLE "table2" TO "table1"', 'DROP TABLE IF EXISTS "__temp_table1_abcd"', ] + + +def test_virtual_catalog_ddl_stripping(make_mocked_engine_adapter: t.Callable): + """After inject_virtual_catalog(), create_schema() with the virtual catalog prefix must strip + the catalog and execute without raising, and with a wrong catalog must raise SQLMeshError.""" + from sqlmesh.utils.errors import SQLMeshError + + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + + assert adapter.supports_virtual_catalog() is True + adapter.inject_virtual_catalog("clickhouse_gw") + + # catalog_support must switch to SINGLE_CATALOG_ONLY after injection + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + # The default synthetic virtual catalog wraps the gateway name in double underscores. + assert adapter._default_catalog == "__clickhouse_gw__" + + # create_schema with the virtual catalog prefix must strip the catalog and not raise + adapter.create_schema("__clickhouse_gw__.mydb") + assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "mydb"'] + + # create_schema with a wrong catalog must raise SQLMeshError + with pytest.raises(SQLMeshError, match="__clickhouse_gw__"): + adapter.create_schema("wrong_catalog.mydb") + + +def test_supports_virtual_catalog_returns_true(): + """ClickhouseEngineAdapter.supports_virtual_catalog() must return True without any connection.""" + from unittest.mock import MagicMock + + adapter = ClickhouseEngineAdapter( + lambda *a, **k: MagicMock(), + dialect="clickhouse", + ) + assert adapter.supports_virtual_catalog() is True + assert adapter._default_catalog is None + + +def test_inject_virtual_catalog_uses_custom_config(make_mocked_engine_adapter: t.Callable): + """When virtual_catalog is set in _extra_config, inject_virtual_catalog uses that value + instead of the synthetic __gateway_name__ default.""" + adapter = make_mocked_engine_adapter( + ClickhouseEngineAdapter, + virtual_catalog="my_custom_catalog", + ) + + adapter.inject_virtual_catalog("clickhouse_gw") + + # The user-configured value must take precedence over the synthetic default. + assert adapter._default_catalog == "my_custom_catalog" + + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + +def test_clickhouse_connection_config_virtual_catalog_extra_engine_config(): + """virtual_catalog set on ClickhouseConnectionConfig must appear in _extra_engine_config + so that the value reaches the adapter's _extra_config dict.""" + from sqlmesh.core.config.connection import ClickhouseConnectionConfig + + config = ClickhouseConnectionConfig( + host="localhost", username="user", virtual_catalog="my_catalog" + ) + assert config._extra_engine_config.get("virtual_catalog") == "my_catalog" + + +def test_clickhouse_connection_config_virtual_catalog_empty_string_rejected(): + """virtual_catalog: "" is a footgun — the empty string propagates to _default_catalog, + which is falsy, so catalog_support stays UNSUPPORTED and the nesting error persists. + Reject it at config parse time with a clear message.""" + import pytest + + from sqlmesh.core.config.connection import ClickhouseConnectionConfig + from sqlmesh.utils.errors import ConfigError + + with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"): + ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog="") + + with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"): + ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog=" ") + + +def test_virtual_catalog_stripped_in_delete_from(make_mocked_engine_adapter: t.Callable): + """delete_from() must strip the virtual catalog prefix before building the DELETE expression.""" + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + adapter.inject_virtual_catalog("ch_gw") + assert adapter._default_catalog == "__ch_gw__" + + adapter.delete_from("__ch_gw__.mydb.my_table", "a = 1") + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 1 + assert "__ch_gw__" not in sql_calls[0] + assert "mydb" in sql_calls[0] + assert "my_table" in sql_calls[0] + assert "DELETE FROM" in sql_calls[0] + + +def test_virtual_catalog_stripped_in_insert_overwrite_by_partition( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + """insert_overwrite_by_partition() must strip the virtual catalog prefix before any SQL is sent.""" + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + adapter.inject_virtual_catalog("ch_gw") + assert adapter._default_catalog == "__ch_gw__" + + # Patch _insert_overwrite_by_condition so we can inspect how table_name was passed. + overwrite_mock = mocker.patch.object(adapter, "_insert_overwrite_by_condition") + # Also patch _get_source_queries_and_columns_to_types to avoid needing a real query. + source_query_mock = mocker.patch.object( + adapter, + "_get_source_queries_and_columns_to_types", + return_value=([], {}), + ) + + adapter.insert_overwrite_by_partition( + "__ch_gw__.mydb.my_table", + parse_one("SELECT 1 AS col"), + partitioned_by=[exp.column("ds")], + ) + + # The table_name passed to _insert_overwrite_by_condition must not contain the virtual catalog. + assert overwrite_mock.called + table_name_arg = overwrite_mock.call_args[0][0] + table_name_sql = ( + table_name_arg.sql("clickhouse") + if isinstance(table_name_arg, exp.Expression) + else str(table_name_arg) + ) + assert "__ch_gw__" not in table_name_sql + + # The target_table passed to _get_source_queries_and_columns_to_types must also be stripped. + assert source_query_mock.called + target_table_kwarg = source_query_mock.call_args[1].get( + "target_table", + source_query_mock.call_args[0][2] if len(source_query_mock.call_args[0]) > 2 else None, + ) + if target_table_kwarg is not None: + target_sql = ( + target_table_kwarg.sql("clickhouse") + if isinstance(target_table_kwarg, exp.Expression) + else str(target_table_kwarg) + ) + assert "__ch_gw__" not in target_sql + + +def test_virtual_catalog_stripped_in_alter_table(make_mocked_engine_adapter: t.Callable): + """alter_table() must strip the virtual catalog prefix from each ALTER TABLE statement.""" + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + adapter.inject_virtual_catalog("ch_gw") + assert adapter._default_catalog == "__ch_gw__" + + alter_expr = exp.Alter( + this=exp.to_table("__ch_gw__.mydb.my_table"), + kind="TABLE", + actions=[exp.Drop(this=exp.to_column("col_a"), kind="COLUMN")], + ) + adapter.alter_table([alter_expr]) + + sql_calls = to_sql_calls(adapter) + assert len(sql_calls) == 1 + assert "__ch_gw__" not in sql_calls[0] + assert "mydb" in sql_calls[0] + assert "my_table" in sql_calls[0] + assert "ALTER TABLE" in sql_calls[0] diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 49b7e56e55..365d31d3fd 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -399,6 +399,415 @@ def test_multiple_gateways(tmp_path: Path): assert context.dag._sorted == ['"db"."staging"."stg_model"', '"db"."main"."final_model"'] +def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): + """ClickHouse (catalog UNSUPPORTED) alongside DuckDB (catalog FULL_SUPPORT) must not raise a + nesting-level SchemaError when models are loaded. + + Expected behaviour after the fix: + - get_default_catalog_per_gateway assigns the gateway name as a virtual catalog for + catalog-unsupported gateways when catalog-aware gateways are present. + - ClickHouse models end up with a 3-level FQN so the MappingSchema nesting is uniform. + - The virtual catalog is stripped from DDL expressions (not raised as an error) because the + adapter's catalog_support flips to SINGLE_CATALOG_ONLY when _default_catalog is set. + """ + + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + # Build a real DuckDB adapter for the primary gateway. + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # Build a minimal ClickHouse adapter stub — no real connection needed. + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + # Simulate the context's engine_adapters dict and call the scheduler directly. + engine_adapters = { + "duckdb_gw": duck_adapter, + "clickhouse_gw": ch_adapter, + } + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = engine_adapters + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # DuckDB gateway must have a real catalog entry. + assert "duckdb_gw" in catalog_per_gw + # DuckDB's default catalog is the database filename without extension. + assert catalog_per_gw["duckdb_gw"] == "db" + # ClickHouse gateway must now also have a virtual catalog wrapped in double underscores. + assert "clickhouse_gw" in catalog_per_gw + assert catalog_per_gw["clickhouse_gw"] == "__clickhouse_gw__" + + # The ClickHouse adapter's _default_catalog must be mutated to the synthetic virtual catalog. + assert ch_adapter._default_catalog == "__clickhouse_gw__" + + # The adapter's catalog_support must now be SINGLE_CATALOG_ONLY (not UNSUPPORTED), + # so that the set_catalog decorator strips the virtual catalog instead of raising. + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + # Loading models for both gateways must not raise a SchemaError. + duckdb_model = load_sql_based_model( + parse("MODEL(name main.duckdb_tbl, kind FULL, gateway duckdb_gw);\nSELECT 1 AS col"), + default_catalog="db", + ) + ch_model = load_sql_based_model( + parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"), + default_catalog="__clickhouse_gw__", + ) + + # Both models must have 3-level FQNs so MappingSchema nesting is uniform. + # count(".") == 2 means 3 parts (catalog.db.table), i.e. a 3-level FQN. + assert duckdb_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for duckdb model, got: {duckdb_model.fqn}" + ) + assert ch_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for ch model, got: {ch_model.fqn}" + ) # 3 parts = 2 dots + + # Both models loaded into the same MappingSchema must not raise a nesting SchemaError. + from sqlglot.schema import MappingSchema + + schema = MappingSchema(normalize=False) + schema.add_table(duckdb_model.fqn, duckdb_model.columns_to_types or {}) + schema.add_table(ch_model.fqn, ch_model.columns_to_types or {}) + + +def test_single_gateway_clickhouse_no_virtual_catalog(mocker): + """When ClickHouse is the only gateway (no catalog-aware peer), it must NOT receive a virtual + catalog. Models remain 2-level and catalog_support stays UNSUPPORTED.""" + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = {"clickhouse_gw": ch_adapter} + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # With only a catalog-unsupported gateway there must be no entry at all. + assert "clickhouse_gw" not in catalog_per_gw + + # The adapter must remain unchanged — no virtual catalog injected. + assert ch_adapter._default_catalog is None + assert ch_adapter.catalog_support == CatalogSupport.UNSUPPORTED + + +def test_multi_gateway_clickhouse_custom_virtual_catalog(tmp_path: Path, mocker): + """When virtual_catalog is configured on the ClickHouse connection, that value is used as the + virtual catalog instead of the synthetic __gateway_name__ default.""" + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # Pass virtual_catalog via _extra_config (the same path used by ClickhouseConnectionConfig). + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + virtual_catalog="my_custom_catalog", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = {"duckdb_gw": duck_adapter, "clickhouse_gw": ch_adapter} + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # The configured virtual_catalog value must be used, not __clickhouse_gw__. + assert catalog_per_gw["clickhouse_gw"] == "my_custom_catalog" + assert ch_adapter._default_catalog == "my_custom_catalog" + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + +def test_snapshot_evaluator_calls_ensure_virtual_catalog_injection(mocker): + """snapshot_evaluator must call _ensure_virtual_catalog_injection before cloning adapters. + + This guards the edge case where snapshot_evaluator is the first property accessed on a fresh + context — before default_catalog fires during model loading — and ensures virtual catalog + injection still happens even in that order. + """ + ctx = Context(config=Config()) + ctx._snapshot_evaluator = None # force re-initialization + + inject_spy = mocker.patch.object(ctx, "_ensure_virtual_catalog_injection") + + _ = ctx.snapshot_evaluator + + inject_spy.assert_called_once() + + +@pytest.mark.fast +def test_multi_gateway_virtual_catalog_create_schema_strips_prefix(tmp_path: Path, mocker): + """Integration test: create_schema with a 3-level virtual-catalog FQN must strip the synthetic + catalog prefix before sending DDL to ClickHouse. + + Flow exercised: + 1. get_default_catalog_per_gateway detects a catalog-aware gateway (DuckDB) alongside + a catalog-unsupported gateway (ClickHouse) and calls inject_virtual_catalog(). + 2. The ClickHouse adapter's _default_catalog is set to ``__clickhouse_gw__``. + 3. A ClickHouse model loaded with that virtual catalog gets a 3-level FQN. + 4. When create_schema is called with the 3-level schema name the virtual catalog prefix + is stripped, so the SQL that reaches the wire uses only a 2-level name. + 5. The DuckDB adapter's _default_catalog is NOT set to a synthetic value. + """ + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # ClickHouse adapter with a mocked connection — no real server needed. + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = { + "duckdb_gw": duck_adapter, + "clickhouse_gw": ch_adapter, + } + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # --- Phase 1: virtual catalog injection assertions --- + + # DuckDB gateway must carry a real catalog entry. + assert "duckdb_gw" in catalog_per_gw + + # ClickHouse gateway must receive the synthetic ``__clickhouse_gw__`` virtual catalog. + assert catalog_per_gw["clickhouse_gw"] == "__clickhouse_gw__" + + # The ClickHouse adapter's _default_catalog must be mutated. + assert ch_adapter._default_catalog == "__clickhouse_gw__" + + # catalog_support must flip to SINGLE_CATALOG_ONLY so the set_catalog decorator strips + # the virtual catalog instead of raising when DDL is executed. + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + # DuckDB adapter must be untouched — it already has real catalog support. + assert duck_adapter._default_catalog != "__duckdb_gw__" + + # --- Phase 2: FQN uniformity --- + + ch_model = load_sql_based_model( + parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"), + default_catalog="__clickhouse_gw__", + ) + duckdb_model = load_sql_based_model( + parse("MODEL(name main.duckdb_tbl, kind FULL, gateway duckdb_gw);\nSELECT 1 AS col"), + default_catalog=catalog_per_gw["duckdb_gw"], + ) + + # Both models must have 3-level FQNs (catalog.db.table → 2 dots) so MappingSchema nesting + # is uniform and does not raise a SchemaError. + assert ch_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for ClickHouse model, got: {ch_model.fqn}" + ) + assert duckdb_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for DuckDB model, got: {duckdb_model.fqn}" + ) + + from sqlglot.schema import MappingSchema + + schema = MappingSchema(normalize=False) + schema.add_table(ch_model.fqn, ch_model.columns_to_types or {}) + schema.add_table(duckdb_model.fqn, duckdb_model.columns_to_types or {}) + + # --- Phase 3: create_schema strips the virtual catalog prefix --- + + # Spy on _create_schema to inspect what schema name is passed after stripping. + create_schema_calls: t.List[str] = [] + + def _capture_create_schema( + schema_name, + ignore_if_exists, + warn_on_error, + properties, + kind, + ): + create_schema_calls.append( + schema_name.sql(dialect="clickhouse") + if hasattr(schema_name, "sql") + else str(schema_name) + ) + + mocker.patch.object(ch_adapter, "_create_schema", side_effect=_capture_create_schema) + + # Call create_schema with the 3-level virtual-catalog-prefixed schema name. + ch_adapter.create_schema("__clickhouse_gw__.mydb") + + assert len(create_schema_calls) == 1, "Expected exactly one _create_schema call" + passed_schema = create_schema_calls[0] + # The virtual catalog prefix must NOT appear in the SQL sent to the wire. + assert "__clickhouse_gw__" not in passed_schema, ( + f"Virtual catalog prefix should be stripped before reaching _create_schema, got: {passed_schema!r}" + ) + + +@pytest.mark.fast +def test_warn_if_virtual_catalog_rematerialization_emits_warning(mocker): + """_warn_if_virtual_catalog_rematerialization must emit a log_warning when new snapshots have + 3-level FQNs that map to existing 2-level FQNs in the current environment, indicating that the + virtual catalog prefix was added to previously-applied ClickHouse models.""" + from unittest.mock import MagicMock + + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.snapshot.definition import SnapshotId + + # Build a minimal Context with no models. + ctx = Context(config=Config()) + + # Create a ClickHouse adapter with a virtual catalog already injected. + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + ch_adapter._default_catalog = "__ch_gw__" + + # Override engine_adapters so the context sees our prepared adapter. + mocker.patch.object( + type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter} + ) + + # Build a mock snapshot with a 3-level name that has the virtual catalog prefix. + new_snapshot = MagicMock() + new_snapshot.name = "__ch_gw__.mydb.my_table" + + # The old 2-level name must appear in snapshots_by_name so we detect the rename. + old_snapshot_id = SnapshotId(name="mydb.my_table", identifier="abc123") + + context_diff = MagicMock() + context_diff.new_snapshots = {new_snapshot.name: new_snapshot} + context_diff.removed_snapshots = {} + context_diff.snapshots_by_name = {"mydb.my_table": MagicMock()} + + plan = MagicMock() + plan.new_snapshots = [new_snapshot] + plan.context_diff = context_diff + + warning_mock = mocker.patch.object(ctx.console, "log_warning") + + ctx._warn_if_virtual_catalog_rematerialization(plan) + + warning_mock.assert_called_once() + warning_text = warning_mock.call_args[0][0] + assert "__ch_gw__" in warning_text + assert "mydb.my_table" in warning_text + + +@pytest.mark.fast +def test_warn_if_virtual_catalog_rematerialization_no_warning_when_genuinely_new(mocker): + """_warn_if_virtual_catalog_rematerialization must NOT warn when there is no matching old + 2-level name — i.e. the model is a brand-new model, not a renamed existing one.""" + from unittest.mock import MagicMock + + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + + ctx = Context(config=Config()) + + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + ch_adapter._default_catalog = "__ch_gw__" + + mocker.patch.object( + type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter} + ) + + new_snapshot = MagicMock() + new_snapshot.name = "__ch_gw__.mydb.brand_new_table" + + context_diff = MagicMock() + context_diff.new_snapshots = {new_snapshot.name: new_snapshot} + context_diff.removed_snapshots = {} + # No matching old name. + context_diff.snapshots_by_name = {} + + plan = MagicMock() + plan.new_snapshots = [new_snapshot] + plan.context_diff = context_diff + + warning_mock = mocker.patch.object(ctx.console, "log_warning") + + ctx._warn_if_virtual_catalog_rematerialization(plan) + + warning_mock.assert_not_called() + + +@pytest.mark.fast +def test_warn_if_virtual_catalog_rematerialization_no_warning_without_virtual_catalog(mocker): + """_warn_if_virtual_catalog_rematerialization must NOT warn when the ClickHouse adapter has no + virtual catalog injected (i.e. _default_catalog is None).""" + from unittest.mock import MagicMock + + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + + ctx = Context(config=Config()) + + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + # No virtual catalog injected — adapter stays at 2-level mode. + assert ch_adapter._default_catalog is None + + mocker.patch.object( + type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter} + ) + + new_snapshot = MagicMock() + new_snapshot.name = "mydb.my_table" + + context_diff = MagicMock() + context_diff.new_snapshots = {new_snapshot.name: new_snapshot} + context_diff.removed_snapshots = {} + context_diff.snapshots_by_name = {} + + plan = MagicMock() + plan.new_snapshots = [new_snapshot] + plan.context_diff = context_diff + + warning_mock = mocker.patch.object(ctx.console, "log_warning") + + ctx._warn_if_virtual_catalog_rematerialization(plan) + + warning_mock.assert_not_called() + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model( From 4bbc3296f683842f96b14b5566a8c14ce3898ba3 Mon Sep 17 00:00:00 2001 From: "nk(Enuke)" <143652584+nkwork9999@users.noreply.github.com> Date: Sat, 20 Jun 2026 14:39:13 +0900 Subject: [PATCH 07/14] fix(dlt): surface underlying error and hint when attaching to pipeline fails (#5836) Signed-off-by: nk Co-authored-by: Claude Fable 5 --- docs/integrations/dlt.md | 10 +++++----- docs/reference/cli.md | 3 ++- sqlmesh/cli/main.py | 4 ++-- sqlmesh/integrations/dlt.py | 20 +++++++++++++++++--- sqlmesh/magics.py | 4 ++-- tests/cli/test_cli.py | 8 +++++++- 6 files changed, 35 insertions(+), 14 deletions(-) diff --git a/docs/integrations/dlt.md b/docs/integrations/dlt.md index d8d38cb864..ffa5e87754 100644 --- a/docs/integrations/dlt.md +++ b/docs/integrations/dlt.md @@ -28,12 +28,12 @@ This will create the configuration file and directories, which are found in all SQLMesh will also automatically generate models to ingest data from the pipeline incrementally. Incremental loading is ideal for large datasets where recomputing entire tables is resource-intensive. In this case utilizing the [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range). However, these model definitions can be customized to meet your specific project needs. -#### Specify the path to the pipelines directory +#### Specify the path to the pipelines working directory -The default location for dlt pipelines is `~/.dlt/pipelines/`. If your pipelines are in a [different directory](https://dlthub.com/docs/general-usage/pipeline#separate-working-environments-with-pipelines_dir), use the `--dlt-path` argument to specify the path explicitly: +The default location for dlt pipeline working state is `~/.dlt/pipelines/`. If dlt stores your pipeline state in a [different pipelines working directory](https://dlthub.com/docs/general-usage/pipeline#separate-working-environments-with-pipelines_dir), use the `--dlt-path` argument to specify that directory explicitly. This should be the directory where dlt stores pipeline state, not the directory containing your pipeline scripts: ```bash -sqlmesh init -t dlt --dlt-pipeline --dlt-path dialect +sqlmesh init -t dlt --dlt-pipeline --dlt-path dialect ``` ### Generating models on demand @@ -58,10 +58,10 @@ sqlmesh dlt_refresh --force sqlmesh dlt_refresh --table ``` -- **Provide the explicit path to the pipelines directory** (using `--dlt-path`): +- **Provide the explicit path to the pipelines working directory** (using `--dlt-path`): ```bash -sqlmesh dlt_refresh --dlt-path +sqlmesh dlt_refresh --dlt-path ``` #### Configuration diff --git a/docs/reference/cli.md b/docs/reference/cli.md index a6d4fa9514..b65f8256ac 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -279,7 +279,8 @@ Options: empty. --dlt-pipeline TEXT DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt - --dlt-path TEXT The directory where the DLT pipeline resides. Use + --dlt-path TEXT The DLT pipelines working directory, where DLT stores + pipeline state (by default ~/.dlt/pipelines). Use alongside template: dlt --help Show this message and exit. ``` diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index c19d2ca629..b3c7a7027b 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -169,7 +169,7 @@ def cli( @click.option( "--dlt-path", type=str, - help="The directory where the DLT pipeline resides. Use alongside template: dlt", + help="The DLT pipelines working directory, where DLT stores pipeline state (by default ~/.dlt/pipelines). Use alongside template: dlt", ) @click.pass_context @error_handler @@ -1155,7 +1155,7 @@ def table_name( @click.option( "--dlt-path", type=str, - help="The directory where the DLT pipeline resides.", + help="The DLT pipelines working directory, where DLT stores pipeline state (by default ~/.dlt/pipelines).", ) @click.pass_context @error_handler diff --git a/sqlmesh/integrations/dlt.py b/sqlmesh/integrations/dlt.py index 2d601a0e22..a2202bea02 100644 --- a/sqlmesh/integrations/dlt.py +++ b/sqlmesh/integrations/dlt.py @@ -22,7 +22,8 @@ def generate_dlt_models_and_settings( pipeline_name: The name of the DLT pipeline to attach to. dialect: The SQL dialect to use for generating SQLMesh models. tables: A list of table names to include. - dlt_path: The path to the directory containing the DLT pipelines. + dlt_path: The path to the DLT pipelines working directory, where DLT stores + pipeline state (by default ~/.dlt/pipelines). Returns: A tuple containing a set of the SQLMesh model definitions, the connection config and the start date. @@ -34,8 +35,21 @@ def generate_dlt_models_and_settings( try: pipeline = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=dlt_path or "") - except CannotRestorePipelineException: - raise click.ClickException(f"Could not attach to pipeline {pipeline_name}") + except CannotRestorePipelineException as e: + from pathlib import Path + from dlt.common.pipeline import get_dlt_pipelines_dir + + searched_dir = dlt_path or get_dlt_pipelines_dir() + msg = f"Could not attach to pipeline {pipeline_name}.\nSearched in: {searched_dir}\n{e}" + if dlt_path and (Path(get_dlt_pipelines_dir()) / pipeline_name).exists(): + msg += ( + f"\nHint: A pipeline named '{pipeline_name}' exists in the default pipelines " + f"working directory '{get_dlt_pipelines_dir()}'. Note that --dlt-path must " + "point to the directory where DLT stores pipeline working state (by default " + "~/.dlt/pipelines), not the directory containing your pipeline scripts. " + "Try omitting --dlt-path." + ) + raise click.ClickException(msg) schema = pipeline.default_schema dataset = pipeline.dataset_name diff --git a/sqlmesh/magics.py b/sqlmesh/magics.py index 57dd150af2..3a59fc4f7b 100644 --- a/sqlmesh/magics.py +++ b/sqlmesh/magics.py @@ -232,7 +232,7 @@ def context(self, line: str) -> None: @argument( "--dlt-path", type=str, - help="The directory where the DLT pipeline resides. Use alongside template: dlt", + help="The DLT pipelines working directory, where DLT stores pipeline state (by default ~/.dlt/pipelines). Use alongside template: dlt", ) @line_magic def init(self, line: str) -> None: @@ -886,7 +886,7 @@ def table_name(self, context: Context, line: str) -> None: @argument( "--dlt-path", type=str, - help="The directory where the DLT pipeline resides.", + help="The DLT pipelines working directory, where DLT stores pipeline state (by default ~/.dlt/pipelines).", ) @line_magic @pass_sqlmesh_context diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index ba71e35843..938f90cc74 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -995,7 +995,7 @@ def test_dlt_pipeline(runner, tmp_path): exec(file.read()) # This should fail since it won't be able to locate the pipeline in this path - with pytest.raises(ClickException, match=r".*Could not attach to pipeline*"): + with pytest.raises(ClickException, match=r".*Could not attach to pipeline*") as excinfo: init_example_project( tmp_path, "duckdb", @@ -1004,6 +1004,12 @@ def test_dlt_pipeline(runner, tmp_path): dlt_path="./dlt2/pipelines", ) + # The error should surface where the pipeline was searched for and, since the + # pipeline exists in the default working directory, a hint about --dlt-path + error_message = str(excinfo.value) + assert "Searched in: ./dlt2/pipelines" in error_message + assert "Try omitting --dlt-path" in error_message + # By setting the pipelines path where the pipeline directory is located, it should work dlt_path = get_dlt_pipelines_dir() init_example_project( From bfda1b900d7113f5604e5c629a102e85d0e72392 Mon Sep 17 00:00:00 2001 From: Lafir <136463254+lafirm@users.noreply.github.com> Date: Sat, 20 Jun 2026 19:26:52 +0530 Subject: [PATCH 08/14] fix(docs): pin pygments to 2.19.2 and keep syntax highlighting (#5854) Signed-off-by: lafirm <136463254+lafirm@users.noreply.github.com> --- docs/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/requirements.txt b/docs/requirements.txt index 1035ffe94a..b76fca0c4d 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -4,3 +4,4 @@ mkdocs-material==9.0.5 mkdocs-material-extensions==1.1.1 mkdocs-glightbox==0.3.7 pdoc==14.5.1 +pygments==2.19.2 # Temporary pin: 2.20.0 breaks docs build; revisit after the fix \ No newline at end of file From 36b04f9264c9dcbdc2a7b5311428d8965871d448 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Mon, 22 Jun 2026 13:50:20 +0200 Subject: [PATCH 09/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- sqlmesh/core/engine_adapter/mssql.py | 37 ++++++++++++++++------------ 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 8acb9e318c..fca6b4cc9f 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -465,8 +465,8 @@ def _build_create_comment_table_exp( template = dedent(""" DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{schema_name}'; - DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @schema_name VARCHAR(128) = {schema_name}; + DECLARE @object_name VARCHAR(128) = {object_name}; DECLARE @object_kind VARCHAR(128) = '{object_kind}'; DECLARE @existing sql_variant; @@ -486,11 +486,13 @@ def _build_create_comment_table_exp( END """) tsql_text = template.format( - comment=exp.Literal.string(table_comment).sql(dialect=self.dialect) - if table_comment is not None - else "NULL", - schema_name=table.db if table.db else "dbo", - object_name=table.name, + comment=exp.Literal.string(table_comment or "NULL").sql( + dialect=self.dialect, identify=False + ), + schema_name=exp.Literal.string(table.db or "dbo").sql( + dialect=self.dialect, identify=False + ), + object_name=exp.Literal.string(table.name).sql(dialect=self.dialect, identify=False), object_kind=table_kind, ) return tsql_text @@ -501,10 +503,10 @@ def _build_create_comment_column_exp( template = dedent(""" DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{schema_name}'; - DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @schema_name VARCHAR(128) = {schema_name}; + DECLARE @object_name VARCHAR(128) = {object_name}; DECLARE @object_kind VARCHAR(128) = '{object_kind}'; - DECLARE @column_name VARCHAR(128) = '{column_name}'; + DECLARE @column_name VARCHAR(128) = {column_name}; DECLARE @existing sql_variant; SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); @@ -522,14 +524,17 @@ def _build_create_comment_column_exp( EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; END """) + tsql_text = template.format( - comment=exp.Literal.string(column_comment).sql(dialect=self.dialect) - if column_comment is not None - else "NULL", - schema_name=table.db if table.db else "dbo", - object_name=table.name, + comment=exp.Literal.string(column_comment or "NULL").sql( + dialect=self.dialect, identify=False + ), + schema_name=exp.Literal.string(table.db or "dbo").sql( + dialect=self.dialect, identify=False + ), + object_name=exp.Literal.string(table.name).sql(dialect=self.dialect, identify=False), object_kind=table_kind, - column_name=column_name, + column_name=exp.Literal.string(column_name).sql(dialect=self.dialect, identify=False), ) return tsql_text From 7805a82fe4cdd2b2087c8e2b8a1ce95d6514f5bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Tue, 12 May 2026 11:01:22 +0200 Subject: [PATCH 10/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- docs/concepts/models/overview.md | 2 +- sqlmesh/core/engine_adapter/mssql.py | 66 ++++++++++++++++++++++++- tests/core/engine_adapter/test_mssql.py | 23 +++++++++ 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/docs/concepts/models/overview.md b/docs/concepts/models/overview.md index d6356462b4..e37be3822a 100644 --- a/docs/concepts/models/overview.md +++ b/docs/concepts/models/overview.md @@ -184,7 +184,7 @@ This table lists each engine's support for `TABLE` and `VIEW` object comments: | DuckDB <=0.9 | N | N | | DuckDB >=0.10 | Y | Y | | MySQL | Y | Y | -| MSSQL | N | N | +| MSSQL | Y | Y | | Postgres | Y | Y | | GCP Postgres | Y | Y | | Redshift | Y | N | diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index e381c0a198..6a39557d3f 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -2,6 +2,7 @@ from __future__ import annotations +from textwrap import dedent import typing as t import logging @@ -53,8 +54,8 @@ class MSSQLEngineAdapter( SUPPORTS_TUPLE_IN = False SUPPORTS_MATERIALIZED_VIEWS = False CURRENT_CATALOG_EXPRESSION = exp.func("db_name") - COMMENT_CREATION_TABLE = CommentCreationTable.UNSUPPORTED - COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED + COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY + COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY SUPPORTS_REPLACE_TABLE = False MAX_IDENTIFIER_LENGTH = 128 SUPPORTS_QUERY_EXECUTION_TRACKING = True @@ -457,3 +458,64 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> N ) return super().delete_from(table_name, where) + + def _build_create_comment_column_exp( + self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" + ) -> exp.Comment | str: + tsql_text = dedent(f""" + SET NOCOUNT ON; + + DECLARE @comment sql_variant = {exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL"}; + DECLARE @property_name VARCHAR(128) = 'MS_Description'; + DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; + DECLARE @object_name VARCHAR(128) = '{table.name}'; + DECLARE @object_kind VARCHAR(128) = '{table_kind}'; + DECLARE @column_name VARCHAR(128) = '{column_name}'; + DECLARE @existing sql_variant; + + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); + + IF @comment IS NULL + BEGIN + IF @existing IS NOT NULL + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + END + ELSE + BEGIN + IF @existing IS NULL + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + ELSE IF @existing != @comment + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + END + """) + return tsql_text + + def _build_create_comment_table_exp( + self, table: exp.Table, table_comment: str, table_kind: str + ) -> exp.Comment | str: + tsql_text = dedent(f""" + SET NOCOUNT ON; + + DECLARE @comment sql_variant = {exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL"}; + DECLARE @property_name VARCHAR(128) = 'MS_Description'; + DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; + DECLARE @object_name VARCHAR(128) = '{table.name}'; + DECLARE @object_kind VARCHAR(128) = '{table_kind}'; + DECLARE @existing sql_variant; + + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, DEFAULT, DEFAULT); + + IF @comment IS NULL + BEGIN + IF @existing IS NOT NULL + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name; + END + ELSE + BEGIN + IF @existing IS NULL + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name; + ELSE IF @existing != @comment + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name; + END + """) + return tsql_text diff --git a/tests/core/engine_adapter/test_mssql.py b/tests/core/engine_adapter/test_mssql.py index ec6a4ba3e8..1123e0511a 100644 --- a/tests/core/engine_adapter/test_mssql.py +++ b/tests/core/engine_adapter/test_mssql.py @@ -1002,3 +1002,26 @@ def python_scd2_model(context, **kwargs): snapshot: Snapshot = make_snapshot(m) assert snapshot.node.physical_properties == m.physical_properties assert snapshot.node.physical_properties.get("mssql_merge_exists") + + +def test_comments(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): + adapter = make_mocked_engine_adapter(MSSQLEngineAdapter) + + columns_to_types = { + "cola": exp.DataType.build("INT"), + "colb": exp.DataType.build("TEXT"), + } + adapter.create_table( + "test_table", columns_to_types, table_description="\\", column_descriptions={"cola": "\\"} + ) + + sql_calls = to_sql_calls(adapter) + assert sql_calls == [ + """IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'test_table') EXEC('CREATE TABLE [test_table] ([cola] INTEGER, [colb] VARCHAR(MAX))');""", + adapter._build_create_comment_table_exp( + exp.table_("test_table", quoted=True), "\\", "TABLE" + ), + adapter._build_create_comment_column_exp( + exp.table_("test_table", quoted=True), "cola", "\\", "TABLE" + ), + ] From a760fd4eb30cbbc6c9b0c723d1db6ac37456aa31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Mon, 8 Jun 2026 20:15:47 +0200 Subject: [PATCH 11/14] mssql test fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- .../engine_adapter/integration/__init__.py | 18 +++++++++++++++ tests/core/engine_adapter/test_mssql.py | 23 ------------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index ee19a92a56..11bf95f3d6 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -526,6 +526,14 @@ def get_table_comment( AND c.relkind = '{"v" if table_kind == "VIEW" else "r"}' ; """ + elif self.dialect == "tsql": + kind = "table" if table_kind == "BASE TABLE" else "view" + query = f""" + SELECT + ep.name, + CAST(ep.value AS NVARCHAR(MAX)) comment + FROM fn_listextendedproperty('MS_Description', 'schema', '{schema_name}', '{kind}', '{table_name}', DEFAULT, DEFAULT) ep + """ result = self.engine_adapter.fetchall(query) @@ -636,6 +644,16 @@ def get_column_comments( AND c.relkind = '{"v" if table_kind == "VIEW" else "r"}' ; """ + elif self.dialect == "tsql": + kind = "table" if table_kind == "BASE TABLE" else "view" + query = f""" + SELECT + col.COLUMN_NAME column_name, + CAST(ep.value AS NVARCHAR(MAX)) comment + FROM INFORMATION_SCHEMA.COLUMNS col + CROSS APPLY fn_listextendedproperty('MS_Description', 'schema', col.TABLE_SCHEMA, '{kind}', col.TABLE_NAME, 'column', col.COLUMN_NAME) ep + WHERE col.TABLE_SCHEMA = '{schema_name}' AND col.TABLE_NAME = '{table_name}' + """ result = self.engine_adapter.fetchall(query) diff --git a/tests/core/engine_adapter/test_mssql.py b/tests/core/engine_adapter/test_mssql.py index 1123e0511a..ec6a4ba3e8 100644 --- a/tests/core/engine_adapter/test_mssql.py +++ b/tests/core/engine_adapter/test_mssql.py @@ -1002,26 +1002,3 @@ def python_scd2_model(context, **kwargs): snapshot: Snapshot = make_snapshot(m) assert snapshot.node.physical_properties == m.physical_properties assert snapshot.node.physical_properties.get("mssql_merge_exists") - - -def test_comments(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): - adapter = make_mocked_engine_adapter(MSSQLEngineAdapter) - - columns_to_types = { - "cola": exp.DataType.build("INT"), - "colb": exp.DataType.build("TEXT"), - } - adapter.create_table( - "test_table", columns_to_types, table_description="\\", column_descriptions={"cola": "\\"} - ) - - sql_calls = to_sql_calls(adapter) - assert sql_calls == [ - """IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'test_table') EXEC('CREATE TABLE [test_table] ([cola] INTEGER, [colb] VARCHAR(MAX))');""", - adapter._build_create_comment_table_exp( - exp.table_("test_table", quoted=True), "\\", "TABLE" - ), - adapter._build_create_comment_column_exp( - exp.table_("test_table", quoted=True), "cola", "\\", "TABLE" - ), - ] From 24804ddb3a297612448b3c1651605d38f5979a2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Thu, 18 Jun 2026 23:40:38 +0200 Subject: [PATCH 12/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- sqlmesh/core/engine_adapter/mssql.py | 65 ++++++++++++++++------------ 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 6a39557d3f..45345a9b7f 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -459,63 +459,74 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> N return super().delete_from(table_name, where) - def _build_create_comment_column_exp( - self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" + def _build_create_comment_table_exp( + self, table: exp.Table, table_comment: str, table_kind: str ) -> exp.Comment | str: - tsql_text = dedent(f""" - SET NOCOUNT ON; - - DECLARE @comment sql_variant = {exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL"}; + template = dedent(""" + DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; - DECLARE @object_name VARCHAR(128) = '{table.name}'; - DECLARE @object_kind VARCHAR(128) = '{table_kind}'; - DECLARE @column_name VARCHAR(128) = '{column_name}'; + DECLARE @schema_name VARCHAR(128) = '{schema_name}'; + DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @object_kind VARCHAR(128) = '{object_kind}'; DECLARE @existing sql_variant; - SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, DEFAULT, DEFAULT); IF @comment IS NULL BEGIN IF @existing IS NOT NULL - EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name; END ELSE BEGIN IF @existing IS NULL - EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name; ELSE IF @existing != @comment - EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name; END """) + tsql_text = template.format( + comment = exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL", + schema_name = table.db if table.db else "dbo", + object_name = table.name, + object_kind = table_kind, + ) return tsql_text - def _build_create_comment_table_exp( - self, table: exp.Table, table_comment: str, table_kind: str + def _build_create_comment_column_exp( + self, table: exp.Table, column_name: str, column_comment: str, table_kind: str ) -> exp.Comment | str: - tsql_text = dedent(f""" - SET NOCOUNT ON; - DECLARE @comment sql_variant = {exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL"}; + template = dedent(""" + DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{table.db if table.db else "dbo"}'; - DECLARE @object_name VARCHAR(128) = '{table.name}'; - DECLARE @object_kind VARCHAR(128) = '{table_kind}'; + DECLARE @schema_name VARCHAR(128) = '{schema_name}'; + DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @object_kind VARCHAR(128) = '{object_kind}'; + DECLARE @column_name VARCHAR(128) = '{column_name}'; DECLARE @existing sql_variant; - SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, DEFAULT, DEFAULT); + SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); IF @comment IS NULL BEGIN IF @existing IS NOT NULL - EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name; + EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; END ELSE BEGIN IF @existing IS NULL - EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name; + EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; ELSE IF @existing != @comment - EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name; + EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; END """) - return tsql_text + tsql_text = template.format( + comment = exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL", + schema_name = table.db if table.db else "dbo", + object_name = table.name, + object_kind = table_kind, + column_name = column_name, + ) + + return tsql_text \ No newline at end of file From baa7599ead23d3fe83caf3e42995a40a00b46702 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Fri, 19 Jun 2026 09:39:43 +0200 Subject: [PATCH 13/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- sqlmesh/core/engine_adapter/mssql.py | 29 +++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 45345a9b7f..8acb9e318c 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -460,7 +460,7 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> N return super().delete_from(table_name, where) def _build_create_comment_table_exp( - self, table: exp.Table, table_comment: str, table_kind: str + self, table: exp.Table, table_comment: str, table_kind: str = "TABLE" ) -> exp.Comment | str: template = dedent(""" DECLARE @comment sql_variant = {comment}; @@ -486,17 +486,18 @@ def _build_create_comment_table_exp( END """) tsql_text = template.format( - comment = exp.Literal.string(table_comment).sql(dialect=self.dialect) if table_comment is not None else "NULL", - schema_name = table.db if table.db else "dbo", - object_name = table.name, - object_kind = table_kind, + comment=exp.Literal.string(table_comment).sql(dialect=self.dialect) + if table_comment is not None + else "NULL", + schema_name=table.db if table.db else "dbo", + object_name=table.name, + object_kind=table_kind, ) return tsql_text def _build_create_comment_column_exp( - self, table: exp.Table, column_name: str, column_comment: str, table_kind: str + self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE" ) -> exp.Comment | str: - template = dedent(""" DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; @@ -522,11 +523,13 @@ def _build_create_comment_column_exp( END """) tsql_text = template.format( - comment = exp.Literal.string(column_comment).sql(dialect=self.dialect) if column_comment is not None else "NULL", - schema_name = table.db if table.db else "dbo", - object_name = table.name, - object_kind = table_kind, - column_name = column_name, + comment=exp.Literal.string(column_comment).sql(dialect=self.dialect) + if column_comment is not None + else "NULL", + schema_name=table.db if table.db else "dbo", + object_name=table.name, + object_kind=table_kind, + column_name=column_name, ) - return tsql_text \ No newline at end of file + return tsql_text From cc0ffbe5ad7919617e4a64dbd354f4e7608ca7c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ujfalusi=20S=C3=A1ndor?= Date: Mon, 22 Jun 2026 13:50:20 +0200 Subject: [PATCH 14/14] Comment registration support for MSSQL engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ujfalusi Sándor --- sqlmesh/core/engine_adapter/mssql.py | 37 ++++++++++++++++------------ 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index 8acb9e318c..fca6b4cc9f 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -465,8 +465,8 @@ def _build_create_comment_table_exp( template = dedent(""" DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{schema_name}'; - DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @schema_name VARCHAR(128) = {schema_name}; + DECLARE @object_name VARCHAR(128) = {object_name}; DECLARE @object_kind VARCHAR(128) = '{object_kind}'; DECLARE @existing sql_variant; @@ -486,11 +486,13 @@ def _build_create_comment_table_exp( END """) tsql_text = template.format( - comment=exp.Literal.string(table_comment).sql(dialect=self.dialect) - if table_comment is not None - else "NULL", - schema_name=table.db if table.db else "dbo", - object_name=table.name, + comment=exp.Literal.string(table_comment or "NULL").sql( + dialect=self.dialect, identify=False + ), + schema_name=exp.Literal.string(table.db or "dbo").sql( + dialect=self.dialect, identify=False + ), + object_name=exp.Literal.string(table.name).sql(dialect=self.dialect, identify=False), object_kind=table_kind, ) return tsql_text @@ -501,10 +503,10 @@ def _build_create_comment_column_exp( template = dedent(""" DECLARE @comment sql_variant = {comment}; DECLARE @property_name VARCHAR(128) = 'MS_Description'; - DECLARE @schema_name VARCHAR(128) = '{schema_name}'; - DECLARE @object_name VARCHAR(128) = '{object_name}'; + DECLARE @schema_name VARCHAR(128) = {schema_name}; + DECLARE @object_name VARCHAR(128) = {object_name}; DECLARE @object_kind VARCHAR(128) = '{object_kind}'; - DECLARE @column_name VARCHAR(128) = '{column_name}'; + DECLARE @column_name VARCHAR(128) = {column_name}; DECLARE @existing sql_variant; SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name); @@ -522,14 +524,17 @@ def _build_create_comment_column_exp( EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name; END """) + tsql_text = template.format( - comment=exp.Literal.string(column_comment).sql(dialect=self.dialect) - if column_comment is not None - else "NULL", - schema_name=table.db if table.db else "dbo", - object_name=table.name, + comment=exp.Literal.string(column_comment or "NULL").sql( + dialect=self.dialect, identify=False + ), + schema_name=exp.Literal.string(table.db or "dbo").sql( + dialect=self.dialect, identify=False + ), + object_name=exp.Literal.string(table.name).sql(dialect=self.dialect, identify=False), object_kind=table_kind, - column_name=column_name, + column_name=exp.Literal.string(column_name).sql(dialect=self.dialect, identify=False), ) return tsql_text