Open Bug 1068234 Opened 10 years ago Updated 2 years ago

Add a queue module to be able to handle multiple async queues without jank

Categories

(Chat Core :: General, defect)

defect

Tracking

(Not tracked)

People

(Reporter: aleth, Unassigned)

References

Details

Attachments

(1 file, 2 obsolete files)

Attached patch process.diff (obsolete) — Splinter Review
This deduplicates the various async queues we have in the current code to avoid jank while processing data. Apart from the duplication, the problem with all these separate queues is that there is still noticeable jank when more than one queue is active at the same time (e.g. the socket one and the display one during LIST). This patch avoids that problem. With it, there doesn't seem to be any jank even while running many queues at once (e.g multiple simultaneous /list while also switching between long logs in the log viewer).

Something like bug 930517 would/will help to make this better as it could take account of any other async stuff that is going on (e.g. async file I/O) too.
Attachment #8490318 - Flags: feedback?(florian)
Attachment #8490318 - Flags: feedback?(clokep)
Depends on: 1060891
Comment on attachment 8490318 [details] [diff] [review]
process.diff

Review of attachment 8490318 [details] [diff] [review]:
-----------------------------------------------------------------

This is pretty awesome! It's complicated enough that I'd really appreciate more comments (or maybe I'm just not as well versed in Promises as I thought I was...). Additionally, I'd like to see tests for this before landing.

::: chat/modules/Queue.jsm
@@ +12,5 @@
> + *
> + * Usage:
> + *   new Queue(aHandler)
> + *     @aHandler: A function that handles a single item of data.
> + *                It should be expected take much less than 15ms to execute.

I think you forgot a word in here.

@@ +19,5 @@
> + *                If the handler returns true, the queue is aborted.
> + *     @aUpdater: An optional function that is called between each handled
> + *                batch of items (at the beginning of the next batch).
> + *     @aFinally: Optional function called whenever all available data has
> + *                been processed.

Does this mean the queue is empty or the queue is being destroyed or some other condition?

@@ +33,5 @@
> + *             memory as early as possible.
> + *             nsISimpleEnumerators and generator functions are both allowed.
> + *   Queue.abort() Stop handling existing data and ignore any future add()s.
> + *
> + *   Queue.isActive: Boolean, true if any data is currently in the queue.

"isActive" to me sounds like "is currently processing data".

@@ +44,5 @@
> +// Symbol.iterator is implemented.
> +// This is known as @@iterator in the ES6 spec.  Until it is bound to
> +// some well-known name, find the @@iterator object by expecting it as
> +// the first property accessed on a for-of iterable.
> +const iteratorSymbol = (function() {

I'm ignoring this code.

@@ +61,5 @@
> +  this.updater = aUpdater;
> +  this.finally = aFinally;
> +}
> +Queue.prototype = {
> +  add: function(aData) {

Doesn't this need to check cancelled and throw an exception if it's called?

@@ +100,5 @@
> +      }
> +    }
> +
> +    if (!this.data) {
> +      this.data = [];

Define this.data in the constructor and check whether this.promise exists or not.

@@ +101,5 @@
> +    }
> +
> +    if (!this.data) {
> +      this.data = [];
> +      this.promise = new Promise((resolve) => { this.resolve = resolve; });

What is "resolve" in this line? Is it a function? A boolean?

@@ +107,5 @@
> +    }
> +    this.data.push(item);
> +    return this.promise;
> +  },
> +  abort: function() {

Does this need to clear data out or anything of that sort? Cancel this.promise?

@@ +124,5 @@
> +  }
> +};
> +
> +var queues = new Set();
> +function startQueue(aQueue) {

Would this be simpler as an singleton object? (You could spawn the task in the constructor and then just have an "add" method that adds to the set.)

@@ +127,5 @@
> +var queues = new Set();
> +function startQueue(aQueue) {
> +  if (!queues.size) {
> +    Task.spawn(function*() {
> +      yield Promise.resolve();

Can you add some comments about what these lines / sections are doing. Promises can be a bit confusing, so more comments is better. (I.e. this is returning immediately to allow the even loop to continue, right?)

@@ +130,5 @@
> +    Task.spawn(function*() {
> +      yield Promise.resolve();
> +      let t = Date.now();
> +      while (queues.size) {
> +        for (let queue of queues) {

I assume the double loop is here to actually allow multiple iterations of queues during a single call to startQueue? This needs a comment explaining it.

@@ +134,5 @@
> +        for (let queue of queues) {
> +          if (!queue.data.length || queue.cancelled) {
> +            queue._done();
> +            queues.delete(queue);
> +            continue;

So when a queue is completed it gets deleted? You can't keep empty queues around?

@@ +137,5 @@
> +            queues.delete(queue);
> +            continue;
> +          }
> +
> +          let item = queue.data[0];

I understand what you're doing here, but I think it needs to made clear that adding an iterable (e.g. an array) does not guarantee those items will be processed as a batch. (In fact, they likely won't be!)

@@ +170,5 @@
> +          // Unblock every 15ms.
> +          if (Date.now() > t + 14) {
> +            yield Promise.resolve();
> +            t = Date.now();
> +            queues.forEach((q) => { if (q.updater) q.updater(); });

It looks like you could theoretically end up calling updater multiple times without actually processing any data. Is this OK?

::: chat/modules/socket.jsm
@@ +465,2 @@
>      // The stop request will be handled when the queue is next empty.
> +    this._queue.add([]).then(() => { this._handleStopRequest(aStatus); });

Wouldn't this._queue.promise.then be simpler? Or is that not identical?

::: im/components/ibConvStatsService.js
@@ +6,5 @@
>  Cu.import("resource:///modules/imXPCOMUtils.jsm");
>  Cu.import("resource:///modules/imServices.jsm");
>  Cu.import("resource://gre/modules/Task.jsm")
>  Cu.import("resource://gre/modules/osfile.jsm");
> +Cu.import("resource:///modules/Queue.jsm");

I ignored this file.
Attachment #8490318 - Flags: feedback?(clokep) → feedback+
(In reply to Patrick Cloke [:clokep] from comment #1)

Thanks for looking at this!

> I was...). Additionally, I'd like to see tests for this before landing.

Yes, this would definitely need tests :)

> > + *     @aFinally: Optional function called whenever all available data has
> > + *                been processed.
> 
> Does this mean the queue is empty or the queue is being destroyed or some
> other condition?

Whenever the queue is empty. 

> @@ +33,5 @@
> > + *   Queue.isActive: Boolean, true if any data is currently in the queue.
> 
> "isActive" to me sounds like "is currently processing data".

Yes, that's intentional - it's the same thing.

> @@ +44,5 @@
> > +// Symbol.iterator is implemented.
> > +// This is known as @@iterator in the ES6 spec.  Until it is bound to
> > +// some well-known name, find the @@iterator object by expecting it as
> > +// the first property accessed on a for-of iterable.
> > +const iteratorSymbol = (function() {
> 
> I'm ignoring this code.

Some of that iterator stuff was really just me playing around to understand the different ways iterables can arise...

> @@ +61,5 @@
> > +  this.updater = aUpdater;
> > +  this.finally = aFinally;
> > +}
> > +Queue.prototype = {
> > +  add: function(aData) {
> 
> Doesn't this need to check cancelled and throw an exception if it's called?

It may be a good idea to check cancelled, thanks. It shouldn't throw an exception though, the whole point of being able to abort a queue is that if some other async code still calls add() later you don't have to worry about it.

> @@ +101,5 @@
> > +    }
> > +
> > +    if (!this.data) {
> > +      this.data = [];
> > +      this.promise = new Promise((resolve) => { this.resolve = resolve; });
> 
> What is "resolve" in this line? Is it a function? A boolean?

A function.

> > +  abort: function() {
> 
> Does this need to clear data out or anything of that sort? Cancel
> this.promise?

No, that's all done in _done().

> > +var queues = new Set();
> > +function startQueue(aQueue) {
> 
> Would this be simpler as an singleton object? (You could spawn the task in
> the constructor and then just have an "add" method that adds to the set.)

I don't understand how this would work - the Task generally isn't just spawned once.

> @@ +127,5 @@
> > +var queues = new Set();
> > +function startQueue(aQueue) {
> > +  if (!queues.size) {
> > +    Task.spawn(function*() {
> > +      yield Promise.resolve();
> 
> Can you add some comments about what these lines / sections are doing.
> Promises can be a bit confusing, so more comments is better. (I.e. this is
> returning immediately to allow the even loop to continue, right?)

Yes (more precisely, in this case it yields so that none of the code in the function* is run synchronically).
 
> > +      while (queues.size) {
> > +        for (let queue of queues) {
> 
> I assume the double loop is here to actually allow multiple iterations of
> queues during a single call to startQueue? This needs a comment explaining
> it.

The loop is to ensure that if there are multiple queues, we take turns handling their data. There can be multiple calls to startQueue, but there is only ever one Task.

> > +          if (!queue.data.length || queue.cancelled) {
> > +            queue._done();
> > +            queues.delete(queue);
> > +            continue;
> 
> So when a queue is completed it gets deleted? You can't keep empty queues
> around?

Sure, you can keep empty Queues around, and then add more data later. But the Task doesn't need to process them any more, and we certainly don't want the module to retain references to inactive Queues.

> > +          let item = queue.data[0];
> 
> I understand what you're doing here, but I think it needs to made clear that
> adding an iterable (e.g. an array) does not guarantee those items will be
> processed as a batch. (In fact, they likely won't be!)

In fact, it would be pretty bad if they were ;)

> @@ +170,5 @@
> > +          // Unblock every 15ms.
> > +          if (Date.now() > t + 14) {
> > +            yield Promise.resolve();
> > +            t = Date.now();
> > +            queues.forEach((q) => { if (q.updater) q.updater(); });
> 
> It looks like you could theoretically end up calling updater multiple times
> without actually processing any data. Is this OK?

I think it's OK, but it's a good point. It may be better to keep track of which queues have actually had data handled in that batch.
 
> ::: chat/modules/socket.jsm
> @@ +465,2 @@
> >      // The stop request will be handled when the queue is next empty.
> > +    this._queue.add([]).then(() => { this._handleStopRequest(aStatus); });
> 
> Wouldn't this._queue.promise.then be simpler? Or is that not identical?

It wouldn't work if the queue had never had any data added to it.
Attached patch process.diff 2 (obsolete) — Splinter Review
Changed the API slightly to avoid the need for the empty add() in this._queue.add([]).then. Also calls the update function only for those queues which have actually had data handled in the current batch.

I'd like to be certain about the form of the API before adding proper tests (I do have some in a WIP).
Attachment #8490318 - Attachment is obsolete: true
Attachment #8490318 - Flags: feedback?(florian)
Attachment #8498105 - Flags: feedback?(florian)
Attachment #8498105 - Flags: feedback?(clokep)
Attachment #8498105 - Flags: feedback?(dteller)
Attached patch process.diff 3Splinter Review
Attachment #8498105 - Attachment is obsolete: true
Attachment #8498105 - Flags: feedback?(florian)
Attachment #8498105 - Flags: feedback?(dteller)
Attachment #8498105 - Flags: feedback?(clokep)
Attachment #8498120 - Flags: feedback?(florian)
Attachment #8498120 - Flags: feedback?(dteller)
Attachment #8498120 - Flags: feedback?(clokep)
Comment on attachment 8498120 [details] [diff] [review]
process.diff 3

Review of attachment 8498120 [details] [diff] [review]:
-----------------------------------------------------------------

Queue.jsm looks useful – see below for detailed feedback.
I wonder if you have already discussed alternatives that may have simpler APIs.

For instance, I suspect that we could patch Task.jsm to support something along the lines of:
  yield Scheduler.schedule();
where `Scheduler.schedule()` checks how much time we have spent in the current tick and may decide to either return immediately or delays by ~15ms.

::: chat/modules/Queue.jsm
@@ +1,4 @@
> +/* This Source Code Form is subject to the terms of the Mozilla Public
> + * License, v. 2.0. If a copy of the MPL was not distributed with this
> + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
> +

Nit: "use strict";

@@ +4,5 @@
> +
> +const EXPORTED_SYMBOLS = ["Queue"];
> +
> +/*
> + * Asynchronically process a queue of sequential data.

What's a queue of _data_?

@@ +6,5 @@
> +
> +/*
> + * Asynchronically process a queue of sequential data.
> + * If there are multiple queues, they run alternately in rotation,
> + * and therefore without causing any jank.

Nit: Unless you're doing multi-threading, that is a promise you can't keep. You should probably reformulate this sentence.

@@ +9,5 @@
> + * If there are multiple queues, they run alternately in rotation,
> + * and therefore without causing any jank.
> + * It is possible to continually add data to an existing queue.
> + *
> + * Usage:

Matter of taste, but I personally prefer when the documentation is with the methods, fields and constructor, instead of gathered on top.

@@ +10,5 @@
> + * and therefore without causing any jank.
> + * It is possible to continually add data to an existing queue.
> + *
> + * Usage:
> + *   new Queue(aHandler, aUpdater, aFinally)

Rather than three arguments, I would suggest passing an object `{onHandled: ..., onBatchComplete: ..., onEmpty: ...}`, to avoid confusions.

@@ +14,5 @@
> + *   new Queue(aHandler, aUpdater, aFinally)
> + *     @aHandler: A function that handles a single item of data.
> + *                It should ideally take much less than 15ms to execute.
> + *                Any errors during execution are reported to the console
> + *                but not passed on.

Errors should probably be converted into promise rejections, to also cause test failures.

@@ +17,5 @@
> + *                Any errors during execution are reported to the console
> + *                but not passed on.
> + *                If the handler returns true, the queue is aborted.
> + *     @aUpdater: An optional function that is called between each handled
> + *                batch of items (at the beginning of the next batch).

Nit: define "batch"

@@ +19,5 @@
> + *                If the handler returns true, the queue is aborted.
> + *     @aUpdater: An optional function that is called between each handled
> + *                batch of items (at the beginning of the next batch).
> + *     @aFinally: Optional function called whenever all available data has
> + *                been processed.

Nit: aOnEmpty, maybe?

@@ +21,5 @@
> + *                batch of items (at the beginning of the next batch).
> + *     @aFinally: Optional function called whenever all available data has
> + *                been processed.
> + *
> + *   Queue.isActive: Boolean, true if any data is currently being processed.

I read this as `true` if we are currently executing `aHandler`. I assume that's not what you mean.

@@ +54,5 @@
> +  catch(name) {
> +    return name;
> +  }
> +  throw new TypeError;
> +})();

So why do you need this, exactly?

@@ +56,5 @@
> +  }
> +  throw new TypeError;
> +})();
> +
> +function Queue(aHandler, aUpdater, aFinally) {

You could add default arguments for `aUpdater` and `aFinally`.

@@ +59,5 @@
> +
> +function Queue(aHandler, aUpdater, aFinally) {
> +  this.handler = aHandler;
> +  this.updater = aUpdater;
> +  this.finally = aFinally;

I suspect that you want to mark these as private (i.e. prefix with "_").
Also, you should define the fields that you are going to add later.

@@ +63,5 @@
> +  this.finally = aFinally;
> +}
> +Queue.prototype = {
> +  promise: Promise.resolve(true),
> +  add: function(aData) {

Do you really need to handle (and sniff) every single kind of iterable on Earth?
For a first version, I would suggest accepting only the bare minimum of types of arguments, to keep the code simple and open for evolution in possibly unexpected directions.

@@ +68,5 @@
> +    // Creates an object implementing the iterator protocol when appropriate.
> +    let item;
> +    if (Array.isArray(aData)) {
> +      // We use our own destructive iterator to release memory as early as
> +      // possible.

Nit: I'm not a big fan of modifying arguments. I would perform a shallow copy of the array and destruct only the shallow copy.

@@ +81,5 @@
> +    }
> +    else if (typeof aData == "function") {
> +      // Generator function.
> +      item = aData();
> +      if (item.toString() != "[object Generator]")

Nit: Our style guide requires curly braces around the statement

@@ +103,5 @@
> +    }
> +
> +    if (!this.data) {
> +      this.data = [];
> +      this.promise = new Promise((resolve) => { this.resolve = resolve; });

Please don't add fields dynamically. Also, I suspect that this should be `this._resolve`.

@@ +110,5 @@
> +    this.data.push(item);
> +    return this.promise;
> +  },
> +  abort: function() {
> +    this.cancelled = true;

Please don't add fields dynamically.

@@ +114,5 @@
> +    this.cancelled = true;
> +    return this.promise;
> +  },
> +  get isActive() !!this.data,
> +  _done: function() {

Nit: Doc? Also, generally, methods should be verbs.

@@ +116,5 @@
> +  },
> +  get isActive() !!this.data,
> +  _done: function() {
> +    let completed = !this.data.length;
> +    if (this.finally)

{...}

@@ +121,5 @@
> +      this.finally(completed);
> +    this.resolve(completed);
> +    delete this.promise;
> +    delete this.resolve;
> +    delete this.data;

Please don't `delete` fields.

@@ +127,5 @@
> +};
> +
> +var queues = new Set();
> +function startQueue(aQueue) {
> +  if (!queues.size) {

If I read this correctly, this should rather be

queues.add(aQueue);
if (queues.size > 1) {
  // Already initialized
  return;
}

@@ +144,5 @@
> +          }
> +
> +          let item = queue.data[0];
> +          if (item.next) {
> +            // Iterator.

That would be `if ("next" in item)`, to keep strict mode happy (and possibly for the sake of speed).

@@ +150,5 @@
> +              item = item.next();
> +            }
> +            catch(e) {
> +              // Ignore broken iterators.
> +              Cu.reportError(e);

I would suggest `Promise.reject(e)`, which will provide better error reporting and fail tests.

@@ +157,5 @@
> +            if (item.done) {
> +              queue.data.shift();
> +              continue;
> +            }
> +            item = item.value;

All these different possible types for `item` make reading a bit difficult. I would suggest making an iterator for the contents of `data` and do something along the lines of

let {done, value} = queue._data.next();
if (done) {
  continue;
}
// use `value`

@@ +173,5 @@
> +            Cu.reportError(e);
> +          }
> +
> +          // Unblock every 15ms.
> +          if (Date.now() > t + 14) {

Please make this 14 a named constant. Also, given that this is the key part of this entire module, this should be emphasized somehow.

@@ +176,5 @@
> +          // Unblock every 15ms.
> +          if (Date.now() > t + 14) {
> +            yield Promise.resolve();
> +            t = Date.now();
> +            batch.forEach((q) => { if (q.updater) q.updater(); });

Wait, what? Since when does `WeakSet` have a `forEach` method? Also, that doesn't seem to match the documentation of `updater`.
Attachment #8498120 - Flags: feedback?(dteller) → feedback+
Attachment #8498120 - Flags: feedback?(florian)
Attachment #8498120 - Flags: feedback?(clokep)
See my questions at the start of comment 5.
Flags: needinfo?(aleth)
(In reply to David Rajchenbach-Teller [:Yoric] (use "needinfo") from comment #5)
> Queue.jsm looks useful – see below for detailed feedback.
> I wonder if you have already discussed alternatives that may have simpler
> APIs.
> 
> For instance, I suspect that we could patch Task.jsm to support something
> along the lines of:
>   yield Scheduler.schedule();
> where `Scheduler.schedule()` checks how much time we have spent in the
> current tick and may decide to either return immediately or delays by ~15ms.

It would be easy to add a method that allows a Task to ensure it doesn't run for more than 15ms before yielding. The problem is that when we yield from a Task, we don't know what will happen next. We may handle UI events, but we may also end up in the next Task. So if each queue is implemented as its own Task, I don't see a way to avoid multiple queues piling up, potentially causing jank. That's why the WIP only had a single Task that internally rotated between queues. Of course there can then still be jank from collisions with other stuff going on on the main thread, but it does limit the problem.

But maybe I misunderstood something?

> Do you really need to handle (and sniff) every single kind of iterable on
> Earth?

Sorry about that - I did a tour of iterables to learn how many types there actually are and how they differ (it's unfortunate they don't all support the newer JS iterable API when accessed from JS). It's certainly not needed to keep them all in a final patch.

> @@ +121,5 @@
> > +      this.finally(completed);
> > +    this.resolve(completed);
> > +    delete this.promise;
> > +    delete this.resolve;
> > +    delete this.data;
> 
> Please don't `delete` fields.

Does this only apply to fields that are not present in the prototype (generally not a good idea anyway) or is there a performance/style aspect to this I am missing?
Flags: needinfo?(aleth)
(In reply to aleth [:aleth] from comment #7)
> > For instance, I suspect that we could patch Task.jsm to support something
> > along the lines of:
> >   yield Scheduler.schedule();
> > where `Scheduler.schedule()` checks how much time we have spent in the
> > current tick and may decide to either return immediately or delays by ~15ms.
> 
> It would be easy to add a method that allows a Task to ensure it doesn't run
> for more than 15ms before yielding. The problem is that when we yield from a
> Task, we don't know what will happen next. We may handle UI events, but we
> may also end up in the next Task. So if each queue is implemented as its own
> Task, I don't see a way to avoid multiple queues piling up, potentially
> causing jank. That's why the WIP only had a single Task that internally
> rotated between queues. Of course there can then still be jank from
> collisions with other stuff going on on the main thread, but it does limit
> the problem.
> 
> But maybe I misunderstood something?

My idea was something along the lines of:

Scheduler.schedule = function() {
  if (nsIScheduler.getTimeSinceWeEnteredTheEventLoop() < 15 /*ms*/) {// FIXME: Implement nsIScheduler
    return Task.DO_NOT_DEFER; // FIXME: Implement Task.DO_NOT_DEFER
  }
  return Promise.resolve(); // This causes execution to be deferred to the next tick.
}

Now, I'm not sure how we can implement nsIScheduler, but this doesn't sound too hard.

> > Please don't `delete` fields.
> 
> Does this only apply to fields that are not present in the prototype
> (generally not a good idea anyway) or is there a performance/style aspect to
> this I am missing?

Adding/removing fields in an object changes its Shape and Type, both of which hurt the JIT compiler. Each Shape and Type that `foo` may take takes a bit of memory and adds respectively a `if` case or an indirection in the generated code whenever the JIT compiler compiles a call `foo.bar`.
(In reply to David Rajchenbach-Teller [:Yoric] (use "needinfo") from comment #8)
> My idea was something along the lines of:
> 
> Scheduler.schedule = function() {
>   if (nsIScheduler.getTimeSinceWeEnteredTheEventLoop() < 15 /*ms*/) {//
> FIXME: Implement nsIScheduler
>     return Task.DO_NOT_DEFER; // FIXME: Implement Task.DO_NOT_DEFER
>   }
>   return Promise.resolve(); // This causes execution to be deferred to the
> next tick.
> }
> 
> Now, I'm not sure how we can implement nsIScheduler, but this doesn't sound
> too hard.

Ah OK, you're proposing we fix bug 930517 and bug 930520 that we discussed at last year's summit first, before tackling what's left of this bug ;) I had assumed that (like the WIP here itself) your suggestion was intended to work around their absence...

> Adding/removing fields in an object changes its Shape and Type, both of
> which hurt the JIT compiler. Each Shape and Type that `foo` may take takes a
> bit of memory and adds respectively a `if` case or an indirection in the
> generated code whenever the JIT compiler compiles a call `foo.bar`.

Thanks, that's good to know.
As far as we know, Aleth is no longer working on this. Clokep, Florian, can anybody take over?
Assignee: aleth → nobody
Status: ASSIGNED → NEW
Severity: normal → S3
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Creator:
Created:
Updated:
Size: