Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadableStream should be an async iterable #778

Closed
mariusGundersen opened this issue Aug 25, 2017 · 22 comments · Fixed by #980
Closed

ReadableStream should be an async iterable #778

mariusGundersen opened this issue Aug 25, 2017 · 22 comments · Fixed by #980

Comments

@mariusGundersen
Copy link

(I'm adding this mostly as a tracking bug after I asked about it in the wrong place)

NodeJS is looking into supporting async iterables as a way to stream data, and it would be great if fetch (or the readable stream part of fetch) supported the same interface. This would make easier to move code between nodejs and the browser.

@jakearchibald
Copy link

@mariusGundersen
Copy link
Author

Excellent! I really look forward to this!

@domenic
Copy link
Member

domenic commented Mar 9, 2018

There are two approaches to this:

  • Have [Symbol.asyncIterable] directly return the default reader object, and add aliases to reader: next() aliases read(); return() aliases releaseLock(). And I guess make releaseLock() return an empty object?
  • Have [Symbol.asyncIterable] return a small wrapper object, of the sort in Jake's blog post.

After realizing we'd have to make releaseLock() return an empty object, I'm leaning toward the latter. But, does anyone think the former would be better?

@domenic
Copy link
Member

domenic commented Mar 9, 2018

Further questions that have come up as I contemplate this all a bit more seriously:

  • Should the async iterator's return() (which is called, remember, when you break out of a for-await-of loop via return/break/throw) cancel the reader, or just release the lock? I.e., if you break out of a loop, should we assume you'll never want to continue looping from the current point and we should clean up for you? On the one hand, that seems presumptuous. On the other hand, it's kind of annoying to make people manually acquire a new reader and call cancel() if they break out, especially if that break out is due to an unexpected throw.
  • If you iterator to the end, should we release the lock for you? This is somewhat of an edge case, and just governs what happens if someone tries to acquire a new iterator or lock after iterating to the end.

I'm leaning toward auto-canceling and auto-releasing, because otherwise, cleanup is quite annoying:

try {
  for await (const chunk of rs) {
    await somethingThatMightReject(chunk);
  }
} finally {
  try {
    // Might throw if the reader is still locked because we finished
    // successfully without breaking or throwing.
    await rs.cancel();
  } catch {}
}

It's starting to feel a bit magic though....

@mariusGundersen
Copy link
Author

Here is the implementation that I currently use to convert a ReadableStream to an async iterator, and it works well enough for what I need from it. I don't think I ever cancel iterating the stream though, maybe only when throwing an exception in a loop.

I also agree that it should be auto cancelled and auto closed, as (async) iterators most often are single consumer. I think the api should be optimized for single consumer and require a small wrapper for cases where it should not automatically close. That seems like the most common scenario, and it doesn't make it impossible to get the multi consumer scenario to work.

@ricea
Copy link
Collaborator

ricea commented Mar 12, 2018

I prefer wrapper object and auto-cancel.

Not auto-cancelling leads to simpler code when your input has a distinct header and body. You can break out of the first loop when you get to the end of the header, and then have a second loop which processes the body. This is a niche use case, and there's other ways of doing it. The rest of the time, auto-cancel is cleaner and less error-prone.

I don't think I favour auto-release. Assuming we have auto-cancel and auto-close, then there are extremely limited extra capabilities you'd gain from auto-release:

  • Get a reader. Mixing iteration with using readers is not something I'd want to encourage.
  • Use a reader to observe that the stream is closed, which it always will be. Pointless.
  • pipeTo or pipeThrough. Basically a really obfuscated way of closing a WritableStream. Unwise.
  • Use it as an async iterator that immediately ends. Pointless.

Maybe auto-release has some aesthetic benefits I haven't considered.

@jakearchibald
Copy link

Wrapper object seems like a clear winner.

I didn't like the sound of auto-cancel, but given @domenic's code example it sounds like the better thing to do.

We could have:

for await (const chunk of stream.iterator({ preventClose: true })) {
  // …
}

Then stream[Symbol.asyncIterator] could alias stream.iterator.

@mcollina
Copy link

@jakearchibald @domenic I would like Node.js Readable to be as close as possible to whatwg Readable when used with for await, i.e. stream.iterator(opts). I think it would help defining a best practice on how iterators should be used. Also, it would improve code portability.
Is this API spec'd yet? Could I be involved in the process?

One thing that is not obvious, is if we should close the stream when there is a break.

@jakearchibald
Copy link

@mcollina see the above comments for API discussion so far. Is there somewhere we can read up on the justifications and tradeoffs for the pattern node is shipping?

@domenic
Copy link
Member

domenic commented Apr 12, 2018

break invokes return(), which per the above design will cancel the stream. I'm not sure it's "obvious" that it should cancel the stream, but it's probably better for the common case, and @jakearchibald had a good idea for how we could allow an escape hatch for the uncommon case.

@mcollina
Copy link

@jakearchibald I added AsyncIterators based on what I thought it might make sense for Node.js Streams, we also did a couple of implementations and this turned out to be more performant*. It's an experimental feature that is shipping for the first time in 2 weeks (it will print a warning and notify the users). We can definitely change any part of what I did in implementation before it gets out of experimental (ideally before Node 10 goes LTS in October, but even later if things are in flux).

break invokes return(), which per the above design will cancel the stream.

I guess my implementation of return  should be correct then, thanks!

  • It will need to be benchmarked again in the future, V8 is shipping a lot of optimizations for promises, generators and AsyncIterators.
@domenic
Copy link
Member

domenic commented Aug 16, 2018

@devsnek has generously volunteered to help with the spec and tests here, and I'm very excited. Summing up some IRC discussion:

  • I think we should have auto-release. There's no reason for any consumer to forever hold a lock and not have any way of giving it back, even if the stream is definitely closed. At least in tests we often check whether something's errored or closed by doing .getReader().closed.then(...); we shouldn't break that, I think.
  • I agree with auto-cancel as the default return() behavior. I like @jakearchibald's idea of .iterator({ preventCancel: true }) as a way of customizing. (Jake said "preventClose", but "preventCancel" is more accurate, unless I'm missing something.)
