Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add "bytes" readableType to TransformStream transformer #601

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -1885,13 +1885,7 @@ ReadableByteStreamController(<var>stream</var>, <var>underlyingByteSource</var>,

<emu-alg>
1. If IsReadableByteStreamController(*this*) is *false*, throw a *TypeError* exception.
1. If *this*.[[byobRequest]] is *undefined* and *this*.[[pendingPullIntos]] is not empty,
1. Let _firstDescriptor_ be the first element of *this*.[[pendingPullIntos]].
1. Let _view_ be ! Construct(<a idl>%Uint8Array%</a>, « _firstDescriptor_.[[buffer]],
_firstDescriptor_.[[byteOffset]] + _firstDescriptor_.[[bytesFilled]], _firstDescriptor_.[[byteLength]] −
_firstDescriptor_.[[bytesFilled]] »).
1. Set *this*.[[byobRequest]] to ! Construct(`<a idl>ReadableStreamBYOBRequest</a>`, « *this*, _view_ »).
1. Return *this*.[[byobRequest]].
1. Return ! ReadableByteStreamControllerGetBYOBRequest(*this*).
</emu-alg>

<h5 id="rbs-controller-desired-size" attribute for="ReadableByteStreamController" lt="desiredSize">get desiredSize</h5>
Expand Down Expand Up @@ -2187,6 +2181,8 @@ nothrow>ReadableByteStreamControllerEnqueue ( <var>controller</var>, <var>chunk<
1. Let _stream_ be _controller_.[[controlledReadableStream]].
1. Assert: _controller_.[[closeRequested]] is *false*.
1. Assert: _stream_.[[state]] is `"readable"`.
1. Assert: Type(_chunk_) is Object.
1. Assert: _chunk_ has a [[ViewedArrayBuffer]] internal slot.
1. Let _buffer_ be _chunk_.[[ViewedArrayBuffer]].
1. Let _byteOffset_ be _chunk_.[[ByteOffset]].
1. Let _byteLength_ be _chunk_.[[ByteLength]].
Expand Down Expand Up @@ -2285,6 +2281,20 @@ nothrow>ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( <var>contr
1. Return _ready_.
</emu-alg>

<h4 id="readable-byte-stream-controller-get-byob-request" aoid="ReadableByteStreamControllerGetBYOBRequest"
nothrow>ReadableByteStreamControllerGetBYOBRequest ( <var>controller</var> )</h4>

<emu-alg>
1. If _controller_.[[byobRequest]] is *undefined* and _controller_.[[pendingPullIntos]] is not empty,
1. Let _firstDescriptor_ be the first element of _controller_.[[pendingPullIntos]].
1. Let _view_ be ! Construct(<a idl>%Uint8Array%</a>, « _firstDescriptor_.[[buffer]],
_firstDescriptor_.[[byteOffset]] + _firstDescriptor_.[[bytesFilled]], _firstDescriptor_.[[byteLength]] −
_firstDescriptor_.[[bytesFilled]] »).
1. Set _controller_.[[byobRequest]] to ! Construct(`<a idl>ReadableStreamBYOBRequest</a>`,
« _controller_, _view_ »).
1. Return _controller_.[[byobRequest]].
</emu-alg>

<h4 id="readable-byte-stream-controller-get-desired-size" aoid="ReadableByteStreamControllerGetDesiredSize"
nothrow>ReadableByteStreamControllerGetDesiredSize ( <var>controller</var> )</h4>

Expand Down
30 changes: 20 additions & 10 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ class ReadableStream {
module.exports = {
ReadableStream,
IsReadableStreamDisturbed,
ReadableByteStreamControllerClose,
ReadableByteStreamControllerEnqueue,
ReadableByteStreamControllerError,
ReadableByteStreamControllerGetBYOBRequest,
ReadableByteStreamControllerGetDesiredSize,
ReadableStreamDefaultControllerClose,
ReadableStreamDefaultControllerEnqueue,
ReadableStreamDefaultControllerError,
Expand Down Expand Up @@ -1212,16 +1217,7 @@ class ReadableByteStreamController {
throw byteStreamControllerBrandCheckException('byobRequest');
}

if (this._byobRequest === undefined && this._pendingPullIntos.length > 0) {
const firstDescriptor = this._pendingPullIntos[0];
const view = new Uint8Array(firstDescriptor.buffer,
firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
firstDescriptor.byteLength - firstDescriptor.bytesFilled);

this._byobRequest = new ReadableStreamBYOBRequest(this, view);
}

return this._byobRequest;
return ReadableByteStreamControllerGetBYOBRequest(this);
}

get desiredSize() {
Expand Down Expand Up @@ -1735,6 +1731,7 @@ function ReadableByteStreamControllerEnqueue(controller, chunk) {

assert(controller._closeRequested === false);
assert(stream._state === 'readable');
assert(ArrayBuffer.isView(chunk) === true);

const buffer = chunk.buffer;
const byteOffset = chunk.byteOffset;
Expand Down Expand Up @@ -1772,6 +1769,19 @@ function ReadableByteStreamControllerError(controller, e) {
ReadableStreamError(stream, e);
}

function ReadableByteStreamControllerGetBYOBRequest(controller) {
if (controller._byobRequest === undefined && controller._pendingPullIntos.length > 0) {
const firstDescriptor = controller._pendingPullIntos[0];
const view = new Uint8Array(firstDescriptor.buffer,
firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
firstDescriptor.byteLength - firstDescriptor.bytesFilled);

controller._byobRequest = new ReadableStreamBYOBRequest(controller, view);
}

return controller._byobRequest;
}

function ReadableByteStreamControllerGetDesiredSize(controller) {
return controller._strategyHWM - controller._totalQueuedBytes;
}
Expand Down
107 changes: 83 additions & 24 deletions reference-implementation/lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ const assert = require('assert');
const { InvokeOrNoop, PromiseInvokeOrPerformFallback, PromiseInvokeOrNoop, typeIsObject } = require('./helpers.js');
const { ReadableStream, ReadableStreamDefaultControllerClose,
ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError,
ReadableStreamDefaultControllerGetDesiredSize } = require('./readable-stream.js');
ReadableStreamDefaultControllerGetDesiredSize,
ReadableByteStreamControllerClose, ReadableByteStreamControllerEnqueue,
ReadableByteStreamControllerError, ReadableByteStreamControllerGetBYOBRequest,
ReadableByteStreamControllerGetDesiredSize } = require('./readable-stream.js');
const { WritableStream, WritableStreamDefaultControllerError } = require('./writable-stream.js');

// Methods on the transform stream controller object
Expand Down Expand Up @@ -37,19 +40,28 @@ function TransformStreamEnqueueToReadable(transformStream, chunk) {
// accept TransformStreamEnqueueToReadable() calls.

const controller = transformStream._readableController;
const type = transformStream._readableType;
assert(type === undefined || type === 'bytes');

try {
ReadableStreamDefaultControllerEnqueue(controller, chunk);
} catch (e) {
// This happens when readableStrategy.size() throws.
// The ReadableStream has already errored itself.
transformStream._readableClosed = true;
TransformStreamErrorIfNeeded(transformStream, e);

throw transformStream._storedError;
if (type === 'bytes') {
if (ArrayBuffer.isView(chunk) === false) {
throw new TypeError('You can only enqueue array buffer views when readableType is "bytes"');
}
ReadableByteStreamControllerEnqueue(controller, chunk);
} else if (type === undefined) {
try {
ReadableStreamDefaultControllerEnqueue(controller, chunk);
} catch (e) {
// This happens when readableStrategy.size() throws.
// The ReadableStream has already errored itself.
transformStream._readableClosed = true;
TransformStreamErrorIfNeeded(transformStream, e);

throw transformStream._storedError;
}
}

const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(controller);
const desiredSize = TransformStreamGetDesiredSize(transformStream);
const maybeBackpressure = desiredSize <= 0;

if (maybeBackpressure === true && transformStream._backpressure === false) {
Expand All @@ -76,10 +88,14 @@ function TransformStreamCloseReadableInternal(transformStream) {
assert(transformStream._errored === false);
assert(transformStream._readableClosed === false);

try {
ReadableStreamDefaultControllerClose(transformStream._readableController);
} catch (e) {
assert(false);
const readableController = transformStream._readableController;
const type = transformStream._readableType;
assert(type === undefined || type === 'bytes');

if (type === 'bytes') {
ReadableByteStreamControllerClose(readableController);
} else if (type === undefined) {
ReadableStreamDefaultControllerClose(readableController);
}

transformStream._readableClosed = true;
Expand All @@ -99,11 +115,23 @@ function TransformStreamErrorInternal(transformStream, e) {
transformStream._errored = true;
transformStream._storedError = e;

const writableController = transformStream._writableController;
const readableController = transformStream._readableController;

const writableType = transformStream._writableType;
const readableType = transformStream._readableType;
assert(writableType === undefined);
assert(readableType === 'bytes' || readableType === undefined);

if (transformStream._writableDone === false) {
WritableStreamDefaultControllerError(transformStream._writableController, e);
WritableStreamDefaultControllerError(writableController, e);
}
if (transformStream._readableClosed === false) {
ReadableStreamDefaultControllerError(transformStream._readableController, e);
if (readableType === 'bytes') {
ReadableByteStreamControllerError(readableController, e);
} else if (readableType === undefined) {
ReadableStreamDefaultControllerError(readableController, e);
}
}
}

Expand Down Expand Up @@ -176,6 +204,19 @@ function TransformStreamTransform(transformStream, chunk) {
e => TransformStreamErrorIfNeeded(transformStream, e));
}

function TransformStreamGetDesiredSize(transformStream) {
const type = transformStream._readableType;
const controller = transformStream._readableController;

assert(type === undefined || type === 'bytes');

if (type === 'bytes') {
return ReadableByteStreamControllerGetDesiredSize(controller);
} else if (type === undefined) {
return ReadableStreamDefaultControllerGetDesiredSize(controller);
}
}

function IsTransformStreamDefaultController(x) {
if (!typeIsObject(x)) {
return false;
Expand All @@ -201,9 +242,10 @@ function IsTransformStream(x) {
}

class TransformStreamSink {
constructor(transformStream, startPromise) {
constructor(transformStream, startPromise, type) {
this._transformStream = transformStream;
this._startPromise = startPromise;
this.type = type;
}

start(c) {
Expand Down Expand Up @@ -256,9 +298,10 @@ class TransformStreamSink {
}

class TransformStreamSource {
constructor(transformStream, startPromise) {
constructor(transformStream, startPromise, type) {
this._transformStream = transformStream;
this._startPromise = startPromise;
this.type = type;
}

start(c) {
Expand Down Expand Up @@ -321,15 +364,28 @@ class TransformStreamDefaultController {
this._controlledTransformStream = transformStream;
}

get byobRequest() {
if (IsTransformStreamDefaultController(this) === false) {
throw defaultControllerBrandCheckException('byobRequest');
}

if (this._readableType !== 'bytes') {
return undefined;
}

const readableController = this._readableController;

return ReadableByteStreamControllerGetBYOBRequest(readableController);
}

get desiredSize() {
if (IsTransformStreamDefaultController(this) === false) {
throw defaultControllerBrandCheckException('desiredSize');
}

const transformStream = this._controlledTransformStream;
const readableController = transformStream._readableController;

return ReadableStreamDefaultControllerGetDesiredSize(readableController);
return TransformStreamGetDesiredSize(transformStream);
}

enqueue(chunk) {
Expand Down Expand Up @@ -379,23 +435,26 @@ class TransformStream {

this._transformStreamController = new TransformStreamDefaultController(this);

this._readableType = transformer.readableType;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to validate the value of readableType and writableType here? It seems weird to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was relying on the other two stream constructors already validating it, but I suppose not.

this._writableType = transformer.writableType;

let startPromise_resolve;
const startPromise = new Promise(resolve => {
startPromise_resolve = resolve;
});

const source = new TransformStreamSource(this, startPromise);
const source = new TransformStreamSource(this, startPromise, this._readableType);

this._readable = new ReadableStream(source, readableStrategy);

const sink = new TransformStreamSink(this, startPromise);
const sink = new TransformStreamSink(this, startPromise, this._writableType);

this._writable = new WritableStream(sink, writableStrategy);

assert(this._writableController !== undefined);
assert(this._readableController !== undefined);

const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(this._readableController);
const desiredSize = TransformStreamGetDesiredSize(this);
// Set _backpressure based on desiredSize. As there is no read() at this point, we can just interpret
// desiredSize being non-positive as backpressure.
TransformStreamSetBackpressure(this, desiredSize <= 0);
Expand Down