Skip to content

fidalco/streams

 
 

Repository files navigation

Streams API

Where Did All the Text Go?

We are in the process of transitioning this specification from a GitHub README into something a bit more palatable. The official-lookin' version is developed in the index.bs file and then deployed to the gh-pages branch; you can see it at https://whatwg.github.io/streams/.

Right now, we've transferred over most of the concepts and text, but none of the algorithms or APIs. We'll be iterating on the APIs a bit more here, in Markdown format, until we feel confident in them. In the meantime, please check out the rendered spec for all of the interesting stage-setting text.

By the way, this transition is being tracked as #62.

Readable Stream APIs

ReadableStream

class ReadableStream {
    constructor({
        function start = () => {},
        function pull = () => {},
        function cancel = () => {},
        object strategy = new CountQueuingStrategy({ highWaterMark: 1 }),
    })

    // Reading data from the underlying source
    any read()
    Promise<undefined> wait()
    get ReadableStreamState state

    // Composing with other streams
    WritableStream pipeTo(WritableStream dest, { ToBoolean close = true } = {})
    ReadableStream pipeThrough({ WritableStream in, ReadableStream out }, options)

    // Stop accumulating data
    Promise<undefined> cancel(any reason)

    // Useful helper
    get Promise<undefined> closed

    // Internal slots
    [[queue]]
    [[started]] = false
    [[draining]] = false
    [[pulling]] = false
    [[state]] = "waiting"
    [[storedError]]
    [[waitPromise]]
    [[closedPromise]]
    [[startedPromise]]

    // Holders for stuff given by the underlying source
    [[onCancel]]
    [[onPull]]
    [[strategy]]

    // Internal methods for use by the underlying source
    [[enqueue]](any chunk)
    [[close]]()
    [[error]](any e)

    // Other internal helper methods
    [[callOrSchedulePull]]()
    [[callPull]]()
}

enum ReadableStreamState {
    "readable"  // the queue has something in it; read at will
    "waiting"   // the source is not ready or the queue is empty; you should call wait
    "closed"  // all data has been read from both the source and the queue
    "errored"   // the source errored so the stream is now dead
}

Properties of the ReadableStream prototype

constructor({ start, pull, cancel, strategy })

The constructor is passed several functions, all optional:

  • start(enqueue, close, error) is typically used to adapt a push-based data source, as it is called immediately so it can set up any relevant event listeners, or to acquire access to a pull-based data source. If this process is asynchronous, it can return a promise to signal success or failure.
  • pull(enqueue, close, error) is typically used to adapt a pull-based data source, as it is called in reaction to read calls, or to start the flow of data in push-based data sources. Once it is called, it will not be called again until its passed enqueue function is called.
  • cancel(reason) is called when the readable stream is canceled, and should perform whatever source-specific steps are necessary to clean up and stop reading. If this process is asynchronous, it can return a promise to signal success or failure.

Both start and pull are given the ability to manipulate the stream's internal queue and state by being passed the this.[[enqueue]], this.[[close]], and this.[[error]] functions.

  1. Set this.[[onCancel]] to cancel.
  2. Set this.[[onPull]] to pull.
  3. Set this.[[strategy]] to strategy.
  4. Let this.[[waitPromise]] be a new promise.
  5. Let this.[[closedPromise]] be a new promise.
  6. Let this.[[queue]] be a new empty List.
  7. Let startResult be the result of start(this.[[enqueue]], this.[[close]], this.[[error]]).
  8. ReturnIfAbrupt(startResult).
  9. Let this.[[startedPromise]] be the result of resolving startResult as a promise.
  10. Upon fulfillment of this.[[startedPromise]], set this.[[started]] to true.
  11. Upon rejection of this.[[startedPromise]] with reason r, call this.[[error]](r).
get state
  1. Return this.[[state]].
read()
  1. If this.[[state]] is "waiting" or "closed", throw a TypeError exception.
  2. If this.[[state]] is "errored", throw this.[[storedError]].
  3. Assert: this.[[state]] is "readable".
  4. Assert: this.[[queue]] is not empty.
  5. Let chunk be DequeueValue(this.[[queue]]).
  6. If this.[[queue]] is now empty,
    1. If this.[[draining]] is true,
      1. Set this.[[state]] to "closed".
      2. Resolve this.[[closedPromise]] with undefined.
    2. If this.[[draining]] is false,
      1. Set this.[[state]] to "waiting".
      2. Let this.[[waitPromise]] be a new promise.
      3. Call this.[[callOrSchedulePull]]().
  7. Return chunk.
wait()
  1. If this.[[state]] is "waiting",
    1. Call this.[[callOrSchedulePull]]().
  2. Return this.[[waitPromise]].
cancel(reason)
  1. If this.[[state]] is "closed", return a new promise resolved with undefined.
  2. If this.[[state]] is "errored", return a new promise rejected with this.[[storedError]].
  3. If this.[[state]] is "waiting", resolve this.[[waitPromise]] with undefined.
  4. Let this.[[queue]] be a new empty List.
  5. Set this.[[state]] to "closed".
  6. Resolve this.[[closedPromise]] with undefined.
  7. Let cancelPromise be a new promise.
  8. Let sourceCancelPromise be the result of promise-calling this.[[onCancel]](reason).
  9. Upon fulfillment of sourceCancelPromise, resolve cancelPromise with undefined.
  10. Upon rejection of sourceCancelPromise with reason r, reject cancelPromise with r.
  11. Return cancelPromise.
get closed
  1. Return this.[[closedPromise]].
pipeTo(dest, { close })

The pipeTo method is one of the more complex methods, and is undergoing some revision and edge-case bulletproofing before we write it up in prose.

For now, please consider the reference implementation normative: reference-implementation/lib/readable-stream.js, look for the pipeTo method.

pipeThrough({ writable, readable }, options)
  1. If Type(writable) is not Object, then throw a TypeError exception.
  2. If Type(readable) is not Object, then throw a TypeError exception.
  3. Let stream be the this value.
  4. Let result be Invoke(stream, "pipeTo", (writable, options)).
  5. ReturnIfAbrupt(result).
  6. Return readable.

Internal Methods of ReadableStream

[[enqueue]](chunk)
  1. If this.[[state]] is "errored" or "closed", return false.
  2. If this.[[draining]] is true, throw a TypeError exception.
  3. Let chunkSize be Invoke(this.[[strategy]], "size", (chunk)).
  4. If chunkSize is an abrupt completion,
    1. Call this.[[error]](chunkSize.[[value]]).
    2. Return false.
  5. EnqueueValueWithSize(this.[[queue]], chunk, chunkSize.[[value]]).
  6. Set this.[[pulling]] to false.
  7. Let queueSize be GetTotalQueueSize(this.[[queue]]).
  8. Let shouldApplyBackpressure be ToBoolean(Invoke(this.[[strategy]], "shouldApplyBackpressure", (queueSize))).
  9. If shouldApplyBackpressure is an abrupt completion,
    1. Call this.[[error]](shouldApplyBackpressure.[[value]]).
    2. Return false.
  10. If this.[[state]] is "waiting",
    1. Set this.[[state]] to "readable".
    2. Resolve this.[[waitPromise]] with undefined.
  11. If shouldApplyBackpressure.[[value]] is true, return false.
  12. Return true.
[[close]]()
  1. If this.[[state]] is "waiting",
    1. Resolve this.[[waitPromise]] with undefined.
    2. Resolve this.[[closedPromise]] with undefined.
    3. Set this.[[state]] to "closed".
  2. If this.[[state]] is "readable",
    1. Set this.[[draining]] to true.
[[error]](e)
  1. If this.[[state]] is "waiting",
    1. Set this.[[state]] to "errored".
    2. Set this.[[storedError]] to e.
    3. Reject this.[[waitPromise]] with e.
    4. Reject this.[[closedPromise]] with e.
  2. If this.[[state]] is "readable",
    1. Let this.[[queue]] be a new empty List.
    2. Set this.[[state]] to "errored".
    3. Set this.[[storedError]] to e.
    4. Let this.[[waitPromise]] be a new promise rejected with e.
    5. Reject this.[[closedPromise]] with e.
[[callOrSchedulePull]]()
  1. If this.[[pulling]] is true, return.
  2. Set this.[[pulling]] to true.
  3. If this.[[started]] is false,
    1. Upon fulfillment of this.[[startedPromise]], call this.[[callPull]].
  4. If this.[[started]] is true, call this.[[callPull]].
[[callPull]]()
  1. Let pullResult be the result of this.[[onPull]](this.[[enqueue]], this.[[close]], this.[[error]]).
  2. If pullResult is an abrupt completion, call this.[[error]](pullResult.[[value]]).

Writable Stream APIs

WritableStream

class WritableStream {
    constructor({
        function start = () => {},
        function write = () => {},
        function close = () => {},
        function abort = close,
        object strategy = new CountQueuingStrategy({ highWaterMark: 0 })
    })

