Closed
Bug 1180897
Opened 9 years ago
Closed 9 years ago
[mozillapulse] Make GenericConsumer able to take multiple exchanges
Categories
(Webtools :: Pulse, defect)
Webtools
Pulse
Tracking
(Not tracked)
RESOLVED
FIXED
People
(Reporter: adusca, Assigned: adusca)
References
Details
Attachments
(1 file, 2 obsolete files)
8.56 KB,
patch
|
mcote
:
review+
|
Details | Diff | Splinter Review |
It would be useful to have one listener that could listen to different exchanges at the same time. Steps: - make the 'exchange' parameter support a list in addition to a single string (https://hg.mozilla.org/automation/mozillapulse/file/f5dc2a54d170/mozillapulse/consumers.py#l34) - then you'd just have to declare & bind to the list of exchanges here: https://hg.mozilla.org/automation/mozillapulse/file/f5dc2a54d170/mozillapulse/consumers.py#l146
Assignee | ||
Comment 1•9 years ago
|
||
Here the API for multiple exchanges is just passing a list of exchanges and a list of topics to GenericConsumer such that topic[i] corresponds to exchange[i]. This requires a user that wants to 2 topics (topic1, topic2) on exchange1 and one topic (topic3) on exchange2 to pass ['exchange1', 'exchange1', 'exchange2'], ['topic1', 'topic2', 'topic3']. It is not pretty, but it was the best I could think of. I tested this with the cases: * ('exchange1', 'topic') - still working * ('exchange1', ['topic1', 'topic2']) - still working * (['exchange1', 'exchange2'], ['topic1', 'topic2']) - works now * (['exchange1', 'exchange2'], ['topic1', 'topic2', 'topic3']) - raises an error, which I think it's the right behaviour
Attachment #8631582 -
Flags: review?(mcote)
Comment 2•9 years ago
|
||
Comment on attachment 8631582 [details] [diff] [review] bug1180897.patch Review of attachment 8631582 [details] [diff] [review]: ----------------------------------------------------------------- This looks great--but could you write a test for it as well? ::: mozillapulse/consumers.py @@ +165,5 @@ > # Bind to the first key. > consumer.queues[0].queue_bind() > > # Bind to any additional keys. > + for i in xrange(1, len(self.topic)): Trailing space. Also check out enumerate(), which is a nicer way to iterate over a list and generate an index at the same time.
Attachment #8631582 -
Flags: review?(mcote) → review-
Comment 3•9 years ago
|
||
(In reply to Alice Scarpa [:adusca] from comment #1) > Here the API for multiple exchanges is just passing a list of exchanges and > a list of topics to GenericConsumer such that topic[i] corresponds to > exchange[i]. This requires a user that wants to 2 topics (topic1, topic2) on > exchange1 and one topic (topic3) on exchange2 to pass ['exchange1', > 'exchange1', 'exchange2'], ['topic1', 'topic2', 'topic3']. It is not pretty, > but it was the best I could think of. Yeah it's a little gross, but as you say I don't think there's any better way to do it without changing the constructor's signature. So it works for me.
Assignee | ||
Comment 4•9 years ago
|
||
Since I was only using the indexes to zip the lists I believe it might be better to just use zip() instead of using enumerate().
Attachment #8631582 -
Attachment is obsolete: true
Assignee | ||
Updated•9 years ago
|
Attachment #8633801 -
Flags: review?(mcote)
Comment 5•9 years ago
|
||
Comment on attachment 8633801 [details] [diff] [review] bug1180897.patch Review of attachment 8633801 [details] [diff] [review]: ----------------------------------------------------------------- With a fresh vagrant, the test run fails for me (see below). A second run passes, but this should work the first time. $ python runtests.py ....Process ConsumerSubprocess-9: Traceback (most recent call last): File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "runtests.py", line 44, in run consumer.listen() File "/Users/mcote/projects/mozillapulse/src/mozillapulse/mozillapulse/consumers.py", line 171, in listen exchange(self.connection).declare(passive=True) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 174, in declare nowait=nowait, passive=passive, File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 620, in exchange_declare (40, 11), # Channel.exchange_declare_ok File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 69, in wait return self.dispatch_method(method_sig, args, content) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 87, in dispatch_method return amqp_method(self, args) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 241, in _close reply_code, reply_text, (class_id, method_id), ChannelError, NotFound: Exchange.declare: (404) NOT_FOUND - no exchange 'exchange/pulse/test' in vhost '/' Process ConsumerSubprocess-10: Traceback (most recent call last): File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "runtests.py", line 44, in run consumer.listen() File "/Users/mcote/projects/mozillapulse/src/mozillapulse/mozillapulse/consumers.py", line 171, in listen exchange(self.connection).declare(passive=True) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 174, in declare nowait=nowait, passive=passive, File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 620, in exchange_declare (40, 11), # Channel.exchange_declare_ok File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 69, in wait return self.dispatch_method(method_sig, args, content) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 87, in dispatch_method return amqp_method(self, args) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 241, in _close reply_code, reply_text, (class_id, method_id), ChannelError, NotFound: Exchange.declare: (404) NOT_FOUND - no exchange 'exchange/pulse/test' in vhost '/' FProcess ConsumerSubprocess-11: Traceback (most recent call last): File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "runtests.py", line 44, in run consumer.listen() File "/Users/mcote/projects/mozillapulse/src/mozillapulse/mozillapulse/consumers.py", line 171, in listen exchange(self.connection).declare(passive=True) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 174, in declare nowait=nowait, passive=passive, File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 620, in exchange_declare (40, 11), # Channel.exchange_declare_ok File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 69, in wait return self.dispatch_method(method_sig, args, content) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 87, in dispatch_method return amqp_method(self, args) File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 241, in _close reply_code, reply_text, (class_id, method_id), ChannelError, NotFound: Exchange.declare: (404) NOT_FOUND - no exchange 'exchange/pulse/test' in vhost '/' F...... ====================================================================== FAIL: test_durable (__main__.TestMultipleExchanges) ---------------------------------------------------------------------- Traceback (most recent call last): File "runtests.py", line 147, in test_durable self._get_verify_msg(msg) File "runtests.py", line 94, in _get_verify_msg self.fail('did not receive message from consumer process') AssertionError: did not receive message from consumer process ====================================================================== FAIL: test_nondurable (__main__.TestMultipleExchanges) ---------------------------------------------------------------------- Traceback (most recent call last): File "runtests.py", line 117, in test_nondurable self._get_verify_msg(msg) File "runtests.py", line 94, in _get_verify_msg self.fail('did not receive message from consumer process') AssertionError: did not receive message from consumer process ---------------------------------------------------------------------- Ran 12 tests in 11.545s FAILED (failures=2) ::: test/runtests.py @@ +244,5 @@ > + msg.set_data('who', 'somedev@mozilla.com') > + return msg > + > + > +class TestMultipleExchangesAgain(PulseTestMixin, unittest.TestCase): Hm this works but says to me that the testing architecture is not great for handling this. Maybe PulseTestMixin could be updated somehow so that we don't have to have two TestCase classes that are together verifying one thing, which is a bit weird. Anyway this is fine for now (aside from the error above); we can revisit it in another bug later. I don't want to hold you up from your main task any longer. :)
Attachment #8633801 -
Flags: review?(mcote) → review-
Assignee | ||
Comment 6•9 years ago
|
||
After investigating this I discovered that I get the 'failed in the first run, pass on every subsequent run' behaviour even when I try running the tests without my patch, but the error I get on the first run without my patch [1] is different then the error I get on the first run with my patch (I get the same error posted on comment #5). Is this a bug with the tests or am I doing something wrong? [1] (venv)test$ python runtests.py E....... ====================================================================== ERROR: test_durable (__main__.TestBuild) ---------------------------------------------------------------------- Traceback (most recent call last): File "runtests.py", line 126, in test_durable publisher.publish(msg) File "/home/adusca/mozillapulse/mozillapulse/publishers.py", line 62, in publish routing_key=message.routing_key) File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 85, in __init__ self.revive(self._channel) File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 222, in revive self.declare() File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 105, in declare self.exchange.declare() File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 171, in declare return self.channel.exchange_declare( File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/abstract.py", line 115, in channel channel = self._channel = channel() File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/utils/__init__.py", line 422, in __call__ value = self.__value__ = self.__contract__() File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 209, in <lambda> channel = ChannelPromise(lambda: connection.default_channel) File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/connection.py", line 756, in default_channel self.connection File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/connection.py", line 741, in connection self._connection = self._establish_connection() File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/connection.py", line 696, in _establish_connection conn = self.transport.establish_connection() File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/transport/pyamqp.py", line 116, in establish_connection conn = self.Connection(**opts) File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/connection.py", line 165, in __init__ self.transport = self.Transport(host, connect_timeout, ssl) File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/connection.py", line 186, in Transport return create_transport(host, connect_timeout, ssl) File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/transport.py", line 299, in create_transport return TCPTransport(host, connect_timeout) File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/transport.py", line 95, in __init__ raise socket.error(last_err) error: timed out ---------------------------------------------------------------------- Ran 8 tests in 8.829s FAILED (errors=1) (venv)test$ python runtests.py ........ ---------------------------------------------------------------------- Ran 8 tests in 0.948s OK (venv)test$
Flags: needinfo?(mcote)
Comment 7•9 years ago
|
||
Huh. I don't get that error. I'm using OS X 10.10.4 with Vagrant 1.7.3. Full log below, running tip: (mozillapulse)retina-wireless:test mcote$ vagrant destroy default: Are you sure you want to destroy the 'default' VM? [y/N] y ==> default: Forcing shutdown of VM... ==> default: Destroying VM and associated drives... ==> default: Running cleanup tasks for 'puppet' provisioner... (mozillapulse)retina-wireless:test mcote$ vagrant up Bringing machine 'default' up with 'virtualbox' provider... ==> default: Importing base box 'trusty-cloud-i386'... ==> default: Matching MAC address for NAT networking... ==> default: Setting the name of the VM: test_default_1437015766015_1344 ==> default: Clearing any previously set forwarded ports... ==> default: Clearing any previously set network interfaces... ==> default: Preparing network interfaces based on configuration... default: Adapter 1: nat default: Adapter 2: hostonly ==> default: Forwarding ports... default: 22 => 2222 (adapter 1) ==> default: Booting VM... ==> default: Waiting for machine to boot. This may take a few minutes... default: SSH address: 127.0.0.1:2222 default: SSH username: vagrant default: SSH auth method: private key default: Warning: Connection timeout. Retrying... default: default: Vagrant insecure key detected. Vagrant will automatically replace default: this with a newly generated keypair for better security. default: default: Inserting generated public key within guest... default: Removing insecure key from the guest if it's present... default: Key inserted! Disconnecting and reconnecting using new SSH key... ==> default: Machine booted and ready! ==> default: Checking for guest additions in VM... ==> default: Configuring and enabling network interfaces... ==> default: Mounting shared folders... default: /vagrant => /Users/mcote/projects/mozillapulse/src/mozillapulse/test default: /tmp/vagrant-puppet/manifests-846018e2aa141a5eb79a64b4015fc5f3 => /Users/mcote/projects/mozillapulse/src/mozillapulse/test/puppet/manifests ==> default: Running provisioner: puppet... ==> default: Running Puppet with vagrant.pp... ==> default: stdin: is not a tty ==> default: Notice: Compiled catalog for vagrant-ubuntu-trusty-32.int.mrcote.info in environment production in 0.18 seconds ==> default: Notice: /Stage[main]/Init/Exec[update_apt]/returns: executed successfully ==> default: Notice: /Stage[main]/Rabbitmq/Package[rabbitmq-server]/ensure: ensure changed 'purged' to 'present' ==> default: Notice: /Stage[main]/Rabbitmq/Exec[create-rabbitmq-user-test]/returns: executed successfully ==> default: Notice: /Stage[main]/Rabbitmq/Exec[create-rabbitmq-user-code]/returns: executed successfully ==> default: Notice: /Stage[main]/Rabbitmq/Exec[grant-rabbitmq-permissions-code]/returns: executed successfully ==> default: Notice: /Stage[main]/Rabbitmq/Exec[grant-rabbitmq-permissions-test]/returns: executed successfully ==> default: Notice: /Stage[main]/Rabbitmq/Exec[create-rabbitmq-user-build]/returns: executed successfully ==> default: Notice: /Stage[main]/Rabbitmq/Exec[grant-rabbitmq-permissions-build]/returns: executed successfully ==> default: Notice: Finished catalog run in 26.34 seconds (mozillapulse)retina-wireless:test mcote$ python runtests.py ........ ---------------------------------------------------------------------- Ran 8 tests in 0.873s OK
Flags: needinfo?(mcote)
Updated•9 years ago
|
Assignee: nobody → alicescarpa
Status: UNCONFIRMED → ASSIGNED
Ever confirmed: true
Comment 8•9 years ago
|
||
Okay I don't know why the tests on tip are currently failing for you, but I do know why your new tests fail. The consumer expects all its exchanges to already exist; they should have been created by the publisher. But the way the PulseTestMixin is structured means only one publisher is created (whatever is in the 'publisher' class variable), so the other exchange doesn't exist when the consumer is created, and it throws an exception when calling declare(). Now that I've thought about it, it's probably not too difficult to modify PulseTestMixin to accept multiple publishers. That would also allow you to combine your two TestCase classes into one. This assumes you can get the tests passing at all... I can try to help you debug that tomorrow.
Assignee | ||
Comment 9•9 years ago
|
||
The tests still don't pass in the first run on my machine, but now the error I get is the same I get when running on tip, which is probably some weirdness on my set up.
Attachment #8633801 -
Attachment is obsolete: true
Attachment #8635425 -
Flags: review?(mcote)
Comment 10•9 years ago
|
||
Comment on attachment 8635425 [details] [diff] [review] bug1180897.patch Review of attachment 8635425 [details] [diff] [review]: ----------------------------------------------------------------- Works for me. Thanks!
Attachment #8635425 -
Flags: review?(mcote) → review+
Comment 11•9 years ago
|
||
http://hg.mozilla.org/automation/mozillapulse/rev/81738cad82c5
Status: ASSIGNED → RESOLVED
Closed: 9 years ago
Resolution: --- → FIXED
You need to log in
before you can comment on or make changes to this bug.
Description
•