class ReadableBase
implements [NodeJS.ReadableStream]
extends Stream
Usage in Deno
```typescript import { ReadableBase } from "node:node__stream.d.ts"; ```readonly
closed: boolean
Is `true` after `'close'` has been emitted.
destroyed: boolean
Is `true` after `readable.destroy()` has been called.
readonly
errored: Error | null
Returns error if the stream has been destroyed with an error.
readable: boolean
Is `true` if it is safe to call read, which means
the stream has not been destroyed or emitted `'error'` or `'end'`.
readonly
readableAborted: boolean
Returns whether the stream was destroyed or errored before emitting `'end'`.
readonly
readableDidRead: boolean
Returns whether `'data'` has been emitted.
readonly
readableEncoding: BufferEncoding | null
Getter for the property `encoding` of a given `Readable` stream. The `encoding` property can be set using the setEncoding method.
readonly
readableEnded: boolean
Becomes `true` when [`'end'`](https://nodejs.org/docs/latest-v22.x/api/stream.html#event-end) event is emitted.
readonly
readableFlowing: boolean | null
This property reflects the current state of a `Readable` stream as described
in the [Three states](https://nodejs.org/docs/latest-v22.x/api/stream.html#three-states) section.
readonly
readableHighWaterMark: number
Returns the value of `highWaterMark` passed when creating this `Readable`.
readonly
readableLength: number
This property contains the number of bytes (or objects) in the queue
ready to be read. The value provides introspection data regarding
the status of the `highWaterMark`.
readonly
readableObjectMode: boolean
Getter for the property `objectMode` of a given `Readable` stream.
[Symbol.asyncDispose](): Promise<void>
Calls `readable.destroy()` with an `AbortError` and returns a promise that fulfills when the stream is finished.
[Symbol.asyncIterator](): AsyncIterableIterator<any>
abstract
_construct(callback: (error?: Error | null) => void): void
_destroy(error: Error | null,callback: (error?: Error | null) => void,): void
_read(size: number): void
addListener(event: "close",listener: () => void,): this
Event emitter
The defined events on documents including:
1. close
2. data
3. end
4. error
5. pause
6. readable
7. resume
addListener(event: "data",listener: (chunk: any) => void,): this
addListener(event: "end",listener: () => void,): this
addListener(event: "error",listener: (err: Error) => void,): this
addListener(event: "pause",listener: () => void,): this
addListener(event: "readable",listener: () => void,): this
addListener(event: "resume",listener: () => void,): this
addListener(event: string | symbol,listener: (...args: any[]) => void,): this
asIndexedPairs(options?: Pick<ArrayOptions, "signal">): Readable
This method returns a new stream with chunks of the underlying stream paired with a counter
in the form `[index, chunk]`. The first index value is `0` and it increases by 1 for each chunk produced.
destroy(error?: Error): this
Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'` event (unless `emitClose` is set to `false`). After this call, the readable
stream will release any internal resources and subsequent calls to `push()` will be ignored.
Once `destroy()` has been called any further calls will be a no-op and no
further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method, but instead implement `readable._destroy()`.
drop(limit: number,options?: Pick<ArrayOptions, "signal">,): Readable
This method returns a new stream with the first *limit* chunks dropped from the start.
emit(event: "close"): boolean
emit(event: "data",chunk: any,): boolean
emit(event: "end"): boolean
emit(event: "error",err: Error,): boolean
emit(event: "pause"): boolean
emit(event: "readable"): boolean
emit(event: "resume"): boolean
emit(event: string | symbol,...args: any[],): boolean
every(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => boolean | Promise<boolean>,options?: ArrayOptions,): Promise<boolean>
This method is similar to `Array.prototype.every` and calls *fn* on each chunk in the stream
to check if all awaited return values are truthy value for *fn*. Once an *fn* call on a chunk
`await`ed return value is falsy, the stream is destroyed and the promise is fulfilled with `false`.
If all of the *fn* calls on the chunks return a truthy value, the promise is fulfilled with `true`.
filter(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => boolean | Promise<boolean>,options?: ArrayOptions,): Readable
This method allows filtering the stream. For each chunk in the stream the *fn* function will be called
and if it returns a truthy value, the chunk will be passed to the result stream.
If the *fn* function returns a promise - that promise will be `await`ed.
find<T>(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => data is T,options?: ArrayOptions,): Promise<T | undefined>
This method is similar to `Array.prototype.find` and calls *fn* on each chunk in the stream
to find a chunk with a truthy value for *fn*. Once an *fn* call's awaited return value is truthy,
the stream is destroyed and the promise is fulfilled with value for which *fn* returned a truthy value.
If all of the *fn* calls on the chunks return a falsy value, the promise is fulfilled with `undefined`.
find(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => boolean | Promise<boolean>,options?: ArrayOptions,): Promise<any>
flatMap(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => any,options?: ArrayOptions,): Readable
This method returns a new stream by applying the given callback to each chunk of the stream
and then flattening the result.
It is possible to return a stream or another iterable or async iterable from *fn* and the result streams
will be merged (flattened) into the returned stream.
forEach(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => void | Promise<void>,options?: ArrayOptions,): Promise<void>
This method allows iterating a stream. For each chunk in the stream the *fn* function will be called.
If the *fn* function returns a promise - that promise will be `await`ed.
This method is different from `for await...of` loops in that it can optionally process chunks concurrently.
In addition, a `forEach` iteration can only be stopped by having passed a `signal` option
and aborting the related AbortController while `for await...of` can be stopped with `break` or `return`.
In either case the stream will be destroyed.
This method is different from listening to the `'data'` event in that it uses the `readable` event
in the underlying machinary and can limit the number of concurrent *fn* calls.
isPaused(): boolean
The `readable.isPaused()` method returns the current operating state of the `Readable`.
This is used primarily by the mechanism that underlies the `readable.pipe()` method.
In most typical cases, there will be no reason to use this method directly.
```js
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
```
iterator(options?: { destroyOnReturn?: boolean; }): AsyncIterableIterator<any>
The iterator created by this method gives users the option to cancel the destruction
of the stream if the `for await...of` loop is exited by `return`, `break`, or `throw`,
or if the iterator should destroy the stream if the stream emitted an error during iteration.
map(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => any,options?: ArrayOptions,): Readable
This method allows mapping over the stream. The *fn* function will be called for every chunk in the stream.
If the *fn* function returns a promise - that promise will be `await`ed before being passed to the result stream.
on(event: "close",listener: () => void,): this
on(event: "data",listener: (chunk: any) => void,): this
on(event: "end",listener: () => void,): this
on(event: "error",listener: (err: Error) => void,): this
on(event: "pause",listener: () => void,): this
on(event: "readable",listener: () => void,): this
on(event: "resume",listener: () => void,): this
on(event: string | symbol,listener: (...args: any[]) => void,): this
once(event: "close",listener: () => void,): this
once(event: "data",listener: (chunk: any) => void,): this
once(event: "end",listener: () => void,): this
once(event: "error",listener: (err: Error) => void,): this
once(event: "pause",listener: () => void,): this
once(event: "readable",listener: () => void,): this
once(event: "resume",listener: () => void,): this
once(event: string | symbol,listener: (...args: any[]) => void,): this
pause(): this
The `readable.pause()` method will cause a stream in flowing mode to stop
emitting `'data'` events, switching out of flowing mode. Any data that
becomes available will remain in the internal buffer.
```js
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
```
The `readable.pause()` method has no effect if there is a `'readable'` event listener.
prependListener(event: "close",listener: () => void,): this
prependListener(event: "data",listener: (chunk: any) => void,): this
prependListener(event: "end",listener: () => void,): this
prependListener(event: "error",listener: (err: Error) => void,): this
prependListener(event: "pause",listener: () => void,): this
prependListener(event: "readable",listener: () => void,): this
prependListener(event: "resume",listener: () => void,): this
prependListener(event: string | symbol,listener: (...args: any[]) => void,): this
prependOnceListener(event: "close",listener: () => void,): this
prependOnceListener(event: "data",listener: (chunk: any) => void,): this
prependOnceListener(event: "end",listener: () => void,): this
prependOnceListener(event: "error",listener: (err: Error) => void,): this
prependOnceListener(event: "pause",listener: () => void,): this
prependOnceListener(event: "readable",listener: () => void,): this
prependOnceListener(event: "resume",listener: () => void,): this
prependOnceListener(event: string | symbol,listener: (...args: any[]) => void,): this
push(chunk: any,encoding?: BufferEncoding,): boolean
read(size?: number): any
The `readable.read()` method reads data out of the internal buffer and
returns it. If no data is available to be read, `null` is returned. By default,
the data is returned as a `Buffer` object unless an encoding has been
specified using the `readable.setEncoding()` method or the stream is operating
in object mode.
The optional `size` argument specifies a specific number of bytes to read. If
`size` bytes are not available to be read, `null` will be returned _unless_ the
stream has ended, in which case all of the data remaining in the internal buffer
will be returned.
If the `size` argument is not specified, all of the data contained in the
internal buffer will be returned.
The `size` argument must be less than or equal to 1 GiB.
The `readable.read()` method should only be called on `Readable` streams
operating in paused mode. In flowing mode, `readable.read()` is called
automatically until the internal buffer is fully drained.
```js
const readable = getReadableStreamSomehow();
// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
let chunk;
console.log('Stream is readable (new data received in buffer)');
// Use a loop to make sure we read all currently available data
while (null !== (chunk = readable.read())) {
console.log(`Read ${chunk.length} bytes of data...`);
}
});
// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
console.log('Reached end of stream.');
});
```
Each call to `readable.read()` returns a chunk of data, or `null`. The chunks
are not concatenated. A `while` loop is necessary to consume all data
currently in the buffer. When reading a large file `.read()` may return `null`,
having consumed all buffered content so far, but there is still more data to
come not yet buffered. In this case a new `'readable'` event will be emitted
when there is more data in the buffer. Finally the `'end'` event will be
emitted when there is no more data to come.
Therefore to read a file's whole contents from a `readable`, it is necessary
to collect chunks across multiple `'readable'` events:
```js
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
```
A `Readable` stream in object mode will always return a single item from
a call to `readable.read(size)`, regardless of the value of the `size` argument.
If the `readable.read()` method returns a chunk of data, a `'data'` event will
also be emitted.
Calling read after the `'end'` event has
been emitted will return `null`. No runtime error will be raised.
reduce<T = any>(): Promise<T>
This method calls *fn* on each chunk of the stream in order, passing it the result from the calculation
on the previous element. It returns a promise for the final value of the reduction.
If no *initial* value is supplied the first chunk of the stream is used as the initial value.
If the stream is empty, the promise is rejected with a `TypeError` with the `ERR_INVALID_ARGS` code property.
The reducer function iterates the stream element-by-element which means that there is no *concurrency* parameter
or parallelism. To perform a reduce concurrently, you can extract the async function to `readable.map` method.
reduce<T = any>(): Promise<T>
removeListener(event: "close",listener: () => void,): this
removeListener(event: "data",listener: (chunk: any) => void,): this
removeListener(event: "end",listener: () => void,): this
removeListener(event: "error",listener: (err: Error) => void,): this
removeListener(event: "pause",listener: () => void,): this
removeListener(event: "readable",listener: () => void,): this
removeListener(event: "resume",listener: () => void,): this
removeListener(event: string | symbol,listener: (...args: any[]) => void,): this
resume(): this
The `readable.resume()` method causes an explicitly paused `Readable` stream to
resume emitting `'data'` events, switching the stream into flowing mode.
The `readable.resume()` method can be used to fully consume the data from a
stream without actually processing any of that data:
```js
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Reached the end, but did not read anything.');
});
```
The `readable.resume()` method has no effect if there is a `'readable'` event listener.
setEncoding(encoding: BufferEncoding): this
The `readable.setEncoding()` method sets the character encoding for
data read from the `Readable` stream.
By default, no encoding is assigned and stream data will be returned as `Buffer` objects. Setting an encoding causes the stream data
to be returned as strings of the specified encoding rather than as `Buffer` objects. For instance, calling `readable.setEncoding('utf8')` will cause the
output data to be interpreted as UTF-8 data, and passed as strings. Calling `readable.setEncoding('hex')` will cause the data to be encoded in hexadecimal
string format.
The `Readable` stream will properly handle multi-byte characters delivered
through the stream that would otherwise become improperly decoded if simply
pulled from the stream as `Buffer` objects.
```js
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Got %d characters of string data:', chunk.length);
});
```
some(fn: (data: any,options?: Pick<ArrayOptions, "signal">,) => boolean | Promise<boolean>,options?: ArrayOptions,): Promise<boolean>
This method is similar to `Array.prototype.some` and calls *fn* on each chunk in the stream
until the awaited return value is `true` (or any truthy value). Once an *fn* call on a chunk
`await`ed return value is truthy, the stream is destroyed and the promise is fulfilled with `true`.
If none of the *fn* calls on the chunks return a truthy value, the promise is fulfilled with `false`.
take(limit: number,options?: Pick<ArrayOptions, "signal">,): Readable
This method returns a new stream with the first *limit* chunks.
toArray(options?: Pick<ArrayOptions, "signal">): Promise<any[]>
This method allows easily obtaining the contents of a stream.
As this method reads the entire stream into memory, it negates the benefits of streams. It's intended
for interoperability and convenience, not as the primary way to consume streams.
unpipe(destination?: WritableStream): this
The `readable.unpipe()` method detaches a `Writable` stream previously attached
using the pipe method.
If the `destination` is not specified, then _all_ pipes are detached.
If the `destination` is specified, but no pipe is set up for it, then
the method does nothing.
```js
import fs from 'node:fs';
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
readable.unpipe(writable);
console.log('Manually close the file stream.');
writable.end();
}, 1000);
```
unshift(chunk: any,encoding?: BufferEncoding,): void
Passing `chunk` as `null` signals the end of the stream (EOF) and behaves the
same as `readable.push(null)`, after which no more data can be written. The EOF
signal is put at the end of the buffer and any buffered data will still be
flushed.
The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by
code that needs to "un-consume" some amount of data that it has optimistically
pulled out of the source, so that the data can be passed on to some other party.
The `stream.unshift(chunk)` method cannot be called after the `'end'` event
has been emitted or a runtime error will be thrown.
Developers using `stream.unshift()` often should consider switching to
use of a `Transform` stream instead. See the `API for stream implementers` section for more information.
```js
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
import { StringDecoder } from 'node:string_decoder';
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Now the body of the message can be read from the stream.
callback(null, header, stream);
return;
}
// Still reading the header.
header += str;
}
}
}
```
Unlike push, `stream.unshift(chunk)` will not
end the reading process by resetting the internal reading state of the stream.
This can cause unexpected results if `readable.unshift()` is called during a
read (i.e. from within a _read implementation on a
custom stream). Following the call to `readable.unshift()` with an immediate push will reset the reading state appropriately,
however it is best to simply avoid calling `readable.unshift()` while in the
process of performing a read.
wrap(stream: ReadableStream): this
Prior to Node.js 0.10, streams did not implement the entire `node:stream` module API as it is currently defined. (See `Compatibility` for more
information.)
When using an older Node.js library that emits `'data'` events and has a pause method that is advisory only, the `readable.wrap()` method can be used to create a `Readable`
stream that uses
the old stream as its data source.
It will rarely be necessary to use `readable.wrap()` but the method has been
provided as a convenience for interacting with older Node.js applications and
libraries.
```js
import { OldReader } from './old-api-module.js';
import { Readable } from 'node:stream';
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // etc.
});
```
from(iterable: Iterable<any> | AsyncIterable<any>,options?: ReadableOptions,): Readable
A utility method for creating Readable Streams out of iterators.
isDisturbed(stream: Readable | ReadableStream): boolean
Returns whether the stream has been read from or cancelled.