    // Writing data to the underlying sink
    Promise<undefined> write(any chunk)
    Promise<undefined> wait()
    get WritableStreamState state

    // Close off the underlying sink gracefully; we are done.
    Promise<undefined> close()

    // Close off the underlying sink forcefully; everything written so far is suspect.
    Promise<undefined> abort(any reason)

    // Useful helpers
    get Promise<undefined> closed

    // Internal methods
    [[error]](any e)
    [[callOrScheduleAdvanceQueue]]()
    [[advanceQueue]]()
    [[syncStateWithQueue]]()
    [[doClose]]()

    // Internal slots
    [[queue]]
    [[started]] = false
    [[writing]] = false
    [[state]] = "writable"
    [[storedError]]
    [[writablePromise]]
    [[closedPromise]]
    [[startedPromise]]

    // Holders for stuff given by the underlying sink
    [[onWrite]]
    [[onClose]]
    [[onAbort]]
    [[strategy]]
}

enum WritableStreamState {
    "writable" // the sink is ready and the queue is not yet full; write at will
    "waiting"  // the sink is not ready or the queue is full; you should call wait
    "closing"  // the sink is being closed; no more writing
    "closed"   // the sink has been closed
    "errored"  // the sink errored so the stream is now dead
}

Properties of the WritableStream prototype

constructor({ start, write, close, abort, strategy })

The constructor is passed several functions, all optional:

  • start(error) is called when the writable stream is created, and should open the underlying writable sink. If this process is asynchronous, it can return a promise to signal success or failure.
  • write(chunk) should write chunk to the underlying sink. It can return a promise to signal success or failure of the write operation to the underlying sink. The stream implementation guarantees that this function will be called only after previous writes have succeeded, and never after close or abort is called.
  • close() should close the underlying sink. If this process is asynchronous, it can return a promise to signal success or failure. The stream implementation guarantees that this function will be called only after all queued-up writes have succeeded.
  • abort() is an abrupt close, signaling that all data written so far is suspect. It should clean up underlying resources, much like close, but perhaps with some custom handling. Unlike close, abort will be called even if writes are queued up, throwing away those chunks. If this process is asynchronous, it can return a promise to signal success or failure.

In reaction to calls to the stream's .write() method, the write constructor option is given a chunk from the internal queue, along with the means to signal that the chunk has been successfully or unsuccessfully written.

  1. Set this.[[onWrite]] to write.
  2. Set this.[[onClose]] to close.
  3. Set this.[[onAbort]] to abort.
  4. Set this.[[strategy]] to strategy.
  5. Let this.[[writablePromise]] be a new promise resolved with undefined.
  6. Let this.[[closedPromise]] be a new promise.
  7. Let this.[[queue]] be a new empty List.
  8. Let startResult be the result of start(this.[[error]]).
  9. ReturnIfAbrupt(startResult).
  10. Let this.[[startedPromise]] be the result of resolving startResult as a promise.
  11. Upon fulfillment of this.[[startedPromise]], set this.[[started]] to true.
  12. Upon rejection of this.[[startedPromise]] with reason r, call this.[[error]](r).
get closed
  1. Return this.[[closedPromise]].
get state
  1. Return this.[[state]].
write(chunk)
  1. If this.[[state]] is "waiting" or "writable",
    1. Let chunkSize be Invoke(this.[[strategy]], "size", (chunk)).
    2. ReturnIfAbrupt(chunkSize).
    3. Let promise be a new promise.
    4. EnqueueValueWithSize(this.[[queue]], Record{[[type]]: "chunk", [[promise]]: promise, [[chunk]]: chunk}, chunkSize).
    5. Let syncResult be this.[[syncStateWithQueue]]().
    6. If syncResult is an abrupt completion,
      1. Call this.[[error]](syncResult.[[value]]).
      2. Return promise.
    7. Call this.[[callOrScheduleAdvanceQueue]]().
    8. Return promise.
  2. If this.[[state]] is "closing" or "closed",
    1. Return a promise rejected with a TypeError exception.
  3. If this.[[state]] is "errored",
    1. Return a promise rejected with this.[[storedError]].
close()
  1. If this.[[state]] is "closing" or "closed", return a promise rejected with a TypeError exception.
  2. If this.[[state]] is "errored", return a promise rejected with this.[[storedError]].
  3. If this.[[state]] is "writable",
    1. Set this.[[writablePromise]] to a new promise rejected with a TypeError exception.
  4. If this.[[state]] is "waiting",
    1. Reject this.[[writablePromise]] with a TypeError exception.
  5. Set this.[[state]] to "closing".
  6. EnqueueValueWithSize(this.[[queue]], Record{[[type]]: "close", [[promise]]: undefined, [[chunk]]: undefined}, 0).
  7. Call this.[[callOrScheduleAdvanceQueue]]().
  8. Return this.[[closedPromise]].
abort(reason)
  1. If this.[[state]] is "closed", return a new promise resolved with undefined.
  2. If this.[[state]] is "errored", return a new promise rejected with this.[[storedError]].
  3. Call this.[[error]](reason).
  4. Let abortPromise be a new promise.
  5. Let sinkAbortPromise be the result of promise-calling this.[[onAbort]](reason).
  6. Upon fulfillment of sinkAbortPromise, resolve abortPromise with undefined.
  7. Upon rejection of sinkAbortPromise with reason r, reject abortPromise with r.
  8. Return abortPromise.
wait()
  1. Return this.[[writablePromise]].

Internal Methods of WritableStream

[[error]](e)
  1. If this.[[state]] is "closed" or "errored", return.
  2. Repeat while this.[[queue]] is not empty:
    1. Let writeRecord be DequeueValue(this.[[queue]]).
    2. If writeRecord.[[type]] is "write", reject writeRecord.[[promise]] with e.
  3. Set this.[[storedError]] to e.
  4. If this.[[state]] is "writable" or "closing", set this.[[writablePromise]] to a new promise rejected with e.
  5. If this.[[state]] is "waiting", reject this.[[writablePromise]] with e.
  6. Reject this.[[closedPromise]] with e.
  7. Set this.[[state]] to "errored".
[[callOrScheduleAdvanceQueue]]()
  1. If this.[[started]] is false,
    1. Upon fulfillment of this.[[startedPromise]], call this.[[advanceQueue]].
  2. If this.[[started]] is true, call this.[[advanceQueue]].
[[advanceQueue]]()
  1. If this.[[queue]] is empty, or this.[[writing]] is true, return.
  2. Let writeRecord be PeekQueueValue(this.[[queue]]).
  3. If writeRecord.[[type]] is "close",
    1. Assert: this.[[state]] is "closing".
    2. DequeueValue(this.[[queue]]).
    3. Assert: this.[[queue]] is empty.
    4. Call this.[[doClose]]().
  4. Otherwise,
    1. Assert: writeRecord.[[type]] is "chunk".
    2. Set this.[[writing]] to true.
    3. Let writeResult be the result of promise-calling this.[[onWrite]](writeRecord.[[chunk]]).
    4. Upon fulfillment of writeResult,
      1. If this.[[state]] is "errored", return.
      2. Set this.[[writing]] to false.
      3. Resolve writeRecord.[[promise]] with undefined.
      4. DequeueValue(this.[[queue]]).
      5. Let syncResult be this.[[syncStateWithQueue]]().
      6. If syncResult is an abrupt completion, then
        1. Call this.[[error]](syncResult.[[value]]).
        2. Return.
      7. Call this.[[advanceQueue]]().
    5. Upon rejection of writeResult with reason r, call this.[[error]](r).

Note: the early-exit clause in the fulfillment handler will be hit if the constructor's write option returns a promise that settles after the stream has been aborted.

Note: the peeking-then-dequeuing dance is necessary so that during the call to the user-supplied function, this.[[onWrite]], the queue and corresponding public state property correctly reflect the ongoing write. The write record only leaves the queue after the chunk has been successfully written to the underlying sink, and we can advance the queue.

[[syncStateWithQueue]]()
  1. If this.[[state]] is "closing", return.
  2. Assert: this.[[state]] is either "writable" or "waiting".
  3. If this.[[state]] is "waiting" and this.[[queue]] is empty,
    1. Set this.[[state]] to "writable".
    2. Resolve this.[[writablePromise]] with undefined.
    3. Return.
  4. Let queueSize be GetTotalQueueSize(this.[[queue]]).
  5. Let shouldApplyBackpressure be ToBoolean(Invoke(this.[[strategy]], "shouldApplyBackpressure", (queueSize))).
  6. ReturnIfAbrupt(shouldApplyBackpressure).
  7. If shouldApplyBackpressure is true and this.[[state]] is "writable",
    1. Set this.[[state]] to "waiting".
    2. Set this.[[writablePromise]] to a new promise.
  8. If shouldApplyBackpressure is false and this.[[state]] is "waiting",
    1. Set this.[[state]] to "writable".
    2. Resolve this.[[writablePromise]] with undefined.
[[doClose]]()
  1. Assert: this.[[state]] is "closing".
  2. Let sinkClosePromise be the result of promise-calling this.[[onClose]]().
  3. Upon fulfillment of sinkClosePromise,
    1. If this.[[state]] is "errored", return.
    2. Assert: this.[[state]] is "closing".
    3. Resolve this.[[closedPromise]] with undefined.
    4. Set this.[[state]] to "closed".
  4. Upon rejection of sinkClosePromise with reason r,
    1. Call this.[[error]](r).

Helper APIs

TransformStream

Transform streams have been developed in the testable implementation, but not yet re-encoded in spec language. We are waiting to validate their design before doing so. In the meantime, see reference-implementation/lib/transform-stream.js.

TeeStream

A "tee stream" is a writable stream which, when written to, itself writes to multiple destinations. It aggregates backpressure and abort signals from those destinations, propagating the appropriate aggregate signals backward.

class TeeStream extends WritableStream {
    constructor() {
        this.[[outputs]] = [];

        super({
            write(chunk) {
                return Promise.all(this.[[outputs]].map(o => o.dest.write(chunk)));
            },
            close() {
                const outputsToClose = this.[[outputs]].filter(o => o.close);
                return Promise.all(outputsToClose.map(o => o.dest.close()));
            },
            abort(reason) {
                return Promise.all(this.[[outputs]].map(o => o.dest.abort(reason)));
            }
        });
    }

    addOut(dest, { close = true } = {}) {
        this.[[outputs]].push({ dest, close });
    }
}

ByteLengthQueuingStrategy

A common queuing strategy when dealing with binary data is to wait until the accumulated byteLength properties of the incoming data reaches a specified highWaterMark. As such, this is provided as a built-in helper along with the stream APIs.

class ByteLengthQueuingStrategy {
    constructor({ highWaterMark }) {
        this.highWaterMark = Number(highWaterMark);

        if (Number.isNaN(this.highWaterMark) || this.highWaterMark < 0) {
            throw new RangeError("highWaterMark must be a nonnegative number.");
        }
    }

    size(chunk) {
        return chunk.byteLength;
    }

    shouldApplyBackpressure(queueSize) {
        return queueSize > this.highWaterMark;
    }
}

CountQueuingStrategy

A common queuing strategy when dealing with object streams is to simply count the number of objects that have been accumulated so far, waiting until this number reaches a specified highWaterMark. As such, this strategy is also provided as a built-in helper.

class CountQueuingStrategy {
    constructor({ highWaterMark }) {
        this.highWaterMark = Number(highWaterMark);

        if (Number.isNaN(this.highWaterMark) || this.highWaterMark < 0) {
            throw new RangeError("highWaterMark must be a nonnegative number.");
        }
    }

    size(chunk) {
        return 1;
    }

    shouldApplyBackpressure(queueSize) {
        return queueSize > this.highWaterMark;
    }
}

Queue-with-Sizes Operations

The streams in this specification use a "queue-with-sizes" data structure to store queued up values, along with their determined sizes. A queue-with-sizes is a List of records with [[value]] and [[size]] fields (although in implementations it would of course be backed by a more efficient data structure).

A number of operations are used to make working with queues-with-sizes more pleasant:

EnqueueValueWithSize ( queue, value, size )

  1. Let size be ToNumber(size).
  2. ReturnIfAbrupt(size).
  3. If size is NaN, +∞, or −∞, throw a RangeError exception.
  4. Append Record{[[value]]: value, [[size]]: size} as the last element of queue.

DequeueValue ( queue )

  1. Assert: queue is not empty.
  2. Let pair be the first element of queue.
  3. Remove pair from queue, shifting all other elements downward (so that the second becomes the first, and so on).
  4. Return pair.[[value]].

PeekQueueValue ( queue )

  1. Assert: queue is not empty.
  2. Let pair be the first element of queue.
  3. Return pair.[[value]].

GetTotalQueueSize ( queue )

  1. Let totalSize be 0.
  2. Repeat for each Record{[[value]], [[size]]} pair that is an element of queue,
    1. Assert: pair.[[size]] is a valid, non-NaN number.
    2. Add pair.[[size]] to totalSize.
  3. Return totalSize.

About

Robust, forward-thinking, and extensible streams for the web

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published