@benjamingr
Copy link
Member

I think we should have auto-release.

In a large code base using our own stream->to->async iterator transition we did this and are pretty happy with that as a default.

At least in tests we often check whether something's errored or closed by doing .getReader().closed.then(...); we shouldn't break that, I think.

Definitely agree from our perspective.

I agree with auto-cancel as the default return() behavior.

We also do this and find it quite useful. It is also easy to opt out of even without providing .iterator({ preventCancel: true }) by either:

  • Wrapping the stream with another stream for the purpose of that iteration and not forwarding cancellation.
  • Wrapping the async iterator with another async iterator and not forwarding the .return there.

That said, I am not objecting to a .iterator({ preventCancel: true }) just saying that we haven't had to use it in our own code.


I've also been using async iterators with Node streams which have been working well - though I admit a lot less than I'd like to - I have been trying to (as in emailing relevant parties) solicit feedback but haven't been able to get it. We hope to solicit feedback through a Node.js foundation survey which we hope to send in about a month.

@domenic
Copy link
Member

domenic commented Jan 27, 2019

Given the precedent that is somewhat being established in WICG/kv-storage#6, I am wondering whether we should rename .iterator() to .values(), hmm.

@ricea
Copy link
Collaborator

ricea commented Jan 29, 2019

I prefer .iterator() for ReadableStream. I think people are likely to see .values() and expect it to be a shortcut for slurping the whole stream. .iterator() is more specific about the functionality it provides.

@MattiasBuelens
Copy link
Collaborator

I am wondering whether we should rename .iterator() to .values(), hmm.

I prefer .iterator() for ReadableStream.

Uhm, at the moment it's called .getIterator() instead of .iterator(). If you want to rename it, let me know in #980. ��

@ricea ricea closed this as completed in #980 Feb 6, 2019
@ricea
Copy link
Collaborator

ricea commented Feb 7, 2019

@liudonghua123
Copy link

I have some code like this. But vscode show some errors (Type 'Promise<ReadableStreamReadResult<Uint8Array>>' must have a '[Symbol.asyncIterator]()' method that returns an async iterator.) around reader.read().

    const response = await fetch(`${download_url_prefix}${node_executable_filename}`)
    const contentLength = +(response.headers.get('Content-Length') ?? 0);
    const reader = response.body?.getReader();
    if(!reader) {
      throw new Error(`Failed to get reader from response body`);
    }
    let receivedLength = 0;
    let chunks = []; 
    for await (let {done, value} of reader.read()) {
      chunks.push(value);
      receivedLength += value.length;
      console.log(`Received ${receivedLength} of ${contentLength}`)
    }

I have to rewrite the above for await to while.

    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        break;
      }
      chunks.push(value);
      received_length += value.length;
      console.log(`Received ${receivedLength} of ${contentLength}`)
    }
@MattiasBuelens
Copy link
Collaborator

MattiasBuelens commented Jan 16, 2024

@liudonghua123 You shouldn't use getReader() or reader.read() in this case, instead you should for await over the ReadableStream itself:

const readable = response.body;
if (!readable) {
  throw new Error('Failed to get response body');
}
let receivedLength = 0;
let chunks = []; 
for await (const value of readable) {
  chunks.push(value);
  receivedLength += value.length;
  console.log(`Received ${receivedLength} of ${contentLength}`)
}

MDN has more examples on async-iterating a stream. 😉

(Note that this GitHub project is not really a support forum. For future questions, I would recommend searching on e.g. StackOverflow, where there are already answers for similar questions.)

@liudonghua123
Copy link

@MattiasBuelens Thanks, I see now. 😄

@himself65

This comment was marked as off-topic.

@MattiasBuelens

This comment was marked as off-topic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment