Closed Bug 1107597 Opened 5 years ago Closed 3 years ago

scheduler: Prototype big-graph scheduler


(Taskcluster Graveyard :: Scheduler, defect)

Not set


(Not tracked)



(Reporter: jonasfj, Assigned: jonasfj)



We should prototype a scheduler without graph boundaries. Basically we should just have one big task-graph.

It sounds bad... it's not it's very simple...
Basically this scheduler keeps a map from taskId to dependencies and requirements.
Hence, an azure table that looks like this:
  - taskId        (taskId of the task)
  - requires      (List of taskIds that must be completed successfully before this task is scheduled)
  - waitFor       (List of taskIds that must be resolved before this task is scheduled)
  - requiresLeft  (taskIds from requiresLeft not yet completed successfully)
  - waitForLeft   (taskIds from waitFor not yet resolved)
  - dependents    (List of taskIds that depends on this task)
So, jlal and I have discussing the option of moving the scheduler into the queue. For added
performance and request cost overhead. It makes sense because most tasks requires scheduling
and the concept is fairly robust now.
Random notes:

### Concept:
  A task T0 depends on a set of tasks (T1, T2, T3...) to reach a certain state (completed).

I still suggest that we do the initial prototype as an external component.
This component just listens to the task-defined and task-completed exchanges and schedules
tasks according to dependencies defined in task.extra.dependents.

The problem with graph scheduling is that jlal have unreasonably scalability requirements :)
Okay, maybe his requirements are only non-trivial, and my problems largely hypothetical.
Anyways, I could sleep night, so started drawing and estimating complexities of various solutions
to the issue. I think I found the perfect one. Granted it is lovely crazy. Anyways, I'm writing down
here so we don't forget. I think this pattern may be generally useful.

### Implementation Strategy A: The Simple Approach
To implement the concept we maintain for T0 list of tasks that have yet to be completed:
  T0.waitFor = [T1, T2, T3]
And for each task T1, T2, T3, we maintain a list of tasks that must be updated when Tx is completed.
  T1.dependents = [T0, ...] ("...", because other tasks may depend on T1 too).
  T2.dependents = [T0, ...]
  T3.dependents = [T0, ...]
When the task T1 is completed, we load T1.dependents and for each entry Tx we remove T1
from Tx.waitFor, if Tx.waitFor is empty we call queue.scheduleTask(Tx).

Hence, when both T1, T2, and T3 have been completed, T0.waitFor will be empty and T0 will have been
scheduled by queue.scheduleTask(T0).

  task-defined:       O(N) ,   where N = |T.waitFor| (we need to update `dependents` for each)
  task-completed:     O(N) ,   where N = |T.dependents| (we need to notify each)
  concurrent writes:  N    ,   in both cases

### The Problem with A
Imagine a task T that depends on N tasks, imagine that these N tasks completes at the same time.
We now have N writers racing to update `T.waitFor`. In `base.Entity.modify` we retry edits if they
conflict with optimistic concurrency. But if N is greater than the number of retries allows, this
will fail. Obvious 100 retries is not reasonable.

The same problem occurs when we create N tasks that all depends on the same task T. We have N writers
racing to update `T.dependents`. Again problems for N > allowed retries.
Default retries is 10, which seems fair, considering the likelihood of this conflict with 10
writers is small.

However, if N is 1k or 10k, the likelihood of conflicts involving more than 10 tasks is going to grow.
Besides we don't want scheduling to only work to be 99.9....9% of the time. Because the math and
assessment of what error rate is acceptable is hard :)
Also we already accept and error rate from S3, azure storage doesn't define one, but I'm sure they
have errors too.

### Implementation Strategy B: Read-only Operations
This is a simple variation of strategy (A). Instead of removing from T.waitFor when one of
the dependencies is completed, we load the state of each entry ion T.waitFor and check if
they are all completed. If so we schedule T.

  task-defined:       O(N)   ,   where N = |T.waitFor|
  task-completed:     O(N^2) ,   where N = max {|T.dependents|, |Tx.waitFor| : Tx in T.dependents}
  concurrent writes:  N      ,   for task-defined only, not task-completed.

This partly solves the problem. But adds a lot of operations.

### Implementation Strategy C: Intermediary Nodes with Max `k` Entries
This is similar to (A), but instead of building a long list in T.waitFor, we split the list
and create intermediary nodes. Hence, if T has |T.waitFor| > k, we create I.waitFor = T.waitFor
and set T.waitFor = I.
This way, for any node T or I there will be no more than k entries in the waitFor list.
Similar trick can be done with the dependents list, if T.dependents has more than k entries you
don't modify T, but tries to load an intermediary node I1 with the id `T+1`, if it exists append
if it has less than k entries, otherwise go one to I2.
(There are also other ways similar things could be done with logarithmic complexity).

  task-defined:       O(N)   ,   where N = max {|T.waitFor|, |Tx.dependents| : T in T.waitFor}
  task-completed:     O(N)
  concurrent writes:  k      ,   for both cases.

### Implementation Strategy D: Table Partitions as Sets
In this approach we need two auxiliary tables:
  WaitFor(partitionKey, rowKey)
  Dependents(partitionKey, rowKey)

When a task T with waitFor = [T1, T2, T3] is defined, we create the following entries:
  WaitFor.create(T, T1)
  WaitFor.create(T, T2)
  WaitFor.create(T, T3)
  Dependents.create(T1, T)
  Dependents.create(T2, T)
  Dependents.create(T3, T)
  For consistency we should also load T1 through T3 and validate that they didn't complete while
  we were creating these entries. If so we just ensure that the related waitFor is removed, etc. 
When a task T1 is completed we do the following:
  for each T in Dependents.listPartition(T1):
    WaitFor.delete(T, T1, ignoreIfNotExists = true)
    if WaitFor.listPartition(T, limit = 1) == []:

Note, that listPartition is paging and but we can handle entries 1000 a the time.
Basically, we use each partition as a set and each entity in a partition is a entry in that set.
We can check of emptyness of the set by listing the partition with a limit of 1 entry.
We can iterate over the entries set too, by listing entries in the partition.
And we can delete, by just deleting an element.

  - There is no need to load a entity before we can remove something from a set.
    If we want to remove T1 from the waitFor set for T we just delete the entry, because
    we have both partitionKey and rowKey.
  - Similarly there is no need to load anything in-order to append to a set.

  task-defined:       O(N) ,   where N = |T.waitFor|
  task-completed:     O(N) ,   where N = |T.dependents|
  concurrent writes:  0    ,   in both cases

Scalability limits is that you cannot create more than 2k tasks that depends on the same task
at the same time. That is about it. I might have missed something. I might be crazy.

Might need additional analysis. I suspect not, in fact this might be cheaper than (A) because we don't
have to load before we remove from the set. Granted there will be more entries to delete. But they can
possibly all be deleted when task.deadline has expired.
Also note, that when creating the WaitFor we can use transactions and do upto 100 in a single
operation. We just need to add a createBatch method to base.Entity. Similar might also be possible
when deleting to clean up from tasks that didn't get scheduled.


I suspect implementation strategy D is the way to go. Scalability of this is awesome.
And cost seems to be sane. All we need to do is clean-up nicely.
I've prototyped a big-graph scheduler using implementation strategy D here:

At this point it can only be used for waiting for tasks to be completed.
I think it's critically important that we have more expressiveness. Sometimes it's desired just to wait
for tasks to be resolved (completed, failed or exception), for bi-section this is very useful.
Other times you may wish to schedule if a task is resolved as failed.

There is so many options, we could simulate anything if we just schedule tasks after dependencies are
resolved, however, they are resolved. But as depending on them to be _completed_ is very common, we
should probably support this explicitly.

Hence, I suggest two properties:
  waitForCompleted  = list of tasks that must be completed, before the task is scheduled
  waitForResolved   = list of tasks that must be resolved, before the task is scheduled

The names here are strictly illustrative. The biggest problem with implementing this behaviour is
to figure out how we can name these properties.

I've considered words such as: required, dependencies, dependents, waitFor, await, resolved, completed
And/or combinations thereof, someone please help be find two good name.

It could be on the form:
Or the `scheduling` level could be left out.
Assignee: nobody → jopsen
Deployed prototype, try defining tasks using the pattern described here:

Note, we have no inspectors that works on a taskGroupId, if we add that this could replace task-graph-scheduler.
For interested parties the following script will use env vars and submit tasks:
### Note from IRC discussion with jlal:

We should require that all parent tasks are defined, before we allow a child task to be defined.
This has two purposes:
 1. It prevents someone else from declaring your parent task before you (potential security issue)
 2. It ensures that the task-graph is acyclic

It would probably be preferable to integrate this into the queue. So that we can reject queue.defineTask
if the parent/required/waitFor tasks aren't defined yet.

### Note, from another discussion with jlal, catlee and rail
If taskIds listed in waitFor had a name, ie. task.extra.waitFor: [{name: "BUILD_TASK", taskId: ...}]
Then something like docker-worker could inject BUILD_TASK_TASK_ID in to the task. And if task
artifacts were given a name like {path: "...", type: 'file', name: 'FIREFOX_BINARY'}, those could
be combined to inject BUILD_TASK_FIREFOX_BINARY.
This might also be relevant in the read-only cache discussion.
(I have no solid idea about how or whether to do this yet).
Component: TaskCluster → Scheduler
Product: Testing → Taskcluster
Blocks: 1108154
Jonasfj: is this the prototype you wanted me to look at?


The current prototype is here:

There is no API for adding/remove dependencies at this point.

The prototype was implemented in bug 1107597:
Please, play with it and feel free to add comments... Does it work they
way we want it to, etc...

I just outline the design for the new scheduler in bug 1178516:
This is possibly a new iteration of the prototype, or a rewrite the
prototype is pretty simple.

Please, do feel free to comment on the design and features planned.
Doing this scheduler is likely going to be a ... stretch goal. I hope to
get it done, but there is a high priority stuff going on first.

Anyways, I strongly encourage you to play with the prototype.
Self-dependencies can be circumvented by calling scheduleTask in the
task-inspector, however, if you do this before the other dependent tasks
are resolved you could be running a test before the build is done.
Closed: 3 years ago
Resolution: --- → FIXED
Product: Taskcluster → Taskcluster Graveyard
You need to log in before you can comment on or make changes to this bug.