mozillapulse consumers die with TimeoutError on Python 3.3+

RESOLVED WONTFIX

Status

defect
RESOLVED WONTFIX
a year ago
16 days ago

People

(Reporter: mars, Unassigned)

Tracking

Trunk
Points:
---

Firefox Tracking Flags

(Not tracked)

Details

I wrote a Pulse message consumer according to the project README.  I hooked the consumer up to a low-ish traffic queue during testing.  After a while the consumer died with "TimeoutError: [Errno 110] Connection timed out".

The traceback below indicates that the error comes from ssl.py.  TimeoutError was introduced in Python 3.3 (https://docs.python.org/3/whatsnew/3.3.html#pep-3151-reworking-the-os-and-io-exception-hierarchy).  I am running Python 3.6.4.

Checking the mozillapulse.consumer.GenericConsumer code, the consumer looks for the low-level socket.timeout exception and restarts the event loop (https://github.com/mozilla-services/mozillapulse/blob/4ceb179fbb33641a580ec38cc5c9fb3bf21a4d99/mozillapulse/consumers.py#L195).  However, ssl.py is raising the new Python 3.3+ TimeoutError, not the low-level socket.timeout.  The GenericConsumer event loop does not handle the new exception and my program dies.  (This may have not been caught in testing because the mozillapulse docs say testing is done without SSL and with an event frequency that doesn't trigger the timeout code.)


The full traceback:

22:09:48 worker.1   |  INFO:committelemetry.pulse:Listening to Pulse messages for exchange/hgpushes/v2 with topic integration/mozilla-inbound
22:09:48 worker.1   |  INFO:committelemetry.worker:Running the Pulse worker
22:09:48 worker.1   |  DEBUG:amqp:Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_binding
s': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': 
True, 'per_consumer_qos': True}, 'cluster_name': 'orange-antelope', 'copyright': 'Copyright (C) 2007-2015 Pivotal Software, Inc.', 'information': 'Lice
nsed under the MPL.  See http://www.rabbitmq.com/', 'platform': 'Erlan
22:09:48 worker.1   |  >  g/OTP', 'product': 'RabbitMQ', 'version': '3.5.7'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
22:09:48 worker.1   |  DEBUG:amqp:using channel_id: 1
22:09:48 worker.1   |  DEBUG:amqp:Channel open
22:17:00 worker.1   |  Traceback (most recent call last):
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/bin/flask", line 11, in <module>
22:17:00 worker.1   |      sys.exit(main())
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/flask/cli.py", line 513, in main
22:17:00 worker.1   |      cli.main(args=args, prog_name=name)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/flask/cli.py", line 380, in main
22:17:00 worker.1   |      return AppGroup.main(self, *args, **kwargs)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/click/core.py", line 697, in main
22:17:00 worker.1   |      rv = self.invoke(ctx)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
22:17:00 worker.1   |      return _process_result(sub_ctx.command.invoke(sub_ctx))
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/click/core.py", line 895, in invoke
22:17:00 worker.1   |      return ctx.invoke(self.callback, **ctx.params)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/click/core.py", line 535, in invoke
22:17:00 worker.1   |      return callback(*args, **kwargs)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
22:17:00 worker.1   |      return f(get_current_context(), *args, **kwargs)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/flask/cli.py", line 257, in decorator
22:17:00 worker.1   |      return __ctx.invoke(f, *args, **kwargs)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/click/core.py", line 535, in invoke
22:17:00 worker.1   |      return callback(*args, **kwargs)
22:17:00 worker.1   |    File "/home/mars/work/commit-telemetry-service/worker.py", line 11, in run_pulse_worker
22:17:00 worker.1   |      run()
22:17:00 worker.1   |    File "/home/mars/work/commit-telemetry-service/committelemetry/worker.py", line 55, in run
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/mozillapulse/consumers.py", line 151, in listen
22:17:00 worker.1   |      self._drain_events_loop()
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/mozillapulse/consumers.py", line 198, in _drain_events_loop
22:17:00 worker.1   |      self.connection.drain_events(timeout=self.timeout)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/kombu/connection.py", line 301, in drain_events
22:17:00 worker.1   |      return self.transport.drain_events(self.connection, **kwargs)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
22:17:00 worker.1   |      return connection.drain_events(**kwargs)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/amqp/connection.py", line 471, in drain_events
22:17:00 worker.1   |      while not self.blocking_read(timeout):
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/amqp/connection.py", line 476, in blocking_read
22:17:00 worker.1   |      frame = self.transport.read_frame()
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/amqp/transport.py", line 226, in read_frame
22:17:00 worker.1   |      frame_header = read(7, True)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/commit-telemetry-service/lib/python3.6/site-packages/amqp/transport.py", line 346, in _read
22:17:00 worker.1   |      s = recv(n - len(rbuf))  # see note above
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/3.6.4/lib/python3.6/ssl.py", line 871, in read
22:17:00 worker.1   |      return self._sslobj.read(len, buffer)
22:17:00 worker.1   |    File "/home/mars/.pyenv/versions/3.6.4/lib/python3.6/ssl.py", line 633, in read
22:17:00 worker.1   |      v = self._sslobj.read(len)
22:17:00 worker.1   |  TimeoutError: [Errno 110] Connection timed out
Summary: Pulse consumers die with TimeoutError on Python 3.3+ → mozillapulse consumers die with TimeoutError on Python 3.3+

We recommend using AMQP clients directly to connect to pulse, rather than pulse-specific wrappers.

Status: NEW → RESOLVED
Last Resolved: 16 days ago
Resolution: --- → WONTFIX
You need to log in before you can comment on or make changes to this bug.