Closed Bug 210125 Opened 21 years ago Closed 21 years ago

need to be able to AsyncWait for closure only


(Core :: Networking, defect, P2)






(Reporter: darin.moz, Assigned: darin.moz)



(Keywords: arch)


(3 files, 2 obsolete files)

we need the ability to asynchronously wait for an async input/output stream to
be closed.  currently, we can only wait for the streams to be "ready" which
means either readable/writable or closed.  however, there are a number of times
when we don't want to read from a stream, but we still want to know if the
stream enters an error condition.

 * NS_AsyncCopy needs this so that it can monitor both streams for errors while
   it is waiting on only one of them for data.  this has been the source of 
   some memory leaks in the past.

 * nsInputStreamPump needs this so it can respond to errors on the underlying 
   stream while it is in the suspended state.  note: today, if an input stream
   pump is canceled, it will fire OnStopRequest even if it was in the suspended
   state, so we need to do the same if the underlying stream hits an error 
   condition.  this has been the source of some bugs having to do with delayed
   error notifications (i think there are bugs filed against IMAP and FTP on 

there are a number of bugs related to this.  i think solving this bug might 
entail adding a flags parameter to AsyncWait.  callers would have the option
of being notified only when the stream enters the closed state.
Severity: normal → major
Priority: -- → P2
Target Milestone: --- → mozilla1.5alpha
Blocks: 200219
Blocks: 178210
Target Milestone: mozilla1.5alpha → mozilla1.5beta
Target Milestone: mozilla1.5beta → mozilla1.6alpha
Attached patch v1 patch (obsolete) — Splinter Review
Comment on attachment 130497 [details] [diff] [review]
v1 patch

bz: i'd really like to get your feedback on this patch since it incorporates
many of your older review comments.  man, has it really taken me this long to
write up this patch?!? ;-)
Attachment #130497 - Flags: review?(bz-vacation)
I'll try to get to this over the weekend...
Comment on attachment 130497 [details] [diff] [review]
v1 patch

nevermind bz.. this patch isn't right.	it has a nasty race condition.	it's
going to take me some time to figure out a good solution.
Attachment #130497 - Flags: review?(bz-vacation)
Attached patch v2 patch (obsolete) — Splinter Review
ok, this patch is much bigger.	summary of changes:

1- introduces nsIEventTarget.  this is like nsIEventQueue, only it only has a
PostEvent method (and a IsOnCurrentThread method).  the idea is that code that
wants to post an event should only need to talk to nsIEventTarget.  this
interface is now a superclass for nsIEventQueue.  however, nsIEventTarget is
also implemented by the nsSocketTransportService (to allow you to post PLEvents
to the socket transport thread), and it is also implemented by the necko i/o
thread pool (nsIOThreadPool).  this means that all threads in necko (minus the
DNS threads) support nsIEventTarget.  as a result, a lot of complex recursion
situations in necko are completely avoided and thread synchronization problems
are likewise simplified greatly.

2- NS_AsyncCopy now has a revised interface.  it accepts an nsIEventTarget
parameter which specificies the thread that should be used to perform the
stream copy.  it also now operates on nsIInputStream and nsIOutputStream
instead of only the nsIAsync* stream variants.	this means that NS_AsyncCopy
can be used for a great many other things.  as a result, the implementation of
nsAsyncStreamCopier has been greatly simplified (code reduction).  in the
future, NS_AsyncCopy can also be used to eliminate some complex stream logic in

3- nsIThreadPool is removed in this patch since necko no longer uses it.  necko
was the only consumer.

4- nsIInputStreamNotify got renamed to nsIInputStreamCallback b/c we all agree
the former name sucked.   "callback" seems like a better name that "observer"
or "listener" since the OnInputStreamReady event is only going to be called
once.  the notification is "one-shot" so "observer" or "listener" seem wrong --
they suggest one or many notifications.

5- closeEx got renamed to closeWithStatus (was in the previous patch).

6- nsStreamTransportService implementation was greatly simplified.  most of the
code was removed in favor of some very small code to setup a NS_AsyncCopy
process between the given blocking stream and a non-blocking pipe that operates
using the necko i/o threads (a nsIEventTarget implementation).

eventually, i think more things like xpcom proxies and timers should use
nsIEventTarget.  that will be done in a follow up patch.  support for timers
being able to post timer events to necko threads (especially the socket
transport thread) is going to be very crucial to the implementation of a number
of necko features (like pipelining fallback and support for expect-100-continue
POST requests).

overall, there is a good amount of code being removed by this patch.  things
are more flexible and much simpler to understand IMO (especially NS_AsyncCopy).
Attachment #130497 - Attachment is obsolete: true
Attached patch v2.1 patchSplinter Review
slightly revised patch.  forgot to include some new files.  this patch

  o  newly added files netwerk/base/src/nsTransportUtils.{h,cpp}, which provide

     a proxy implementation of nsITransportEventSink that is smart about 
     coalescing events.  this replaces the implementation that was in 
     nsHttpTransaction and is now used with both nsITransport implementations.
     care was taken to ensure that unique status events would not be coalesced
     by default since FTP depends on getting the STATE_CONNECTED event.

  o  needed to use NS_ProxyRelease in nsTransportUtils to ensure that the
     real nsITransportEventSink gets destroyed on the proper thread.  this
     prompted me to move the implementation of NS_ProxyRelease into a .cpp
     file to save code size.

otherwise, this is the same as the v2 patch.
Attachment #131151 - Attachment is obsolete: true
Attachment #131195 - Flags: superreview?(bz-vacation)
Attachment #131195 - Flags: review?(dougt)
Comment on attachment 131195 [details] [diff] [review]
v2.1 patch

>Index: layout/html/base/src/nsFrameManager.cpp

>       // Add the event to our linked list of posted events
>       ev->mNext = mPostedEvents;
>       mPostedEvents = ev;
>       // Post the event
>-      eventQueue->PostEvent(ev);
>+      rv = eventQueue->PostEvent(ev);
>+      if (NS_FAILED(rv)) {

Shouldn't we only add the event to the list if PostEvent succeeds?

>Index: layout/html/base/src/nsPresShell.cpp

>-    eventQueue->PostEvent(ev);
>+    if (NS_FAILED(eventQueue->PostEvent(ev))) {
>+      NS_ERROR("failed to post reflow event");
>+      PL_DestroyEvent(ev);
>+    }
>     mReflowEventQueue = eventQueue;

We should only set mReflowEventQueue if posting succeeded.

>Index: modules/plugin/samples/SanePlugin/nsSanePlugin.cpp

>-    if (eventQ->PostEvent(event) != PR_SUCCESS) {
>+    if (NS_FAILED(eventQ->PostEvent(event))) {
>         NS_ERROR("Error trying to post event!\n");
>         return;


>Index: netwerk/base/public/nsITransport.idl

>-     *        any thread.  (NOTE: the event queue must be resolved.)
>+     *        any thread.

Does the event queue no longer have to be resolved, if that's what our
nsIEventTarget is?

>Index: netwerk/base/src/nsAsyncStreamCopier.cpp

> nsAsyncStreamCopier::Init(nsIInputStream *source,
>+    // protect against someone initializing us twice (whatever!)
>+    if (!mLock) {

Wouldn't it be best to do:

if (mLock) 

and then proceed?  It seems to me that being inited a second time would just be
a bad idea for correctness of this code... ;)

>+    // we want to receive progress notifications...

Add a comment that says that the release happens in OnAsyncCopyComplete?  And
maybe add a comment there that says that the addref happens here.

>Index: netwerk/base/src/nsIOService.cpp

>-    mStreamTransportService = do_GetService(kStreamTransportServiceCID, &rv);

Why is this no longer needed?

>Index: netwerk/base/src/nsSocketTransport2.cpp
>+nsSocketInputStream::AsyncWait(nsIInputStreamCallback *callback,
>+                               PRUint32 flags,
>+                               PRUint32 amount,
>+                               nsIEventTarget *eventQ)

Shouldn't the arg be named eventTarget or something like that?

>+nsSocketOutputStream::AsyncWait(nsIOutputStreamCallback *callback,
>+                                PRUint32 flags,
>+                                PRUint32 amount,
>+                                nsIEventTarget *eventQ)


>@@ -1378,34 +1448,33 @@ nsSocketTransport::OpenInputStream(PRUin
>         net_ResolveSegmentParams(segsize, segcount);
>         nsIMemory *segalloc = net_GetSegmentAlloc(segsize);
>-        rv = NS_AsyncCopy(&mInput, pipeOut, PR_FALSE, PR_TRUE,
>-                          segsize, 1, segalloc);
>+        rv = NS_AsyncCopy(&mInput, pipeOut, gSocketTransportService,
>+                          NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize);

So we no longer use segalloc?

>@@ -1426,48 +1495,47 @@ nsSocketTransport::OpenOutputStream(PRUi


>Index: netwerk/base/src/nsSocketTransportService2.cpp

>+#define PLEVENT_FROM_LINK(_link) \
>+    ((PLEvent*) ((char*) (_link) - offsetof(PLEvent, link)))

Could we make those casts NS_REINTERPRET_CAST just to make it clear what's
going on?

More coming later.
>>Index: netwerk/base/src/nsIOService.cpp
>>-    mStreamTransportService = do_GetService(kStreamTransportServiceCID, &rv);
>Why is this no longer needed?

this was in the IO service because the IO service needed to tell the stream
transport service to shutdown.  now that stream transport service is really just
a thin wrapper around the nsIOThreadPool, i decided to have the nsIOThreadPool
just listen for "xpcom-shutdown" and do its own cleanup at that time.  thus,
there is no need for the IO service to communicate anything to the
nsIOThreadPool or nsStreamTransportService at shutdown time.

>So we no longer use segalloc?

segalloc was only needed to handle the case where neither of the streams given
to NS_AsyncCopy are buffered.  in that case we would allocate a pipe, and then
create two "async-copy" processes.  i prefer to return an error if neither
stream is buffered.  that forces users to think about what they are doing when
they use NS_AsyncCopy.  chances are good that they do have a stream which is
buffered... if they don't then they can allocate the pipe and call NS_AsyncCopy
twice.  but so far i've never found a need for this in the code.
.... and thanks for the review comments!  keep 'em coming ;-)
> segalloc was only needed

Well, if we don't need it, we should not be getting it.  ;)
Comment on attachment 131195 [details] [diff] [review]
v2.1 patch

>Index: netwerk/base/src/nsStreamTransportService.cpp
>+class nsInputStreamTransport : public nsITransport
>+                             , public nsIInputStream

It would be good to document why this implement nsIInputSteam (that we
asynccopy from it).

> nsOutputStreamTransport::OpenOutputStream(PRUint32 flags,
>+    rv = NS_AsyncCopy(pipeIn, this, target,
>+                      NS_ASYNCCOPY_VIA_READSEGMENTS, segsize);
>+    if (NS_SUCCEEDED(rv))
>+        NS_ADDREF(*result = mPipeOut);
>-    NS_ADDREF(*result = mPipeOut);
>     return NS_OK;

"return rv;", no?

>+nsOutputStreamTransport::Write(const char *buf, PRUint32 count, PRUint32 *result)

>+    // limit amount read
>+    PRUint32 max = mLimit - mOffset;

"written", not "read"

>Index: netwerk/base/src/nsTransportUtils.cpp

>+    PR_STATIC_CALLBACK(void*) HandleEvent(PLEvent *event)
>+        // since this event is being handled, we need to clear the proxy's ref
>+        {
>+            nsAutoLock lock(proxy->mLock);
>+            proxy->mLastEvent = nsnull;
>+        }

Don't we only want to do this if this == proxy->mLastEvent ? Otherwise we could
prevent some coalescing that could happen at various points...

>Index: netwerk/protocol/ftp/src/nsFtpConnectionThread.cpp
>+        // perform the data copy on the socket transport thread.

Say why (that is, because "output" is a socket stream anyway, so this way all
the work is done on a single thread).

>Index: netwerk/protocol/http/src/nsHttpChannel.cpp
>+    nsresult rv = eventQ->PostEvent(event);
>+    if (NS_FAILED(rv)) {
>         delete event;

PL_DestroyEvent(), no?

>Index: netwerk/protocol/http/src/nsHttpConnectionMgr.h

>+        return PR_AtomicIncrement((PRInt32 *) &mRef);

mRef is a PRInt32.  Why the casting?

>+        nsrefcnt n = PR_AtomicDecrement((PRInt32 *) &mRef);


>Index: netwerk/protocol/http/src/nsHttpTransaction.cpp

>@@ -415,22 +398,30 @@ nsHttpTransaction::WriteSegments(nsAHttp
>+        nsCOMPtr<nsIEventTarget> target;
>+        gHttpHandler->GetSocketThreadEventTarget(getter_AddRefs(target));

Comment why we want to use that thread as the target?

>@@ -930,22 +887,23 @@ nsHttpTransaction::DeleteSelfOnConsumerT

>-        NS_ASSERTION(status == PR_SUCCESS, "PostEvent failed");
>+        if (NS_FAILED(status))
>+            NS_ERROR("PostEvent failed");

NS_ASSERTION(NS_SUCCEEDED(status), "PostEvent failed");

otherwise you execute the NS_FAILED thing in opt builds if the compiler is
stupid enough....

> nsHttpTransaction::OnOutputStreamReady(nsIAsyncOutputStream *out)

Comment that this will be called on the socket thread.

>Index: xpcom/io/nsIAsyncInputStream.idl

>+     * @param aEventTarget
>+     *        Specify NULL to receive notification on ANY thread (possibly even
>+     *        recursively on the calling thread)

Recursively does not imply synchronously here, right?  We should make that

>+     * If passed to asyncWait, this flags overrides the default behavior,

"this flag overrides"

>Index: xpcom/io/nsIAsyncOutputStream.idl

Same things, since this is all the same as the input stream.

>Index: xpcom/io/nsStreamUtils.cpp

>+    virtual PRUint32 DoCopy(nsresult &sourceCondition, nsresult &sinkCondition) = 0;

If those are straight out params, make them nsresult*?	If they are inout, then
this is better....

>+    nsresult PostContinuationEvent()

This is called in a bunch of places.  Do we really want to inline it?  (It's
small, so I can see how we may want to...)

>+    nsresult PostContinuationEvent_Locked()

This one is much bigger, though, and has a _lot_ of callers (once you inline
PostContinuationEvent, especially).

>Index: xpcom/io/nsStreamUtils.h

>+ * A null event target is not permitted.

The impl of NS_AsyncCopy should have an NS_PRECONDITION to that effect...

>Index: xpcom/proxy/src/nsProxyRelease.cpp
>+        // we do not release doomed here since it may cause a delete on the the

Double "the" at the end there.

This patch does not seem to include nsIOThreadPool.cpp (or .h, for that
Comment on attachment 132243 [details] [diff] [review]
missing nsIOThreadPool.{h,cpp} from previous patch

>Index: nsIOThreadPool.cpp

>+    PRLock    *mLock;

Comment what members need to be protected by mLock?

>+nsIOThreadPool::PostEvent(PLEvent *event)
>+    PR_Lock(mLock);
>+    if (mShutdown)
>+        return NS_ERROR_UNEXPECTED;

This leaves mLock locked if PostEvent is called when mShutdown is true, no? 
Sounds bad...  Any reason not to use an nsAutoLock here?

sr=bzbarsky with those changes.
Attachment #132243 - Flags: superreview+
Blocks: 220566

good catch on the nsAutoLock thing!

>>-     *        any thread.  (NOTE: the event queue must be resolved.)
>>+     *        any thread.
>Does the event queue no longer have to be resolved, if that's what our
>nsIEventTarget is?

there are no magic values for nsIEventTarget.  that only applies to
nsIEventQueue, so there is no need to mention anything about "resolving the
event target" since it just never makes sense to speak of an unresolved event
target.  i guess i could document this somewhere, like in nsIEventTarget.idl,
but i felt that excluding documentation was sufficient to indicate that there is
no support for unresolved event targets.  but, maybe for help with those trying
to understand this new stuff, it would be helpful.

>Don't we only want to do this if this == proxy->mLastEvent ? Otherwise we could
>prevent some coalescing that could happen at various points...

proxy->mLastEvent always == self.  i added an assertion.

>>+     * @param aEventTarget
>>+     *        Specify NULL to receive notification on ANY thread (possibly >even
>>+     *        recursively on the calling thread)
>Recursively does not imply synchronously here, right?  We should make that

yes, it does mean synchronously.  without giving an event target, the callback
could occur immediately before AsyncWait returns... same thread, same stack,
etc.  this is why specifying an event target is really a good idea.  not doing
so makes things complicated.  i think "recursively on the calling thread"
explains this pretty clearly, but i can add "i.e., synchronously" or something
like that to clarify.

thx bz!
> proxy->mLastEvent always == self.

Why?  Say we get an OnTransportStatus notification.  We post event1, set
mLastEvent = event1.  Then we get a second OnTransportStatus notification, can't
coalesce, post event2, set mLastEvent = event2.  Now event1 fires, and self !=
proxy->mLastEvent in its HandleEvent method...

> i can add "i.e., synchronously"

That would be much appreciated.
bz: you're right... i should have known that!  guess i haven't had my head
around this code for a while.  i'll fix that up as you suggested... thx!
ok, patch is in on the trunk.
Closed: 21 years ago
Resolution: --- → FIXED
Comment on attachment 131195 [details] [diff] [review]
v2.1 patch

I guess this was "sr=me with these changes"... ;)
Attachment #131195 - Flags: superreview?(bzbarsky) → superreview+
nebiros is busted. This patch fixed my local solaris build. (It also backs out
Darin's earlier attempt to fix it.)

Looks like the compiler's initial complaint was justified. But I cant see why
Darin's attempt to fix it by using friend did not work.
> using friend

Doesn't that only give access to "protected" stuff?  The typedef in question is
Comment on attachment 131195 [details] [diff] [review]
v2.1 patch

... and dougt had given me r= via email or aim.
Attachment #131195 - Flags: review?(dougt)
solaris fix checked in.  thx!
You need to log in before you can comment on or make changes to this bug.