Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ValueError: Invalid file descriptor: -1 when building KafkaAdminClient #177

Closed
etripier opened this issue Apr 5, 2024 · 7 comments
Closed
Labels
bug Something isn't working

Comments

@etripier
Copy link

etripier commented Apr 5, 2024

  File "/usr/local/lib/python3.11/site-packages/kafka/admin/client.py", line 369, in _send_request_to_node
    self._client.poll()
  File "/usr/local/lib/python3.11/site-packages/kafka/client_async.py", line 594, in poll
    self._poll(timeout / 1000)
  File "/usr/local/lib/python3.11/site-packages/kafka/client_async.py", line 649, in _poll
    self._selector.modify(
  File "/usr/local/lib/python3.11/selectors.py", line 377, in modify
    key = self._fd_to_key[self._fileobj_lookup(fileobj)]
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/selectors.py", line 225, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/selectors.py", line 42, in _fileobj_to_fd
    raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1

We started seeing this error after upgrading to 2.1.0. We think it's caused by https://github.com/wbarnha/kafka-python-ng/pull/156/files. We'll put a repro case together later if we can, but in the meantime - do you have an idea of what could be going wrong?

@wbarnha
Copy link
Collaborator

wbarnha commented Apr 6, 2024

I'm surprised this isn't picked up in the integration tests. If you can put together a reproducible example for me, that would be appreciated!

@wbarnha wbarnha added the bug Something isn't working label Apr 6, 2024
@dingxiong
Copy link

dingxiong commented Apr 6, 2024

Hi @wbarnha, I work with @etripier in the same company.
I think the PR mentioned above broke the SSL connection.

Below is how I reproduce the issue in a debian virtual machine.

> cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 11 (bullseye)"

First, generate needed CA and server side certificate.

mkdir -p /tmp/test
cd /tmp/test

# generate private key `ca-key` and CA root `ca-cert`
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:1234 -subj /C=CN/ST=GuangDong/L=Guangzhou/O="Localhost Server"/CN=not-important

# generate server private key `server.key` and certificate signing reqest `server.scr`
openssl req -new -nodes -keyout server.key -out server.csr -days 365 -passout pass:1234 -subj /CN=localhost

# sign it 
openssl x509 -req -in server.csr -CA ca-cert -CAkey ca-key -CAcreateserial -out server.pem -passin pass:1234

# combine key and certificate.
touch server.keystore.pem
cat server.key >> server.keystore.pem
cat server.pem >> server.keystore.pem

Then, put below config inside /usr/local/kafka/config/server.properties

listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
ssl.keystore.type=PEM
ssl.keystore.location=/tmp/test/server.keystore.pem

and restart kafka sudo systemctl restart kafka.

Now, test the connection.

With kafka-python-ng==2.0.3

In [1]: import kafka

In [2]: c = kafka.KafkaAdminClient(client_id='test', bootstrap_servers='localhost:9093', security_protocol='SSL', ssl_cafile='/tmp/test/ca-cert')
[2024-04-06 17:25:00,829] INFO in conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9093 <connecting> [IPv4 ('127.0.0.1', 9093)]>: connecting to localhost:9093 [('127.0.0.1', 9093) IPv4]
[2024-04-06 17:25:00,829] INFO in conn: Probing node bootstrap-0 broker version
[2024-04-06 17:25:00,830] INFO in conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9093 <handshake> [IPv4 ('127.0.0.1', 9093)]>: Loading SSL CA from /tmp/test/ca-cert
[2024-04-06 17:25:00,858] INFO in conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9093 <handshake> [IPv4 ('127.0.0.1', 9093)]>: Connection complete.
[2024-04-06 17:25:00,961] INFO in conn: Broker version identified as 2.6.0
[2024-04-06 17:25:00,962] INFO in conn: Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
[2024-04-06 17:25:00,962] INFO in conn: Probing node bootstrap-0 broker version
[2024-04-06 17:25:01,064] INFO in conn: Broker version identified as 2.6.0
[2024-04-06 17:25:01,064] INFO in conn: Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
[2024-04-06 17:25:01,067] INFO in conn: <BrokerConnection node_id=0 host=localhost:9093 <connecting> [IPv4 ('127.0.0.1', 9093)]>: connecting to localhost:9093 [('127.0.0.1', 9093) IPv4]
[2024-04-06 17:25:01,067] INFO in conn: Probing node 0 broker version
[2024-04-06 17:25:01,068] INFO in conn: <BrokerConnection node_id=0 host=localhost:9093 <handshake> [IPv4 ('127.0.0.1', 9093)]>: Loading SSL CA from /tmp/test/ca-cert
[2024-04-06 17:25:01,082] INFO in conn: <BrokerConnection node_id=0 host=localhost:9093 <handshake> [IPv4 ('127.0.0.1', 9093)]>: Connection complete.
[2024-04-06 17:25:01,083] INFO in conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9093 <connected> [IPv4 ('127.0.0.1', 9093)]>: Closing connection.
[2024-04-06 17:25:01,185] INFO in conn: Broker version identified as 2.6.0
[2024-04-06 17:25:01,185] INFO in conn: Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup

With kafka-python-ng==2.2.1

In [5]: import kafka
In [6]: c = kafka.KafkaAdminClient(client_id='test', bootstrap_servers='localhost:9093', security_protocol='SSL', ssl_cafile='/tmp/test/ca-cert')
[2024-04-06 17:24:17,400] INFO in conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9093 <connecting> [IPv4 ('127.0.0.1', 9093)]>: connecting to localhost:9093 [('127.0.0.1', 9093) IPv4]
[2024-04-06 17:24:17,400] INFO in conn: Probing node bootstrap-0 broker version
[2024-04-06 17:24:17,401] INFO in conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9093 <handshake> [IPv4 ('127.0.0.1', 9093)]>: Loading SSL CA from /tmp/test/ca-cert
[2024-04-06 17:24:17,429] INFO in conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9093 <handshake> [IPv4 ('127.0.0.1', 9093)]>: Connection complete.
[2024-04-06 17:24:17,531] INFO in conn: Broker version identified as 2.6.0
[2024-04-06 17:24:17,531] INFO in conn: Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
[2024-04-06 17:24:17,531] INFO in conn: Probing node bootstrap-0 broker version
[2024-04-06 17:24:17,633] INFO in conn: Broker version identified as 2.6.0
[2024-04-06 17:24:17,633] INFO in conn: Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[6], line 1
----> 1 c = kafka.KafkaAdminClient(client_id='test', bootstrap_servers='localhost:9093', security_protocol='SSL', ssl_cafile='/tmp/test/ca-cert')

File ~/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kafka/admin/client.py:219, in KafkaAdminClient.__init__(self, **configs)
    216     self.config['api_version'] = self._client.config['api_version']
    218 self._closed = False
--> 219 self._refresh_controller_id()
    220 log.debug("KafkaAdminClient started.")

File ~/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kafka/admin/client.py:272, in KafkaAdminClient._refresh_controller_id(self)
    270 if 1 <= version <= 6:
    271     request = MetadataRequest[version]()
--> 272     future = self._send_request_to_node(self._client.least_loaded_node(), request)
    274     self._wait_for_futures([future])
    276     response = future.value

File ~/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kafka/admin/client.py:369, in KafkaAdminClient._send_request_to_node(self, node_id, request, wakeup)
    356 """Send a Kafka protocol message to a specific broker.
    357
    358 Returns a future that may be polled for status and results.
   (...)
    364 :exception: The exception if the message could not be sent.
    365 """
    366 while not self._client.ready(node_id):
    367     # poll until the connection to broker is ready, otherwise send()
    368     # will fail with NodeNotReadyError
--> 369     self._client.poll()
    370 return self._client.send(node_id, request, wakeup)

File ~/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kafka/client_async.py:594, in KafkaClient.poll(self, timeout_ms, future)
    591             timeout = min(timeout, self.config['retry_backoff_ms'])
    592         timeout = max(0, timeout)  # avoid negative timeouts
--> 594     self._poll(timeout / 1000)
    596 # called without the lock to avoid deadlock potential
    597 # if handlers need to acquire locks
    598 responses.extend(self._fire_pending_completed_requests())

File ~/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kafka/client_async.py:649, in KafkaClient._poll(self, timeout)
    645 if conn.send_pending_requests_v2():
    646     # If send is complete, we dont need to track write readiness
    647     # for this socket anymore
    648     if key.events ^ selectors.EVENT_WRITE:
--> 649         self._selector.modify(
    650             key.fileobj,
    651             key.events ^ selectors.EVENT_WRITE,
    652             key.data)
    653     else:
    654         self._selector.unregister(key.fileobj)

File ~/.pyenv/versions/3.11.3/lib/python3.11/selectors.py:377, in _PollLikeSelector.modify(self, fileobj, events, data)
    375 def modify(self, fileobj, events, data=None):
    376     try:
--> 377         key = self._fd_to_key[self._fileobj_lookup(fileobj)]
    378     except KeyError:
    379         raise KeyError(f"{fileobj!r} is not registered") from None

File ~/.pyenv/versions/3.11.3/lib/python3.11/selectors.py:225, in _BaseSelectorImpl._fileobj_lookup(self, fileobj)
    216 """Return a file descriptor from a file object.
    217
    218 This wraps _fileobj_to_fd() to do an exhaustive search in case
   (...)
    222 used by _SelectorMapping.
    223 """
    224 try:
--> 225     return _fileobj_to_fd(fileobj)
    226 except ValueError:
    227     # Do an exhaustive search.
    228     for key in self._fd_to_key.values():

File ~/.pyenv/versions/3.11.3/lib/python3.11/selectors.py:42, in _fileobj_to_fd(fileobj)
     39         raise ValueError("Invalid file object: "
     40                          "{!r}".format(fileobj)) from None
     41 if fd < 0:
---> 42     raise ValueError("Invalid file descriptor: {}".format(fd))
     43 return fd

ValueError: Invalid file descriptor: -1

@dingxiong
Copy link

dingxiong commented Apr 6, 2024

Hi @wbarnha
When SSL is turned on, the bootstrap socket will be wrapped as a SSLSocket. See code, and the underlying socket will be detached. Therefore the underlying socket will be closed, but the fd won't be closed, and this new SSLSocket takes ownership of fd.

I printed some logs, and find in this case, the key.fileob.fd = -1 but key.fd is not -1. I see quite a few places has below logic

try:
    self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
    self._selector.modify(sock, selectors.EVENT_WRITE, conn)

This logic has a problem when key.fileob.fd != key.fd. I presume below logic is better?

try:
    self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
    self._selector.unregister(sock)
    self._selector.register(sock, selectors.EVENT_WRITE, conn)

BTW, with below change, kafka.KafkaAdminClient works correctly.

diff --git a/kafka/client_async.py b/kafka/client_async.py
index b46b879..b285c0c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -266,7 +266,8 @@ class KafkaClient:
                 try:
                     self._selector.register(sock, selectors.EVENT_WRITE, conn)
                 except KeyError:
-                    self._selector.modify(sock, selectors.EVENT_WRITE, conn)
+                    self._selector.unregister(sock)
+                    self._selector.register(sock, selectors.EVENT_WRITE, conn)

                 if self.cluster.is_bootstrap(node_id):
                     self._last_bootstrap = time.time()

Also, I see there is function gen_ssl_resources, but it is not used in any test. So I assume tests never cover SSL connections?

I am happy to help. I can raise a PR to fix this issue and add integration tests for SSL connection as well.

@wbarnha
Copy link
Collaborator

wbarnha commented Apr 6, 2024

Hey, thanks for looking into this further! If you can open up a PR with the necessary changes (and tests hopefully), I'll get it merged in and released ASAP! Sorry for the inconvenience!

I'm glad we're catching these mistakes before they're merged into kafka-python. Imagine how many tickets would be getting opened?

@dingxiong
Copy link

Yeah, I'm happy to help. But now it's the weekend. I can do it next week if you can wait.

@dingxiong
Copy link

Hi @wbarnha this is my pr #178.
Please help take a look.

@wbarnha
Copy link
Collaborator

wbarnha commented Apr 10, 2024

Closed in #178.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants