Closed Bug 1395254 Opened 3 years ago Closed 1 year ago

Consume TaskCluster events from the standard Pulse exchanges rather than via taskcluster-treeherder


(Tree Management :: Treeherder: Data Ingestion, enhancement, P2)



(Not tracked)



(Reporter: emorley, Assigned: armenzg)


(Blocks 4 open bugs)



(5 files)

(In reply to Jonas Finnemann Jensen (:jonasfj) from 1066272 comment #9)
> Note: There are two ways to get this data:
> B) Delete taskcluster-treeherder an ingest messages from taskcluster
> directly in treeherder.
>    Since taskcluster-treeherder is effectively just a rewriting messages
> specifically for treeherder,
>    and TC is probably the primary source of input for TH these days.
>    Given that TC exchanges are stable and all messages are verified against
> documented schemas,
>    this wouldn't create tons of bugs.

We should just switch to consuming the messages directly to remove another layer of complexity that needs maintaining. To do this, we'll have to move the transformations performed by taskcluster-treeherder into Treeherder - though this will be a good thing, since it will let us later refactor and remove buildbot-isms that had to be added to taskcluster-treeherder so that Treeherder could handle the payloads.
Blocks: 1062827
There HAD been complications with the old revision_hash that taskcluster-treeherder worked around, as I recall.  It looks like there are still some jobs coming with with revision_hash (Aug 30th):

Regardless, I'm totally in support of this.  :)
But that's orthogonal surely? Since:

(In reply to Ed Morley [:emorley] from comment #0)
> To do this, we'll have to move the transformations performed by taskcluster-treeherder into Treeherder

ie the same handling will occur, just in Treeherder now instead.

Though that Treeherder will soon not support revision_hash anyway (bug 1257602). I think one of the last submitters is:

...though very soon I'm going to be removing support regardless - people have had long enough to stop using it.
Sorry, I didn't mean to imply that it's a blocker by any means.  I just meant to mention for scoping purposes that there had been some complicated logic regarding revision_hashes.  I think the revision_hash had been part of the routing key that had to be extracted.  My memory on this isn't very clear anymore, however.  I just remember GregA struggling with it a bit.

Nevertheless, I agree.  Nothing stopping us from doing it.  I just meant to give a heads up that it wasn't as simple as a mapping between formats.  Though, if the revision_hash actually DOES go away, then it may well BE just a simple mapping.  :)
Assignee: nobody → cdawson
I'm not sure if there is anything in TC using revision hash any longer.  Even esr52 seems to be using the push revision.  I'm curious to know what is running in TC that might not be using that.
In bug 1257602 I've been tracking the last remaining usages of revision_hash. There don't appear to be any via Treeherder REST submission, but there are still Funsize jobs doing so via pulse (bug 1399440). Hopefully now these have been switched to in-tree today in bug 1342392 the old jobs can be turned off soon.
Depends on: 1257602
Moving to a P2 since I think we should consider this for the next quarter or two, given it blocks both bug 1066272 and bug 1395337 (the latter since it would be best to avoid making people move from REST submission to a soon to be legacy pulse schema).
Blocks: 1395337
Priority: P3 → P2
Blocks: 1349182
No longer blocks: 1395337
This no longer blocks bug 1349182 (removing support for submitting jobs via Treeherder's REST API), since we're not going to support people submitting to Treeherder directly via Pulse (which would have wanted us to stabilise the API via this bug and others first) - and instead will have Taskcluster as the sole method of submitting data to Treeherder.

As such, this is lower priority, and something the Treeherder team will not be working on any time soon - though we'd gladly help anyone who wanted to take this on.
No longer blocks: 1349182
Priority: P2 → P3
Duplicate of this bug: 1341345
Assignee: cdawson → nobody
Summary: Consume TaskCluster events from the standard Pulse exchanges rather than via treeherder-taskcluster → Consume TaskCluster events from the standard Pulse exchanges rather than via taskcluster-treeherder
See Also: → 1291012

The service that's currently translating taskcluster messages to treeherder messages will become unmaintained as of July or so. It could continue to work, in a don't-breathe-around-this kind of fashion, after that, but I think this bug is the way forward. If that helps to determine priority.

I will take this after my parental leave. I come back on May 20th.

Assignee: nobody → armenzg

Let me know if what I'm understanding is off.

Taskcluster publishes messages to exchange/taskcluster-treeherder/v1/jobs.# and exchange/taskcluster-queue/v1/task-defined.#.

We want Treeherder to consume from exchange/taskcluster-queue/v1/task-defined.# so we can shutdown the treeherder-taskcluster service.

This means that I will need to do some transformations as done in here:

Specifically this transformation:

This should probably be handled within job_loader:

Flags: needinfo?(cdawson)

Close! There are a few exchanges that the queue publishes on, for different events in the lifetime of a task. task-defined, task-pending, task-running, etc. In fact, we don't translate task-defined into anything for treeherder. Events on each of those exchanges are handled in specific callbacks after line 287.

As a note, the exchanges do not have a trailling .# on them -- I suspect you're confusing exchanges with routing key patterns (which do often have a trailing .#).

Also, just to be clear, shutting down taskcluster-treeherder will mean that we no longer publish on exchange/taskcluster-treeherder/v1/jobs.

It sounds like Dustin already answered you. Thanks Dustin! But please n-i me again if you have another question. :)

Flags: needinfo?(cdawson)

dustin, give this Pulse message of a running task [1], how can I get to the task metadata?

  owner: task.metadata.owner


    "exchange": "exchange/taskcluster-queue/v1/task-running",
    "routingKey": "primary.HpeJLImbSf-oOOKGzf9LWA.0.signing-linux-v1.signing-linux-2.scriptworker-prov-v1.signing-linux-v1.gecko-level-3.amVL6lQQSpiobyFbYuNT0g._",
    "payload": {
      "status": {
        "taskId": "HpeJLImbSf-oOOKGzf9LWA",
        "provisionerId": "scriptworker-prov-v1",
        "workerType": "signing-linux-v1",
        "schedulerId": "gecko-level-3",
        "taskGroupId": "amVL6lQQSpiobyFbYuNT0g",
        "deadline": "2019-06-04T12:39:51.729Z",
        "expires": "2020-06-02T12:39:51.729Z",
        "retriesLeft": 5,
        "state": "running",
        "runs": [
            "runId": 0,
            "state": "running",
            "reasonCreated": "scheduled",
            "workerGroup": "signing-linux-v1",
            "workerId": "signing-linux-2",
            "takenUntil": "2019-06-03T13:19:00.963Z",
            "scheduled": "2019-06-03T12:54:09.074Z",
            "started": "2019-06-03T12:59:01.035Z"
      "runId": 0,
      "workerGroup": "signing-linux-v1",
      "workerId": "signing-linux-2",
      "takenUntil": "2019-06-03T13:19:00.963Z",
      "version": 1

  async handleMessage(message) {
    let taskId = message.payload.status.taskId;
    let task = await this.queue.task(taskId);

just call queue.task(..) to get the task.

camd: I need to write a script that can takes downloaded messages from the Pulse inspector, retrieve the metadata for each task and save it in a file.

The downloaded messages and task metadata will be used for running tests.

Any suggestions as to where to place the script?
Any place you would like this documented?

This would be used to refresh the text fixtures with more recent data.

Flags: needinfo?(cdawson)

Would this work as a django management command in /etl? If not, then perhaps somewhere in /etl.

Flags: needinfo?(cdawson)

camd: Yes, I'm using a management command.

Unfortunately, this is going to take longer than I had anticipated. Making test changes (even as little invasive as possible) is requirying a lot of work.
If I don't have a PR ready by Friday this won't be ready until after the All Hands.

I found the bug making me sad the last two days.
The PR is ready to be reviewed.
Instructions are found in the commit message of the first commit.
I'm now going to focus on updating the tests.

Blocks: 1560596
See Also: → 1560596

Armen: One test that I did when working on the taskcluster-treeherder feature last time was to write a script to call the /jobs API and download the jobs for the same push on stage and the test branch and have it compare every job looking for discrepancies. We did, in fact, find a few stragglers that got dropped for several oddball reasons I can't recall now. Mostly some random text parsing issue, iirc.

I would recommend running a script like this against several pushes. Testing completed pushes is easier. I think you can determine if you're getting the pending and running jobs in just fine by direct observation.

Your idea to compare two pushes is a great one. I caught few bugs!

Unfortunately, comparing two pushes is rather difficult since you need the celery worker running before a push is pushed and keep it running until all tasks have completed. Otherwise, production would keep completing tasks while your local instance would diverge.

Fortunately, it's a problem that itched me enough and I started writing a command that will take all tasks for a push and ingest them into Treeherder. My last commit includes a WIP.

However, this endeavour required changing the code to support async/await correctly.
The script can consume all tasks for a push within minutes. I would prefer few seconds, however, it might require a lot more effort.
If you have anyone that's an expert on Python's async coding please ask them to give some feedback on the last commit; thank you!

Unfortunately I'm hitting this issue when a lot of tasks are processed.

Are there lots of calls being made in parallel to the queue? It's best to limit API calls to, say, 50 at a time to avoid this kind of issue. Whether it's a bottleneck at the server or just on the client's network connection, running 1000's of simultaneous API calls can result in timeouts.

Another possibility is that there is something else going on at the same time that is not async, and that is taking more than 30 seconds so when the event loop gets control back, the HTTP connection has already timed out.

I removed the timeout issues like this:

- session = taskcluster.aio.createSession(loop=loop)
+ # Limiting the connection pool just in case we have too many
+ conn = aiohttp.TCPConnector(limit=30)
+ # Remove default timeout limit of 5 minutes
+ timeout = aiohttp.ClientTimeout(total=0)
+ session = taskcluster.aio.createSession(loop=loop, connector=conn, timeout=timeout)

Aki gave me some general feedback:

I took a quick look. I think you disabled the timeout with total=0, though I haven't used that yet. You might need to disable each of the timeouts, not just the total?
Another option might be to allow a timeout, but retry. We have a retry_async here... you basically

await retry_async(my_func, args=(args, to, pass, to, my_func), kwargs={"kwargs": "to", "pass": "to", "myfunc": 0})

which would retry my_func 5 times on timeout by default. This would allow you to reenable the timeouts, but retry on failure.

I'm not entirely convinced we want to retry because all the work to fetch data would get wasted.

Tomorrow I will be working on the code to compare pushes and see if there are any bugs found.

Priority: P3 → P2

I had to increase the number of Heroku workers for store_pulse_jobs since I see a spike (expectedly) on the rabbitmq queue.
I've also bumped production so we don't foget the day we deploy this to production.

It's expected that store_pulse_jobs will take more of the slice.
We also have aiohttp taking part of the slice.

For some very odd reason, I believe some taskcluster-treeherder messages (binding to exchange/taskcluster-treeherder/v1/jobs) was left over inside of the Pulse queue when switching over. I saw 54 errors like this while my changes were live.

I wonder if we need to rename the queue so we can better transition or if the names will get stuck.

I can see these attributes in NewRelic:

exchange exchange/taskcluster-treeherder/v1/jobs
routing_key tc-treeherder.autoland._

Maybe I need to land a change like this so these messages don't get requeue:

diff --git a/treeherder/etl/taskcluster_pulse/ b/treeherder/etl/taskcluster_pulse/
index ebd7cd0ef..180f24d5c 100644
--- a/treeherder/etl/taskcluster_pulse/
+++ b/treeherder/etl/taskcluster_pulse/
@@ -2,6 +2,7 @@
 import asyncio
 import logging
 import os
+import pprint

 import jsonschema
 import slugid
@@ -111,7 +112,13 @@ def validateTask(task):
 # This will generate a list of messages that need to be ingested by Treeherder
 async def handleMessage(message, taskDefinition=None):
     jobs = []
-    taskId = message["payload"]["status"]["taskId"]
+    try:
+        taskId = message["payload"]["status"]["taskId"]
+    except KeyError as e:
+        logger.exception(e)
+        logger.debug(pprint.pprint(message))
+        return jobs
     task = (await asyncQueue.task(taskId)) if not taskDefinition else taskDefinition
         parsedRoute = parseRouteInfo("tc-treeherder", taskId, task["routes"], task)
Attachment #9080124 - Attachment description: image.png → Screenshot of one of treeherder-staging queues

Side note, I landed a fix for this:

I don't know how I did not notice it locally.

Yesterday we backed out the changes.

We want to see if we can avoid loosing some tasks due to the transition/configuration period of Pulse.
Both the old exchanges and new exchanges use the same queue name, thus, getting into a funky situation for few minutes.

We will try to consume between both queues for a short period of time.
I was thinking of naming a new queue with the new bindings with a new Procfile entry.
We would then reduce the number of workers to 0 on Heroku and remove the old queue and the Procfile entry.
Let's see if that works.

If I can do anything with the current tc-treeherder application (like scaling to zero dynos) let me know.

I would let you know.
I'm going to wait until Monday/Tuesday to deploy it.

Deployed to treeherder-stage at 9:55 AM ET.

Actions taken:

  • Added nodes for pulse_listener_tasks (open the flood for new system) and store_pulse_jobs (which resumes consumption of old jobs)
  • Brought to 0 nodes pulse_listener_jobs (close pipeline for old system)
  • Waited for store_pulse_jobs celery queue to empty
  • Brought to 0 nodes worker_store_pulse_old
  • Deleted queue/treeherder-staging/jobs queue


  • Check New Relic for 'Error analytics'
    • Land fix which got lost from last week's backout
    • No more errors after that \o/
  • Run against few autoland pushes
    • Noticed that production has few jobs as "pending" instead of "running"
      • I was expecting production to catch up sooner or later, however, this task is still showing as 'pending' while Taskcluster is showing it as running.
      • Oddly, production seems to have some jobs as pending while they should be running (see screenshot)
      • I've increased production's worker_store_pulse_data from 4 nodes to 5 nodes to see if we're just not processing fast enough

So far, the only difference I'm seeing is that stage is better than production at correctly transitioning from 'pending' to 'running'.

I will re-verify a bunch of pushes this afternoon. At the moment I'm very satisfied.

As of now, there are no more issues, however, I will be deploying this to production next week as I will be PTO starting on Thursday (if not Wed).

Today I discovered two new issues that were not caught during all of my previous testing:

NewRelic made it easy to notice this issues. I will need to investigate why they were not caught before and write tests for them.

This has been live for the last 30 minutes. No issues found so far.

As far as I know we're done.

I will follow up with bug 1560596 with anything that not handled in here.

Closed: 1 year ago
Resolution: --- → FIXED
You need to log in before you can comment on or make changes to this bug.