Note: There are a few cases of duplicates in user autocompletion which are being worked on.
Bug 1128959 (streams)

Implement the WHATWG Streams spec

NEW
Assigned to

Status

()

Core
DOM
3 years ago
2 days ago

People

(Reporter: Michael[tm] Smith, Assigned: baku)

Tracking

(Depends on: 3 bugs, Blocks: 4 bugs, {dev-doc-needed})

Firefox Tracking Flags

(Not tracked)

Details

(URL)

Attachments

(14 attachments, 32 obsolete attachments)

3.40 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
37.66 KB, patch
Details | Diff | Splinter Review
30.62 KB, patch
Details | Diff | Splinter Review
5.11 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
8.33 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
25.92 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
2.79 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
37.36 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
33.22 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
15.59 KB, patch
annevk
: review+
Details | Diff | Splinter Review
5.72 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
3.82 KB, patch
bkelly
: review+
Details | Diff | Splinter Review
37.62 KB, patch
Details | Diff | Splinter Review
14.25 KB, patch
Details | Diff | Splinter Review
(Reporter)

Description

3 years ago
https://streams.spec.whatwg.org/
Status: UNCONFIRMED → NEW
Ever confirmed: true

Comment 1

3 years ago
Please also study this while looking at this specification:

  https://github.com/yutakahirano/fetch-with-streams/

Google is pushing this so if we want something fundamentally different now would be the time to say something.
Blocks: 813450
Status: NEW → UNCONFIRMED
Ever confirmed: false

Updated

3 years ago
Status: UNCONFIRMED → NEW
Ever confirmed: true
(Reporter)

Updated

3 years ago
Component: DOM: Core & HTML → DOM
Does this allow receiving a ReadableStream, from for example a filesystem API or a network API, and then transferring that ReadableStream to a Worker thread using postMessage({ readThis: myStream }, [myStream]);

Likewise, can a page instantiate a ReadableStream (through a ReadableStream constructor or through some other means) that can then be transferred to a Worker thread?

This doesn't mean that all ReadableStreams have to be transferrable. For example a stream of plain JS objects can obviously not be transferred. So maybe we need a notion of transferrable and non-transferrable streams.


I also don't understand the purpose of having the "start"/"pull"/"cancel" interface which is used for the ReadableStream constructor. It seems like we're ending up with two writing interfaces, "start"/"pull"/"cancel" and WritableStream. Why do we need two? It seems silly if the only use-case is to enable having a standalone ReadableStream constructor. The pattern that's used for MessageChannel seems much better.


These are not new questions, I've asked these questions before so I assume that there are thought out answers.


I'd also like to understand if the intent of this API is intended to be a binary stream, or a generic object stream? I.e. is it appropriate to use for something like filesystem directory enumeration?

Finally, I'd really like to get some sort of feedback from TC39 on this. Especially if this API is intended to be used for things like directory enumeration since that likely means that we will eventually want ES syntax which uses this (like how "async function" will use Promise). Is TC39 prepared to build that on top of this Stream proposal, or are we going to end up with two incompatible solutions?
I'm sure Domenic would know best, but here's my read on this particular question.

(In reply to Jonas Sicking (:sicking) from comment #2)
> I also don't understand the purpose of having the "start"/"pull"/"cancel"
> interface which is used for the ReadableStream constructor. It seems like
> we're ending up with two writing interfaces, "start"/"pull"/"cancel" and
> WritableStream. Why do we need two? It seems silly if the only use-case is
> to enable having a standalone ReadableStream constructor. The pattern that's
> used for MessageChannel seems much better.

How would you implement a ReadableStream with a pull oriented source using WritableStream?  The "pull"/"cancel" feature seems reasonable and unique for that purpose.

The "start" feature is a bit redundant since the spec could also provide some kind of pipe class that provides both readable and writable.  Then a push source would just call write() on that class.  This is a but clumsy for extension, though, because now you are forcing objects to provide a WritableStream interface just to take data from a push source.  This means you can't easily build a duplex object where writing sends data out and reading pulls data in.

One could also envisage a hybrid source that pushes a certain amount of pre-buffered data at "start", and then waits for "pull" to retrieve the rest.  This would also not be easily achievable using solely WriteableStream.

Overall I like the ReadableStream's constructor "start".
Transferring does not seem addressed in the spec.  I opened an issue to ask the question:

  https://github.com/whatwg/streams/issues/276

Comment 5

3 years ago
> Transferring does not seem addressed in the spec.  I opened an issue to ask the question:

I'll respond to this over there.

> I also don't understand the purpose of having the "start"/"pull"/"cancel" interface which is used for the ReadableStream constructor.

I did some analysis on that a while back (along with some other issues): https://gist.github.com/domenic/197ae6fab8adc6890c7a#part-3-streams-as-two-sided-pipes

Basically, trying to move to this model makes it extremely awkward to author streams, as Ben hinted at. Please take that document with a grain of salt as it is old (so a few signatures are outdated) and cumulative (so section 3, which I linked to, assumes some of the stuff from sections 1 and 2, which also did not make it into the spec). But hopefully it explains why this path did not bear fruit. I can write out full examples of the awkwardness if you'd like, perhaps adapting all of these https://streams.spec.whatwg.org/#creating-examples into the shared-queue style.


> I'd also like to understand if the intent of this API is intended to be a binary stream, or a generic object stream? I.e. is it appropriate to use for something like filesystem directory enumeration?

It is intended to be an I/O stream, but in a broad sense. I/O often starts and ends with bytes, but in between you often have strings, JSON, video frames, protobufs, and other data structures. The interface is focused around mapping efficiently to low-level I/O primitives, but the chunks that pass through streams can be of any type.

I do not think this is the best primitive for filesystem directory enumeration, although it could do in a pinch. Let me explain. In general, given that we have values for sync/singular, iterators for sync/plural, and promises for async/singular, we'd like some shared abstraction for async/plural. Right now async iterator [1] or observable [2] are the leading candidates in TC39. A (readable) stream would be a specific subtype of the more general type, optimized for I/O. This is similar to how an array is a specific subtype of the more general iterable type, optimized for O(1) indexed access and memory locality. The parallels even extend to the development timeline, with array/stream being fleshed out first and then later retrofitted to be a subtype of iterable/async iterable once those were/are developed.

I think filesystem directory enumeration could use a readable stream, but I'm not sure the extra abilities it comes with (e.g. backpressure control, exclusive reader, piping through transform streams and to writable streams) would make very much sense for a directory listing. Maybe they would, especially backpressure---I'd have to learn more about the underlying OS APIs to understand that. But it does feel a bit like killing a fly with a bazooka.

[1]: https://github.com/zenparsing/async-iteration/
[2]: https://github.com/jhusain/asyncgenerator#introducing-observable

> Finally, I'd really like to get some sort of feedback from TC39 on this. Especially if this API is intended to be used for things like directory enumeration since that likely means that we will eventually want ES syntax which uses this (like how "async function" will use Promise). Is TC39 prepared to build that on top of this Stream proposal, or are we going to end up with two incompatible solutions?

This was brought up at TC39 and there was general agreement both on the fact that it should be a subtype of a more abstract solution (as above).
(In reply to Domenic Denicola from comment #5)
> > Transferring does not seem addressed in the spec.  I opened an issue to ask the question:
> 
> I'll respond to this over there.

FWIW, given that this targets IO streams, this seems like an extra important issue to solve since many times it's beneficial to do IO and parsing on background threads.

> > I also don't understand the purpose of having the "start"/"pull"/"cancel" interface which is used for the ReadableStream constructor.
> 
> I did some analysis on that a while back (along with some other issues):
> https://gist.github.com/domenic/197ae6fab8adc6890c7a#part-3-streams-as-two-
> sided-pipes

This argument seems to boil down to "JS doesn't have any syntax for instantiating two objects which has co-dependency". I don't feel competent enough in JS to have an opinion about this. I'd love to hear Dave Herman or Yehuda's opinion.


The way stuff like this is handled in C++ is through interfaces rather than classes. I.e. ReadableStream and WritableStream would be interfaces that doesn't have direct constructors, but instead just acts like contracts. So no one (not even some omnious platform backend) would be able to instantiate ReadableStream/WritableStream, instead you'd instantiate other objects which happen to expose ReadableStream/WritableStream interfaces.

But JS doesn't really have interfaces. The equivalent is duck types. So there's no "thenable" or "iterable" interfaces, instead anything with a .then() function is a thenable, and everything with a @@iterator() function is an iterable.

By that (quite possibly very flawed) logic, ReadableStream and WritableStream could fairly easily be created by having a Pipe constructor like:

class Pipe {
  constructor()

  get readable()
  get writable()
}

where the objects returned by .readable and .writable contain functions which both close over the same closure and thus can share access to underlying buffers. So there would be no ReadableStream class and thus no need for a ReadableStream constructor. So no cyclic dependency between the ReadableStream constructor and WriteableStream constructor.

So this is definitely doable without needing to rely on magic platform objects. The whole setup can be explained and implemented in JS.

But like I said, I defer to people that knows JS better than me (such as yourself) to if this is has other problems than "not impossible" :)

> > I'd also like to understand if the intent of this API is intended to be a binary stream, or a generic object stream? I.e. is it appropriate to use for something like filesystem directory enumeration?
> 
> It is intended to be an I/O stream, but in a broad sense. I/O often starts
> and ends with bytes, but in between you often have strings, JSON, video
> frames, protobufs, and other data structures. The interface is focused
> around mapping efficiently to low-level I/O primitives, but the chunks that
> pass through streams can be of any type.

Ok, that makes sense.
Jonas, how precisely do you think we need the transferability spec'd out before we proceed?  I think we have a rough idea of what to do. [1]  However, it sounds like Domenic would like to wait until its spec'd for promises first unless we really feel this is necessary for streams immediately.

[1]: https://github.com/whatwg/streams/issues/276
Flags: needinfo?(jonas)

Comment 8

3 years ago
To be clear I don't really insist on any particular ordering, I just thought it'd be better to tackle the simple case (promises) first before trying streams. But we didn't get very far with that plan so if there's motivation to "skip ahead" to streams then that seems fine.
I thought that some of the most important use cases we had for Streams was around inter-thread-communication. So it seems important that that can be done without making too many sacrifices.
Flags: needinfo?(jonas)
(Assignee)

Updated

3 years ago
OS: Mac OS X → All
Hardware: x86 → All
I believe we have consensus on moving forward.  I've sent an intent to implement to dev-platform and blogged a bit about it here:

  https://blog.wanderview.com/blog/2015/06/19/intent-to-implement-streams-in-firefox/
Assignee: nobody → bkelly
Status: NEW → ASSIGNED
Keywords: dev-doc-needed
Duplicate of this bug: 891286
Alias: streams
Depends on: 1230148
We will need Promises implemented in JavaScript engine directly -- see bug 911216.
Depends on: 911216
(In reply to Yury Delendik (:yury) from comment #12)
> We will need Promises implemented in JavaScript engine directly -- see bug
> 911216.

I spoke with Boris about this previously and its more of a nice to have.  If js promises are not done we can have an intrinsic that creates one of our DOM promises.
Created attachment 8706963 [details] [diff] [review]
[WIP] ReadableStream self-hosted implementation

WIP for some of ReadableStream, ReadableStreamReader, ReadableStreamController
TODOs use JS Promises instead of DOM/Workers hacks, tee and pipe methods
I have the WIP from comment 14 just about re-based.  Just trying to figure out one last issue with integrating with the SPIDERMONKEY_PROMISE bits.
Created attachment 8749929 [details] [diff] [review]
Rebased wip ReadableStream patch

This rebases the previous patch to currently mozilla-central with --enable-sm-promise switched on.

There is currently a problem with the spidermonkey promises, though.  It seems if self hosted code does this:

  _GetOriginalPromiseConstructor().resolve()

Then we end up hitting this assertion in Debug_CheckSelfHosted():

  https://dxr.mozilla.org/mozilla-central/source/js/src/vm/Interpreter.cpp#186

The Promise code does some complicated things to support chrome wrappers.  It seems like the Promise_static_resolve() or the promiseCapability.resolve() functions are not properly marked as self hosted.

Till, do you have any ideas here?  For now I have commented out the assertions, but I'd like to make the spidermonkey promises usable from self hosted code.
Attachment #8706963 - Attachment is obsolete: true
Flags: needinfo?(till)
Note, if you want to see this yourself you can:

0) fix a couple bit rot issues with SPIDERMONKEY_PROMISES build
1) apply this patch
2) uncomment the assertions in Interpreter.cpp and GlobalObject.h
3) run ./mach web-platform-tests streams/readable-streams/general.https.html
I think Boris solved this for me.  I didn't know that I had to use callFunction() in self hosted code.  Sorry for the noise.
Flags: needinfo?(till)
Duplicate of this bug: 1271950
Created attachment 8751999 [details] [diff] [review]
Rebased wip ReadableStream.patch

Updated patch that uses callFunction().

I still have an issue with the bad-underlying-source tests hitting this assert, though:

https://dxr.mozilla.org/mozilla-central/source/js/src/vm/GlobalObject.h#678
Attachment #8749929 - Attachment is obsolete: true
Created attachment 8752001 [details] [diff] [review]
Rebased wip ReadableStream.patch

A little more cleanup that I missed in the last patch.
Attachment #8751999 - Attachment is obsolete: true
Created attachment 8752223 [details] [diff] [review]
Rebased wip ReadableStream.patch

Fix the last issue I mentioned.  Turns out a "use strict" violation that tries to create a new global variable in self hosting triggers an assertion in setIntrinsic().  That was not intuitive to me.
Attachment #8752001 - Attachment is obsolete: true
Depends on: 1272697
Comment on attachment 8752223 [details] [diff] [review]
Rebased wip ReadableStream.patch

I moved this patch over to bug 1272697.  I'll land this as P1 in that bug with further refinements in additional patches.
Attachment #8752223 - Attachment is obsolete: true
Its been a while since I commented here, but work is progressing.  Bringing the self-hosted js up-to-spec is taking a while because streams were majorly refactored.  At the end this will have support for both byte stream optimizations and tee'ing.

I'm maintaining my patch queue here:

https://github.com/wanderview/gecko-patches/tree/dev-streams

Its not functional yet, but I'm almost done translating the js side of things.  I then need to add the c++ intrinsics necessary and get the tests working again.
Blocks: 1283191
Duplicate of this bug: 1294957
Unfortunately I have to focus on a large service worker effort at the moment.  I need to drop this.  I believe we have someone lined up to take it on, though.
Assignee: bkelly → nobody
Status: ASSIGNED → NEW

Comment 27

10 months ago
Anything moved on?

Updated

9 months ago
Blocks: 802882

Updated

8 months ago
Blocks: 1191981
Depends on: 1333800
Depends on: 1335397

Comment 28

4 months ago
till, is there a bug to watch specifically about writable streams progress? I know you had a prototype written up in an afternoon some time recently.

FYI, the spec is quite stable these days. We have been tweaking edge case behaviors over the last month (e.g. what happens if you abort() then immediately close()) and have arrived at a place we are happy with and hoping to ship in Chrome.
interesting and very usefull feature ....

Comment 30

2 months ago
any updates on this?
(Assignee)

Comment 31

2 months ago
Till is taking care of the JS part. I tried his patches and they just works :)
I'm working on the DOM integration. Currently, with my patches, it's possible to expose ReadableStream objects via WebIDL.
We are working on connecting JS and DOM but I don't know exactly when this will be completed.
(Assignee)

Updated

2 months ago
Depends on: 1367375
(Assignee)

Comment 32

a month ago
Created attachment 8878090 [details] [diff] [review]
part 1 - WebIDL bindings

I want to start uploading some code. All this code works correctly but I need to fix details before asking for a review.
Assignee: nobody → amarchesini
(Assignee)

Comment 33

a month ago
Created attachment 8878091 [details] [diff] [review]
part 2 - Using of ReadableStream in WebIDL
(Assignee)

Comment 34

a month ago
Created attachment 8878092 [details] [diff] [review]
part 3 - Fetch.body implementation
(Assignee)

Comment 35

a month ago
Created attachment 8878093 [details] [diff] [review]
part 4 - WPT enabled

With these patches we already pass a big number of WPTs.
(Assignee)

Comment 36

a month ago
Created attachment 8879605 [details] [diff] [review]
part 3 - Fetch.body implementation
Attachment #8878092 - Attachment is obsolete: true
(Assignee)

Comment 37

a month ago
Created attachment 8880339 [details] [diff] [review]
part 1 - WebIDL bindings

Bz, do you mind to take a look at this patch? Here I introduced ReadableStream in WebIDL bindings. This is not covered by WebIDL spec yet.
Attachment #8880339 - Flags: review?(bzbarsky)
(Assignee)

Updated

a month ago
Attachment #8878090 - Attachment is obsolete: true
(Assignee)

Comment 38

a month ago
Created attachment 8880340 [details] [diff] [review]
part 3 - Using of ReadableStream in WebIDL files
Attachment #8878091 - Attachment is obsolete: true
Attachment #8880340 - Flags: review?(bkelly)
(Assignee)

Comment 39

a month ago
Created attachment 8880342 [details] [diff] [review]
part 3 - Fetch.body implementation

This works fine and it's green in WPTs.
Attachment #8879605 - Attachment is obsolete: true
Attachment #8880342 - Flags: feedback?(bkelly)
(Assignee)

Comment 40

a month ago
Created attachment 8880343 [details] [diff] [review]
part 4 - disable by prefs
Attachment #8878093 - Attachment is obsolete: true
Attachment #8880343 - Flags: feedback?(bkelly)
(Assignee)

Comment 41

a month ago
Created attachment 8880344 [details] [diff] [review]
part 5 - Response.body + Response CTOR
Attachment #8880344 - Flags: feedback?(bkelly)
Comment on attachment 8880340 [details] [diff] [review]
part 3 - Using of ReadableStream in WebIDL files

Review of attachment 8880340 [details] [diff] [review]:
-----------------------------------------------------------------

::: dom/webidl/Response.webidl
@@ +29,5 @@
>    [ChromeOnly, NewObject, Throws] Response cloneUnfiltered();
>  };
>  Response implements Body;
>  
> +// This should be part of Body but we don't want to expose body to request yet.

Can you write a follow-up bug and include the bug number here?
Attachment #8880340 - Flags: review?(bkelly) → review+
(Assignee)

Comment 43

a month ago
Created attachment 8880437 [details] [diff] [review]
part 5 - Response.body + Response CTOR
Attachment #8880344 - Attachment is obsolete: true
Attachment #8880344 - Flags: feedback?(bkelly)
Attachment #8880437 - Flags: review?(bkelly)
(Assignee)

Updated

a month ago
Attachment #8880437 - Flags: review?(bkelly) → feedback?(bkelly)
Comment on attachment 8880342 [details] [diff] [review]
part 3 - Fetch.body implementation

Review of attachment 8880342 [details] [diff] [review]:
-----------------------------------------------------------------

This is a good start, but I think we should use pipe and NS_AsyncCopy() here.  Also, I'm not sure this is currently safe on worker threads.

::: dom/fetch/Fetch.cpp
@@ +1516,5 @@
>    SetBodyUsed();
>  
> +  nsCOMPtr<nsIGlobalObject> global = DerivedClass()->GetParentObject();
> +
> +  // If we already created a ReadableStreamBody we have to close it now.

Uh, why do we need to close the existing readable stream?  How can you consume it then?

@@ +1585,5 @@
>  
>  template <class Derived>
>  void
>  FetchBody<Derived>::GetBody(JSContext* aCx,
> +                            JS::MutableHandle<JSObject*> aMessage,

Can you rename the "aMessage" to "aBodyOut" out?

@@ +1591,2 @@
>  {
> +  if (!mReadableStreamBody) {

Can you add a comment here saying something like:

// We don't have a js-defined ReadableStream, so wrap our native binary
// stream in a ReadableStream and return it.

Or something describing the behavior.

@@ +1596,5 @@
> +      return;
> +    }
> +
> +    // If the body has been already consumed, we close the stream.
> +    if (BodyUsed() && !ReadableStreamClose(aCx, body)) {

I'm confused.  Why do we want to return a closed stream here?  Can't we just avoid returning anything if the body is used?

@@ +1601,5 @@
> +      aRv.StealExceptionFromJSContext(aCx);
> +      return;
> +    }
> +
> +    mReadableStreamBody = body;

Oh, do we want to leave a closed stream here if BodyUsed()?  Still not sure I understand why.

::: dom/fetch/Fetch.h
@@ +176,5 @@
>  
>  protected:
>    nsCOMPtr<nsIGlobalObject> mOwner;
>  
> +  JS::Heap<JSObject*> mReadableStreamBody;

This can be a script provided readable stream or a native stream wrapped as a ReadableStream?  Can we add a comment describing that?

::: dom/fetch/FetchStream.cpp
@@ +60,5 @@
> +  MOZ_ASSERT(aUnderlyingSource);
> +
> +  RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
> +
> +  MOZ_ASSERT(stream->mWritingState == eInitializing);

MOZ_DIAGNOSTIC_ASSERT()

@@ +63,5 @@
> +
> +  MOZ_ASSERT(stream->mWritingState == eInitializing);
> +  stream->mWritingState = eWriting;
> +
> +  MOZ_ASSERT(!stream->mTaskQueue);

Can we make this a MOZ_DIAGNOSTIC_ASSERT()?  Are we sure content script can force us to get called twice here in some way?  Should we have a runtime check as well?

@@ +89,5 @@
> +                                          size_t* aByteWritten)
> +{
> +  MOZ_ASSERT(aUnderlyingSource);
> +  MOZ_ASSERT(aBuffer);
> +  MOZ_ASSERT(aByteWritten);

I feel like all the various input assertions should be diagnostics.  They are cheap and validate our interface expectations with jsapi.

@@ +105,5 @@
> +    stream->mQueue.RemoveElementAt(0);
> +  }
> +
> +  MOZ_ASSERT(aLength >= queue->mSize);
> +  memcpy(aBuffer, queue->mBuffer, queue->mSize);

This seems undesirable in a pretty common case.  If we have non-blocking stream we can read it directly on the target thread.  No need to copy to a buffer on the IO thread and then copy again to the js buffer on the target thread.

Also, we should be able to use ReadSegments() instead of Read().

Actually, I would recommend removing PopulateQueueOnIOThread() and modifying RequestDataCallback to do this:

1. If the stream is non-blocking (and maybe async) then just keep the current stream to read from directly on the target thread.
2. If the stream is blocking, then create an async non-blocking pipe with an equivalent size to what you are doing now.  Use NS_AsyncCopy on STS to populate it.  Back pressure will just do its thing normally.  Then read from the pipe's reader side on the target thread.

The advantage of doing it this way is there is no extra copying in the cases where we already have a non-blocking, async stream; e.g. an existing pipe or string stream.

I think it might also be less code overall since you don't have to worry about managing buffers, doing the copying, etc.  That is all handled by the pipe and NS_AsyncCopy code.

@@ +112,5 @@
> +  // Let's fetch new data.
> +  nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction([stream] () {
> +    stream->FlushQueueOnTargetThread();
> +  });
> +  stream->mTargetThread->Dispatch(r.forget());

What happens here if the stream is on a Worker and its shutting down?  This can fail, right?  Do we care?  I guess that goes for all mTargetThread dispatches here.

@@ +162,5 @@
> +  : mWritingState(eInitializing)
> +  , mReadingState(eActive)
> +  , mGlobal(aGlobal)
> +  , mInputStream(aInputStream)
> +  , mTargetThread(NS_GetCurrentThread())

I think this should use aGlobal->EventTargetFor() instead.  We will need to fix bug 1366089 for Worker globals.  (Feel free to steal that from me if you want.)

@@ +169,5 @@
> +{}
> +
> +FetchStream::~FetchStream()
> +{
> +  NS_ProxyRelease(mTargetThread, mGlobal.forget());

I'm not sure this will work if the FetchStream outlives the Worker thread.  We probably need a WorkerHolder until the global is destroyed.

Do we need to listen for xpcom shutdown?

::: dom/fetch/FetchStream.h
@@ +89,5 @@
> +
> +  uint32_t QueueSize()
> +  {
> +    MutexAutoLock lock(mMutex);
> +    return mQueue.Length();

Can you put this in the cpp file.  It makes it easier to find the code and things compile faster since there is less header pollution.  If its shown to be hot in a profile we can move it back to being inlined.

@@ +121,5 @@
> +    eInactive,
> +  };
> +
> +  // Touched only on the I/O thread.
> +  ReadingState mReadingState;

I think the enums are probably only 8-bit, which means we are going to have a gap in our packing on some platforms here.  Can you move them to the end?

@@ +129,5 @@
> +  nsCOMPtr<nsIThread> mTargetThread;
> +
> +  JS::Heap<JSObject*> mReadableStream;
> +
> +  RefPtr<TaskQueue> mTaskQueue;

Can you add a comment that this points to the STS as an IO work thread?

@@ +131,5 @@
> +  JS::Heap<JSObject*> mReadableStream;
> +
> +  RefPtr<TaskQueue> mTaskQueue;
> +
> +  struct Queue {

Could we rename this to Buffer, Block, or something?  I saw Queue in the cpp and thought this was a push/pop style queue data structure.

@@ +133,5 @@
> +  RefPtr<TaskQueue> mTaskQueue;
> +
> +  struct Queue {
> +    char mBuffer[4096];
> +    uint32_t mSize;

This is going to fit poorly with our memory allocator.  Its just over the size of one block, so you will likely end up wasting a lot of space.

@@ +137,5 @@
> +    uint32_t mSize;
> +  };
> +
> +  // Protected by mMutex
> +  nsTArray<UniquePtr<Queue>> mQueue;

Can we make this:

  AutoTArray<UniquePtr<Queue>, MAX_BUFFERS> mQueue;

Maybe changing MAX_BUFFERS to:

  static const uint32_t sMaxBuffers = 3;

To avoid macro conflict.

::: js/src/builtin/Stream.cpp
@@ +3182,5 @@
>      controller->setFixedSlot(QueueContainerSlot_TotalSize, Int32Value(0));
>      // Step 8: Set this.[[started]] and this.[[closeRequested]] to false.
>      // Step 9: Set this.[[strategyHWM]] to
>      //         ? ValidateAndNormalizeHighWaterMark(highWaterMark).
> +    controller->setFixedSlot(ControllerSlot_StrategyHWM, Int32Value(0));

Not sure I understand this.
Attachment #8880342 - Flags: feedback?(bkelly)
Attachment #8880343 - Flags: feedback?(bkelly) → feedback+
Comment on attachment 8880437 [details] [diff] [review]
part 5 - Response.body + Response CTOR

Review of attachment 8880437 [details] [diff] [review]:
-----------------------------------------------------------------

::: dom/fetch/BodyExtractor.cpp
@@ +217,5 @@
> +                                            aContentTypeWithCharset, aCharset);
> +  }
> +
> +  // TODO
> +  return NS_ERROR_FAILURE;

Please use an error code with a good message since this will be displayed in the web console.  I assume we will be doing this at least for Request bodies for the time being.

::: dom/fetch/FetchStream.cpp
@@ +419,5 @@
> +  MOZ_ASSERT(aUnderlyingReadableStreamSource);
> +  MOZ_ASSERT(aInputStream);
> +
> +  RefPtr<FetchStream> stream =
> +    static_cast<FetchStream*>(aUnderlyingReadableStreamSource);

Is it ok to call this if its already been read?  Do we need a check for used/disturbed?

Also, using pipe/NS_AsyncCopy in the reading code would make this possibly easier.  You can return the stream that is currently being read without worrying about partially copied buffers, etc.

::: dom/webidl/Fetch.webidl
@@ +7,5 @@
>   * http://fetch.spec.whatwg.org/
>   */
>  
>  typedef object JSON;
> +typedef (Blob or BufferSource or FormData or URLSearchParams or ReadableStream or USVString) BodyInit;

Is there a feature detection issue here?  How does chrome support ReadableStream in body init for response, but not request?
Attachment #8880437 - Flags: feedback?(bkelly) → feedback+

Comment 46

a month ago
> This is not covered by WebIDL spec yet.

OK, but is there some description of what the goal behavior is here?  That would help a lot with getting the review done quickly...  Is it basically "make it act more or less like typed arrays"?
(Assignee)

Comment 47

a month ago
> Uh, why do we need to close the existing readable stream?  How can you
> consume it then?

You cannot. By spec, if the body is consumed, the stream should be considered closed.

> I'm confused.  Why do we want to return a closed stream here?  Can't we just
> avoid returning anything if the body is used?

Currently, by spec, and WPT tests as well, we must always return a stream, maybe already closed.
(Assignee)

Comment 48

27 days ago
> OK, but is there some description of what the goal behavior is here?  That
> would help a lot with getting the review done quickly...  Is it basically
> "make it act more or less like typed arrays"?

Yes. It is basically that. I have to admit that I implemented it in such way that I was able to work on the rest, the DOM part.

Comment 49

23 days ago
Comment on attachment 8880339 [details] [diff] [review]
part 1 - WebIDL bindings

>+++ b/dom/bindings/BindingDeclarations.h

Why do you need jsfriendapi.h here?  I don't think you do, at first glance.

I'm also not quite sure why we're adding these to BindingDeclarations.h.  I think it would make more sense to put SpidermonkeyInterfaceObjectStorage and company in SpidermonkeyInterface.h, include SpidermonkeyInterface.h from ReadableStream.h and TypedArray.h, and change the code that computes the includes to include the appropriate one of those, basically by adding a helper function that checks whether the type is a ReadableStream or not and returns the appropriate string.

That will also avoid the weird recursive include you have right now where SpidermonkeyInterface.h includes TypedArray.h, which includes SpidermonkeyInterface.h

Also, I think the capitalization used throughout bindings is SpiderMonkey, with capital 'M'.  Please do that.

>+  JSObject* mObj;

This was called mTypedObj for a reason.  It's guaranteed to be the actual object we're interested in, not a cross-compartment wrapper or whatever.  As in, having the right type.

Maybe we should call it mInterfaceObj or mImplObj or something.  But mObj is too generic.



>+    JS::UnsafeTraceRoot(trc, &mObj, "SpidermonkeyInterfaceObjectStorage.mWrappedObj");

This needs to trace &mWrappedObj.  Please make sure to fix this!

>+// And a specialization for dealing with nullable typed arrays

"nullable SpiderMonkey interfaces", right?

>+class MOZ_RAII SpidermonkeyInterfaceRooter<Nullable<InterfaceType> > :

s/> >/>>/, please.  I know it was preexisting.

>+// Class for easily setting up a rooted typed array object on the stack

"a rooted SpiderMonkey interface object".

>+++ b/dom/bindings/Codegen.py
>             # Optional<RootedTypedArray<ArrayType> >.  So do a holder if we're
>             # optional and use a RootedTypedArray otherwise.

Please fix these comments to refer to Optional<RootedSpiderMonkeyInterface<InterfaceType>> and RootedSpiderMonkeyInterface, respectively.

There are various other places in this file that assume that isSpiderMonkeyInterace means "is a typed array".  Search for typedArraysAreStructs and some of the comments around it.  A separate changeset to fix up all that stuff would be good, so it doesn't get mixed in with the substantive changes here.

>+++ b/dom/bindings/ReadableStream.h
>+  inline bool inited() const

This could live on the shared base class.  It used to not matter, because all typed array types inherited from both TypedArray_base (which is where this lived), and that was the only thing inheriting from TypedArrayObjectStorage.  But now we should hoist as much as possible to the shared storage class.

>+  inline bool WrapIntoNewCompartment(JSContext* cx)

Should live on the base class.

>+  inline JSObject *Obj() const

Should live on the base class.

The rest looks good, but please fix the above.
Attachment #8880339 - Flags: review?(bzbarsky) → review-

Updated

23 days ago
Blocks: 1377602
(Assignee)

Comment 50

20 days ago
Created attachment 8883236 [details] [diff] [review]
part 1 - WebIDL bindings
Attachment #8880339 - Attachment is obsolete: true
(Assignee)

Comment 51

20 days ago
Created attachment 8883237 [details] [diff] [review]
part 2 - WebIDL internal renaming

bz, can you take a look at patch 1 and 2 when you are back? Thanks!
Flags: needinfo?(bzbarsky)
(Assignee)

Updated

20 days ago
Attachment #8880340 - Attachment description: part 2 - Using of ReadableStream in WebIDL files → part 3 - Using of ReadableStream in WebIDL files
(Assignee)

Comment 52

20 days ago
Created attachment 8883238 [details] [diff] [review]
part 4 - Fetch.body implementation

Note that nsITransport::CreateInputTransport/OpenInputStream uses nsIPipe and NS_AsyncCopy internally.
Attachment #8880342 - Attachment is obsolete: true
Attachment #8883238 - Flags: review?(bkelly)
(Assignee)

Comment 53

20 days ago
Created attachment 8883239 [details] [diff] [review]
part 5 - Disable StreamS API by default
Attachment #8883239 - Flags: review?(bkelly)
(Assignee)

Updated

20 days ago
Attachment #8880343 - Attachment is obsolete: true
(Assignee)

Updated

20 days ago
Attachment #8883239 - Attachment description: stream_05_disable.patch → part 5 - Disable StreamS API by default
(Assignee)

Comment 54

19 days ago
Created attachment 8883551 [details] [diff] [review]
part 6 - shutting down
Attachment #8880437 - Attachment is obsolete: true
Attachment #8883551 - Flags: review?(bkelly)
(Assignee)

Comment 55

19 days ago
Created attachment 8883552 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit
Attachment #8883552 - Flags: review?(bkelly)
Comment on attachment 8883238 [details] [diff] [review]
part 4 - Fetch.body implementation

Review of attachment 8883238 [details] [diff] [review]:
-----------------------------------------------------------------

This is looking pretty good, but I'd like to see it again.  In particular, I think this current patch has a bug if code does this:

  // creates FetchStream and starts copying to the pipe
  let rs = response.body;

  // let the copy work for a while
  await delay(1000);

  // Don't use that FetchStream created above, but instead try to drain the original stream.
  // This will miss data copied to the pipe already.
  response.text();

::: dom/fetch/Fetch.cpp
@@ +879,5 @@
>      aRv.ThrowTypeError<MSG_FETCH_BODY_CONSUMED_ERROR>();
>      return nullptr;
>    }
>  
> +  // If the stream body has been already used.

Maybe clarify this comment by just using the spec text for the spec?

// If this object is disturbed or locked, return a new promise rejected with a TypeError.

@@ +884,5 @@
> +  if (mReadableStreamBody) {
> +    JS::Rooted<JSObject*> body(aCx, mReadableStreamBody);
> +    if (JS::ReadableStreamIsDisturbed(body) ||
> +        JS::ReadableStreamIsLocked(body)) {
> +      aRv.Throw(NS_ERROR_DOM_TYPE_ERR);

This should have an error message defined for the type error.  I actually thought ErrorResult asserted if you called Throw(NS_ERROR_DOM_TYPE_ERR) instead of using ThrowTypeError()

@@ +893,1 @@
>    SetBodyUsed();

Can we change GetBodyUsed() to just check JS::ReadableStreamIsDisturbed()?  Then we don't have to track a separate body used flag.  I believe this is what the spec does.

@@ +893,5 @@
>    SetBodyUsed();
>  
> +  nsCOMPtr<nsIGlobalObject> global = DerivedClass()->GetParentObject();
> +
> +  // If we already created a ReadableStreamBody we have to close it now.

I'm confused.  Where does the spec say to do this?  AFAICT it says:

3. Let reader be the result of getting a reader from stream. If that threw an exception, return a new promise rejected with that exception.
4. Let promise be the result of reading all bytes from stream with reader.
5. Return the result of transforming promise by a fulfillment handler that returns the result of the package data algorithm with its first argument, type and this object’s MIME type.

This suggests to me we should lock the stream, not throw an error on it.  I understand we are reading from the underlying source directly here, but I don't think we want an observable difference to script that has already gotten a handle to the ReadableStream body, right?

Also, now that I look closer, I don't think this will be correct.  We can't just read from the underlying source here any more.  FetchStream::Create() called in GetBody() earlier might start draining the underlying source stream to the async pipe.  This means some data will have already been drained from the underlying source.  So we can't just go straight to the underlying source.  

You should probably extract the stream back out of the FetchStream here and re-set it as the body stream before starting the consume.  This will let us use the new pipe stream if its been constructed.

@@ +898,5 @@
> +  if (mReadableStreamBody) {
> +    JS::Rooted<JSObject*> body(aCx, mReadableStreamBody);
> +
> +    nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(global);
> +    RefPtr<DOMError> error = new DOMError(window, NS_ERROR_TYPE_ERR);

Please provide a message with the error.  I think you can just pass it as a string literal as the third argument.

::: dom/fetch/FetchStream.cpp
@@ +27,5 @@
> +  nsCOMPtr<nsIInputStream> inputStream;
> +  aBody->GetBody(getter_AddRefs(inputStream));
> +
> +  nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(inputStream);
> +  if (!asyncStream) {

This is not quite adequate.  You need to also verify that the stream is non-blocking by calling:

http://searchfox.org/mozilla-central/source/xpcom/io/nsIInputStream.idl#146

For example, its possible to create a pipe that has a blocking input.  This blocking pipe will also happen to QI to nsIAsyncInputStream.

Note, we should also file a bug to fix this same problem here:

http://searchfox.org/mozilla-central/source/ipc/glue/IPCStreamUtils.cpp#143

@@ +101,5 @@
> +  MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWaiting);
> +  stream->mState = eReading;
> +
> +  nsresult rv =
> +    stream->mInputStream->AsyncWait(stream, 0, 0, NS_GetCurrentThread());

We should use mGlobal->EventTargetFor(TaskCategory::Other) here.

@@ +133,5 @@
> +  }
> +
> +  *aByteWritten = written;
> +
> +  if (written == 0) {

Some streams will also set their status to NS_BASE_STREAM_CLOSED.  I think you need to handle that case separately for calls like Read(), Available(), etc.  We should close the stream instead of throwing an error instead.  Or maybe you can handle NS_BASE_STREAM_CLOSED in ErrorPropagation() instead.

@@ +150,5 @@
> +
> +  RefPtr<FetchStream> stream = static_cast<FetchStream*>(aUnderlyingSource);
> +
> +  stream->mState = eClosed;
> +  stream->mInputStream->CloseWithStatus(NS_BINDING_ABORTED);

The current code will cause a DOMError to be created for this, right?  If Read() or Available() is called after aborting.  Perhaps we should pass NS_BASE_STREAM_CLOSED instead since that matches the spec intent better.

@@ +183,5 @@
> +                         nsIAsyncInputStream* aInputStream)
> +  : mState(eWaiting)
> +  , mGlobal(aGlobal)
> +  , mInputStream(aInputStream)
> +  , mTargetThread(NS_GetCurrentThread())

mGlobal->EventTargetFor(TaskCategory::Other)

@@ +191,5 @@
> +}
> +
> +FetchStream::~FetchStream()
> +{
> +  NS_ProxyRelease("FetchStream::mGlobal", mTargetThread, mGlobal.forget());

Same problem as the other patch we have been looking at.  This can leak the entire global if the worker thread has started shutting down.  Is there a holder that covers this?  Its not obvious from this patch.  And we need to do something in worker code to ensure the event target continues to function while there is a holder.

Also, I can't figure out how this gets referenced off its owning js thread.  Can you explain that process?  Or maybe its in a later patch?

If we can make this a single thread object then this problem goes away.

@@ +222,5 @@
> +  mState = eClosed;
> +
> +  nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
> +  // TODO: we should decide with errors we want to propagate.
> +  RefPtr<DOMError> error = new DOMError(window, aError);

I think we should perhaps convert this to a TypeError for now instead of exposing our internal error codes.  At least a generic message to start would be good and then we can add more specific ones for different internal codes.

@@ +243,5 @@
> +
> +  MOZ_DIAGNOSTIC_ASSERT(mState == eReading);
> +  mState = eWriting;
> +
> +  uint64_t size;

Initial to zero please.

@@ +252,5 @@
> +  }
> +
> +  AutoJSAPI jsapi;
> +  if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
> +    return NS_OK;

Shouldn't we propagate an error here?  The stream seems dead.

::: dom/fetch/FetchStream.h
@@ +85,5 @@
> +  nsCOMPtr<nsIGlobalObject> mGlobal;
> +  nsCOMPtr<nsIAsyncInputStream> mInputStream;
> +  nsCOMPtr<nsIThread> mTargetThread;
> +
> +  JS::Heap<JSObject*> mReadableStream;

Is this safe on an NS_DECL_THREADSAFE_ISUPPORTS class?  How do we guarantee its released on the correct thread?  Or can we make it NS_DECL_ISUPPORTS instead?  Its not obvious why it needs to be threasafe ref counting to me.
Attachment #8883238 - Flags: review?(bkelly) → review-
Comment on attachment 8883239 [details] [diff] [review]
part 5 - Disable StreamS API by default

Review of attachment 8883239 [details] [diff] [review]:
-----------------------------------------------------------------

::: modules/libpref/init/all.js
@@ +4970,5 @@
> +// Streams API
> +#ifdef NIGHTLY_BUILD
> +pref("dom.streams.enabled", true);
> +#else
> +pref("dom.streams.enabled", false);

I think we should start disabled on nightly so we can do a pref study there.
Attachment #8883239 - Flags: review?(bkelly) → review+
Comment on attachment 8883551 [details] [diff] [review]
part 6 - shutting down

Review of attachment 8883551 [details] [diff] [review]:
-----------------------------------------------------------------

::: dom/fetch/FetchStream.cpp
@@ +128,5 @@
> +    if (NS_WARN_IF(aRv.Failed())) {
> +      return nullptr;
> +    }
> +
> +    aRv = os->AddObserver(stream, DOM_WINDOW_FROZEN_TOPIC, true);

Do we really want this for frozen?  If something goes into bfcache and then comes back out this patch will leave its streams closed without any recovery.  I think perhaps we should just leave the streams open while its in the bfcache.  We might get a few extra copies to the pipe, but it will hit its limit and then apply back pressure.

@@ +139,5 @@
> +    MOZ_ASSERT(workerPrivate);
> +
> +    stream->mWorkerHolder.reset(new FetchStreamWorkerHolder(stream));
> +    if (NS_WARN_IF(!stream->mWorkerHolder->HoldWorker(workerPrivate, Closing))) {
> +      aRv.Throw(NS_ERROR_FAILURE);

NS_ERROR_DOM_INVALID_STATE_ERR would be a bit nicer.

@@ +252,5 @@
>  FetchStream::FinalizeCallback(void* aUnderlyingSource)
>  {
>    MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
>  
> +  // This can be called in any thread.

How can this get called off the js thread that initiated the creation of FetchStream?

@@ +403,5 @@
> +  MOZ_ASSERT((strcmp(aTopic, DOM_WINDOW_FROZEN_TOPIC) == 0) ||
> +             (strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0));
> +
> +  nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
> +  if (SameCOMIdentity(aSubject, window)) {

The whole observer thing seems very inefficient since we have a reference to our owning global.  It seems like we should have some kind of "observe this nsIGlobalObject for shutdown" API.  I filed bug 1379209 for this.
Attachment #8883551 - Flags: review?(bkelly) → review+
Comment on attachment 8883552 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit

Review of attachment 8883552 [details] [diff] [review]:
-----------------------------------------------------------------

::: dom/fetch/BodyExtractor.cpp
@@ +223,5 @@
> +    return rv;
> +  }
> +
> +  // JS generated ReadableStream.
> +  // TODO

Will this be another patch in this bug?  I don't think we can land enable the feature in this state.

Also, I don't think we will have a content length in this case either.

::: dom/fetch/FetchStream.cpp
@@ +388,5 @@
> +  nsCOMPtr<nsIInputStream> inputStream =
> +    do_QueryInterface(stream->mInputStream);
> +  MOZ_ASSERT(inputStream);
> +
> +  nsresult rv = inputStream->Available(aContentLength);

This doesn't seem right.  If the original stream was variable length then this will be incorrect.  If the original stream is being converted to a pipe then this will be incorrect.

We probably need a better way to track fixed length streams vs variable length streams.

::: dom/fetch/Request.cpp
@@ +564,5 @@
>        const fetch::OwningBodyInit& bodyInit = bodyInitNullable.Value();
>        nsCOMPtr<nsIInputStream> stream;
>        nsAutoCString contentTypeWithCharset;
>        uint64_t contentLengthUnused;
> +      aRv = ExtractByteStreamFromBody(global, bodyInit,

I thought we were not supporting this in Request yet?

::: dom/xhr/XMLHttpRequestWorker.cpp
@@ +2242,5 @@
>  void
> +XMLHttpRequestWorker::Send(JSContext* aCx, const ReadableStream& aBody,
> +                           ErrorResult& aRv)
> +{
> +  // TODO

Are you going to do a patch in this bug?  If not please note the follow-up bug.

@@ +2243,5 @@
> +XMLHttpRequestWorker::Send(JSContext* aCx, const ReadableStream& aBody,
> +                           ErrorResult& aRv)
> +{
> +  // TODO
> +  aRv.Throw(NS_ERROR_FAILURE);

Please use a better error code.  I think we have a not supported or unimplemented thing.
Attachment #8883552 - Flags: review?(bkelly) → review-
(Assignee)

Comment 60

15 days ago
> Also, now that I look closer, I don't think this will be correct.  We can't
> just read from the underlying source here any more.  FetchStream::Create()
> called in GetBody() earlier might start draining the underlying source
> stream to the async pipe.  This means some data will have already been
> drained from the underlying source.  So we can't just go straight to the
> underlying source.

If this happens, the stream is locked (or disturbed). GetBodyUsed() will return true and we will not be in this scenario.


> Also, I can't figure out how this gets referenced off its owning js thread. 
> Can you explain that process?  Or maybe its in a later patch?

Yes, it's in an other patch. But basically, FinalizeCallback() can be called in any thread.
Workers are not covered yet in this patch.
(In reply to Andrea Marchesini [:baku] from comment #60)
> > Also, now that I look closer, I don't think this will be correct.  We can't
> > just read from the underlying source here any more.  FetchStream::Create()
> > called in GetBody() earlier might start draining the underlying source
> > stream to the async pipe.  This means some data will have already been
> > drained from the underlying source.  So we can't just go straight to the
> > underlying source.
> 
> If this happens, the stream is locked (or disturbed). GetBodyUsed() will
> return true and we will not be in this scenario.

I'm pretty sure you can use the .body getter without locking the stream.  You have to do response.body.getReader() to lock it.  You can of course also unlock it again.  And if you don't actually read anything this does not disturb the stream AFAIK.
(Assignee)

Comment 62

15 days ago
What I meant is that that code just works. I don't see why I'm locking the stream in this code:

  let rs = response.body;
  await delay(1000);
  response.text();

The reading doesn't happen, and the stream is not locked.
(Assignee)

Comment 63

15 days ago
Yeah, I confirm. It works. And there is also a WPT about it.
(Assignee)

Comment 64

15 days ago
Created attachment 8884663 [details] [diff] [review]
part 4 - Fetch.body implementation

All the comments are applied. Some additional notes:

1. thread-safe is needed because the finalizeCallback can be executed in anythread. This is what Till discussed with me some weeks ago.

2. the worker part is covered by part 6 where I removed NS_ProxyRelase. I'll ask you to take a look at that patch again.

3. the test you were proposing passes with this patch because the body locking happens only when the first DataRequestCallback is executed. This happens when the reader is created. So, just doing: var body = response.body; sleep(1000); doesn't produce any locking.
Attachment #8883238 - Attachment is obsolete: true
Attachment #8884663 - Flags: review?(bkelly)
(Assignee)

Comment 65

15 days ago
Created attachment 8884664 [details] [diff] [review]
part 6 - shutting down

Same patch, with comments applied. I want you to take a look at how I nullify mGlobal without the using of NS_ProxyRelease.
Attachment #8883551 - Attachment is obsolete: true
Attachment #8884664 - Flags: review?(bkelly)
(In reply to Andrea Marchesini [:baku] from comment #62)
> What I meant is that that code just works. I don't see why I'm locking the
> stream in this code:
> 
>   let rs = response.body;
>   await delay(1000);
>   response.text();
> 
> The reading doesn't happen, and the stream is not locked.

But my original comment was not about locking.  I thought you said my comment didn't matter because the stream would be locked.  That is the only reason I mentioned locking here.

My issue is with what happens when the response body is blocking, like from cache API.  In these cases you start an async copy in the background.  Let me see if I can write a larger test that I expect to fail with the code I last reviewed.

If you open the console on this page in your streams enabled build, what output do you get?

https://response-body-getter-test.glitch.me/

The last line should be:

original="123456789abcdef" from-cache="123456789abcdef"

But based on the code I reviewed previously I don't think it will be.
Flags: needinfo?(amarchesini)
(Assignee)

Comment 67

13 days ago
Created attachment 8885197 [details] [diff] [review]
part 4 - Fetch.body implementation

This passes your test.
Attachment #8884663 - Attachment is obsolete: true
Attachment #8884663 - Flags: review?(bkelly)
Flags: needinfo?(amarchesini)
Attachment #8885197 - Flags: review?(bkelly)
(Assignee)

Comment 68

13 days ago
Created attachment 8885199 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit

All the TODOs will be covered by other patches
Attachment #8883552 - Attachment is obsolete: true
Attachment #8885199 - Flags: review?(bkelly)
(Assignee)

Comment 69

13 days ago
Created attachment 8885271 [details] [diff] [review]
part 4 - Fetch.body implementation
Attachment #8885197 - Attachment is obsolete: true
Attachment #8885197 - Flags: review?(bkelly)
Attachment #8885271 - Flags: review?(bkelly)
(Assignee)

Comment 70

13 days ago
Created attachment 8885350 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit

1. Response.Clone() works as it should.
2. ReadableStream exposed just to Response objects.
Attachment #8885199 - Attachment is obsolete: true
Attachment #8885199 - Flags: review?(bkelly)
Attachment #8885350 - Flags: review?(bkelly)
Comment on attachment 8885271 [details] [diff] [review]
part 4 - Fetch.body implementation

Review of attachment 8885271 [details] [diff] [review]:
-----------------------------------------------------------------

Thanks!  r=me with comments addressed.  My biggest concern is that we leave .body locked if one of the consuming methods, like text(), are called.  The ReadableStreamClose() does not lock it in the spec AFAICT, but not sure about JS::ReadableStreamClose().

::: dom/fetch/Fetch.cpp
@@ +878,5 @@
> +    return true;
> +  }
> +
> +  // If this object is disturbed or locked, return a new promise rejected with a
> +  // TypeError.

This comment seems wrong. This method returns a boolean, not a promise.

@@ +922,5 @@
> +
> +  // If we already created a ReadableStreamBody we have to close it now.
> +  if (mReadableStreamBody) {
> +    JS::Rooted<JSObject*> body(aCx, mReadableStreamBody);
> +    JS::ReadableStreamClose(aCx, body);

Shouldn't we just lock this?  AFAICT that is the observable change to `response.body.getReader()` after running:

https://fetch.spec.whatwg.org/#concept-body-consume-body

Or does close leave the stream locked?  Its a bit unclear to me what that js call does.

@@ +999,5 @@
> +    }
> +
> +    MOZ_ASSERT(body);
> +
> +    // If the body has been already consumed, we close the stream.

Again, shouldn't this be a locked stream or does "ReadableStreamClose" do that?  The spec ReadableStreamClose does not lock the stream:

https://streams.spec.whatwg.org/#readable-stream-close

It would also be nice to clarify the comment a bit more.  Maybe something like:

// If the body has already been consumed via text(), etc, we must still
// expose a .body ReadableStream.  This stream should be locked to reflect
// that the other consuming method has been called.

@@ +1000,5 @@
> +
> +    MOZ_ASSERT(body);
> +
> +    // If the body has been already consumed, we close the stream.
> +    if (BodyUsed() && !ReadableStreamClose(aCx, body)) {

nit:  You refer to this as JS::ReadableStreamClose() above, but not here.  Lets use a consistent style.

::: dom/fetch/FetchStream.cpp
@@ +106,5 @@
> +        return;
> +      }
> +
> +      nsCOMPtr<nsIInputStream> wrapper;
> +      rv = transport->OpenInputStream(/* aFlags */ 0,

So, this is only ok if we are guaranteed that the stream will be marked disturbed after starting this drain.  Can we MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream)) here?  Also make a comment that its safe to start draining the original stream since the disturbed flag will block any other consuming methods on the Request/Response.

@@ +233,5 @@
> +}
> +
> +FetchStream::~FetchStream()
> +{
> +  NS_ProxyRelease("FetchStream::mGlobal", mOwningEventTarget, mGlobal.forget());

What about mReadableStream?  Is it safe to destroy that JS::Heap<> member off the js thread?  I guess I should look at part 6.

@@ +292,5 @@
> +  uint64_t size = 0;
> +  nsresult rv = mInputStream->Available(&size);
> +  if (NS_SUCCEEDED(rv) && size == 0) {
> +    // In theory this should not happen. If size is 0, the stream should be
> +    // considered closed.

I think this is only true since this is a callback from AsyncWait(), right?  Available() can return zero in other cases without being closed yet.
Attachment #8885271 - Flags: review?(bkelly) → review+
(Assignee)

Comment 72

12 days ago
Created attachment 8885693 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit

Much better approach. The next patch is about FetchStreamReader: a nsIInputStream able to read data from a ReadableStream.
Attachment #8885350 - Attachment is obsolete: true
Attachment #8885350 - Flags: review?(bkelly)
Attachment #8885693 - Flags: review?(bkelly)
(Assignee)

Updated

12 days ago
Duplicate of this bug: 1147061
Comment on attachment 8884664 [details] [diff] [review]
part 6 - shutting down

Review of attachment 8884664 [details] [diff] [review]:
-----------------------------------------------------------------

r=me with comments addressed.

::: dom/fetch/FetchStream.cpp
@@ +23,5 @@
> +class FetchStreamWorkerHolder final : public WorkerHolder
> +{
> +public:
> +  explicit FetchStreamWorkerHolder(FetchStream* aStream)
> +    : mStream(aStream)

I think this should call WorkerHolder() constructor with AllowIdleShutdownStart.  Otherwise having a stream that is not being read in a worker will prevent that worker from GC'ing.

@@ +143,5 @@
> +    MOZ_ASSERT(workerPrivate);
> +
> +    stream->mWorkerHolder.reset(new FetchStreamWorkerHolder(stream));
> +    if (NS_WARN_IF(!stream->mWorkerHolder->HoldWorker(workerPrivate, Closing))) {
> +      aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);

I think this will leak the stream if HoldWorker() returns false.  The WorkerHolder and the stream form a cycle.  Perhaps change it to:

  UniquePtr<FetchStreamWorkerHolder> holder(new FetchStreamWorkerHolder(stream));
  if (!holder->HoldeWorker(workerPrivate, Closing)) {
    // ...
  }

  // Note, this will create a ref-cycle between the holder and the stream.  The
  // cycle is broken when the stream is closed or the worker begins shutting down.
  stream->mWorkerHolder = Move(holder);

@@ +263,5 @@
>  FetchStream::FinalizeCallback(void* aUnderlyingSource)
>  {
>    MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource);
>  
> +  // This can be called in any thread.

Can we short circuit here if we are already closed?  It seems Close() does the same work.  Or are we guaranteed this will not be called if Close() was called?  In that case, can we do a MOZ_DIAGNOSTIC_ASSERT()?

@@ +285,5 @@
> +        }
> +        stream->mGlobal = nullptr;
> +      });
> +
> +    SystemGroup::Dispatch("FetchStream::FinalizeCallback",

We have a global here, so I don't think we need system group.  Lets do `stream->mGlobal->Dispatch(...)` so its labeled.

Also, you are using different strings for the name here.

@@ +403,5 @@
> +  JS::ReadableStreamClose(cx, stream);
> +
> +  // We are in the correct thread, let's nullify the global now.
> +  mGlobal = nullptr;
> +  mWorkerHolder = nullptr;

This should remove the main thread window observer as well.
Attachment #8884664 - Flags: review?(bkelly) → review+
(Assignee)

Comment 75

12 days ago
Created attachment 8885798 [details] [diff] [review]
part 8 - JS ReadableStream as CTOR
Attachment #8885798 - Flags: review?(bkelly)
(Assignee)

Comment 76

12 days ago
Created attachment 8885801 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit
Attachment #8885693 - Attachment is obsolete: true
Attachment #8885693 - Flags: review?(bkelly)
Attachment #8885801 - Flags: review?(bkelly)
(Assignee)

Comment 77

12 days ago
Created attachment 8885802 [details] [diff] [review]
part 8 - NS_NewCancelableRunnableFunction
Attachment #8885802 - Flags: review?(bkelly)
(Assignee)

Comment 78

12 days ago
Created attachment 8885814 [details] [diff] [review]
part 10 - WPT
Attachment #8885814 - Flags: review?(bkelly)
(Assignee)

Comment 79

12 days ago
> ReadableStreamClose() does not lock it in the spec AFAICT, but not sure
> about JS::ReadableStreamClose().

Currently JS::ReadableStreamClose() locks the steam. Till will let me know if I have to use something else in case this will change behavior.

> What about mReadableStream?  Is it safe to destroy that JS::Heap<> member
> off the js thread?  I guess I should look at part 6.

mReadableStream can be released in any thread. But I'll check it again.

> > +  nsresult rv = mInputStream->Available(&size);
> > +  if (NS_SUCCEEDED(rv) && size == 0) {
> 
> I think this is only true since this is a callback from AsyncWait(), right? 
> Available() can return zero in other cases without being closed yet.

Exactly.
Comment on attachment 8885801 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit

Review of attachment 8885801 [details] [diff] [review]:
-----------------------------------------------------------------

r- for the clone issue.

::: dom/fetch/FetchStream.cpp
@@ +440,5 @@
> +  if (NS_WARN_IF(!stream->mOriginalInputStream)) {
> +    return NS_ERROR_DOM_INVALID_STATE_ERR;
> +  }
> +
> +  nsresult rv = stream->mOriginalInputStream->Available(aContentLength);

Again, I don't think this is a fair measure of content-length of the stream.  The available bytes might be only part of the stream.  I think you probably want to return UNKNOWN_BODY_SIZE here:

http://searchfox.org/mozilla-central/source/dom/fetch/InternalResponse.h#298

::: dom/fetch/Response.cpp
@@ +250,5 @@
> +      nsCOMPtr<nsIInputStream> bodyStream;
> +      uint64_t bodySize = 0;
> +
> +      MOZ_ASSERT(underlyingSource);
> +      aRv = FetchStream::RetrieveInputStream(underlyingSource,

I realize part 9 is changing this, but I have a couple concerns:

1) This assumes underlyingSource is coming from the fetch API.  As we expand streams support through the DOM it seems like we may have to handle non-fetch native stream sources.
2) I think this code effectively neuters the ReadabeStream passed in to the constructor.  Is that correct?  Should you be able to do this?

  let body = // some ReadableStream

  // create a Response using the body stream, but don't actually
  // use this response for anything.
  let r = new Response(body);

  // Drain the original ReadableStream passed to the response directly.
  body.getReader().read();

I don't think this will work here.

@@ -225,2 @@
>      }
> -    internalResponse->SetBody(bodyStream, bodySize);

nit: I don't think it was necessary to move the bodyStream declaration and SetBody() calls into the nested if/else statements.  They still just call SetBody() at the end and use the same bodyStream variable.

@@ +309,5 @@
>    RefPtr<Response> response = new Response(mOwner, ir);
> +
> +  // This response and the cloned one must share the same ReadableStream.
> +  JS::Rooted<JSObject*> body(aCx);
> +  GetBody(aCx, &body, aRv);

This seems quite wrong.  The spec requires that the cloned response have a tee of the ReadableStream:

https://fetch.spec.whatwg.org/#concept-body-clone

Now, we do a clone of the internal nsIInputStream in InternalResponse::Clone(), so maybe the tee() is happening that way.  But we definitely don't want r1.body and r2.body to have object identity.  They should be readable separately.

@@ +332,5 @@
>    RefPtr<Response> ref = new Response(mOwner, ir);
> +
> +  // This response and the cloned one must share the same ReadableStream.
> +  JS::Rooted<JSObject*> body(aCx);
> +  GetBody(aCx, &body, aRv);

Again, I don't think this is right.

::: dom/webidl/Response.webidl
@@ +7,5 @@
>   * https://fetch.spec.whatwg.org/#response-class
>   */
>  
> +// This should be Constructor(optional BodyInit... but BodyInit doesn't include
> +// ReadableStream yet because we don't want to expose Streams API to Request.

Is there any advantage to do this instead of just adding ReadableStream to BodyInit and throwing in the Request constructor if its ReadableStream?  That would seem a bit simpler.
Attachment #8885801 - Flags: review?(bkelly) → review-
(Assignee)

Comment 81

11 days ago
> 1) This assumes underlyingSource is coming from the fetch API.  As we expand
> streams support through the DOM it seems like we may have to handle
> non-fetch native stream sources.
> 2) I think this code effectively neuters the ReadabeStream passed in to the
> constructor.  Is that correct?  Should you be able to do this?

Yes, part 9 covers this part. The current code just works with DOM-created ReadableStream.
I also asked to have an extra flag to differentiate the type of underlyingSource, but for now, we just have Fetch ReadableStream sources.

> Is there any advantage to do this instead of just adding ReadableStream to
> BodyInit and throwing in the Request constructor if its ReadableStream? 
> That would seem a bit simpler.

BodyInit is also used by XHR, Request and Beacon. If we do as you propose, we need to introduce that check in all these components.
Comment on attachment 8885798 [details] [diff] [review]
part 8 - JS ReadableStream as CTOR

Review of attachment 8885798 [details] [diff] [review]:
-----------------------------------------------------------------

Sorry, but I had "opinions" on this patch.  I'd like to redesign it pretty significantly. :-)

In particular, I think we should lean on nsPipe for a lot of the work here.  I think this will be more maintainable and performant.  Also, I think it will make it easier to handle our Response-vs-InternalResponse split a bit better and should just work for clones.

::: dom/fetch/FetchStreamReader.cpp
@@ +158,5 @@
> +  }
> +
> +  RefPtr<FetchStreamReader> self(this);
> +  RefPtr<Runnable> r = NS_NewCancelableRunnableFunction(
> +    "FetchStreamReader::AsyncWait RetrieveData",

If I'm reading this right we:

1) Issue a runnable to js owning thread every time AsyncWait() is called.
2) This runnable will copy one buffer of data which we let Read() drain.
3) We immediately go back to WOULD_BLOCK state.
4) The next AsyncWait() issues another runnable to copy as step (1).

