Closed Bug 1074747 Opened 10 years ago Closed 8 years ago

Async Generators

Categories

(Toolkit :: Async Tooling, defect)

defect
Not set
normal

Tracking

()

RESOLVED WONTFIX

People

(Reporter: Yoric, Unassigned)

References

Details

Attachments

(1 file, 2 obsolete files)

Promise (and Task.jsm) work nicely to handle the case of a function that returns a single result asynchronously, but they are not very useful for functions that produce many results asynchronously, such as database requests.

It would be useful if we had conventions and support in Task.jsm to handle such cases.
Blocks: 856878
Attached patch Async Streams (obsolete) — Splinter Review
Assignee: nobody → dteller
Attachment #8497449 - Flags: feedback?(ttaubert)
Attachment #8497449 - Flags: feedback?(paolo.mozmail)
Comment on attachment 8497449 [details] [diff] [review]
Async Streams

Review of attachment 8497449 [details] [diff] [review]:
-----------------------------------------------------------------

(Are the PromiseWorker changes related?)

I wonder whether we actually gain a lot by shipping a custom stream API, that would make it hard to use it with existing filter and map functions. For future compatibility I think it would be better to stick with the WhatWG proposal and write a utility like:

Task.consume = Task.async(function* (stream, cb) {
  function* readChunk() {
    let value;
    while (value = stream.read()) {
      yield value;
    }
  }

  while (isStreamAlive(stream)) {
    yield stream.wait();
    yield* cb(readChunk());    
  }
});

And then you could so something like:

Task.consume(db.query(), function* (rows) {
  for (let row of rows) {
    console.log("step", row);
  }

  // We could block here if we want to wait with processing
  // the next chunk until after something is done.
  yield Promise.resolve();
}).then(() => {
  console.log("done");
}, err => {
  console.log("error", err);
});

::: toolkit/components/promiseworker/PromiseWorker.jsm
@@ +262,5 @@
>        if (args) {
>          args = yield Promise.resolve(Promise.all(args));
>        }
> +      if (transfers) {
> +        transfers = yield Promise.resolve(Promise.all(transfers));

The extra Promise.resolve() call seems unnecessary. What exactly are the changes here for? Are they related to the async streams?

::: toolkit/modules/asyncstream.jsm
@@ +9,5 @@
> + * Use as follows:
> + *
> + * for (let stream = yield someRequest();
> + *      stream != null;
> + *      stream = yield stream.next()) {

I'd rather have someRequest() return the stream immediately and start the loop with |let chunk = yield stream.next()|.

@@ +16,5 @@
> + *        // Do something
> + * }
> + *
> + * @param (any) value
> + *              The value at the current position in the stream.

This seems like the stream would only ever hold a single value. For the example of a db query returning rows this seems insufficient as multiple rows would represent the progress of the async operation.
Attachment #8497449 - Flags: feedback?(ttaubert)
Comment on attachment 8497449 [details] [diff] [review]
Async Streams

(In reply to Tim Taubert [:ttaubert] from comment #2)
> Task.consume(db.query(), function* (rows) {
>   for (let row of rows) {
>     console.log("step", row);
>   }
> 
>   // We could block here if we want to wait with processing
>   // the next chunk until after something is done.
>   yield Promise.resolve();
> })

I think we're confusing "implement streams" with the original goal of this bug, which I'd restate as "use an iteration statement rather than a callback to both request and consume data asynchronously".

In other words, no callbacks or nested tasks. This way, you can use "return" to exit from the outer task. When this or an exception happens, the iterator is automatically closed, stopping the data flow and freeing resources.

Closing the iterator should interrupt the data flow as well. You can't wait for this cleanup to occur asynchronously, however, which can be a problem in some cases. A workaround could be to wait on a spacial Promise if this is needed.

<start operation>
try {
  for (... of <async generator>) {
    <wait and then process asynchronously>;
    if (condition) {
      return;
    }
  }
} finally {
  yield <completion promise>;
}

If you don't need to wait, a simpler syntax should be used instead:

for (... of <start operation and get async generator>) {
  <wait and then process asynchronously>;
  if (condition) {
    return;
  }
}
Attachment #8497449 - Flags: feedback?(paolo.mozmail)
Attached patch Async StreamsSplinter Review
Sorry, got rid of the code qrefed by accident.
Attachment #8497449 - Attachment is obsolete: true
(In reply to Tim Taubert [:ttaubert] from comment #2)
> Comment on attachment 8497449 [details] [diff] [review]
> Async Streams
> 
> Review of attachment 8497449 [details] [diff] [review]:
> -----------------------------------------------------------------
> 
> (Are the PromiseWorker changes related?)

No, my bad.

> I wonder whether we actually gain a lot by shipping a custom stream API,
> that would make it hard to use it with existing filter and map functions.
> For future compatibility I think it would be better to stick with the WhatWG
> proposal and write a utility like:

[...]

I have the impression that what you are suggesting is actually orthogonal.
Also, I don't see how that interacts with `filter` and `map`.

> ::: toolkit/modules/asyncstream.jsm
> @@ +9,5 @@
> > + * Use as follows:
> > + *
> > + * for (let stream = yield someRequest();
> > + *      stream != null;
> > + *      stream = yield stream.next()) {
> 
> I'd rather have someRequest() return the stream immediately and start the
> loop with |let chunk = yield stream.next()|.

So you would prefer a stateful stream (each call to `next` consumes the current state and moves to the next one) to a stateless one (calls to `next` have no side-effect)?

This would certainly work, but according to my quick experiments, the resulting code seems slightly more complicated. What would we gain?

> @@ +16,5 @@
> > + *        // Do something
> > + * }
> > + *
> > + * @param (any) value
> > + *              The value at the current position in the stream.
> 
> This seems like the stream would only ever hold a single value. For the
> example of a db query returning rows this seems insufficient as multiple
> rows would represent the progress of the async operation.

Right now, I'm trying to replace callbacks that are called upon one value at a time. Now, if specific produces of Stream want `value` to be an array of all results accumulated to this point, that works, too.

(In reply to :Paolo Amadini from comment #3)
> In other words, no callbacks or nested tasks. This way, you can use "return"
> to exit from the outer task. When this or an exception happens, the iterator
> is automatically closed, stopping the data flow and freeing resources.

Actually, this goes beyond my current usecase, so I wasn't planning to work on that in this bug. Now, we can easily add a `close` operation (passed as argument to the constructor of `Stream`) and work out some utilities based on this. But generally, auto-closing stuff at predictable points is not something that JS does correctly, so I believe that this particular issue is beyond the scope of the current bug.
Not actively working on this.
Assignee: dteller → nobody
There is an async iteration proposal for JavaScript that, in conjunction with async/await should cover this, I think: https://github.com/tc39/proposal-async-iteration

Please reopen if you disagree.
Status: NEW → RESOLVED
Closed: 8 years ago
Resolution: --- → WONTFIX
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: