The `open()` method on fs read and write streams has been deprecated for many years. It's time to remove it while still allowing the open method to be monkeypatched since that's still apparently a thing. PR-URL: https://github.com/nodejs/node/pull/58529 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com> Reviewed-By: LiviaMedeiros <livia@cirno.name> Reviewed-By: Michaël Zasso <targos@protonmail.com> Reviewed-By: Jason Zhang <xzha4350@gmail.com>
556 lines
15 KiB
JavaScript
556 lines
15 KiB
JavaScript
'use strict';
|
|
|
|
const {
|
|
Array,
|
|
FunctionPrototypeBind,
|
|
MathMin,
|
|
ObjectDefineProperty,
|
|
ObjectSetPrototypeOf,
|
|
PromisePrototypeThen,
|
|
ReflectApply,
|
|
Symbol,
|
|
} = primordials;
|
|
|
|
const {
|
|
ERR_INVALID_ARG_TYPE,
|
|
ERR_METHOD_NOT_IMPLEMENTED,
|
|
ERR_OUT_OF_RANGE,
|
|
ERR_STREAM_DESTROYED,
|
|
ERR_SYSTEM_ERROR,
|
|
} = require('internal/errors').codes;
|
|
const {
|
|
kEmptyObject,
|
|
} = require('internal/util');
|
|
const {
|
|
validateBoolean,
|
|
validateFunction,
|
|
validateInteger,
|
|
} = require('internal/validators');
|
|
const { errorOrDestroy } = require('internal/streams/destroy');
|
|
const fs = require('fs');
|
|
const { kRef, kUnref, FileHandle } = require('internal/fs/promises');
|
|
const { Buffer } = require('buffer');
|
|
const {
|
|
copyObject,
|
|
getOptions,
|
|
getValidatedFd,
|
|
validatePath,
|
|
} = require('internal/fs/utils');
|
|
const { Readable, Writable, finished } = require('stream');
|
|
const { toPathIfFileURL } = require('internal/url');
|
|
const kIoDone = Symbol('kIoDone');
|
|
const kIsPerformingIO = Symbol('kIsPerformingIO');
|
|
|
|
const kFs = Symbol('kFs');
|
|
const kHandle = Symbol('kHandle');
|
|
|
|
function _construct(callback) {
|
|
const stream = this;
|
|
if (typeof stream.fd === 'number') {
|
|
callback();
|
|
return;
|
|
}
|
|
|
|
if (typeof stream.open === 'function') {
|
|
// Backwards compat for monkey patching open().
|
|
const orgEmit = stream.emit;
|
|
stream.emit = function(...args) {
|
|
if (args[0] === 'open') {
|
|
this.emit = orgEmit;
|
|
callback();
|
|
ReflectApply(orgEmit, this, args);
|
|
} else if (args[0] === 'error') {
|
|
this.emit = orgEmit;
|
|
callback(args[1]);
|
|
} else {
|
|
ReflectApply(orgEmit, this, args);
|
|
}
|
|
};
|
|
stream.open();
|
|
} else {
|
|
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
|
|
if (er) {
|
|
callback(er);
|
|
} else {
|
|
stream.fd = fd;
|
|
callback();
|
|
stream.emit('open', stream.fd);
|
|
stream.emit('ready');
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
// This generates an fs operations structure for a FileHandle
|
|
const FileHandleOperations = (handle) => {
|
|
return {
|
|
open: (path, flags, mode, cb) => {
|
|
throw new ERR_METHOD_NOT_IMPLEMENTED('open()');
|
|
},
|
|
close: (fd, cb) => {
|
|
handle[kUnref]();
|
|
PromisePrototypeThen(handle.close(),
|
|
() => cb(), cb);
|
|
},
|
|
fsync: (fd, cb) => {
|
|
PromisePrototypeThen(handle.sync(), () => cb(), cb);
|
|
},
|
|
read: (fd, buf, offset, length, pos, cb) => {
|
|
PromisePrototypeThen(handle.read(buf, offset, length, pos),
|
|
(r) => cb(null, r.bytesRead, r.buffer),
|
|
(err) => cb(err, 0, buf));
|
|
},
|
|
write: (fd, buf, offset, length, pos, cb) => {
|
|
PromisePrototypeThen(handle.write(buf, offset, length, pos),
|
|
(r) => cb(null, r.bytesWritten, r.buffer),
|
|
(err) => cb(err, 0, buf));
|
|
},
|
|
writev: (fd, buffers, pos, cb) => {
|
|
PromisePrototypeThen(handle.writev(buffers, pos),
|
|
(r) => cb(null, r.bytesWritten, r.buffers),
|
|
(err) => cb(err, 0, buffers));
|
|
},
|
|
};
|
|
};
|
|
|
|
function close(stream, err, cb) {
|
|
if (!stream.fd) {
|
|
cb(err);
|
|
} else if (stream.flush) {
|
|
stream[kFs].fsync(stream.fd, (flushErr) => {
|
|
_close(stream, err || flushErr, cb);
|
|
});
|
|
} else {
|
|
_close(stream, err, cb);
|
|
}
|
|
}
|
|
|
|
function _close(stream, err, cb) {
|
|
stream[kFs].close(stream.fd, (er) => {
|
|
cb(er || err);
|
|
});
|
|
stream.fd = null;
|
|
}
|
|
|
|
function importFd(stream, options) {
|
|
if (typeof options.fd === 'number') {
|
|
// When fd is a raw descriptor, we must keep our fingers crossed
|
|
// that the descriptor won't get closed, or worse, replaced with
|
|
// another one
|
|
// https://github.com/nodejs/node/issues/35862
|
|
stream[kFs] = options.fs || fs;
|
|
return options.fd;
|
|
} else if (typeof options.fd === 'object' &&
|
|
options.fd instanceof FileHandle) {
|
|
// When fd is a FileHandle we can listen for 'close' events
|
|
if (options.fs) {
|
|
// FileHandle is not supported with custom fs operations
|
|
throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs');
|
|
}
|
|
stream[kHandle] = options.fd;
|
|
stream[kFs] = FileHandleOperations(stream[kHandle]);
|
|
stream[kHandle][kRef]();
|
|
options.fd.on('close', FunctionPrototypeBind(stream.close, stream));
|
|
return options.fd.fd;
|
|
}
|
|
|
|
throw new ERR_INVALID_ARG_TYPE('options.fd',
|
|
['number', 'FileHandle'], options.fd);
|
|
}
|
|
|
|
function ReadStream(path, options) {
|
|
if (!(this instanceof ReadStream))
|
|
return new ReadStream(path, options);
|
|
|
|
// A little bit bigger buffer and water marks by default
|
|
options = copyObject(getOptions(options, kEmptyObject));
|
|
if (options.highWaterMark === undefined)
|
|
options.highWaterMark = 64 * 1024;
|
|
|
|
if (options.autoDestroy === undefined) {
|
|
options.autoDestroy = false;
|
|
}
|
|
|
|
if (options.fd == null) {
|
|
this.fd = null;
|
|
this[kFs] = options.fs || fs;
|
|
validateFunction(this[kFs].open, 'options.fs.open');
|
|
|
|
// Path will be ignored when fd is specified, so it can be falsy
|
|
this.path = toPathIfFileURL(path);
|
|
this.flags = options.flags === undefined ? 'r' : options.flags;
|
|
this.mode = options.mode === undefined ? 0o666 : options.mode;
|
|
|
|
validatePath(this.path);
|
|
} else {
|
|
this.fd = getValidatedFd(importFd(this, options));
|
|
}
|
|
|
|
options.autoDestroy = options.autoClose === undefined ?
|
|
true : options.autoClose;
|
|
|
|
validateFunction(this[kFs].read, 'options.fs.read');
|
|
|
|
if (options.autoDestroy) {
|
|
validateFunction(this[kFs].close, 'options.fs.close');
|
|
}
|
|
|
|
this.start = options.start;
|
|
this.end = options.end;
|
|
this.pos = undefined;
|
|
this.bytesRead = 0;
|
|
this[kIsPerformingIO] = false;
|
|
|
|
if (this.start !== undefined) {
|
|
validateInteger(this.start, 'start', 0);
|
|
|
|
this.pos = this.start;
|
|
}
|
|
|
|
|
|
if (this.end === undefined) {
|
|
this.end = Infinity;
|
|
} else if (this.end !== Infinity) {
|
|
validateInteger(this.end, 'end', 0);
|
|
|
|
if (this.start !== undefined && this.start > this.end) {
|
|
throw new ERR_OUT_OF_RANGE(
|
|
'start',
|
|
`<= "end" (here: ${this.end})`,
|
|
this.start,
|
|
);
|
|
}
|
|
}
|
|
|
|
ReflectApply(Readable, this, [options]);
|
|
}
|
|
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
|
|
ObjectSetPrototypeOf(ReadStream, Readable);
|
|
|
|
ObjectDefineProperty(ReadStream.prototype, 'autoClose', {
|
|
__proto__: null,
|
|
get() {
|
|
return this._readableState.autoDestroy;
|
|
},
|
|
set(val) {
|
|
this._readableState.autoDestroy = val;
|
|
},
|
|
});
|
|
|
|
ReadStream.prototype._construct = _construct;
|
|
|
|
ReadStream.prototype._read = function(n) {
|
|
n = this.pos !== undefined ?
|
|
MathMin(this.end - this.pos + 1, n) :
|
|
MathMin(this.end - this.bytesRead + 1, n);
|
|
|
|
if (n <= 0) {
|
|
this.push(null);
|
|
return;
|
|
}
|
|
|
|
const buf = Buffer.allocUnsafeSlow(n);
|
|
|
|
this[kIsPerformingIO] = true;
|
|
this[kFs]
|
|
.read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
|
|
this[kIsPerformingIO] = false;
|
|
|
|
// Tell ._destroy() that it's safe to close the fd now.
|
|
if (this.destroyed) {
|
|
this.emit(kIoDone, er);
|
|
return;
|
|
}
|
|
|
|
if (er) {
|
|
errorOrDestroy(this, er);
|
|
} else if (bytesRead > 0) {
|
|
if (this.pos !== undefined) {
|
|
this.pos += bytesRead;
|
|
}
|
|
|
|
this.bytesRead += bytesRead;
|
|
|
|
if (bytesRead !== buf.length) {
|
|
// Slow path. Shrink to fit.
|
|
// Copy instead of slice so that we don't retain
|
|
// large backing buffer for small reads.
|
|
const dst = Buffer.allocUnsafeSlow(bytesRead);
|
|
buf.copy(dst, 0, 0, bytesRead);
|
|
buf = dst;
|
|
}
|
|
|
|
this.push(buf);
|
|
} else {
|
|
this.push(null);
|
|
}
|
|
});
|
|
};
|
|
|
|
ReadStream.prototype._destroy = function(err, cb) {
|
|
// Usually for async IO it is safe to close a file descriptor
|
|
// even when there are pending operations. However, due to platform
|
|
// differences file IO is implemented using synchronous operations
|
|
// running in a thread pool. Therefore, file descriptors are not safe
|
|
// to close while used in a pending read or write operation. Wait for
|
|
// any pending IO (kIsPerformingIO) to complete (kIoDone).
|
|
if (this[kIsPerformingIO]) {
|
|
this.once(kIoDone, (er) => close(this, err || er, cb));
|
|
} else {
|
|
close(this, err, cb);
|
|
}
|
|
};
|
|
|
|
ReadStream.prototype.close = function(cb) {
|
|
if (typeof cb === 'function') finished(this, cb);
|
|
this.destroy();
|
|
};
|
|
|
|
ObjectDefineProperty(ReadStream.prototype, 'pending', {
|
|
__proto__: null,
|
|
get() { return this.fd === null; },
|
|
configurable: true,
|
|
});
|
|
|
|
function WriteStream(path, options) {
|
|
if (!(this instanceof WriteStream))
|
|
return new WriteStream(path, options);
|
|
|
|
options = copyObject(getOptions(options, kEmptyObject));
|
|
|
|
// Only buffers are supported.
|
|
options.decodeStrings = true;
|
|
|
|
if (options.fd == null) {
|
|
this.fd = null;
|
|
this[kFs] = options.fs || fs;
|
|
validateFunction(this[kFs].open, 'options.fs.open');
|
|
|
|
// Path will be ignored when fd is specified, so it can be falsy
|
|
this.path = toPathIfFileURL(path);
|
|
this.flags = options.flags === undefined ? 'w' : options.flags;
|
|
this.mode = options.mode === undefined ? 0o666 : options.mode;
|
|
|
|
validatePath(this.path);
|
|
} else {
|
|
this.fd = getValidatedFd(importFd(this, options));
|
|
}
|
|
|
|
options.autoDestroy = options.autoClose === undefined ?
|
|
true : options.autoClose;
|
|
|
|
if (!this[kFs].write && !this[kFs].writev) {
|
|
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
|
|
this[kFs].write);
|
|
}
|
|
|
|
if (this[kFs].write) {
|
|
validateFunction(this[kFs].write, 'options.fs.write');
|
|
}
|
|
|
|
if (this[kFs].writev) {
|
|
validateFunction(this[kFs].writev, 'options.fs.writev');
|
|
}
|
|
|
|
if (options.autoDestroy) {
|
|
validateFunction(this[kFs].close, 'options.fs.close');
|
|
}
|
|
|
|
this.flush = options.flush;
|
|
if (this.flush == null) {
|
|
this.flush = false;
|
|
} else {
|
|
validateBoolean(this.flush, 'options.flush');
|
|
validateFunction(this[kFs].fsync, 'options.fs.fsync');
|
|
}
|
|
|
|
// It's enough to override either, in which case only one will be used.
|
|
if (!this[kFs].write) {
|
|
this._write = null;
|
|
}
|
|
if (!this[kFs].writev) {
|
|
this._writev = null;
|
|
}
|
|
|
|
this.start = options.start;
|
|
this.pos = undefined;
|
|
this.bytesWritten = 0;
|
|
this[kIsPerformingIO] = false;
|
|
|
|
if (this.start !== undefined) {
|
|
validateInteger(this.start, 'start', 0);
|
|
|
|
this.pos = this.start;
|
|
}
|
|
|
|
ReflectApply(Writable, this, [options]);
|
|
|
|
if (options.encoding)
|
|
this.setDefaultEncoding(options.encoding);
|
|
}
|
|
ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
|
|
ObjectSetPrototypeOf(WriteStream, Writable);
|
|
|
|
ObjectDefineProperty(WriteStream.prototype, 'autoClose', {
|
|
__proto__: null,
|
|
get() {
|
|
return this._writableState.autoDestroy;
|
|
},
|
|
set(val) {
|
|
this._writableState.autoDestroy = val;
|
|
},
|
|
});
|
|
|
|
WriteStream.prototype._construct = _construct;
|
|
|
|
function writeAll(data, size, pos, cb, retries = 0) {
|
|
this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
|
|
// No data currently available and operation should be retried later.
|
|
if (er?.code === 'EAGAIN') {
|
|
er = null;
|
|
bytesWritten = 0;
|
|
}
|
|
|
|
if (this.destroyed || er) {
|
|
return cb(er || new ERR_STREAM_DESTROYED('write'));
|
|
}
|
|
|
|
this.bytesWritten += bytesWritten;
|
|
|
|
retries = bytesWritten ? 0 : retries + 1;
|
|
size -= bytesWritten;
|
|
pos += bytesWritten;
|
|
|
|
// Try writing non-zero number of bytes up to 5 times.
|
|
if (retries > 5) {
|
|
cb(new ERR_SYSTEM_ERROR('write failed'));
|
|
} else if (size) {
|
|
writeAll.call(this, buffer.slice(bytesWritten), size, pos, cb, retries);
|
|
} else {
|
|
cb();
|
|
}
|
|
});
|
|
}
|
|
|
|
function writevAll(chunks, size, pos, cb, retries = 0) {
|
|
this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
|
|
// No data currently available and operation should be retried later.
|
|
if (er?.code === 'EAGAIN') {
|
|
er = null;
|
|
bytesWritten = 0;
|
|
}
|
|
|
|
if (this.destroyed || er) {
|
|
return cb(er || new ERR_STREAM_DESTROYED('writev'));
|
|
}
|
|
|
|
this.bytesWritten += bytesWritten;
|
|
|
|
retries = bytesWritten ? 0 : retries + 1;
|
|
size -= bytesWritten;
|
|
pos += bytesWritten;
|
|
|
|
// Try writing non-zero number of bytes up to 5 times.
|
|
if (retries > 5) {
|
|
cb(new ERR_SYSTEM_ERROR('writev failed'));
|
|
} else if (size) {
|
|
writevAll.call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
|
|
} else {
|
|
cb();
|
|
}
|
|
});
|
|
}
|
|
|
|
WriteStream.prototype._write = function(data, encoding, cb) {
|
|
this[kIsPerformingIO] = true;
|
|
writeAll.call(this, data, data.length, this.pos, (er) => {
|
|
this[kIsPerformingIO] = false;
|
|
if (this.destroyed) {
|
|
// Tell ._destroy() that it's safe to close the fd now.
|
|
cb(er);
|
|
return this.emit(kIoDone, er);
|
|
}
|
|
|
|
cb(er);
|
|
});
|
|
|
|
if (this.pos !== undefined)
|
|
this.pos += data.length;
|
|
};
|
|
|
|
WriteStream.prototype._writev = function(data, cb) {
|
|
const len = data.length;
|
|
const chunks = new Array(len);
|
|
let size = 0;
|
|
|
|
for (let i = 0; i < len; i++) {
|
|
const chunk = data[i].chunk;
|
|
|
|
chunks[i] = chunk;
|
|
size += chunk.length;
|
|
}
|
|
|
|
this[kIsPerformingIO] = true;
|
|
writevAll.call(this, chunks, size, this.pos, (er) => {
|
|
this[kIsPerformingIO] = false;
|
|
if (this.destroyed) {
|
|
// Tell ._destroy() that it's safe to close the fd now.
|
|
cb(er);
|
|
return this.emit(kIoDone, er);
|
|
}
|
|
|
|
cb(er);
|
|
});
|
|
|
|
if (this.pos !== undefined)
|
|
this.pos += size;
|
|
};
|
|
|
|
WriteStream.prototype._destroy = function(err, cb) {
|
|
// Usually for async IO it is safe to close a file descriptor
|
|
// even when there are pending operations. However, due to platform
|
|
// differences file IO is implemented using synchronous operations
|
|
// running in a thread pool. Therefore, file descriptors are not safe
|
|
// to close while used in a pending read or write operation. Wait for
|
|
// any pending IO (kIsPerformingIO) to complete (kIoDone).
|
|
if (this[kIsPerformingIO]) {
|
|
this.once(kIoDone, (er) => close(this, err || er, cb));
|
|
} else {
|
|
close(this, err, cb);
|
|
}
|
|
};
|
|
|
|
WriteStream.prototype.close = function(cb) {
|
|
if (cb) {
|
|
if (this.closed) {
|
|
process.nextTick(cb);
|
|
return;
|
|
}
|
|
this.on('close', cb);
|
|
}
|
|
|
|
// If we are not autoClosing, we should call
|
|
// destroy on 'finish'.
|
|
if (!this.autoClose) {
|
|
this.on('finish', this.destroy);
|
|
}
|
|
|
|
// We use end() instead of destroy() because of
|
|
// https://github.com/nodejs/node/issues/2006
|
|
this.end();
|
|
};
|
|
|
|
// There is no shutdown() for files.
|
|
WriteStream.prototype.destroySoon = WriteStream.prototype.end;
|
|
|
|
ObjectDefineProperty(WriteStream.prototype, 'pending', {
|
|
__proto__: null,
|
|
get() { return this.fd === null; },
|
|
configurable: true,
|
|
});
|
|
|
|
module.exports = {
|
|
ReadStream,
|
|
WriteStream,
|
|
};
|