This will work, but it seems extremely slow.  It requires ping-ponging a single buffer between threads.  (Or even the same thread!)

I really think we should lean on nsPipe again.  When you need to convert a js ReadableStream to an nsIInputStream you can:

1) Create an nsPipe.
2) Start a read() loop on the js owning thread that pushes data into the pipe writer end.
  a) When the pipe buffer is full the pipe writer end will return WOULD_BLOCK and you can stop reading from the ReadableStream.
  b) When the pipe writer end's AsyncWait() fires you can start reading from the ReadableStream again.
3) You can then pass the nsPipe reader end to any gecko code that takes an nsIInputStream on any thread.  The pipe handles cross-thread safety via its mutex.

This has the advantage of reusing the existing pipe code which is well tested at this point.  All that is really required is to write the code that drains from ReadableStream and writes to nsIAsyncOutputStream.

This whole process should also probably have a WorkerHolder for thread lifetime.  It can also terminate the loop when the worker shuts down.

::: dom/fetch/FetchStreamReader.h
@@ +14,5 @@
> +namespace mozilla {
> +namespace dom {
> +
> +class FetchStreamReader final : public nsIAsyncInputStream
> +                              , public PromiseNativeHandler

Exposing PromiseNativeHandler on the returned object is a bit confusing.  AFAICT its only used internally to FetchStreamReader.  The external caller is not intended to use Promise code directly.  Right?

@@ +22,5 @@
> +  NS_DECL_NSIASYNCINPUTSTREAM
> +  NS_DECL_NSIINPUTSTREAM
> +
> +  static nsresult
> +  Create(JSContext* aCx, nsIGlobalObject* aGlobal, JS::HandleObject aStream,

Can you document this a bit.  Why are we returning the reader in an output variable?

@@ +33,5 @@
> +  RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override;
> +
> +private:
> +  FetchStreamReader(nsIGlobalObject* aGlobal, JS::HandleObject aReader);
> +  ~FetchStreamReader();

This can be = default.

::: dom/fetch/Response.cpp
@@ +350,5 @@
> +{
> +  *aStream = nullptr;
> +
> +  // Maybe we have to retrieve the stream from the readableStream using a
> +  // reader.

Are we sure there is not code that bypasses Response::GetBody() and calls InternalResponse::GetBody() directly?  In those cases they will get a nullptr body when there is a js ReadableStream body on the Response object.  Right?

For example, I don't think this will work right for this call site:

http://searchfox.org/mozilla-central/source/dom/workers/ServiceWorkerEvents.cpp#670

It goes straight to the InternalResponse() to call GetUnfilteredBody().

Perhaps if the body is a js ReadableStream you could:

1) Create the pipe as I suggest in my other comment.
2) Wrap the pipe reader end in an nsIAsyncInputStream that dispatches an nsIRunnable to a target on the first read.  TriggeringAsyncInputStream or something.  Otherwise its just a pass through.
3) Set the wrapped trigger stream as the InternalResponse body.
4) Start reading from the ReadableStream into the pipe writer end when the TriggeringAsyncInputStream fires its runnable.

