Closed Bug 1407452 Opened 7 years ago Closed 7 years ago

decision task does not fail when a task cannot be created

Categories

(Firefox Build System :: Task Configuration, task)

task
Not set
normal

Tracking

(Not tracked)

RESOLVED FIXED
mozilla58

People

(Reporter: garndt, Assigned: dustin)

Details

Attachments

(1 file)

In this decision task, a task could not be created because the scheduler ID of one of the tasks did not match the scheduler ID of the decision task.  The queue returns a 409, which is displayed in the logs, but the decision task still looks like it completed successfully

https://tools.taskcluster.net/groups/IYfHXCm_Qi-p7OIrvgbMPQ/tasks/IYfHXCm_Qi-p7OIrvgbMPQ/runs/0/logs/public%2Flogs%2Flive.log#L1437
Assignee: nobody → dustin
Comment on attachment 8917415 [details]
Bug 1407452 - fix concurrent task creation to handle exceptions;

https://reviewboard.mozilla.org/r/188412/#review193754

I might be wrong, but I think there is a much simpler way to do this.

1) Do a `task_id_to_future[dep].result()` for each dependency,
2) Rely on `try: ... finally: ...` and just cancel everything in finally.

This aims to rely on co-routines for dealing with scheduling. As you do a co-routine per task, and if you graphed how the co-routines wait on each other it would match the task-graph dependencies.

::: taskcluster/taskgraph/create.py:110
(Diff revision 1)
> -                    fs[task_id] = e.submit(create_task, session, slugid(),
> +                    submit(slugid(), taskid_to_label[task_id], task_def)
> -                                           taskid_to_label[task_id], task_def)
>              tasklist.difference_update(to_remove)
>  
> +            # as each of those futures complete, try to schedule more tasks
> +            for f in futures.as_completed(new):

So as each future completes you iterate over all remaining tasks and try to schedule them...

I think you can do much simpler than this, try waiting on the tasks multiple times, like:

```python
    with futures.ThreadPoolExecutor(CONCURRENCY) as e:
        # Map from task_id to future that is resolved when task have been created
        task_id_to_future = {}

        def schedule_task(task_id):
            task_def = taskgraph.tasks[task_id].task
            # Wait for all dependencies to be scheduled
            # We know task_id_to_future[dep] exists because we schedule in post-order
            for dep in task_def.get('dependencies', []):
                task_id_to_future[dep].result()
            create_task(session, task_id, taskid_to_label[task_id], task_def)

        # Schedule tasks in post-order, populating task_id_to_future[task_id]
        # before we schedule anything depending on task_id
        for task_id in taskgraph.graph.visit_postorder():
            task_id_to_future[task_id] = e.submit(schedule_task, task_id)

        # Wait for all scheduled tasks to be done
        try:
            for future in task_id_to_future.values():
                # Notice that we might call future.result() more than once,
                # but this is perfectly okay, futures are only ever resolved once. 
                future.result()
        finally:
            # Cancel everything that is still running, in case of exceptions
            for future in task_id_to_future.values():
                future.cancel()
```

This doesn't do the `duplicates` thing, nor checks if taskids are in `tasklist`.. But those seem like small issues to fix.
Attachment #8917415 - Flags: review?(jopsen) → review-
The suggested approach ends up blocking unnecessarily in `task_id_to_future[dep].result()` -- that blocks the thread (of which there are only CONCURRENCY) waiting for another task creation to finish, when in fact there may be a task available to create in that thread.  That is roughly how things worked before the recent refactor.

Looked at another way, these aren't coroutines -- they are functions run in a thread pool.

I don't think the cancellation is an issue -- the exception exits the main process which implicitly kills the other threads.

I had considered vendoring `promise`, in which case we could make a graph of promise dependencies, with each promise wrapping a future.  Something like:

for each task:
  promises[taskid] = Promise.all(promises[tid] for tid in task.dependencies).then(lambda: submit_task(task))

def fail(err):
    raise err
Promise.all(promises.values()).catch(fail)
We should figure out how to fix this -- the current situation is definitely broken for small task-graphs.
Flags: needinfo?(jopsen)
Comment on attachment 8917415 [details]
Bug 1407452 - fix concurrent task creation to handle exceptions;

https://reviewboard.mozilla.org/r/188412/#review193754

> So as each future completes you iterate over all remaining tasks and try to schedule them...
> 
> I think you can do much simpler than this, try waiting on the tasks multiple times, like:
> 
> ```python
>     with futures.ThreadPoolExecutor(CONCURRENCY) as e:
>         # Map from task_id to future that is resolved when task have been created
>         task_id_to_future = {}
> 
>         def schedule_task(task_id):
>             task_def = taskgraph.tasks[task_id].task
>             # Wait for all dependencies to be scheduled
>             # We know task_id_to_future[dep] exists because we schedule in post-order
>             for dep in task_def.get('dependencies', []):
>                 task_id_to_future[dep].result()
>             create_task(session, task_id, taskid_to_label[task_id], task_def)
> 
>         # Schedule tasks in post-order, populating task_id_to_future[task_id]
>         # before we schedule anything depending on task_id
>         for task_id in taskgraph.graph.visit_postorder():
>             task_id_to_future[task_id] = e.submit(schedule_task, task_id)
> 
>         # Wait for all scheduled tasks to be done
>         try:
>             for future in task_id_to_future.values():
>                 # Notice that we might call future.result() more than once,
>                 # but this is perfectly okay, futures are only ever resolved once. 
>                 future.result()
>         finally:
>             # Cancel everything that is still running, in case of exceptions
>             for future in task_id_to_future.values():
>                 future.cancel()
> ```
> 
> This doesn't do the `duplicates` thing, nor checks if taskids are in `tasklist`.. But those seem like small issues to fix.

You are right, this won't work with threads :(

hmm, thinks a bit on this... but yeah, it won't be pretty.
Comment on attachment 8917415 [details]
Bug 1407452 - fix concurrent task creation to handle exceptions;

https://reviewboard.mozilla.org/r/188412/#review195000

Okay, I see now nice way to do this...

A different take on the same would be:
```python
with futures.ThreadPoolExecutor(CONCURRENCY) as e:
    # Track task_ids that are either pending or completed
    completed = set()
    pending = set(taskgraph.graph.visit_postorder())
    running = []  # list of futures currently in progress

    def schedule_task(task_def, task_id, label):
        create_task(session, task_id, label, task_def)
        return task_id

    def schedule_ready_tasks():
        for task_id in pending:
            task_def = taskgraph.tasks[task_id].task
            # Check if dependencies are completed
            if not set(task_def.get('dependencies', [])).issubset(completed):
                continue  # Skip this task, if dependencies are not completed
            # schedule task
            label = taskid_to_label[task_id]
            pending.remove(task_id)
            running.append(e.submit(schedule_task, task_def, task_id, label))
            # schedule duplicates
            dups = taskgraph.tasks[task_id].attributes.get('task_duplicates', 1)
            for i in range(1, dups):
                running.append(e.submit(schedule_task, task_def, slugid(), label))

    try:
        schedule_ready_tasks()
        while len(pending) + len(running) > 0:
            for future in futures.as_completed(running):
                # As each tasks is created, add it to futures, and schedule the next
                task_id = future.result()
                running.remove(future)
                completed.add(task_id)
                schedule_ready_tasks()
                # break to outer-while loop, so as_completed is called again
                # as we always wait for the next future to be done
                break
    finally:
        for future in running:
            future.cancel()
```

It's basically the same, but I avoid recursive calls to `schedule_tasks`.. I don't think it matters, the only motivation is that it could be slightly easier to follow..

I really wish there was a better of way of doing concurrency in python. IMO, condition variables are prettier than what we're doing here :)
Attachment #8917415 - Flags: review- → review+
I say ship this in either form :)
Flags: needinfo?(jopsen)
I'm going to land my version since it's already in hg and tested.  You're right -- it's equivalent to yours in comment 9, and neither is awesome.  Condition variables or Promises might improve things, but probably not much.
Pushed by dmitchell@mozilla.com:
https://hg.mozilla.org/integration/autoland/rev/8fe459c1eaa7
fix concurrent task creation to handle exceptions; r=jonasfj
https://hg.mozilla.org/mozilla-central/rev/8fe459c1eaa7
Status: NEW → RESOLVED
Closed: 7 years ago
Resolution: --- → FIXED
Target Milestone: --- → mozilla58
Product: TaskCluster → Firefox Build System
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: