Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
except ImportError:
bigquery_magics = None

if sys.version_info < (3, 10):
if sys.version_info < (3, 10): # pragma: NO COVER

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer we add a comment explaining why this is safe. In this case, it's to protect from the user somehow installing this despite us dropping support in #17187

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the comment on the version check: I looked at the gapic-generator-python templates and how this is handled across the rest of the monorepo. The generator templates do not include an explanatory comment here, and the vast majority of our libraries don't use one. The code is essentially self-documenting since the warnings.warn string states why the check is firing (to warn users who are on an unsupported Python version). To maintain parity with our generated code and the rest of the libraries, I'd prefer to leave the comment off.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the comment on the version check: I looked at the gapic-generator-python templates and how this is handled across the rest of the monorepo. The generator templates do not include an explanatory comment here, and the vast majority of our libraries don't use one. The code is essentially self-documenting since the warnings.warn string states why the check is firing (to warn users who are on an unsupported Python version). To maintain parity with our generated code and the rest of the libraries, I'd prefer to leave the comment off.

warnings.warn(
"The python-bigquery library no longer supports Python <= 3.9. "
f"Your Python version is {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}. We "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def close(self):

if self._owns_bqstorage_client:
# There is no close() on the BQ Storage client itself.
self._bqstorage_client._transport.grpc_channel.close()
self._bqstorage_client._transport.close()

for cursor_ in self._cursors_created:
if not cursor_._closed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,4 @@ def _close_transports(client, bqstorage_client):
"""
client.close()
if bqstorage_client is not None:
bqstorage_client._transport.grpc_channel.close()
bqstorage_client._transport.close()
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,9 @@ def to_arrow(
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client._transport.grpc_channel.close() # type: ignore
# mypy: bqstorage_client is guaranteed to be not None when owns_bqstorage_client is True,
# but mypy cannot infer this correlation. We ignore the union-attr error here.
bqstorage_client._transport.close() # type: ignore[union-attr]

if record_batches and bqstorage_client is not None:
return pyarrow.Table.from_batches(record_batches)
Expand Down
28 changes: 27 additions & 1 deletion packages/google-cloud-bigquery/tests/system/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import datetime
import decimal
import uuid
Expand All @@ -21,7 +22,6 @@

from google.cloud._helpers import UTC


_naive = datetime.datetime(2016, 12, 5, 12, 41, 9)
_naive_microseconds = datetime.datetime(2016, 12, 5, 12, 41, 9, 250000)
_stamp = "%s %s" % (_naive.date().isoformat(), _naive.time().isoformat())
Expand Down Expand Up @@ -104,3 +104,29 @@ def _rate_limit_exceeded(forbidden):
google.api_core.exceptions.Forbidden,
error_predicate=_rate_limit_exceeded,
)


@contextlib.contextmanager
def patch_tracked_requests():
"""Context manager to patch google-auth requests and track/close their HTTP sessions.

This prevents socket leaks in system tests that use Workload Identity or metadata server auth.
"""
import google.auth.transport.requests

original_init = google.auth.transport.requests.Request.__init__
tracked_requests = []

def patched_init(self, session=None):
original_init(self, session=session)
if session is None:
tracked_requests.append(self)

google.auth.transport.requests.Request.__init__ = patched_init
try:
yield tracked_requests
finally:
google.auth.transport.requests.Request.__init__ = original_init
for req in tracked_requests:
if hasattr(req, "session") and req.session is not None:
req.session.close()
75 changes: 43 additions & 32 deletions packages/google-cloud-bigquery/tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

from . import helpers


JOB_TIMEOUT = 120 # 2 minutes
DATA_PATH = pathlib.Path(__file__).parent.parent / "data"

Expand Down Expand Up @@ -234,23 +233,29 @@ def _create_bucket(self, bucket_name, location=None):

def test_close_releases_open_sockets(self):
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with helpers.patch_tracked_requests():
client = Config.CLIENT
client.query(
"""
SELECT
source_year AS year, COUNT(is_male) AS birth_count
FROM `bigquery-public-data.samples.natality`
GROUP BY year
ORDER BY year DESC
LIMIT 15
"""
)

client = Config.CLIENT
client.query(
"""
SELECT
source_year AS year, COUNT(is_male) AS birth_count
FROM `bigquery-public-data.samples.natality`
GROUP BY year
ORDER BY year DESC
LIMIT 15
"""
)
client.close()

client.close()
import gc

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've recently experienced pytest still hanging onto resources even after explicit del calls (pola-rs/polars-bigquery-client#3). Might make sense to add a layer of indirection like I did in https://github.com/pola-rs/polars-bigquery-client/pull/3/changes#diff-d9ee89704040c8f986c8e0a4ab3757806eebe9caa54073a72a3672b5626df3cf

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed the polars-bigquery PR. While pytest's reference-holding is a concern, I believe this PR is less susceptible because we’ve moved toward explicit closure of the transport and auth sessions (via the tracker) rather than relying solely on object destruction. Since we aren't dealing with Rust-backed objects that have opaque lifecycles, the explicit .close() calls should release the underlying sockets even if pytest lingers on the Python wrapper. I'll monitor for flakiness, but the current gc.collect() approach seems to resolve the leaks in my testing.


conn_count_end = len(current_process.net_connections())
gc.collect()
conn_end = current_process.net_connections()
conn_count_end = len(conn_end)
self.assertLessEqual(conn_count_end, conn_count_start)

def test_create_dataset(self):
Expand Down Expand Up @@ -2174,25 +2179,31 @@ def test_dbapi_dry_run_query(self):
def test_dbapi_connection_does_not_leak_sockets(self):
pytest.importorskip("google.cloud.bigquery_storage")
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())

# Provide no explicit clients, so that the connection will create and own them.
connection = dbapi.connect()
cursor = connection.cursor()

cursor.execute(
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with helpers.patch_tracked_requests():
# Provide no explicit clients, so that the connection will create and own them.
connection = dbapi.connect()
cursor = connection.cursor()

cursor.execute(
"""
SELECT id, `by`, timestamp
FROM `bigquery-public-data.hacker_news.full`
ORDER BY `id` ASC
LIMIT 100000
"""
SELECT id, `by`, timestamp
FROM `bigquery-public-data.hacker_news.full`
ORDER BY `id` ASC
LIMIT 100000
"""
)
rows = cursor.fetchall()
self.assertEqual(len(rows), 100000)
)
rows = cursor.fetchall()
self.assertEqual(len(rows), 100000)

connection.close()
import gc

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this function re: splitting the gc logic tests into a separate module.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment.


connection.close()
conn_count_end = len(current_process.net_connections())
gc.collect()
conn_end = current_process.net_connections()
conn_count_end = len(conn_end)
self.assertLessEqual(conn_count_end, conn_count_start)

def _load_table_for_dml(self, rows, dataset_id, table_id):
Expand Down
46 changes: 25 additions & 21 deletions packages/google-cloud-bigquery/tests/system/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pytest
import psutil

from . import helpers

IPython = pytest.importorskip("IPython")
io = pytest.importorskip("IPython.utils.io")
Expand Down Expand Up @@ -48,27 +49,30 @@ def ipython_interactive(ipython):
def test_bigquery_magic(ipython_interactive):
ip = IPython.get_ipython()
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())

# Deprecated, but should still work in google-cloud-bigquery 3.x.
with pytest.warns(FutureWarning, match="bigquery_magics"):
ip.extension_manager.load_extension("google.cloud.bigquery")

sql = """
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10
"""
with io.capture_output() as captured:
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)

conn_count_end = len(current_process.net_connections())
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with helpers.patch_tracked_requests():
# Deprecated, but should still work in google-cloud-bigquery 3.x.
with pytest.warns(FutureWarning, match="bigquery_magics"):
ip.extension_manager.load_extension("google.cloud.bigquery")

sql = """
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10
"""
with io.capture_output() as captured:
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)

conn_end = current_process.net_connections()
conn_count_end = len(conn_end)

lines = re.split("\n|\r", captured.stdout)
# Removes blanks & terminal code (result of display clearing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _mock_bqstorage_client(self):
from google.cloud import bigquery_storage

mock_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
mock_client._transport = mock.Mock(spec=["channel"])
mock_client._transport = mock.Mock(spec=["channel", "close"])
mock_client._transport.grpc_channel = mock.Mock(spec=["close"])
return mock_client

Expand Down Expand Up @@ -176,7 +176,7 @@ def test_close_closes_all_created_bigquery_clients(self):
connection.close()

self.assertTrue(client.close.called)
self.assertTrue(bqstorage_client._transport.grpc_channel.close.called)
self.assertTrue(bqstorage_client._transport.close.called)

def test_close_does_not_close_bigquery_clients_passed_to_it(self):
pytest.importorskip("google.cloud.bigquery_storage")
Expand All @@ -187,7 +187,7 @@ def test_close_does_not_close_bigquery_clients_passed_to_it(self):
connection.close()

self.assertFalse(client.close.called)
self.assertFalse(bqstorage_client._transport.grpc_channel.close.called)
self.assertFalse(bqstorage_client._transport.close.called)

def test_close_closes_all_created_cursors(self):
connection = self._make_one(client=self._mock_client())
Expand Down
7 changes: 2 additions & 5 deletions packages/google-cloud-bigquery/tests/unit/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

@pytest.fixture()
def use_local_magics_context(monkeypatch):
if magics is not None:
if magics is not None: # pragma: NO COVER
local_context = magics.Context()
local_context._project = "unit-test-project"
mock_credentials = mock.create_autospec(
Expand Down Expand Up @@ -2195,13 +2195,10 @@ def test_bigquery_magic_create_dataset_fails(monkeypatch):


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_with_location(monkeypatch):
def test_bigquery_magic_with_location(monkeypatch, use_local_magics_context):
ip = IPython.get_ipython()
monkeypatch.setattr(bigquery, "bigquery_magics", None)
bigquery.load_ipython_extension(ip)
magics.context.credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

run_query_patch = mock.patch(
"google.cloud.bigquery.magics.magics._run_query", autospec=True
Expand Down
Loading
Loading