diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ad3f0796875aae..4359c9a5d47c2f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -13,7 +13,6 @@ const { const { eos } = require('internal/streams/end-of-stream'); const { once } = require('internal/util'); const destroyImpl = require('internal/streams/destroy'); -const Duplex = require('internal/streams/duplex'); const { AbortError, aggregateTwoErrors, @@ -36,6 +35,8 @@ const { isIterable, isReadable, isReadableNodeStream, + isWritableNodeStream, + isWritableStream, isNodeStream, isTransformStream, isWebStream, @@ -159,7 +160,8 @@ async function pumpToWeb(readable, writable, finish, { end }) { try { for await (const chunk of readable) { await writer.ready; - writer.write(chunk).catch(() => {}); + writer.write(chunk).catch(() => { + }); } await writer.ready; @@ -179,6 +181,68 @@ async function pumpToWeb(readable, writable, finish, { end }) { } } +function isValidPipelineSource(stream) { + return ( + isIterable(stream) || + isReadableNodeStream(stream) || + isReadableStream(stream) || + isTransformStream(stream) + ); +} + +function isValidPipelineTransform(stream) { + return ( + isTransformStream(stream) || + (isNodeStream(stream) && + isReadableNodeStream(stream) && + isWritableNodeStream(stream)) + ); +} + +function isValidPipelineDestination(stream) { + return ( + isWritableNodeStream(stream) || + isWritableStream(stream) || + isTransformStream(stream) + ); +} + +function validatePipelineStream(stream, i, len) { + if (typeof stream === 'function') { + return; + } + + if (i === 0) { + if (!isValidPipelineSource(stream)) { + throw new ERR_INVALID_ARG_TYPE( + 'source', + ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], + stream, + ); + } + return; + } + + if (i === len - 1) { + if (!isValidPipelineDestination(stream)) { + throw new ERR_INVALID_ARG_TYPE( + 'destination', + ['Writable', 'WritableStream', 'TransformStream'], + stream, + ); + } + return; + } + + if (!isValidPipelineTransform(stream)) { + throw new ERR_INVALID_ARG_TYPE( + `transform[${i - 1}]`, + ['Duplex', 'Transform', 'TransformStream'], + stream, + ); + } +} + function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))); } @@ -192,6 +256,10 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_MISSING_ARGS('streams'); } + for (let i = 0; i < streams.length; i++) { + validatePipelineStream(streams[i], i, streams.length); + } + const ac = new AbortController(); const signal = ac.signal; const outerSignal = opts?.signal; @@ -298,10 +366,8 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { - ret = stream; } else { - ret = Duplex.from(stream); + ret = stream; } } else if (typeof stream === 'function') { if (isTransformStream(ret)) { @@ -402,8 +468,6 @@ function pipelineImpl(streams, callback, opts) { 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); } ret = stream; - } else { - ret = Duplex.from(stream); } }