From afe8bec92f73bfb08bf4361f3eeacffd3664e096 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Sat, 30 May 2026 09:50:02 +0900 Subject: [PATCH 1/2] Add support for environment context --- pyiceberg/environment_context.py | 43 +++++++++++++++++++ pyiceberg/table/snapshots.py | 4 ++ pyiceberg/view/metadata.py | 3 +- tests/integration/test_deletes.py | 3 ++ tests/integration/test_inspect_table.py | 5 +++ .../test_writes/test_partitioned_writes.py | 15 +++++++ tests/integration/test_writes/test_writes.py | 25 +++++++++++ tests/table/test_snapshots.py | 7 +++ tests/test_environment_context.py | 34 +++++++++++++++ 9 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 pyiceberg/environment_context.py create mode 100644 tests/test_environment_context.py diff --git a/pyiceberg/environment_context.py b/pyiceberg/environment_context.py new file mode 100644 index 0000000000..8791ae247f --- /dev/null +++ b/pyiceberg/environment_context.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from importlib.metadata import version + + +class EnvironmentContext: + _PROPERTIES: dict[str, str] = { + "engine-name": "pyiceberg", + "engine-version": version("pyiceberg"), + } + + def __init__(self) -> None: + raise NotImplementedError("EnvironmentContext is a utility class and cannot be instantiated.") + + @classmethod + def get(cls) -> dict[str, str]: + """Return a read-only copy of all properties.""" + return cls._PROPERTIES.copy() + + @classmethod + def put(cls, key: str, value: str) -> None: + """Will add the given key/value pair in a global properties map.""" + cls._PROPERTIES[key] = value + + @classmethod + def remove(cls, key: str) -> str | None: + """Remove the key from the global properties map.""" + return cls._PROPERTIES.pop(key, None) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 7e4c6eb1ec..c18aeb00f8 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -25,6 +25,7 @@ from pydantic import Field, PrivateAttr, model_serializer +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.io import FileIO from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec @@ -402,6 +403,9 @@ def _update_totals(total_property: str, added_property: str, removed_property: s removed_property=REMOVED_EQUALITY_DELETES, ) + for key, value in EnvironmentContext.get().items(): + summary.__setitem__(key, value) + return summary diff --git a/pyiceberg/view/metadata.py b/pyiceberg/view/metadata.py index 33766040e3..648810b319 100644 --- a/pyiceberg/view/metadata.py +++ b/pyiceberg/view/metadata.py @@ -21,6 +21,7 @@ from pydantic import Field, RootModel, field_validator +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.schema import Schema from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties from pyiceberg.typedef import ViewVersion as ViewVersionLiteral @@ -51,7 +52,7 @@ class ViewVersion(IcebergBaseModel): """ID of the schema for the view version""" timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) """Timestamp when the version was created (ms from epoch)""" - summary: dict[str, str] = Field(default_factory=dict) + summary: dict[str, str] = Field(default_factory=lambda: EnvironmentContext.get()) """A string to string map of summary metadata about the version""" representations: list[ViewRepresentation] = Field() """A list of representations for the view definition""" diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index c49689b716..ca84d6f7ed 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -23,6 +23,7 @@ from pyspark.sql import SparkSession from pyiceberg.catalog.rest import RestCatalog +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual from pyiceberg.manifest import ManifestEntryStatus @@ -480,6 +481,8 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio "total-files-size": snapshots[2].summary["total-files-size"], "total-position-deletes": "1", "total-records": "4", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), }, ) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 03d4437d18..cf6f484b60 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -27,6 +27,7 @@ from pytest_lazy_fixtures import lf from pyiceberg.catalog import Catalog +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import ( And, @@ -277,6 +278,8 @@ def test_inspect_snapshots( ("total-files-size", str(file_size)), ("total-position-deletes", "0"), ("total-equality-deletes", "0"), + ("engine-name", "pyiceberg"), + ("engine-version", EnvironmentContext.get().get("engine-version")), ] # Delete @@ -290,6 +293,8 @@ def test_inspect_snapshots( ("total-files-size", "0"), ("total-position-deletes", "0"), ("total-equality-deletes", "0"), + ("engine-name", "pyiceberg"), + ("engine-version", EnvironmentContext.get().get("engine-version")), ] lhs = spark.table(f"{identifier}.snapshots").toPandas() diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 1d1488255f..abfb9bad10 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -25,6 +25,7 @@ from pyspark.sql import SparkSession from pyiceberg.catalog import Catalog +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.exceptions import NoSuchTableError from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -498,6 +499,8 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "3", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert summaries[1] == { @@ -511,6 +514,8 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-files-size": str(file_size * 2), "total-position-deletes": "0", "total-records": "6", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert summaries[2] == { "removed-files-size": str(file_size * 2), @@ -523,6 +528,8 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-files-size": "0", "total-data-files": "0", "total-records": "0", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert summaries[3] == { "changed-partition-count": "3", @@ -535,6 +542,8 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-files-size": str(file_size), "total-data-files": "3", "total-records": "3", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert summaries[4] == { "changed-partition-count": "3", @@ -547,6 +556,8 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-files-size": str(file_size * 2), "total-data-files": "6", "total-records": "6", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert "removed-files-size" in summaries[5] assert "total-files-size" in summaries[5] @@ -561,6 +572,8 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-files-size": summaries[5]["total-files-size"], "total-data-files": "2", "total-records": "2", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert "added-files-size" in summaries[6] assert "total-files-size" in summaries[6] @@ -575,6 +588,8 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-files-size": summaries[6]["total-files-size"], "total-data-files": "4", "total-records": "4", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 2a0c50a921..a111f4df8f 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -44,6 +44,7 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _dataframe_to_data_files @@ -231,6 +232,8 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "3", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } # Append @@ -244,6 +247,8 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-files-size": str(file_size * 2), "total-position-deletes": "0", "total-records": "6", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } # Delete @@ -257,6 +262,8 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-files-size": "0", "total-position-deletes": "0", "total-records": "0", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } # Append @@ -270,6 +277,8 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "3", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } @@ -326,6 +335,8 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal "total-files-size": summaries[0]["total-files-size"], "total-position-deletes": "0", "total-records": "5", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } # Java produces: # { @@ -367,6 +378,8 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal "total-files-size": summaries[1]["total-files-size"], "total-position-deletes": "0", "total-records": "4", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert len(tbl.scan().to_pandas()) == 4 @@ -831,6 +844,8 @@ def test_summaries_with_only_nulls( "total-files-size": "0", "total-position-deletes": "0", "total-records": "0", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert summaries[1] == { @@ -843,6 +858,8 @@ def test_summaries_with_only_nulls( "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "2", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert summaries[2] == { @@ -855,6 +872,8 @@ def test_summaries_with_only_nulls( "total-files-size": "0", "total-position-deletes": "0", "total-records": "0", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert summaries[3] == { @@ -864,6 +883,8 @@ def test_summaries_with_only_nulls( "total-files-size": "0", "total-position-deletes": "0", "total-records": "0", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } @@ -1156,6 +1177,8 @@ def test_inspect_snapshots( ("total-files-size", str(file_size)), ("total-position-deletes", "0"), ("total-equality-deletes", "0"), + ("engine-name", "pyiceberg"), + ("engine-version", EnvironmentContext.get().get("engine-version")), ] # Delete @@ -1169,6 +1192,8 @@ def test_inspect_snapshots( ("total-files-size", "0"), ("total-position-deletes", "0"), ("total-equality-deletes", "0"), + ("engine-name", "pyiceberg"), + ("engine-version", EnvironmentContext.get().get("engine-version")), ] lhs = spark.table(f"{identifier}.snapshots").toPandas() diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 077027f7b9..485a541630 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -19,6 +19,7 @@ import pytest +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -315,6 +316,8 @@ def test_merge_snapshot_summaries_empty() -> None: "total-files-size": "0", "total-position-deletes": "0", "total-equality-deletes": "0", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), }, ) @@ -349,6 +352,8 @@ def test_merge_snapshot_summaries_new_summary() -> None: "total-files-size": "4", "total-position-deletes": "5", "total-equality-deletes": "3", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), }, ) @@ -391,6 +396,8 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: "total-files-size": "5", "total-position-deletes": "6", "total-equality-deletes": "4", + "engine-name": "pyiceberg", + "engine-version": EnvironmentContext.get().get("engine-version"), } assert actual.additional_properties == expected diff --git a/tests/test_environment_context.py b/tests/test_environment_context.py new file mode 100644 index 0000000000..fe0b44fd75 --- /dev/null +++ b/tests/test_environment_context.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import re + +from pyiceberg.environment_context import EnvironmentContext + + +def test_default_value() -> None: + actual = EnvironmentContext.get() + assert len(actual) == 2 + assert actual["engine-name"] == "pyiceberg" + assert re.match(r"^\d+\.\d+\.\d+", actual["engine-version"]) + + +def test_put_and_remove() -> None: + EnvironmentContext.put("test-key", "test-value") + assert EnvironmentContext.get()["test-key"] == "test-value" + + EnvironmentContext.remove("test-key") + assert "test-key" not in EnvironmentContext.get() From 8f55791a75673955e4b58e4f1cc97015ab5aeaeb Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 25 Jun 2026 04:36:30 +0900 Subject: [PATCH 2/2] fixup! Add support for environment context --- pyiceberg/environment_context.py | 4 +- pyiceberg/table/snapshots.py | 2 +- pyiceberg/view/metadata.py | 3 +- tests/integration/test_deletes.py | 3 +- tests/integration/test_inspect_table.py | 57 ++-- .../test_writes/test_partitioned_writes.py | 199 ++++++----- tests/integration/test_writes/test_writes.py | 313 +++++++++--------- tests/integration/test_writes/utils.py | 9 + tests/table/test_snapshots.py | 9 +- tests/test_environment_context.py | 28 +- 10 files changed, 321 insertions(+), 306 deletions(-) diff --git a/pyiceberg/environment_context.py b/pyiceberg/environment_context.py index 8791ae247f..e9da38c874 100644 --- a/pyiceberg/environment_context.py +++ b/pyiceberg/environment_context.py @@ -15,13 +15,13 @@ # specific language governing permissions and limitations # under the License. -from importlib.metadata import version +from pyiceberg import __version__ class EnvironmentContext: _PROPERTIES: dict[str, str] = { "engine-name": "pyiceberg", - "engine-version": version("pyiceberg"), + "engine-version": __version__, } def __init__(self) -> None: diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index c18aeb00f8..bf2656e29b 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -404,7 +404,7 @@ def _update_totals(total_property: str, added_property: str, removed_property: s ) for key, value in EnvironmentContext.get().items(): - summary.__setitem__(key, value) + summary[key] = value return summary diff --git a/pyiceberg/view/metadata.py b/pyiceberg/view/metadata.py index 648810b319..33766040e3 100644 --- a/pyiceberg/view/metadata.py +++ b/pyiceberg/view/metadata.py @@ -21,7 +21,6 @@ from pydantic import Field, RootModel, field_validator -from pyiceberg.environment_context import EnvironmentContext from pyiceberg.schema import Schema from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties from pyiceberg.typedef import ViewVersion as ViewVersionLiteral @@ -52,7 +51,7 @@ class ViewVersion(IcebergBaseModel): """ID of the schema for the view version""" timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) """Timestamp when the version was created (ms from epoch)""" - summary: dict[str, str] = Field(default_factory=lambda: EnvironmentContext.get()) + summary: dict[str, str] = Field(default_factory=dict) """A string to string map of summary metadata about the version""" representations: list[ViewRepresentation] = Field() """A list of representations for the view definition""" diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index ca84d6f7ed..feab384c02 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -481,8 +481,7 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio "total-files-size": snapshots[2].summary["total-files-size"], "total-position-deletes": "1", "total-records": "4", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), + **EnvironmentContext.get(), }, ) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index cf6f484b60..c9ded5dcc0 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -54,6 +54,11 @@ TimestamptzType, ) + +def with_environment_context_tuples(summary: list[tuple[str, str]]) -> list[tuple[str, str]]: + return summary + list(EnvironmentContext.get().items()) + + TABLE_SCHEMA = Schema( NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False), NestedField(field_id=2, name="string", field_type=StringType(), required=False), @@ -268,34 +273,34 @@ def test_inspect_snapshots( assert file_size > 0 # Append - assert df["summary"][0].as_py() == [ - ("added-files-size", str(file_size)), - ("added-data-files", "1"), - ("added-records", "3"), - ("total-data-files", "1"), - ("total-delete-files", "0"), - ("total-records", "3"), - ("total-files-size", str(file_size)), - ("total-position-deletes", "0"), - ("total-equality-deletes", "0"), - ("engine-name", "pyiceberg"), - ("engine-version", EnvironmentContext.get().get("engine-version")), - ] + assert df["summary"][0].as_py() == with_environment_context_tuples( + [ + ("added-files-size", str(file_size)), + ("added-data-files", "1"), + ("added-records", "3"), + ("total-data-files", "1"), + ("total-delete-files", "0"), + ("total-records", "3"), + ("total-files-size", str(file_size)), + ("total-position-deletes", "0"), + ("total-equality-deletes", "0"), + ] + ) # Delete - assert df["summary"][1].as_py() == [ - ("removed-files-size", str(file_size)), - ("deleted-data-files", "1"), - ("deleted-records", "3"), - ("total-data-files", "0"), - ("total-delete-files", "0"), - ("total-records", "0"), - ("total-files-size", "0"), - ("total-position-deletes", "0"), - ("total-equality-deletes", "0"), - ("engine-name", "pyiceberg"), - ("engine-version", EnvironmentContext.get().get("engine-version")), - ] + assert df["summary"][1].as_py() == with_environment_context_tuples( + [ + ("removed-files-size", str(file_size)), + ("deleted-data-files", "1"), + ("deleted-records", "3"), + ("total-data-files", "0"), + ("total-delete-files", "0"), + ("total-records", "0"), + ("total-files-size", "0"), + ("total-position-deletes", "0"), + ("total-equality-deletes", "0"), + ] + ) lhs = spark.table(f"{identifier}.snapshots").toPandas() rhs = df.to_pandas() diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index abfb9bad10..2ca4d476dc 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -25,7 +25,6 @@ from pyspark.sql import SparkSession from pyiceberg.catalog import Catalog -from pyiceberg.environment_context import EnvironmentContext from pyiceberg.exceptions import NoSuchTableError from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -43,7 +42,7 @@ from pyiceberg.types import ( StringType, ) -from utils import TABLE_SCHEMA, _create_table +from utils import TABLE_SCHEMA, _create_table, with_environment_context @pytest.mark.integration @@ -488,109 +487,109 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro file_size = int(summaries[0]["added-files-size"]) assert file_size > 0 - assert summaries[0] == { - "changed-partition-count": "3", - "added-data-files": "3", - "added-files-size": str(file_size), - "added-records": "3", - "total-data-files": "3", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": str(file_size), - "total-position-deletes": "0", - "total-records": "3", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[0] == with_environment_context( + { + "changed-partition-count": "3", + "added-data-files": "3", + "added-files-size": str(file_size), + "added-records": "3", + "total-data-files": "3", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": str(file_size), + "total-position-deletes": "0", + "total-records": "3", + } + ) - assert summaries[1] == { - "changed-partition-count": "3", - "added-data-files": "3", - "added-files-size": str(file_size), - "added-records": "3", - "total-data-files": "6", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": str(file_size * 2), - "total-position-deletes": "0", - "total-records": "6", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } - assert summaries[2] == { - "removed-files-size": str(file_size * 2), - "changed-partition-count": "3", - "total-equality-deletes": "0", - "deleted-data-files": "6", - "total-position-deletes": "0", - "total-delete-files": "0", - "deleted-records": "6", - "total-files-size": "0", - "total-data-files": "0", - "total-records": "0", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } - assert summaries[3] == { - "changed-partition-count": "3", - "added-data-files": "3", - "total-equality-deletes": "0", - "added-records": "3", - "total-position-deletes": "0", - "added-files-size": str(file_size), - "total-delete-files": "0", - "total-files-size": str(file_size), - "total-data-files": "3", - "total-records": "3", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } - assert summaries[4] == { - "changed-partition-count": "3", - "added-data-files": "3", - "total-equality-deletes": "0", - "added-records": "3", - "total-position-deletes": "0", - "added-files-size": str(file_size), - "total-delete-files": "0", - "total-files-size": str(file_size * 2), - "total-data-files": "6", - "total-records": "6", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[1] == with_environment_context( + { + "changed-partition-count": "3", + "added-data-files": "3", + "added-files-size": str(file_size), + "added-records": "3", + "total-data-files": "6", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": str(file_size * 2), + "total-position-deletes": "0", + "total-records": "6", + } + ) + assert summaries[2] == with_environment_context( + { + "removed-files-size": str(file_size * 2), + "changed-partition-count": "3", + "total-equality-deletes": "0", + "deleted-data-files": "6", + "total-position-deletes": "0", + "total-delete-files": "0", + "deleted-records": "6", + "total-files-size": "0", + "total-data-files": "0", + "total-records": "0", + } + ) + assert summaries[3] == with_environment_context( + { + "changed-partition-count": "3", + "added-data-files": "3", + "total-equality-deletes": "0", + "added-records": "3", + "total-position-deletes": "0", + "added-files-size": str(file_size), + "total-delete-files": "0", + "total-files-size": str(file_size), + "total-data-files": "3", + "total-records": "3", + } + ) + assert summaries[4] == with_environment_context( + { + "changed-partition-count": "3", + "added-data-files": "3", + "total-equality-deletes": "0", + "added-records": "3", + "total-position-deletes": "0", + "added-files-size": str(file_size), + "total-delete-files": "0", + "total-files-size": str(file_size * 2), + "total-data-files": "6", + "total-records": "6", + } + ) assert "removed-files-size" in summaries[5] assert "total-files-size" in summaries[5] - assert summaries[5] == { - "removed-files-size": summaries[5]["removed-files-size"], - "changed-partition-count": "2", - "total-equality-deletes": "0", - "deleted-data-files": "4", - "total-position-deletes": "0", - "total-delete-files": "0", - "deleted-records": "4", - "total-files-size": summaries[5]["total-files-size"], - "total-data-files": "2", - "total-records": "2", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[5] == with_environment_context( + { + "removed-files-size": summaries[5]["removed-files-size"], + "changed-partition-count": "2", + "total-equality-deletes": "0", + "deleted-data-files": "4", + "total-position-deletes": "0", + "total-delete-files": "0", + "deleted-records": "4", + "total-files-size": summaries[5]["total-files-size"], + "total-data-files": "2", + "total-records": "2", + } + ) assert "added-files-size" in summaries[6] assert "total-files-size" in summaries[6] - assert summaries[6] == { - "changed-partition-count": "2", - "added-data-files": "2", - "total-equality-deletes": "0", - "added-records": "2", - "total-position-deletes": "0", - "added-files-size": summaries[6]["added-files-size"], - "total-delete-files": "0", - "total-files-size": summaries[6]["total-files-size"], - "total-data-files": "4", - "total-records": "4", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[6] == with_environment_context( + { + "changed-partition-count": "2", + "added-data-files": "2", + "total-equality-deletes": "0", + "added-records": "2", + "total-position-deletes": "0", + "added-files-size": summaries[6]["added-files-size"], + "total-delete-files": "0", + "total-files-size": summaries[6]["total-files-size"], + "total-data-files": "4", + "total-records": "4", + } + ) @pytest.mark.integration diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index a111f4df8f..606684099a 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -44,7 +44,6 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog -from pyiceberg.environment_context import EnvironmentContext from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _dataframe_to_data_files @@ -67,7 +66,7 @@ UUIDType, ) from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion -from utils import TABLE_SCHEMA, _create_table +from utils import TABLE_SCHEMA, _create_table, with_environment_context, with_environment_context_tuples @pytest.fixture(scope="session", autouse=True) @@ -222,64 +221,64 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi assert file_size > 0 # Append - assert summaries[0] == { - "added-data-files": "1", - "added-files-size": str(file_size), - "added-records": "3", - "total-data-files": "1", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": str(file_size), - "total-position-deletes": "0", - "total-records": "3", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[0] == with_environment_context( + { + "added-data-files": "1", + "added-files-size": str(file_size), + "added-records": "3", + "total-data-files": "1", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": str(file_size), + "total-position-deletes": "0", + "total-records": "3", + } + ) # Append - assert summaries[1] == { - "added-data-files": "1", - "added-files-size": str(file_size), - "added-records": "3", - "total-data-files": "2", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": str(file_size * 2), - "total-position-deletes": "0", - "total-records": "6", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[1] == with_environment_context( + { + "added-data-files": "1", + "added-files-size": str(file_size), + "added-records": "3", + "total-data-files": "2", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": str(file_size * 2), + "total-position-deletes": "0", + "total-records": "6", + } + ) # Delete - assert summaries[2] == { - "deleted-data-files": "2", - "deleted-records": "6", - "removed-files-size": str(file_size * 2), - "total-data-files": "0", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": "0", - "total-position-deletes": "0", - "total-records": "0", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[2] == with_environment_context( + { + "deleted-data-files": "2", + "deleted-records": "6", + "removed-files-size": str(file_size * 2), + "total-data-files": "0", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "0", + "total-position-deletes": "0", + "total-records": "0", + } + ) # Append - assert summaries[3] == { - "added-data-files": "1", - "added-files-size": str(file_size), - "added-records": "3", - "total-data-files": "1", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": str(file_size), - "total-position-deletes": "0", - "total-records": "3", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[3] == with_environment_context( + { + "added-data-files": "1", + "added-files-size": str(file_size), + "added-records": "3", + "total-data-files": "1", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": str(file_size), + "total-position-deletes": "0", + "total-records": "3", + } + ) @pytest.mark.integration @@ -324,20 +323,20 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal # APPEND assert "added-files-size" in summaries[0] assert "total-files-size" in summaries[0] - assert summaries[0] == { - "added-data-files": "3", - "added-files-size": summaries[0]["added-files-size"], - "added-records": "5", - "changed-partition-count": "3", - "total-data-files": "3", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": summaries[0]["total-files-size"], - "total-position-deletes": "0", - "total-records": "5", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[0] == with_environment_context( + { + "added-data-files": "3", + "added-files-size": summaries[0]["added-files-size"], + "added-records": "5", + "changed-partition-count": "3", + "total-data-files": "3", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": summaries[0]["total-files-size"], + "total-position-deletes": "0", + "total-records": "5", + } + ) # Java produces: # { # "added-data-files": "1", @@ -364,23 +363,23 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal assert "added-files-size" in summaries[1] assert "removed-files-size" in summaries[1] assert "total-files-size" in summaries[1] - assert summaries[1] == { - "added-data-files": "1", - "added-files-size": summaries[1]["added-files-size"], - "added-records": "2", - "changed-partition-count": "1", - "deleted-data-files": "1", - "deleted-records": "3", - "removed-files-size": summaries[1]["removed-files-size"], - "total-data-files": "3", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": summaries[1]["total-files-size"], - "total-position-deletes": "0", - "total-records": "4", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[1] == with_environment_context( + { + "added-data-files": "1", + "added-files-size": summaries[1]["added-files-size"], + "added-records": "2", + "changed-partition-count": "1", + "deleted-data-files": "1", + "deleted-records": "3", + "removed-files-size": summaries[1]["removed-files-size"], + "total-data-files": "3", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": summaries[1]["total-files-size"], + "total-position-deletes": "0", + "total-records": "4", + } + ) assert len(tbl.scan().to_pandas()) == 4 @@ -837,55 +836,55 @@ def test_summaries_with_only_nulls( file_size = int(summaries[1]["added-files-size"]) assert file_size > 0 - assert summaries[0] == { - "total-data-files": "0", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": "0", - "total-position-deletes": "0", - "total-records": "0", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[0] == with_environment_context( + { + "total-data-files": "0", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "0", + "total-position-deletes": "0", + "total-records": "0", + } + ) - assert summaries[1] == { - "added-data-files": "1", - "added-files-size": str(file_size), - "added-records": "2", - "total-data-files": "1", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": str(file_size), - "total-position-deletes": "0", - "total-records": "2", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[1] == with_environment_context( + { + "added-data-files": "1", + "added-files-size": str(file_size), + "added-records": "2", + "total-data-files": "1", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": str(file_size), + "total-position-deletes": "0", + "total-records": "2", + } + ) - assert summaries[2] == { - "deleted-data-files": "1", - "deleted-records": "2", - "removed-files-size": str(file_size), - "total-data-files": "0", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": "0", - "total-position-deletes": "0", - "total-records": "0", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[2] == with_environment_context( + { + "deleted-data-files": "1", + "deleted-records": "2", + "removed-files-size": str(file_size), + "total-data-files": "0", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "0", + "total-position-deletes": "0", + "total-records": "0", + } + ) - assert summaries[3] == { - "total-data-files": "0", - "total-delete-files": "0", - "total-equality-deletes": "0", - "total-files-size": "0", - "total-position-deletes": "0", - "total-records": "0", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), - } + assert summaries[3] == with_environment_context( + { + "total-data-files": "0", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "0", + "total-position-deletes": "0", + "total-records": "0", + } + ) @pytest.mark.integration @@ -1167,34 +1166,34 @@ def test_inspect_snapshots( assert file_size > 0 # Append - assert df["summary"][0].as_py() == [ - ("added-files-size", str(file_size)), - ("added-data-files", "1"), - ("added-records", "3"), - ("total-data-files", "1"), - ("total-delete-files", "0"), - ("total-records", "3"), - ("total-files-size", str(file_size)), - ("total-position-deletes", "0"), - ("total-equality-deletes", "0"), - ("engine-name", "pyiceberg"), - ("engine-version", EnvironmentContext.get().get("engine-version")), - ] + assert df["summary"][0].as_py() == with_environment_context_tuples( + [ + ("added-files-size", str(file_size)), + ("added-data-files", "1"), + ("added-records", "3"), + ("total-data-files", "1"), + ("total-delete-files", "0"), + ("total-records", "3"), + ("total-files-size", str(file_size)), + ("total-position-deletes", "0"), + ("total-equality-deletes", "0"), + ] + ) # Delete - assert df["summary"][1].as_py() == [ - ("removed-files-size", str(file_size)), - ("deleted-data-files", "1"), - ("deleted-records", "3"), - ("total-data-files", "0"), - ("total-delete-files", "0"), - ("total-records", "0"), - ("total-files-size", "0"), - ("total-position-deletes", "0"), - ("total-equality-deletes", "0"), - ("engine-name", "pyiceberg"), - ("engine-version", EnvironmentContext.get().get("engine-version")), - ] + assert df["summary"][1].as_py() == with_environment_context_tuples( + [ + ("removed-files-size", str(file_size)), + ("deleted-data-files", "1"), + ("deleted-records", "3"), + ("total-data-files", "0"), + ("total-delete-files", "0"), + ("total-records", "0"), + ("total-files-size", "0"), + ("total-position-deletes", "0"), + ("total-equality-deletes", "0"), + ] + ) lhs = spark.table(f"{identifier}.snapshots").toPandas() rhs = df.to_pandas() diff --git a/tests/integration/test_writes/utils.py b/tests/integration/test_writes/utils.py index 4ab54d97e7..967f37e957 100644 --- a/tests/integration/test_writes/utils.py +++ b/tests/integration/test_writes/utils.py @@ -20,6 +20,7 @@ import pyarrow as pa from pyiceberg.catalog import Catalog +from pyiceberg.environment_context import EnvironmentContext from pyiceberg.exceptions import NoSuchTableError from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -79,3 +80,11 @@ def _create_table( tbl.append(d) return tbl + + +def with_environment_context(summary: dict[str, str]) -> dict[str, str]: + return {**summary, **EnvironmentContext.get()} + + +def with_environment_context_tuples(summary: list[tuple[str, str]]) -> list[tuple[str, str]]: + return summary + list(EnvironmentContext.get().items()) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 485a541630..8374184b30 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -316,8 +316,7 @@ def test_merge_snapshot_summaries_empty() -> None: "total-files-size": "0", "total-position-deletes": "0", "total-equality-deletes": "0", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), + **EnvironmentContext.get(), }, ) @@ -352,8 +351,7 @@ def test_merge_snapshot_summaries_new_summary() -> None: "total-files-size": "4", "total-position-deletes": "5", "total-equality-deletes": "3", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), + **EnvironmentContext.get(), }, ) @@ -396,8 +394,7 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: "total-files-size": "5", "total-position-deletes": "6", "total-equality-deletes": "4", - "engine-name": "pyiceberg", - "engine-version": EnvironmentContext.get().get("engine-version"), + **EnvironmentContext.get(), } assert actual.additional_properties == expected diff --git a/tests/test_environment_context.py b/tests/test_environment_context.py index fe0b44fd75..2ee3e6fb53 100644 --- a/tests/test_environment_context.py +++ b/tests/test_environment_context.py @@ -14,21 +14,29 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import re - +from pyiceberg import __version__ from pyiceberg.environment_context import EnvironmentContext def test_default_value() -> None: - actual = EnvironmentContext.get() - assert len(actual) == 2 - assert actual["engine-name"] == "pyiceberg" - assert re.match(r"^\d+\.\d+\.\d+", actual["engine-version"]) + assert EnvironmentContext.get() == { + "engine-name": "pyiceberg", + "engine-version": __version__, + } -def test_put_and_remove() -> None: - EnvironmentContext.put("test-key", "test-value") - assert EnvironmentContext.get()["test-key"] == "test-value" +def test_get_returns_copy() -> None: + actual = EnvironmentContext.get() + actual["test-key"] = "test-value" - EnvironmentContext.remove("test-key") assert "test-key" not in EnvironmentContext.get() + + +def test_put_and_remove() -> None: + try: + EnvironmentContext.put("test-key", "test-value") + assert EnvironmentContext.get()["test-key"] == "test-value" + assert EnvironmentContext.remove("test-key") == "test-value" + assert "test-key" not in EnvironmentContext.get() + finally: + EnvironmentContext.remove("test-key")