This would allow the ReadableStream to be returned via the body getter normally, or we can start consuming it via the InternalResponse body as we do today.

For clones it would pretty much just work as well.  The pipe reader end already handles clones with zero copies.  The new TriggeringAsyncInputStream wrapper would need to be cloneable based on its underlying stream.  The ReadableStream draining code would just need to be smart enough to handle more than one triggering runnable firing.  Just ignore the extras.
Attachment #8885798 - Flags: review?(bkelly) → review-
Comment on attachment 8885802 [details] [diff] [review]
part 8 - NS_NewCancelableRunnableFunction

Review of attachment 8885802 [details] [diff] [review]:
-----------------------------------------------------------------

r=me with an extra argument to set the cancellation callback.

::: xpcom/threads/nsThreadUtils.h
@@ +518,5 @@
> +class CancelableRunnableFunction : public CancelableRunnable
> +{
> +public:
> +  template <typename F>
> +  explicit CancelableRunnableFunction(F&& aFunction)

It seems this should probably take a separate function for that cancel operation?  If we're just going to ignore cancel you could just make NS_NewRunnable use CancelableRunnable, right?
Attachment #8885802 - Flags: review?(bkelly) → review+
Comment on attachment 8885814 [details] [diff] [review]
part 10 - WPT

Review of attachment 8885814 [details] [diff] [review]:
-----------------------------------------------------------------

r- mostly for the changes in response-stream-disturbed-5.html.

::: testing/web-platform/meta/fetch/api/basic/stream-response.any.js.ini
@@ +1,4 @@
>  [stream-response.any.worker.html]
>    type: testharness
>    [Stream response's body]
>      expected: FAIL

Why do we still fail this one?

::: testing/web-platform/meta/fetch/api/response/response-clone.html.ini
@@ +3,1 @@
>    [Check response clone use structureClone for teed ReadableStreams (Int8Arraychunk)]

Maybe these would be fixed with the tee on clone?

::: testing/web-platform/meta/fetch/api/response/response-consume-stream.html.ini
@@ +3,2 @@
>    [Read form data response's body as readableStream]
>      expected: FAIL

Why does this fail?  We support FormData, right?

::: testing/web-platform/tests/fetch/api/response/response-consume-stream.html
@@ +63,5 @@
>  test(function() {
>      assert_equals(Response.error().body, null);
>  }, "Getting an error Response stream");
>  
> +test(function() {

Was this necessary or just a driveby cleanup?

::: testing/web-platform/tests/fetch/api/response/response-stream-disturbed-5.html
@@ +15,5 @@
>  promise_test(function() {
>      return fetch("../resources/data.json").then(function(response) {
>          response.blob();
>          assert_not_equals(response.body, null);
> +        return response.body.getReader().read();

This seems wrong to me.  Calling a consuming method like .blob() should mark the stream disturbed synchronously.  Right?
Attachment #8885814 - Flags: review?(bkelly) → review-

Comment 85

11 days ago
Comment on attachment 8883236 [details] [diff] [review]
part 1 - WebIDL bindings

r=me
Flags: needinfo?(bzbarsky)
Attachment #8883236 - Flags: review+

Comment 86

11 days ago
Comment on attachment 8883237 [details] [diff] [review]
part 2 - WebIDL internal renaming

>+                # If our spiderMonkey interface is nullable, this will set the

"SpiderMonkey interface"

>+    spiderMonkey interface, "result" is one of the

"SpiderMonkey interface"

>+            # (which includes spiderMonkey interfaces and arraybuffers) at the

No, this is wrong.  arraybuffers are SpiderMonkey interfaces (but are not typed arrays!).  So just "(which includes SpiderMonkey interfaces)", please.

>+                # The code for unwrapping non-callback interfaces, spiderMonkey

"SpiderMonkey"

>+        If spiderMonkeyInterfacesAreStructs is false, spiderMonkey interfaces

"SpiderMonkey interfaces"

r=me with that.
Attachment #8883237 - Flags: review+
(Assignee)

Comment 87

11 days ago
Created attachment 8886039 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit
Attachment #8885801 - Attachment is obsolete: true
Attachment #8886039 - Flags: review?(bkelly)
(Assignee)

Comment 88

11 days ago
> ::: testing/web-platform/meta/fetch/api/response/response-clone.html.ini
> @@ +3,1 @@
> >    [Check response clone use structureClone for teed ReadableStreams (Int8Arraychunk)]
> 
> Maybe these would be fixed with the tee on clone?

No, this has to be fixed by the JS engine part.

> testing/web-platform/meta/fetch/api/response/response-consume-stream.html.ini
> @@ +3,2 @@
> >    [Read form data response's body as readableStream]
> >      expected: FAIL
> 
> Why does this fail?  We support FormData, right?

Because we serialize the form data using FSMultipartFormData. I have to check if this is a bug in the test or in our code. I'll do it as follow up.

> > +test(function() {
> 
> Was this necessary or just a driveby cleanup?

The test was broken. I can land it as a separate patch in a separate bug.

> testing/web-platform/tests/fetch/api/response/response-stream-disturbed-5.
> 
> This seems wrong to me.  Calling a consuming method like .blob() should mark
> the stream disturbed synchronously.  Right?

By spec, getReader() doesn't throw.
Flags: needinfo?(bkelly)
(Assignee)

Comment 89

11 days ago
Created attachment 8886139 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit
Attachment #8886039 - Attachment is obsolete: true
Attachment #8886039 - Flags: review?(bkelly)
Attachment #8886139 - Flags: review?(bkelly)
(Assignee)

Comment 90

11 days ago
Created attachment 8886141 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR
Attachment #8885798 - Attachment is obsolete: true
Attachment #8886141 - Flags: review?(bkelly)
(Assignee)

Updated

11 days ago
Attachment #8886141 - Attachment description: part 8 - JS ReadableStream as CTOR → part 9 - JS ReadableStream as CTOR
(Assignee)

Updated

11 days ago
Attachment #8885802 - Attachment description: part 9 - NS_NewCancelableRunnableFunction → part 8 - NS_NewCancelableRunnableFunction
(In reply to Andrea Marchesini [:baku] from comment #88)
> > testing/web-platform/tests/fetch/api/response/response-stream-disturbed-5.
> > 
> > This seems wrong to me.  Calling a consuming method like .blob() should mark
> > the stream disturbed synchronously.  Right?
> 
> By spec, getReader() doesn't throw.

Uh, it seems to in the spec I am looking at.  See step 1 here:

https://streams.spec.whatwg.org/#rs-get-reader

And the locked check is in the reader constructor bit, step 2 here:

https://streams.spec.whatwg.org/#default-reader-constructor
Flags: needinfo?(bkelly)
(Assignee)

Comment 92

11 days ago
Ok, this seems a bug in our JS implementation. I'll check this with till. Ignore the WPT patch.
(Assignee)

Comment 93

11 days ago
Created attachment 8886259 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit

The lock is done in patch 9 using a Reader.
Attachment #8886139 - Attachment is obsolete: true
Attachment #8886139 - Flags: review?(bkelly)
Attachment #8886259 - Flags: review?(bkelly)
(Assignee)

Comment 94

11 days ago
Created attachment 8886260 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR
Attachment #8886141 - Attachment is obsolete: true
Attachment #8886141 - Flags: review?(bkelly)
Attachment #8886260 - Flags: review?(bkelly)
(Assignee)

Comment 95

11 days ago
Created attachment 8886261 [details] [diff] [review]
part 10 - WPT
Attachment #8885814 - Attachment is obsolete: true
Attachment #8886261 - Flags: review?(bkelly)
Comment on attachment 8886259 [details] [diff] [review]
part 7 - Response CTOR with a ReadableStream as BodyInit

Review of attachment 8886259 [details] [diff] [review]:
-----------------------------------------------------------------

r=me with comments addressed.

::: dom/fetch/BodyExtractor.h
@@ +39,5 @@
>    nsresult GetAsStream(nsIInputStream** aResult,
>                         uint64_t* aContentLength,
>                         nsACString& aContentTypeWithCharset,
>                         nsACString& aCharset) const override;
> +

nit: blank line

::: dom/fetch/Fetch.cpp
@@ +1139,5 @@
> +
> +  // If this is a ReadableStream with an external source, this has been
> +  // generated by a Fetch. In this case, Fetch will be able to recreate it
> +  // again when GetBody() is called.
> +  if (JS::ReadableStreamGetMode(stream) == JS::ReadableStreamMode::ExternalSource) {

Should we throw in this case if mReadableStreamBody exists, but is locked?  I believe ReadableStreamTee() will throw in that case.

I guess Response::Clone() does this already by checking BodyUsed().  Lets MOZ_DIAGNOSTIC_ASSERT(BodyUsed()) in this ExternalSource case to avoid accidentally breaking this in the future.

::: dom/fetch/Response.cpp
@@ +308,5 @@
> +  if (NS_WARN_IF(aRv.Failed())) {
> +    return nullptr;
> +  }
> +
> +  if (body) {

Please add a comment here indicating that we can have a body, but get nullptr here if its a native stream.  In this case the InternalResponse will have a clone of the native body and the ReadableStream will be created lazily if needed.

@@ +333,5 @@
> +  if (NS_WARN_IF(aRv.Failed())) {
> +    return nullptr;
> +  }
> +
> +  if (body) {

Same comment here.
Attachment #8886259 - Flags: review?(bkelly) → review+
Comment on attachment 8886260 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR

Review of attachment 8886260 [details] [diff] [review]:
-----------------------------------------------------------------

This looks better, but it still does more copying than I would like.  I tried to sketch out some pseudo-code below showing the algorithm I had in mind.  I think we can do it without any additional NS_AsyncCopy() pump or copying.  (This assumes that we can hold a ref to the Uint8Array chunk if the pipe writer is full and we have to wait, though.)

I'm going on PTO tomorrow, but I will check bugmail and try to review in my spare time.  I don't want to leave this for a week until I come back.  Thanks for humoring me here!

::: dom/fetch/Fetch.h
@@ +213,5 @@
>    }
>  
> +  // FetchStreamHolder
> +  void
> +  StoreAndTraceReader(JS::HandleObject aReader) override

What is the significance of the "trace" term here?  Does it imply cycle tracing or something?

::: dom/fetch/FetchStreamReader.cpp
@@ +113,5 @@
> +  if (NS_WARN_IF(NS_FAILED(rv))) {
> +    return rv;
> +  }
> +
> +  rv = NS_AsyncCopy(streamReader, pipeOut,

I'm sorry, but I really would like to avoid copying to an intermediate buffer here.  I don't think we should copy to mBuffer in FetchStreamReader.  Also, I don't think we need an NS_AsyncCopy() pump here.

See comment below for some more details.

@@ +427,5 @@
> +    DispatchAsyncCallback();
> +    return;
> +  }
> +
> +  memcpy(mBuffer.Elements(), data, len);

I think we should just do:

  uint32_t numWritten = pipeWriter.Write(data, len);
  if (numWritten < len) {
    // Hold a strong ref to the FetchReadableStreamDataArray Uint8Array

    // Call nsIAsyncOutputStream::AsyncWait()

    // When it calls back, write the next part of the array

    // If we finish writing this chunk then call the next reader().read() to perform the async loop.
  }

To expand on the overall algorithm, I think would should effectively we have two async loops:

1) The "outer" loop is doing:

  function outer() {
    // read next chunk
    return stream.reader().read().then(chunk => {
      // wait until we have pushed it into the pipe
      return inner(chunk);
    })
    // repeat
    .then(outer);
  }

2) The "inner" loop is doing:

  function inner(chunk) {
    return new Promise(resolve => {
      let remaining = chunk.length;
      let offset = 0;

      function writeSegment() {
        let written = pipeWriter.write(chunk + offset, remaining);
        remaining -= written;
        offset += written;

        // done
        if(remaining < 1) {
          resolve();
          return;
        }

        // wait for more room, then loop
        pipeWriter.asyncWait(writeSegment);
      }
    });
  };

Obviously this is just pseudo code.  I wouldn't use real promises here.  (I guess you could try MozPromise if you wanted, but might not be worth it.)

The goal, though, is to use the existing pump mechanisms in ReadableStream /nsIAsyncOutputStream and to avoid any extra copies.

This overall mechanism would be triggered the first time gecko tries to read the pipe reader stream.
Attachment #8886260 - Flags: review?(bkelly) → review-
Comment on attachment 8886261 [details] [diff] [review]
part 10 - WPT

Review of attachment 8886261 [details] [diff] [review]:
-----------------------------------------------------------------

r=me with comments addressed.  In particular, I think we must indicate prefs correctly in our test_interfaces.js files, etc.

::: dom/tests/mochitest/general/test_interfaces.js
@@ +56,5 @@
>      "Object",
>      "Promise",
>      "Proxy",
>      "RangeError",
> +    "ReadableStream",

See other comments below about needing pref annotations for these.

::: dom/workers/test/serviceworkers/test_serviceworker_interfaces.js
@@ +27,5 @@
>      "ArrayBuffer",
>      "Atomics",
>      "Boolean",
> +    "ByteLengthQueuingStrategy",
> +    "CountQueuingStrategy",

Why isn't ReadableStream necessary here?  I don't think we can ship ReadableStream on window without also exposing it on ServiceWorker.  I know at least this site feature detects ReadableStream before calling the SW register() call:

https://jakearchibald.com/

::: dom/workers/test/test_worker_interfaces.js
@@ +51,5 @@
>      "Object",
>      "Promise",
>      "Proxy",
>      "RangeError",
> +    "ReadableStream",

Not technically a WPT test.  Please fix the commit log to indicate its changed WPT and mochitests.

Also, I think this needs a pref annotation.  In particular, I would still like to start with this stuff default off on nightly.  I think I asked for this in a previous patch, but perhaps you aren't testing with that yet.

::: testing/web-platform/meta/fetch/api/response/response-clone.html.ini
@@ +3,2 @@
>    [Check response clone use structureClone for teed ReadableStreams (Int8Arraychunk)]
>      expected: FAIL

Is there a follow-up bug here?  You said this is a bug in the jsapi bits?  Can you note the bug here?

::: testing/web-platform/meta/fetch/api/response/response-consume-stream.html.ini
@@ +3,2 @@
>    [Read form data response's body as readableStream]
>      expected: FAIL

Can you add the follow-up bug here?
Attachment #8886261 - Flags: review?(bkelly) → review+
For testing streams with service workers, see:

  https://jakearchibald.com/

And this in-development code in a google library:

  https://twitter.com/jeffposnick/status/885314388818894848
(Assignee)

Comment 100

10 days ago
Having just a pipe is an interesting approach, but we still need to expose a custom nsIInputStream because we need to know when the real reading starts: writing data immediately, will lock the ReadableStream, and this is not what we want. If the inputStream is not used, the Reader should not be created.
Right.  But it should be a custom stream wrapper around the pipe reader, not the js readable stream.  When the first thing in gecko calls read then trigger the start of algorithm above.  This custom stream can be tiny and pass through almost everything.  It just needs to call a function on first read.  It doesn't need to manage a buffer.
(Assignee)

Comment 102

10 days ago
Created attachment 8886588 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR
Attachment #8886260 - Attachment is obsolete: true
Attachment #8886588 - Flags: review?(bkelly)
(Assignee)

Comment 103

10 days ago
Created attachment 8886598 [details] [diff] [review]
part 10 - WPT
Attachment #8886261 - Attachment is obsolete: true
Attachment #8886598 - Flags: review?(annevk)

Comment 104

10 days ago
Comment on attachment 8886598 [details] [diff] [review]
part 10 - WPT

Review of attachment 8886598 [details] [diff] [review]:
-----------------------------------------------------------------

This looks good.
Attachment #8886598 - Flags: review?(annevk) → review+
Comment on attachment 8886588 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR

Review of attachment 8886588 [details] [diff] [review]:
-----------------------------------------------------------------

Partial review.  I need to look at the cloning stuff closer before completing.  Later today or tomorrow.

::: dom/fetch/FetchStreamReader.cpp
@@ +75,5 @@
> +      streamReader->mPipeOut->CloseWithStatus(NS_ERROR_DOM_INVALID_STATE_ERR);
> +      return NS_ERROR_DOM_INVALID_STATE_ERR;
> +    }
> +
> +    streamReader->mWorkerHolder = Move(holder);

Please add a comment that we have a ref-cycle here that is broken when the stream is closed or the worker shutsdown.

Also, not sure if it matters, but we have a cycle in the worker case and not the main thread case.

@@ +101,5 @@
> +
> +void
> +FetchStreamReader::CloseAndRelease(nsresult aStatus)
> +{
> +  if (mStreamClosed) {

Can you do an NS_ASSERT_OWNINGTHREAD(FetchStreamReader) here?

@@ +111,5 @@
> +
> +  mGlobal = nullptr;
> +
> +  mPipeOut->CloseWithStatus(aStatus);
> +  mPipeOut = nullptr;

Can you do a kungFuDeathGrip here in case the mPipeOut::AsyncWait() or holder is the last thing holding us alive?  It seems like we could be released during cancelation/close in the middle of the method here.

@@ +126,5 @@
> +                                  JS::MutableHandle<JSObject*> aReader,
> +                                  ErrorResult& aRv)
> +{
> +  MOZ_ASSERT(!mReader);
> +  MOZ_ASSERT(aStream);

Can you make cheap checks like these MOZ_DIAGNOSTIC_ASSERT()?

Also, please add NS_ASSERT_OWNINGTHREAD(FetchStreamReader).

@@ +152,5 @@
> +NS_IMETHODIMP
> +FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream)
> +{
> +  MOZ_ASSERT(aStream == mPipeOut);
> +  MOZ_ASSERT(mReader);

NS_ASSERT_OWNINGTHREAD(FetchStreamReader)

@@ +154,5 @@
> +{
> +  MOZ_ASSERT(aStream == mPipeOut);
> +  MOZ_ASSERT(mReader);
> +
> +  if (mBuffer) {

I think we should check mStreamClosed here before doing anything.  Its possible the StreamReady runnable is in the event queue when the stream is closed.  We should short-circuit any further action here.

@@ +193,5 @@
> +FetchStreamReader::ResolvedCallback(JSContext* aCx,
> +                                    JS::Handle<JS::Value> aValue)
> +{
> +  // This promise should be resolved with { done: boolean, value: something },
> +  // "value" is interesting only if done is false.

Short-circuit based on mStreamClosed.

@@ +224,5 @@
> +  Uint8Array& array = value->mValue.Value();
> +  array.ComputeLengthAndData();
> +  uint32_t len = array.Length();
> +
> +  MOZ_ASSERT(!mBuffer);

MOZ_DIAGNOSTIC_ASSERT(!mBuffer);

Also maybe, MOZ_DIAGNOSTIC_ASSERT(len > 0)?  Or is that possible here?  Maybe it just works.

@@ +228,5 @@
> +  MOZ_ASSERT(!mBuffer);
> +  mBuffer = Move(value);
> +
> +  mBufferOffset = 0;
> +  mBufferSize = len;

I think "size" is confusing here.  Can you rename to mBufferRemaining or something?

@@ +260,5 @@
> +      mBuffer = nullptr;
> +    }
> +  }
> +
> +  rv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);

I think this will work for nsPipeOutputStream because we know that implementation does its best to fully write the desired number of bytes.  Its possible for other nsIOutputStream impls, though, to write just a smaller number of bytes, but still not be blocking.  In theory we would want to loop here to catch that.  The AsyncWait() will still fire immediately, but at the cost of an added runnable in this case.

Anyway, maybe just a comment here that we don't need to loop since we know we are writing to nsPipeOutputStream and we know it will fully write if possible.

::: dom/fetch/InternalResponse.cpp
@@ +151,5 @@
>    if (!mBody) {
>      return clone.forget();
>    }
>  
> +  if (aCloneType ==eCloneInputStream) {

nit: space after ==
Comment on attachment 8886588 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR

Review of attachment 8886588 [details] [diff] [review]:
-----------------------------------------------------------------

Please see comment 105 and these comments.  (Assuming splinter doesn't repeat them.  Its acting weird for me.)

This is looking really good!

I think the only thing really missing now is a wrapper stream to call StartConsuming() when gecko tries to read the FetchStreamReader's corresponding nsPipeInputStream.  r- is mainly because I want to look at this stream as well.  Sorry.

Also, can you please make a few mochitests that:

1. Test consuming a js ReadableStream via the FetchStreamReader.
2. Like (1) but test what happens when its down in a Worker when worker.terminate() is called in the middle.
3. Test that we properly error out when a js ReadableStream sends a non Uint8Array chunk through the stream.

I think we probably want these tests before we trying enabling this feature.  Ideally we would include the tests in this bug.

::: dom/fetch/Fetch.cpp
@@ +993,5 @@
> +      }
> +    } else {
> +      // If this is not a native ReadableStream, let's activate the
> +      // FetchStreamReader.
> +      MOZ_ASSERT(mFetchStreamReader);

I'm a little confused about this.  Why do we have a mFetchStreamReader already and then override it again after the StartConsuming below?  What is the original mFetchStreamReader and what is it replaced with?

@@ +1102,5 @@
> +  }
> +
> +  MOZ_ASSERT(body);
> +
> +  // If the body has been already consumed, we close the stream.

s/close/lock/g

::: dom/fetch/FetchStreamReader.cpp
@@ +75,5 @@
> +      streamReader->mPipeOut->CloseWithStatus(NS_ERROR_DOM_INVALID_STATE_ERR);
> +      return NS_ERROR_DOM_INVALID_STATE_ERR;
> +    }
> +
> +    streamReader->mWorkerHolder = Move(holder);

Please add a comment that we have a ref-cycle here that is broken when the stream is closed or the worker shutsdown.

Also, not sure if it matters, but we have a cycle in the worker case and not the main thread case.

@@ +78,5 @@
> +
> +    streamReader->mWorkerHolder = Move(holder);
> +  }
> +
> +  pipeIn.forget(aInputStream);

Is this correct?  We had discussed having a wrapper nsIAsyncInputStream that call StartConsuming() on first Read()/ReadSegments().  I don't see how StartConsuming is triggered in that case.

@@ +101,5 @@
> +
> +void
> +FetchStreamReader::CloseAndRelease(nsresult aStatus)
> +{
> +  if (mStreamClosed) {

Can you do an NS_ASSERT_OWNINGTHREAD(FetchStreamReader) here?

@@ +111,5 @@
> +
> +  mGlobal = nullptr;
> +
> +  mPipeOut->CloseWithStatus(aStatus);
> +  mPipeOut = nullptr;

Can you do a kungFuDeathGrip here in case the mPipeOut::AsyncWait() or holder is the last thing holding us alive?  It seems like we could be released during cancelation/close in the middle of the method here.

@@ +126,5 @@
> +                                  JS::MutableHandle<JSObject*> aReader,
> +                                  ErrorResult& aRv)
> +{
> +  MOZ_ASSERT(!mReader);
> +  MOZ_ASSERT(aStream);

Can you make cheap checks like these MOZ_DIAGNOSTIC_ASSERT()?

Also, please add NS_ASSERT_OWNINGTHREAD(FetchStreamReader).

@@ +152,5 @@
> +NS_IMETHODIMP
> +FetchStreamReader::OnOutputStreamReady(nsIAsyncOutputStream* aStream)
> +{
> +  MOZ_ASSERT(aStream == mPipeOut);
> +  MOZ_ASSERT(mReader);

NS_ASSERT_OWNINGTHREAD(FetchStreamReader)

@@ +154,5 @@
> +{
> +  MOZ_ASSERT(aStream == mPipeOut);
> +  MOZ_ASSERT(mReader);
> +
> +  if (mBuffer) {

I think we should check mStreamClosed here before doing anything.  Its possible the StreamReady runnable is in the event queue when the stream is closed.  We should short-circuit any further action here.

@@ +193,5 @@
> +FetchStreamReader::ResolvedCallback(JSContext* aCx,
> +                                    JS::Handle<JS::Value> aValue)
> +{
> +  // This promise should be resolved with { done: boolean, value: something },
> +  // "value" is interesting only if done is false.

Short-circuit based on mStreamClosed.

@@ +224,5 @@
> +  Uint8Array& array = value->mValue.Value();
> +  array.ComputeLengthAndData();
> +  uint32_t len = array.Length();
> +
> +  MOZ_ASSERT(!mBuffer);

MOZ_DIAGNOSTIC_ASSERT(!mBuffer);

Also maybe, MOZ_DIAGNOSTIC_ASSERT(len > 0)?  Or is that possible here?  Maybe it just works.

@@ +228,5 @@
> +  MOZ_ASSERT(!mBuffer);
> +  mBuffer = Move(value);
> +
> +  mBufferOffset = 0;
> +  mBufferSize = len;

I think "size" is confusing here.  Can you rename to mBufferRemaining or something?

@@ +260,5 @@
> +      mBuffer = nullptr;
> +    }
> +  }
> +
> +  rv = mPipeOut->AsyncWait(this, 0, 0, mOwningEventTarget);

I think this will work for nsPipeOutputStream because we know that implementation does its best to fully write the desired number of bytes.  Its possible for other nsIOutputStream impls, though, to write just a smaller number of bytes, but still not be blocking.  In theory we would want to loop here to catch that.  The AsyncWait() will still fire immediately, but at the cost of an added runnable in this case.

Anyway, maybe just a comment here that we don't need to loop since we know we are writing to nsPipeOutputStream and we know it will fully write if possible.

::: dom/fetch/FetchStreamReader.h
@@ +25,5 @@
> +public:
> +  NS_DECL_ISUPPORTS
> +  NS_DECL_NSIOUTPUTSTREAMCALLBACK
> +
> +  // This creates a nsIInputStream able to retrieve data from the ReadableStream

This is a little confusing since "the" ReadableStream hasn't been specified yet.  Create() just seems to set things up and provides a variety of out parameters.  Maybe document that StartConsuming() needs to be called later to actually attach to a particular ReadableStream.

::: dom/fetch/InternalResponse.cpp
@@ +151,5 @@
>    if (!mBody) {
>      return clone.forget();
>    }
>  
> +  if (aCloneType ==eCloneInputStream) {

Also, just move this aCloneType check into the previous if-statement:

  if (!mBody || aCloneType != eCloneInputStream) {
    return clone.forget();
  }

::: dom/fetch/InternalResponse.h
@@ +43,5 @@
>    ToIPC(IPCInternalResponse* aIPCResponse,
>          M* aManager,
>          UniquePtr<mozilla::ipc::AutoIPCStream>& aAutoStream);
>  
> +  enum CloneType

nit: I prefer enum class these days, but this works too.

::: dom/fetch/Response.cpp
@@ +266,5 @@
> +        // If this is a JS-created ReadableStream, let's create a
> +        // FetchStreamReader.
> +        aRv = FetchStreamReader::Create(aGlobal.Context(), global,
> +                                        getter_AddRefs(r->mFetchStreamReader),
> +                                        getter_AddRefs(bodyStream));

How does StartConsuming get called in this case where gecko consumes the nsIAsyncInputStream.  I think we need that wrapper stream to start it on Read/ReadSegments.
Attachment #8886588 - Flags: review?(bkelly) → review-
(In reply to Ben Kelly [PTO, back July 24][:bkelly] from comment #106)
> Also, can you please make a few mochitests that:
> 
> 1. Test consuming a js ReadableStream via the FetchStreamReader.
> 2. Like (1) but test what happens when its down in a Worker when
> worker.terminate() is called in the middle.
> 3. Test that we properly error out when a js ReadableStream sends a non
> Uint8Array chunk through the stream.

I guess a lot of this is covered by streams WPT tests.  We need to test the termination case, though.

Also, please ask jgraham to do an import of WPT tests so we can run this service worker test:

https://github.com/w3c/web-platform-tests/blob/master/service-workers/service-worker/fetch-event-respond-with-readable-stream.https.html

I'm not sure why we don't already have it in tree.  Have we really not sync'd since May 19?

Or pull it in manually.  We need to test that case.
(Assignee)

Comment 108

7 days ago
> I think the only thing really missing now is a wrapper stream to call
> StartConsuming() when gecko tries to read the FetchStreamReader's
> corresponding nsPipeInputStream.  r- is mainly because I want to look at
> this stream as well.  Sorry.

No, we don't need it. We want to start the reading only when the consuming of the body starts. We create the FetchStreamReaer immediately, and with it, we create the pipe. But the creation of the ReadableStreamReader should happen only when the body consuming starts for real. Otherwise the stream is locked when the Response object is created.

> Also, can you please make a few mochitests that:

Yeah. I have several tests. I will attach them as an extra patch.

> ::: dom/fetch/Fetch.cpp
> @@ +993,5 @@
> > +      }
> > +    } else {
> > +      // If this is not a native ReadableStream, let's activate the
> > +      // FetchStreamReader.
> > +      MOZ_ASSERT(mFetchStreamReader);
> 
> I'm a little confused about this.  Why do we have a mFetchStreamReader
> already and then override it again after the StartConsuming below?  What is
> the original mFetchStreamReader and what is it replaced with?

We have a FetchStreamReader because that is created when the Response is created in case the BodyInit is a ReadableStream. In this way we also have a valid nsIInputStream.

How you can see, here I active the reading calling StartComsuming.

> Is this correct?  We had discussed having a wrapper nsIAsyncInputStream that
> call StartConsuming() on first Read()/ReadSegments().  I don't see how
> StartConsuming is triggered in that case.

At the first Read()/ReadSegments() we want to return WOULD_BLOCK. If the reading starts before the ::ConsumeBody(), we don't want to create the ReadableStreamReader, otherwise the stream will be locked.

All the other comments are applied. I'll submit an extra patch with tests.
Flags: needinfo?(bkelly)
(Assignee)

Comment 109

7 days ago
Created attachment 8886960 [details] [diff] [review]
part 11 - tests
Attachment #8886960 - Flags: review?(bkelly)
(In reply to Andrea Marchesini [:baku] from comment #108)
> > I think the only thing really missing now is a wrapper stream to call
> > StartConsuming() when gecko tries to read the FetchStreamReader's
> > corresponding nsPipeInputStream.  r- is mainly because I want to look at
> > this stream as well.  Sorry.
> 
> No, we don't need it. We want to start the reading only when the consuming
> of the body starts. We create the FetchStreamReaer immediately, and with it,
> we create the pipe. But the creation of the ReadableStreamReader should
> happen only when the body consuming starts for real. Otherwise the stream is
> locked when the Response object is created.

I don't understand this.  Something like FetchEvent.respondWith() does not call one of the consuming methods.  It gets the nsIAsyncInputStream straight out of InternalResponse and tries to read it.  As far as I can tell your current implementation will never start to read the js stream in that case.

http://searchfox.org/mozilla-central/source/dom/workers/ServiceWorkerEvents.cpp#670

Similarly Cache API just operates on the nsIAsyncInputStream:

http://searchfox.org/mozilla-central/source/dom/cache/TypeUtils.cpp#222

These gecko integration points need to start consuming the js stream in some way.  Ideally this would "just work" when you try to read the stream.
Flags: needinfo?(bkelly)
Comment on attachment 8886960 [details] [diff] [review]
part 11 - tests

Review of attachment 8886960 [details] [diff] [review]:
-----------------------------------------------------------------

Looks reasonable, but please add a test for worker termination.

We also need to see the SW WPT test from upstream.  I don't think we will pass it with the current patches yet, though.
Attachment #8886960 - Flags: review?(bkelly) → review+
Also, a simple test that tries to put a Response with a js ReadableStream in Cache API would be useful.
(Assignee)

Comment 113

6 days ago
Created attachment 8887447 [details] [diff] [review]
part 12 - StartConsuming in SetBodyUsed()

This is the missing bit of patch 9. I also added a couple of cache tests.
Attachment #8887447 - Flags: review?(bkelly)
Comment on attachment 8887447 [details] [diff] [review]
part 12 - StartConsuming in SetBodyUsed()

Review of attachment 8887447 [details] [diff] [review]:
-----------------------------------------------------------------

The plan to StartConsuming() from SetBodyUsed() looks good.  I will r+ the previous patch since we are adding this.

It does, however, raise another problem with what global/JSContext to use for that first read.  For one, Boris suggests AutoJSAPI is not appropriate if we might trigger script like Read() does.  We should use AutoEntryScript.

Also, we need to clarify what happens if the ReadableStream, Response, Cache, etc are all created in different globals.  We might need to talk to Domenic or at least check what Chrome does.

So r- for this global issue.  Please talk to Boris and Domenic to figure out the best plan here.

::: dom/cache/TypeUtils.cpp
@@ +221,5 @@
>    nsCOMPtr<nsIInputStream> stream;
>    ir->GetUnfilteredBody(getter_AddRefs(stream));
>    if (stream) {
> +    AutoJSAPI jsapi;
> +    if (NS_WARN_IF(!jsapi.Init(GetGlobalObject()))) {

Is this necessarily the right global?  It seems you could have a Cache object from another compartment from the Response.  We probably want to use the Response's global since that is where its internal functions live?

Also, Boris suggests that an AutoJSAPI is wrong here since we can run script when we read the stream.  He suggests this should be an AutoEntryScript instead.

In general, we also need to clarify with the spec editors how cross-global issues should be handled.  Should it be the entry global, incumbant global, etc?

::: dom/fetch/Fetch.cpp
@@ +970,5 @@
>  FetchBody<Response>::BodyUsed() const;
>  
>  template <class Derived>
> +void
> +FetchBody<Derived>::SetBodyUsed(JSContext* aCx, ErrorResult& aRv)

Why do we pass a JSContext here?  The FetchBody already has an nsIGlobalObject mOwner attribute.  It seems we could get the JSContext from that global automatically?  It would remove the possibility of someone using the wrong context/global for draining the stream.

Also, I think we should probably assert that we are on the correct thread for the global.  Something like:

  MOZ_ASSERT(mOwner->EventTargetFor(TaskCategory::Other)->IsOnCurrentThread());

@@ +972,5 @@
>  template <class Derived>
> +void
> +FetchBody<Derived>::SetBodyUsed(JSContext* aCx, ErrorResult& aRv)
> +{
> +  mBodyUsed = true;

We should either assert this only happens once or short-circuit on second call.  We don't want to lock/startConsuming twice.

@@ +989,5 @@
> +      // If this is not a native ReadableStream, let's activate the
> +      // FetchStreamReader.
> +      MOZ_ASSERT(mFetchStreamReader);
> +      JS::Rooted<JSObject*> reader(aCx);
> +      mFetchStreamReader->StartConsuming(aCx, readableStreamObj, &reader, aRv);

Ok.  This seems reasonable.  It also helps ensure that we properly mark the stream used when we consume it directly in gecko.  The wrapper stream would not have helped there.

::: dom/fetch/Fetch.h
@@ +192,5 @@
>  
>    // Utility public methods accessed by various runnables.
>  
>    void
> +  SetBodyUsed(JSContext* aCx, ErrorResult& aRv);

Can you add a documenting comment here?  I think this method is getting a bit complicated now.  We should probably mention:

1) It uses an internal flag to track if the body is used.  This is tracked separately from the ReadableStream disturbed state due to purely native streams.
2) If there is a ReadableStream reflector for the native stream it is Locked.
3) If there is a JS ReadableStream then we begin pumping it into the native body stream.  This effectively locks and disturbs the stream.

As I mentioned in my other comment, I'm not sure we should require passing a JSContext here.

::: dom/fetch/Request.cpp
@@ +605,5 @@
>      nsCOMPtr<nsIInputStream> body;
>      inputReq->GetBody(getter_AddRefs(body));
>      if (body) {
>        inputReq->SetBody(nullptr);
> +      inputReq->SetBodyUsed(aGlobal.Context(), aRv);

Same question about which global to use.

::: dom/tests/mochitest/fetch/common_readableStreams.js
@@ +132,5 @@
> +  is(new Uint8Array(cacheBody)[0], 0x01, "First byte is correct");
> +  is(new Uint8Array(cacheBody)[1], 0x02, "Second byte is correct");
> +  is(new Uint8Array(cacheBody)[2], 0x03, "Third byte is correct");
> +
> +  next();

Please delete the cache objects before the test ends to avoid polluting the shared test state.
Attachment #8887447 - Flags: review?(bkelly) → review-
Comment on attachment 8886588 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR

Switching this to r+ since part 12 patch fixes the issue with calling StartConsume().  Please just make sure all the other comments are addressed.  Thanks!
Attachment #8886588 - Flags: review- → review+
Andrea, can you please write a test like the Cache.put() one that pipes a relatively large stream?  Like generate 13 byte chunks 100,000 times.  This will ensure we test the multi-chunk path.  It should also force us to hit the pipe writer's AsyncWait() since it will cross the 64kb buffer boundary inside one of the ReadableStream chunks.
Flags: needinfo?(amarchesini)
(Assignee)

Comment 117

4 days ago
Created attachment 8888293 [details] [diff] [review]
part 13 - tests with buffers of 1mb

Still green.
Flags: needinfo?(amarchesini)
Attachment #8888293 - Flags: review?(bkelly)
(Assignee)

Comment 118

3 days ago
Created attachment 8888771 [details] [diff] [review]
part 12 - StartConsuming in SetBodyUsed()

This part doesn't cover the AutoEntryScript part. This is just about the correct state of the JSContext at all entry points into the Streams API.
Attachment #8887447 - Attachment is obsolete: true
Attachment #8888771 - Flags: review?(bzbarsky)
(Assignee)

Comment 119

3 days ago
Created attachment 8888774 [details] [diff] [review]
part 14 - cross-compartment tests
Attachment #8888774 - Flags: feedback?(bzbarsky)
Comment on attachment 8888293 [details] [diff] [review]
part 13 - tests with buffers of 1mb

Review of attachment 8888293 [details] [diff] [review]:
-----------------------------------------------------------------

This exercises the "chunk exceeds pipe buffer limit" case, but it would be nice to have a test that also exercises the "many chunks into a single pipe buffer" case.
Attachment #8888293 - Flags: review?(bkelly) → review+
Comment on attachment 8888293 [details] [diff] [review]
part 13 - tests with buffers of 1mb

Review of attachment 8888293 [details] [diff] [review]:
-----------------------------------------------------------------

This exercises the "chunk exceeds pipe buffer limit" case, but it would be nice to have a test that also exercises the "many chunks into a single pipe buffer" case.

::: dom/tests/mochitest/fetch/common_readableStreams.js
@@ +143,5 @@
>    info("Converting the response to text");
>    let cacheBody = await cacheResponse.arrayBuffer();
>  
>    ok(cacheBody instanceof ArrayBuffer, "Body is an array buffer");
> +  is(cacheBody.byteLength, BIG_BUFFER_SIZE, "Body length is correct");

Oh, it would also be useful to validate that the bytes across chunks/pipe writes are in order.  Thats partly why I suggested using some odd chunk size that you could populate with sequential numbers.  Just filling with 42 doesn't really do that unfortunately.
Comment on attachment 8886588 [details] [diff] [review]
part 9 - JS ReadableStream as CTOR

>+    aRv.StealExceptionFromJSContext(aCx);

Why is that not NoteJSContextException?  Can it become that after part 12?

This happens in LockStream() and StartConsuming().
Comment on attachment 8888771 [details] [diff] [review]
part 12 - StartConsuming in SetBodyUsed()

This needs a sane commit message.  It should presumably mention something about passing the JSContext down from binding entrypoints to where we start reading the stream.

It should also explain _why_ we would want to do such a thing.

r- until the commit message is fixed.

That said, some code comments:

>+++ b/dom/cache/Cache.cpp
>  ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override

There's no guarantee that the state of aCx here matches the spec in any way.  There's even no guarantee that this is specified sanely.

I would like to see an analysis of how and whether that state matches the spec.

>+Cache::MatchAll(JSContext* aCx, const Optional<RequestOrUSVString>& aRequest,
>+    RefPtr<InternalRequest> ir = ToInternalRequest(aCx, aRequest.Value(),
>                                                      IgnoreBody, aRv);

Fix the indent, please.  I know it's a preexisting problem....

>+++ b/dom/fetch/Fetch.h
> Doing this,
>+  // if the body is a ReadableStream, the reading starts.

  "If the body is a ReadableStream, this method will start reading the stream."

>+++ b/dom/flyweb/FlyWebServerEvents.cpp

Hmm, so I guess this could be one reason to not just leave the exception on aCx?  But it's pretty weird to silently drop the exception like this...  It's not even being reported anywhere.

>+++ b/dom/workers/ScriptLoader.cpp

Please do NOT use AutoSafeJSContext: it's clearly documented as deprecated.

Furthermore, AutoSafeJSContext does not set up the right state to allow running script, and as I understand Put() and Match() can end up running script, right?

Again, I'd like to see an analysis of what the spec says should happen here in terms of globals: should they be passed in from somewhere, or do we need an AutoEntryScript here, or something else?  This applies to both uses of AutoSafeJSContext.

>+++ b/dom/workers/ServiceWorkerEvents.cpp

Again, it's a bit weird to just silently swallow exceptions.  Unlike flyweb, this is code that's going to be used on the web; silently swallowing exceptions here is not ok.  We need to either make sure the exception is on aCx before we return from here (and presumably the caller will then do something with that?) or we need to report it, or _something_.

Actually, it's not entirely clear to me that the caller does anything useful with exceptions here.  This is part of a promise reaction, but doesn't have a promise to reject, so it might just drop exceptions on the floor no matter what we do.  Check with till?

Ideally we'd just have a testcase that exercises this case.

>+++ b/dom/workers/ServiceWorkerScriptCache.cpp

This code is also silently swallowing the exception, albeit in a non-obvious way.  Maybe it's OK... it does use the nsresult for something, but that nsresult is likely to be "JS threw an exception" or some such, with all information about the actual exception lost.  Followup bug to make this better is fine, but in the meantime we should make sure we make this work with the "NoteJSContextException" thing if we decide to do that.

This happens both in ManageOldCache and in WriteToCache....

>CompareCache::Initialize(Cache* const aCache, const nsAString& aURL)
>+  AutoJSAPI jsapi;

Don't we plan to call something that can run JS code?  Or do we know it's OK because we're definitely not passing in a stream?  At the very least, this needs some comments!
Attachment #8888771 - Flags: review?(bzbarsky) → review-
Comment on attachment 8888774 [details] [diff] [review]
part 14 - cross-compartment tests

>+async function test_nativeStream_continue(r, that) {

"instanceof that.Response" tells you nothing about the global of the object.  If you really want to detect that, compare Object.getPrototypeOf(whatever) to that.Response.prototype.

>+  that.ok(blob instanceof Blob, "We have a blob");

Blob, or that.Blob?  Again, in Gecko both will pass, but if you're trying to check the proto, you may want Object.getPrototypeOf.  Both places in this function.

>+  let d = await a.body.getReader().read();

Might be worth testing which global the return value of "getReader()" lives in.

Similar throughout these tests.

Ideally, by the way, these would be web platform tests.  So maybe s/compartment/global/ in all this stuff so they're easier to convert?

>+++ b/dom/tests/mochitest/fetch/iframe_readableStreams.html
>+<iframe src="iframe_readableStreams.html" id="iframe"></iframe>

This doesn't make sense.  That load will just get blocked by the load-recursion blocker.  Why are we doing that?

>+  self[data.func].call(iframe, data.args, parent);

Why "iframe" as the first arg?  When called in the parent, "this" is the parent window.  Are we trying to make that be the case here too?  If so, we should pass "parent".  If not, we can just self[data.func](data.args, parent) and have "this" be our window...

>+++ b/dom/tests/mochitest/fetch/mochitest.ini
>+  iframe_readableStreams.html

Add it as a test-specific support file?

So now my main question: do any of these tests _fail_?  Because given all the stuff in the JS stream code and our C++ consumers that clearly should fail when things are called in interesting compartments, I'd really hope we actually have some test failures.  If we don't, then we're not exercising those codepaths....
Attachment #8888774 - Flags: feedback?(bzbarsky) → feedback+
You need to log in before you can comment on or make changes to this bug.