Skip to content

Commit 9b380b4

Browse files
committed
fix(cncf-kubernetes): use Configuration.socket_options instead of monkey-patching urllib3 (#68396)
The `enable_tcp_keepalive` config option in the Kubernetes provider relied on monkey-patching urllib3's default_socket_options. In urllib3 v2.x, the socket_options parameter in HTTPConnection.__init__ is evaluated as a default argument at import time, so post-import changes are never picked up by new connections. Fix by passing socket options through the Kubernetes client's Configuration.socket_options field, which is properly threaded through ApiClient -> RESTClientObject -> urllib3.PoolManager. Also includes TCP_NODELAY in the socket options to preserve the default urllib3 behavior of disabling Nagle's algorithm. Closes: #68396
1 parent b91394a commit 9b380b4

3 files changed

Lines changed: 33 additions & 39 deletions

File tree

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_client.py

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,33 @@ def _disable_verify_ssl() -> None:
5151
_import_err = e
5252

5353

54-
def _enable_tcp_keepalive() -> None:
54+
def _enable_tcp_keepalive(configuration: Configuration) -> None:
5555
"""
56-
Enable TCP keepalive mechanism.
56+
Enable TCP keepalive mechanism on the provided Kubernetes client configuration.
5757
58-
This prevents urllib3 connection to hang indefinitely when idle connection
59-
is time-outed on services like cloud load balancers or firewalls.
58+
This prevents urllib3 connections from hanging indefinitely when an idle
59+
connection is timed out by services like cloud load balancers or firewalls.
6060
61-
See https://github.com/apache/airflow/pull/11406 for detailed explanation.
61+
Uses the ``socket_options`` field on the Kubernetes ``Configuration`` object,
62+
which is threaded through to the underlying urllib3 ``PoolManager`` and
63+
``HTTPConnection``, rather than monkey-patching urllib3 connection defaults
64+
(which no longer works with urllib3 v2.x).
65+
66+
See https://github.com/apache/airflow/pull/11406 for the original discussion
67+
and https://github.com/apache/airflow/issues/68396 for the urllib3 v2 fix.
6268
6369
Please ping @michalmisiewicz or @dimberman in the PR if you want to modify this function.
6470
"""
6571
import socket
6672

67-
from urllib3.connection import HTTPConnection, HTTPSConnection
68-
6973
tcp_keep_idle = conf.getint("kubernetes_executor", "tcp_keep_idle")
7074
tcp_keep_intvl = conf.getint("kubernetes_executor", "tcp_keep_intvl")
7175
tcp_keep_cnt = conf.getint("kubernetes_executor", "tcp_keep_cnt")
7276

73-
socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
77+
socket_options: list[tuple[int, int, int | bytes]] = [
78+
(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),
79+
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
80+
]
7481

7582
if hasattr(socket, "TCP_KEEPIDLE"):
7683
socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, tcp_keep_idle))
@@ -87,17 +94,7 @@ def _enable_tcp_keepalive() -> None:
8794
else:
8895
log.debug("Unable to set TCP_KEEPCNT on this platform")
8996

90-
# Cast both the default options and our socket options
91-
socket_options_cast: list[tuple[int, int, int | bytes]] = [
92-
(level, opt, val) for level, opt, val in socket_options
93-
]
94-
default_options_cast: list[tuple[int, int, int | bytes]] = [
95-
(level, opt, val) for level, opt, val in HTTPSConnection.default_socket_options
96-
]
97-
98-
# Then use the cast versions for both HTTPS and HTTP
99-
HTTPSConnection.default_socket_options = default_options_cast + socket_options_cast
100-
HTTPConnection.default_socket_options = default_options_cast + socket_options_cast
97+
configuration.socket_options = socket_options
10198

10299

103100
def get_kube_client(
@@ -118,10 +115,10 @@ def get_kube_client(
118115
if not has_kubernetes:
119116
raise _import_err
120117

121-
if conf.getboolean("kubernetes_executor", "enable_tcp_keepalive"):
122-
_enable_tcp_keepalive()
123-
124118
configuration = _get_default_configuration()
119+
120+
if conf.getboolean("kubernetes_executor", "enable_tcp_keepalive"):
121+
_enable_tcp_keepalive(configuration)
125122
api_client_retry_configuration = conf.getjson(
126123
"kubernetes_executor", "api_client_retry_configuration", fallback={}
127124
)

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_client.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,10 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
import socket
2019
from unittest import mock
2120

2221
import pytest
2322
from kubernetes.client import Configuration
24-
from urllib3.connection import HTTPConnection, HTTPSConnection
25-
2623
from airflow.providers.cncf.kubernetes.kube_client import (
2724
_disable_verify_ssl,
2825
_enable_tcp_keepalive,
@@ -65,19 +62,19 @@ def test_load_config_ssl_ca_cert(self, conf, config):
6562

6663
@pytest.mark.platform("linux")
6764
def test_enable_tcp_keepalive(self):
68-
socket_options = [
69-
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
70-
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 120),
71-
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30),
72-
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6),
73-
]
74-
expected_http_connection_options = HTTPConnection.default_socket_options + socket_options
75-
expected_https_connection_options = HTTPSConnection.default_socket_options + socket_options
76-
77-
_enable_tcp_keepalive()
78-
79-
assert HTTPConnection.default_socket_options == expected_http_connection_options
80-
assert HTTPSConnection.default_socket_options == expected_https_connection_options
65+
import socket
66+
67+
configuration = Configuration()
68+
assert configuration.socket_options is None
69+
70+
_enable_tcp_keepalive(configuration)
71+
72+
assert configuration.socket_options is not None
73+
assert configuration.socket_options[0] == (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
74+
assert configuration.socket_options[1] == (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
75+
assert configuration.socket_options[2] == (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 120)
76+
assert configuration.socket_options[3] == (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)
77+
assert configuration.socket_options[4] == (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6)
8178

8279
def test_disable_verify_ssl(self):
8380
configuration = Configuration()

providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def get_conn(self) -> client.ApiClient:
7676
configuration = self._get_config()
7777
configuration.refresh_api_key_hook = self._refresh_api_key_hook
7878
if self.enable_tcp_keepalive:
79-
_enable_tcp_keepalive()
79+
_enable_tcp_keepalive(configuration)
8080
return client.ApiClient(configuration)
8181

8282
def _refresh_api_key_hook(self, configuration: client.configuration.Configuration):

0 commit comments

Comments
 (0)