Closed Bug 1180897 Opened 9 years ago Closed 9 years ago

[mozillapulse] Make GenericConsumer able to take multiple exchanges

Categories

(Webtools :: Pulse, defect)

defect
Not set
normal

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: adusca, Assigned: adusca)

References

Details

Attachments

(1 file, 2 obsolete files)

It would be useful to have one listener that could listen to different exchanges at the same time.

Steps:

-  make the 'exchange' parameter support a list in addition to a single string (https://hg.mozilla.org/automation/mozillapulse/file/f5dc2a54d170/mozillapulse/consumers.py#l34)

-  then you'd just have to declare & bind to the list of exchanges here: https://hg.mozilla.org/automation/mozillapulse/file/f5dc2a54d170/mozillapulse/consumers.py#l146
Attached patch bug1180897.patch (obsolete) — Splinter Review
Here the API for multiple exchanges is just passing a list of exchanges and a list of topics to GenericConsumer such that topic[i] corresponds to exchange[i]. This requires a user that wants to 2 topics (topic1, topic2) on exchange1 and one topic (topic3) on exchange2 to pass ['exchange1', 'exchange1', 'exchange2'], ['topic1', 'topic2', 'topic3']. It is not pretty, but it was the best I could think of.

I tested this with the cases:

* ('exchange1', 'topic')  - still working

* ('exchange1', ['topic1', 'topic2']) - still working

* (['exchange1', 'exchange2'], ['topic1', 'topic2']) - works now

* (['exchange1', 'exchange2'], ['topic1', 'topic2', 'topic3']) - raises an error, which I think it's the right behaviour
Attachment #8631582 - Flags: review?(mcote)
Comment on attachment 8631582 [details] [diff] [review]
bug1180897.patch

Review of attachment 8631582 [details] [diff] [review]:
-----------------------------------------------------------------

This looks great--but could you write a test for it as well?

::: mozillapulse/consumers.py
@@ +165,5 @@
>          # Bind to the first key.
>          consumer.queues[0].queue_bind()
>  
>          # Bind to any additional keys.
> +        for i in xrange(1, len(self.topic)): 

Trailing space.  Also check out enumerate(), which is a nicer way to iterate over a list and generate an index at the same time.
Attachment #8631582 - Flags: review?(mcote) → review-
(In reply to Alice Scarpa [:adusca] from comment #1)
> Here the API for multiple exchanges is just passing a list of exchanges and
> a list of topics to GenericConsumer such that topic[i] corresponds to
> exchange[i]. This requires a user that wants to 2 topics (topic1, topic2) on
> exchange1 and one topic (topic3) on exchange2 to pass ['exchange1',
> 'exchange1', 'exchange2'], ['topic1', 'topic2', 'topic3']. It is not pretty,
> but it was the best I could think of.

Yeah it's a little gross, but as you say I don't think there's any better way to do it without changing the constructor's signature.  So it works for me.
Attached patch bug1180897.patch (obsolete) — Splinter Review
Since I was only using the indexes to zip the lists I believe it might be better to just use zip() instead of using enumerate().
Attachment #8631582 - Attachment is obsolete: true
Attachment #8633801 - Flags: review?(mcote)
Comment on attachment 8633801 [details] [diff] [review]
bug1180897.patch

Review of attachment 8633801 [details] [diff] [review]:
-----------------------------------------------------------------

With a fresh vagrant, the test run fails for me (see below).  A second run passes, but this should work the first time.

$ python runtests.py
....Process ConsumerSubprocess-9:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "runtests.py", line 44, in run
    consumer.listen()
  File "/Users/mcote/projects/mozillapulse/src/mozillapulse/mozillapulse/consumers.py", line 171, in listen
    exchange(self.connection).declare(passive=True)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 174, in declare
    nowait=nowait, passive=passive,
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 620, in exchange_declare
    (40, 11),  # Channel.exchange_declare_ok
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 69, in wait
    return self.dispatch_method(method_sig, args, content)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 87, in dispatch_method
    return amqp_method(self, args)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 241, in _close
    reply_code, reply_text, (class_id, method_id), ChannelError,
NotFound: Exchange.declare: (404) NOT_FOUND - no exchange 'exchange/pulse/test' in vhost '/'
Process ConsumerSubprocess-10:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "runtests.py", line 44, in run
    consumer.listen()
  File "/Users/mcote/projects/mozillapulse/src/mozillapulse/mozillapulse/consumers.py", line 171, in listen
    exchange(self.connection).declare(passive=True)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 174, in declare
    nowait=nowait, passive=passive,
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 620, in exchange_declare
    (40, 11),  # Channel.exchange_declare_ok
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 69, in wait
    return self.dispatch_method(method_sig, args, content)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 87, in dispatch_method
    return amqp_method(self, args)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 241, in _close
    reply_code, reply_text, (class_id, method_id), ChannelError,
NotFound: Exchange.declare: (404) NOT_FOUND - no exchange 'exchange/pulse/test' in vhost '/'
FProcess ConsumerSubprocess-11:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "runtests.py", line 44, in run
    consumer.listen()
  File "/Users/mcote/projects/mozillapulse/src/mozillapulse/mozillapulse/consumers.py", line 171, in listen
    exchange(self.connection).declare(passive=True)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 174, in declare
    nowait=nowait, passive=passive,
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 620, in exchange_declare
    (40, 11),  # Channel.exchange_declare_ok
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 69, in wait
    return self.dispatch_method(method_sig, args, content)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/abstract_channel.py", line 87, in dispatch_method
    return amqp_method(self, args)
  File "/Users/mcote/projects/mozillapulse/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/channel.py", line 241, in _close
    reply_code, reply_text, (class_id, method_id), ChannelError,
NotFound: Exchange.declare: (404) NOT_FOUND - no exchange 'exchange/pulse/test' in vhost '/'
F......
======================================================================
FAIL: test_durable (__main__.TestMultipleExchanges)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "runtests.py", line 147, in test_durable
    self._get_verify_msg(msg)
  File "runtests.py", line 94, in _get_verify_msg
    self.fail('did not receive message from consumer process')
AssertionError: did not receive message from consumer process

======================================================================
FAIL: test_nondurable (__main__.TestMultipleExchanges)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "runtests.py", line 117, in test_nondurable
    self._get_verify_msg(msg)
  File "runtests.py", line 94, in _get_verify_msg
    self.fail('did not receive message from consumer process')
AssertionError: did not receive message from consumer process

----------------------------------------------------------------------
Ran 12 tests in 11.545s

FAILED (failures=2)

::: test/runtests.py
@@ +244,5 @@
> +        msg.set_data('who', 'somedev@mozilla.com')
> +        return msg
> +
> +
> +class TestMultipleExchangesAgain(PulseTestMixin, unittest.TestCase):

Hm this works but says to me that the testing architecture is not great for handling this.  Maybe PulseTestMixin could be updated somehow so that we don't have to have two TestCase classes that are together verifying one thing, which is a bit weird.

Anyway this is fine for now (aside from the error above); we can revisit it in another bug later.  I don't want to hold you up from your main task any longer. :)
Attachment #8633801 - Flags: review?(mcote) → review-
After investigating this I discovered that I get the 'failed in the first run, pass on every subsequent run' behaviour even when I try running the tests without my patch, but the error I get on the first run without my patch [1] is different then the error I get on the first run with my patch (I get the same error posted on comment #5). Is this a bug with the tests or am I doing something wrong?

[1]
(venv)test$ python runtests.py 
E.......
======================================================================
ERROR: test_durable (__main__.TestBuild)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "runtests.py", line 126, in test_durable
    publisher.publish(msg)
  File "/home/adusca/mozillapulse/mozillapulse/publishers.py", line 62, in publish
    routing_key=message.routing_key)
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 85, in __init__
    self.revive(self._channel)
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 222, in revive
    self.declare()
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 105, in declare
    self.exchange.declare()
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/entity.py", line 171, in declare
    return self.channel.exchange_declare(
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/abstract.py", line 115, in channel
    channel = self._channel = channel()
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/utils/__init__.py", line 422, in __call__
    value = self.__value__ = self.__contract__()
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/messaging.py", line 209, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/connection.py", line 756, in default_channel
    self.connection
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/connection.py", line 741, in connection
    self._connection = self._establish_connection()
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/connection.py", line 696, in _establish_connection
    conn = self.transport.establish_connection()
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/kombu-3.0.26-py2.7.egg/kombu/transport/pyamqp.py", line 116, in establish_connection
    conn = self.Connection(**opts)
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/connection.py", line 165, in __init__
    self.transport = self.Transport(host, connect_timeout, ssl)
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/connection.py", line 186, in Transport
    return create_transport(host, connect_timeout, ssl)
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/transport.py", line 299, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/home/adusca/pulse_actions/venv/local/lib/python2.7/site-packages/amqp-1.4.6-py2.7.egg/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
error: timed out

----------------------------------------------------------------------
Ran 8 tests in 8.829s

FAILED (errors=1)
(venv)test$ python runtests.py 
........
----------------------------------------------------------------------
Ran 8 tests in 0.948s

OK
(venv)test$
Flags: needinfo?(mcote)
Huh.  I don't get that error.  I'm using OS X 10.10.4 with Vagrant 1.7.3.  Full log below, running tip:

(mozillapulse)retina-wireless:test mcote$ vagrant destroy
    default: Are you sure you want to destroy the 'default' VM? [y/N] y
==> default: Forcing shutdown of VM...
==> default: Destroying VM and associated drives...
==> default: Running cleanup tasks for 'puppet' provisioner...
(mozillapulse)retina-wireless:test mcote$ vagrant up
Bringing machine 'default' up with 'virtualbox' provider...
==> default: Importing base box 'trusty-cloud-i386'...
==> default: Matching MAC address for NAT networking...
==> default: Setting the name of the VM: test_default_1437015766015_1344
==> default: Clearing any previously set forwarded ports...
==> default: Clearing any previously set network interfaces...
==> default: Preparing network interfaces based on configuration...
    default: Adapter 1: nat
    default: Adapter 2: hostonly
==> default: Forwarding ports...
    default: 22 => 2222 (adapter 1)
==> default: Booting VM...
==> default: Waiting for machine to boot. This may take a few minutes...
    default: SSH address: 127.0.0.1:2222
    default: SSH username: vagrant
    default: SSH auth method: private key
    default: Warning: Connection timeout. Retrying...
    default:
    default: Vagrant insecure key detected. Vagrant will automatically replace
    default: this with a newly generated keypair for better security.
    default:
    default: Inserting generated public key within guest...
    default: Removing insecure key from the guest if it's present...
    default: Key inserted! Disconnecting and reconnecting using new SSH key...
==> default: Machine booted and ready!
==> default: Checking for guest additions in VM...
==> default: Configuring and enabling network interfaces...
==> default: Mounting shared folders...
    default: /vagrant => /Users/mcote/projects/mozillapulse/src/mozillapulse/test
    default: /tmp/vagrant-puppet/manifests-846018e2aa141a5eb79a64b4015fc5f3 => /Users/mcote/projects/mozillapulse/src/mozillapulse/test/puppet/manifests
==> default: Running provisioner: puppet...
==> default: Running Puppet with vagrant.pp...
==> default: stdin: is not a tty
==> default: Notice: Compiled catalog for vagrant-ubuntu-trusty-32.int.mrcote.info in environment production in 0.18 seconds
==> default: Notice: /Stage[main]/Init/Exec[update_apt]/returns: executed successfully
==> default: Notice: /Stage[main]/Rabbitmq/Package[rabbitmq-server]/ensure: ensure changed 'purged' to 'present'
==> default: Notice: /Stage[main]/Rabbitmq/Exec[create-rabbitmq-user-test]/returns: executed successfully
==> default: Notice: /Stage[main]/Rabbitmq/Exec[create-rabbitmq-user-code]/returns: executed successfully
==> default: Notice: /Stage[main]/Rabbitmq/Exec[grant-rabbitmq-permissions-code]/returns: executed successfully
==> default: Notice: /Stage[main]/Rabbitmq/Exec[grant-rabbitmq-permissions-test]/returns: executed successfully
==> default: Notice: /Stage[main]/Rabbitmq/Exec[create-rabbitmq-user-build]/returns: executed successfully
==> default: Notice: /Stage[main]/Rabbitmq/Exec[grant-rabbitmq-permissions-build]/returns: executed successfully
==> default: Notice: Finished catalog run in 26.34 seconds
(mozillapulse)retina-wireless:test mcote$ python runtests.py
........
----------------------------------------------------------------------
Ran 8 tests in 0.873s

OK
Flags: needinfo?(mcote)
Assignee: nobody → alicescarpa
Status: UNCONFIRMED → ASSIGNED
Ever confirmed: true
Okay I don't know why the tests on tip are currently failing for you, but I do know why your new tests fail.  The consumer expects all its exchanges to already exist; they should have been created by the publisher.  But the way the PulseTestMixin is structured means only one publisher is created (whatever is in the 'publisher' class variable), so the other exchange doesn't exist when the consumer is created, and it throws an exception when calling declare().

Now that I've thought about it, it's probably not too difficult to modify PulseTestMixin to accept multiple publishers.  That would also allow you to combine your two TestCase classes into one.  This assumes you can get the tests passing at all... I can try to help you debug that tomorrow.
Attached patch bug1180897.patchSplinter Review
The tests still don't pass in the first run on my machine, but now the error I get is the same  I get when running on tip, which is probably some weirdness on my set up.
Attachment #8633801 - Attachment is obsolete: true
Attachment #8635425 - Flags: review?(mcote)
Comment on attachment 8635425 [details] [diff] [review]
bug1180897.patch

Review of attachment 8635425 [details] [diff] [review]:
-----------------------------------------------------------------

Works for me.  Thanks!
Attachment #8635425 - Flags: review?(mcote) → review+
http://hg.mozilla.org/automation/mozillapulse/rev/81738cad82c5
Status: ASSIGNED → RESOLVED
Closed: 9 years ago
Resolution: --- → FIXED
You need to log in before you can comment on or make changes to this bug.