worker: properly handle env and NODE_OPTIONS in workers
PR-URL: https://github.com/nodejs/node/pull/31711 Fixes: https://github.com/nodejs/node/issues/30627 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
parent
84b8857098
commit
d63bcdd9cd
@ -1358,8 +1358,8 @@ E('ERR_VM_MODULE_NOT_MODULE',
|
||||
'Provided module is not an instance of Module', Error);
|
||||
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
|
||||
E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
|
||||
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors) =>
|
||||
`Initiated Worker with invalid execArgv flags: ${errors.join(', ')}`,
|
||||
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
|
||||
`Initiated Worker with ${msg}: ${errors.join(', ')}`,
|
||||
Error);
|
||||
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
|
||||
E('ERR_WORKER_OUT_OF_MEMORY', 'Worker terminated due to reaching memory limit',
|
||||
|
@ -125,17 +125,16 @@ class Worker extends EventEmitter {
|
||||
|
||||
const url = options.eval ? null : pathToFileURL(filename);
|
||||
// Set up the C++ handle for the worker, as well as some internal wiring.
|
||||
this[kHandle] = new WorkerImpl(url, options.execArgv,
|
||||
this[kHandle] = new WorkerImpl(url,
|
||||
env === process.env ? null : env,
|
||||
options.execArgv,
|
||||
parseResourceLimits(options.resourceLimits));
|
||||
if (this[kHandle].invalidExecArgv) {
|
||||
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
|
||||
}
|
||||
if (env === process.env) {
|
||||
// This may be faster than manually cloning the object in C++, especially
|
||||
// when recursively spawning Workers.
|
||||
this[kHandle].cloneParentEnvVars();
|
||||
} else if (env !== undefined) {
|
||||
this[kHandle].setEnvVars(env);
|
||||
if (this[kHandle].invalidNodeOptions) {
|
||||
throw new ERR_WORKER_INVALID_EXEC_ARGV(
|
||||
this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable');
|
||||
}
|
||||
this[kHandle].onexit = (code, customErr) => this[kOnExit](code, customErr);
|
||||
this[kPort] = this[kHandle].messagePort;
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
using node::kAllowedInEnvironment;
|
||||
using node::kDisallowedInEnvironment;
|
||||
using v8::Array;
|
||||
using v8::ArrayBuffer;
|
||||
@ -47,14 +48,15 @@ Worker::Worker(Environment* env,
|
||||
Local<Object> wrap,
|
||||
const std::string& url,
|
||||
std::shared_ptr<PerIsolateOptions> per_isolate_opts,
|
||||
std::vector<std::string>&& exec_argv)
|
||||
std::vector<std::string>&& exec_argv,
|
||||
std::shared_ptr<KVStore> env_vars)
|
||||
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
|
||||
per_isolate_opts_(per_isolate_opts),
|
||||
exec_argv_(exec_argv),
|
||||
platform_(env->isolate_data()->platform()),
|
||||
start_profiler_idle_notifier_(env->profiler_idle_notifier_started()),
|
||||
thread_id_(Environment::AllocateThreadId()),
|
||||
env_vars_(env->env_vars()) {
|
||||
env_vars_(env_vars) {
|
||||
Debug(this, "Creating new worker instance with thread id %llu", thread_id_);
|
||||
|
||||
// Set up everything that needs to be set up in the parent environment.
|
||||
@ -441,6 +443,7 @@ Worker::~Worker() {
|
||||
|
||||
void Worker::New(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
Isolate* isolate = args.GetIsolate();
|
||||
|
||||
CHECK(args.IsConstructCall());
|
||||
|
||||
@ -451,24 +454,81 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
|
||||
|
||||
std::string url;
|
||||
std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
|
||||
std::shared_ptr<KVStore> env_vars = nullptr;
|
||||
|
||||
std::vector<std::string> exec_argv_out;
|
||||
bool has_explicit_exec_argv = false;
|
||||
|
||||
CHECK_EQ(args.Length(), 3);
|
||||
CHECK_EQ(args.Length(), 4);
|
||||
// Argument might be a string or URL
|
||||
if (!args[0]->IsNullOrUndefined()) {
|
||||
Utf8Value value(
|
||||
args.GetIsolate(),
|
||||
args[0]->ToString(env->context()).FromMaybe(Local<String>()));
|
||||
isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
|
||||
url.append(value.out(), value.length());
|
||||
}
|
||||
|
||||
if (args[1]->IsArray()) {
|
||||
Local<Array> array = args[1].As<Array>();
|
||||
if (args[1]->IsNull()) {
|
||||
// Means worker.env = { ...process.env }.
|
||||
env_vars = env->env_vars()->Clone(isolate);
|
||||
} else if (args[1]->IsObject()) {
|
||||
// User provided env.
|
||||
env_vars = KVStore::CreateMapKVStore();
|
||||
env_vars->AssignFromObject(isolate->GetCurrentContext(),
|
||||
args[1].As<Object>());
|
||||
} else {
|
||||
// Env is shared.
|
||||
env_vars = env->env_vars();
|
||||
}
|
||||
|
||||
if (args[1]->IsObject() || args[2]->IsArray()) {
|
||||
per_isolate_opts.reset(new PerIsolateOptions());
|
||||
|
||||
HandleEnvOptions(
|
||||
per_isolate_opts->per_env, [isolate, &env_vars](const char* name) {
|
||||
MaybeLocal<String> value =
|
||||
env_vars->Get(isolate, OneByteString(isolate, name));
|
||||
return value.IsEmpty() ? std::string{}
|
||||
: std::string(*String::Utf8Value(
|
||||
isolate, value.ToLocalChecked()));
|
||||
});
|
||||
|
||||
#ifndef NODE_WITHOUT_NODE_OPTIONS
|
||||
MaybeLocal<String> maybe_node_opts =
|
||||
env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
|
||||
if (!maybe_node_opts.IsEmpty()) {
|
||||
std::string node_options(
|
||||
*String::Utf8Value(isolate, maybe_node_opts.ToLocalChecked()));
|
||||
std::vector<std::string> errors{};
|
||||
std::vector<std::string> env_argv =
|
||||
ParseNodeOptionsEnvVar(node_options, &errors);
|
||||
// [0] is expected to be the program name, add dummy string.
|
||||
env_argv.insert(env_argv.begin(), "");
|
||||
std::vector<std::string> invalid_args{};
|
||||
options_parser::Parse(&env_argv,
|
||||
nullptr,
|
||||
&invalid_args,
|
||||
per_isolate_opts.get(),
|
||||
kAllowedInEnvironment,
|
||||
&errors);
|
||||
if (errors.size() > 0 && args[1]->IsObject()) {
|
||||
// Only fail for explicitly provided env, this protects from failures
|
||||
// when NODE_OPTIONS from parent's env is used (which is the default).
|
||||
Local<Value> error;
|
||||
if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
|
||||
Local<String> key =
|
||||
FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
|
||||
// Ignore the return value of Set() because exceptions bubble up to JS
|
||||
// when we return anyway.
|
||||
USE(args.This()->Set(env->context(), key, error));
|
||||
return;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (args[2]->IsArray()) {
|
||||
Local<Array> array = args[2].As<Array>();
|
||||
// The first argument is reserved for program name, but we don't need it
|
||||
// in workers.
|
||||
has_explicit_exec_argv = true;
|
||||
std::vector<std::string> exec_argv = {""};
|
||||
uint32_t length = array->Length();
|
||||
for (uint32_t i = 0; i < length; i++) {
|
||||
@ -490,8 +550,6 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
|
||||
|
||||
std::vector<std::string> invalid_args{};
|
||||
std::vector<std::string> errors{};
|
||||
per_isolate_opts.reset(new PerIsolateOptions());
|
||||
|
||||
// Using invalid_args as the v8_args argument as it stores unknown
|
||||
// options for the per isolate parser.
|
||||
options_parser::Parse(
|
||||
@ -518,40 +576,24 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
|
||||
USE(args.This()->Set(env->context(), key, error));
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!has_explicit_exec_argv)
|
||||
} else {
|
||||
exec_argv_out = env->exec_argv();
|
||||
}
|
||||
|
||||
Worker* worker =
|
||||
new Worker(env, args.This(), url, per_isolate_opts,
|
||||
std::move(exec_argv_out));
|
||||
Worker* worker = new Worker(env,
|
||||
args.This(),
|
||||
url,
|
||||
per_isolate_opts,
|
||||
std::move(exec_argv_out),
|
||||
env_vars);
|
||||
|
||||
CHECK(args[2]->IsFloat64Array());
|
||||
Local<Float64Array> limit_info = args[2].As<Float64Array>();
|
||||
CHECK(args[3]->IsFloat64Array());
|
||||
Local<Float64Array> limit_info = args[3].As<Float64Array>();
|
||||
CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
|
||||
limit_info->CopyContents(worker->resource_limits_,
|
||||
sizeof(worker->resource_limits_));
|
||||
}
|
||||
|
||||
void Worker::CloneParentEnvVars(const FunctionCallbackInfo<Value>& args) {
|
||||
Worker* w;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
|
||||
CHECK(w->thread_joined_); // The Worker has not started yet.
|
||||
|
||||
w->env_vars_ = w->env()->env_vars()->Clone(args.GetIsolate());
|
||||
}
|
||||
|
||||
void Worker::SetEnvVars(const FunctionCallbackInfo<Value>& args) {
|
||||
Worker* w;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
|
||||
CHECK(w->thread_joined_); // The Worker has not started yet.
|
||||
|
||||
CHECK(args[0]->IsObject());
|
||||
w->env_vars_ = KVStore::CreateMapKVStore();
|
||||
w->env_vars_->AssignFromObject(args.GetIsolate()->GetCurrentContext(),
|
||||
args[0].As<Object>());
|
||||
}
|
||||
|
||||
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
|
||||
Worker* w;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
|
||||
@ -723,8 +765,6 @@ void InitWorker(Local<Object> target,
|
||||
w->InstanceTemplate()->SetInternalFieldCount(1);
|
||||
w->Inherit(AsyncWrap::GetConstructorTemplate(env));
|
||||
|
||||
env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars);
|
||||
env->SetProtoMethod(w, "cloneParentEnvVars", Worker::CloneParentEnvVars);
|
||||
env->SetProtoMethod(w, "startThread", Worker::StartThread);
|
||||
env->SetProtoMethod(w, "stopThread", Worker::StopThread);
|
||||
env->SetProtoMethod(w, "ref", Worker::Ref);
|
||||
|
@ -26,7 +26,8 @@ class Worker : public AsyncWrap {
|
||||
v8::Local<v8::Object> wrap,
|
||||
const std::string& url,
|
||||
std::shared_ptr<PerIsolateOptions> per_isolate_opts,
|
||||
std::vector<std::string>&& exec_argv);
|
||||
std::vector<std::string>&& exec_argv,
|
||||
std::shared_ptr<KVStore> env_vars);
|
||||
~Worker() override;
|
||||
|
||||
// Run the worker. This is only called from the worker thread.
|
||||
|
@ -7,6 +7,7 @@ if (process.config.variables.node_without_node_options)
|
||||
|
||||
const assert = require('assert');
|
||||
const exec = require('child_process').execFile;
|
||||
const { Worker } = require('worker_threads');
|
||||
|
||||
const tmpdir = require('../common/tmpdir');
|
||||
tmpdir.refresh();
|
||||
@ -14,13 +15,13 @@ tmpdir.refresh();
|
||||
const printA = require.resolve('../fixtures/printA.js');
|
||||
const printSpaceA = require.resolve('../fixtures/print A.js');
|
||||
|
||||
expect(` -r ${printA} `, 'A\nB\n');
|
||||
expect(`-r ${printA}`, 'A\nB\n');
|
||||
expect(`-r ${JSON.stringify(printA)}`, 'A\nB\n');
|
||||
expect(`-r ${JSON.stringify(printSpaceA)}`, 'A\nB\n');
|
||||
expect(`-r ${printA} -r ${printA}`, 'A\nB\n');
|
||||
expect(` -r ${printA} -r ${printA}`, 'A\nB\n');
|
||||
expect(` --require ${printA} --require ${printA}`, 'A\nB\n');
|
||||
expectNoWorker(` -r ${printA} `, 'A\nB\n');
|
||||
expectNoWorker(`-r ${printA}`, 'A\nB\n');
|
||||
expectNoWorker(`-r ${JSON.stringify(printA)}`, 'A\nB\n');
|
||||
expectNoWorker(`-r ${JSON.stringify(printSpaceA)}`, 'A\nB\n');
|
||||
expectNoWorker(`-r ${printA} -r ${printA}`, 'A\nB\n');
|
||||
expectNoWorker(` -r ${printA} -r ${printA}`, 'A\nB\n');
|
||||
expectNoWorker(` --require ${printA} --require ${printA}`, 'A\nB\n');
|
||||
expect('--no-deprecation', 'B\n');
|
||||
expect('--no-warnings', 'B\n');
|
||||
expect('--no_warnings', 'B\n');
|
||||
@ -28,16 +29,22 @@ expect('--trace-warnings', 'B\n');
|
||||
expect('--redirect-warnings=_', 'B\n');
|
||||
expect('--trace-deprecation', 'B\n');
|
||||
expect('--trace-sync-io', 'B\n');
|
||||
expect('--trace-events-enabled', 'B\n');
|
||||
expectNoWorker('--trace-events-enabled', 'B\n');
|
||||
expect('--track-heap-objects', 'B\n');
|
||||
expect('--throw-deprecation', 'B\n');
|
||||
expect('--zero-fill-buffers', 'B\n');
|
||||
expect('--v8-pool-size=10', 'B\n');
|
||||
expect('--trace-event-categories node', 'B\n');
|
||||
expect('--throw-deprecation',
|
||||
/.*DeprecationWarning: Buffer\(\) is deprecated due to security and usability issues.*/,
|
||||
'new Buffer(42)',
|
||||
true);
|
||||
expectNoWorker('--zero-fill-buffers', 'B\n');
|
||||
expectNoWorker('--v8-pool-size=10', 'B\n');
|
||||
expectNoWorker('--trace-event-categories node', 'B\n');
|
||||
expectNoWorker(
|
||||
// eslint-disable-next-line no-template-curly-in-string
|
||||
'--trace-event-file-pattern {pid}-${rotation}.trace_events',
|
||||
'B\n'
|
||||
);
|
||||
// eslint-disable-next-line no-template-curly-in-string
|
||||
expect('--trace-event-file-pattern {pid}-${rotation}.trace_events', 'B\n');
|
||||
// eslint-disable-next-line no-template-curly-in-string
|
||||
expect('--trace-event-file-pattern {pid}-${rotation}.trace_events ' +
|
||||
expectNoWorker('--trace-event-file-pattern {pid}-${rotation}.trace_events ' +
|
||||
'--trace-event-categories node.async_hooks', 'B\n');
|
||||
expect('--unhandled-rejections=none', 'B\n');
|
||||
|
||||
@ -53,9 +60,9 @@ if (common.isLinux && ['arm', 'x64'].includes(process.arch)) {
|
||||
}
|
||||
|
||||
if (common.hasCrypto) {
|
||||
expect('--use-openssl-ca', 'B\n');
|
||||
expect('--use-bundled-ca', 'B\n');
|
||||
expect('--openssl-config=_ossl_cfg', 'B\n');
|
||||
expectNoWorker('--use-openssl-ca', 'B\n');
|
||||
expectNoWorker('--use-bundled-ca', 'B\n');
|
||||
expectNoWorker('--openssl-config=_ossl_cfg', 'B\n');
|
||||
}
|
||||
|
||||
// V8 options
|
||||
@ -63,14 +70,20 @@ expect('--abort_on-uncaught_exception', 'B\n');
|
||||
expect('--disallow-code-generation-from-strings', 'B\n');
|
||||
expect('--max-old-space-size=0', 'B\n');
|
||||
expect('--stack-trace-limit=100',
|
||||
/(\s*at f \(\[eval\]:1:\d*\)\r?\n){100}/,
|
||||
/(\s*at f \(\[(eval|worker eval)\]:1:\d*\)\r?\n)/,
|
||||
'(function f() { f(); })();',
|
||||
true);
|
||||
// Unsupported on arm. See https://crbug.com/v8/8713.
|
||||
if (!['arm', 'arm64'].includes(process.arch))
|
||||
expect('--interpreted-frames-native-stack', 'B\n');
|
||||
|
||||
function expect(opt, want, command = 'console.log("B")', wantsError = false) {
|
||||
function expectNoWorker(opt, want, command, wantsError) {
|
||||
expect(opt, want, command, wantsError, false);
|
||||
}
|
||||
|
||||
function expect(
|
||||
opt, want, command = 'console.log("B")', wantsError = false, testWorker = true
|
||||
) {
|
||||
const argv = ['-e', command];
|
||||
const opts = {
|
||||
cwd: tmpdir.path,
|
||||
@ -79,15 +92,52 @@ function expect(opt, want, command = 'console.log("B")', wantsError = false) {
|
||||
};
|
||||
if (typeof want === 'string')
|
||||
want = new RegExp(want);
|
||||
exec(process.execPath, argv, opts, common.mustCall((err, stdout, stderr) => {
|
||||
|
||||
const test = (type) => common.mustCall((err, stdout) => {
|
||||
const o = JSON.stringify(opt);
|
||||
if (wantsError) {
|
||||
stdout = stderr;
|
||||
assert.ok(err, `${type}: expected error for ${o}`);
|
||||
stdout = err.stack;
|
||||
} else {
|
||||
assert.ifError(err);
|
||||
assert.ifError(err, `${type}: failed for ${o}`);
|
||||
}
|
||||
if (want.test(stdout)) return;
|
||||
|
||||
const o = JSON.stringify(opt);
|
||||
assert.fail(`For ${o}, failed to find ${want} in: <\n${stdout}\n>`);
|
||||
assert.fail(
|
||||
`${type}: for ${o}, failed to find ${want} in: <\n${stdout}\n>`
|
||||
);
|
||||
});
|
||||
|
||||
exec(process.execPath, argv, opts, test('child process'));
|
||||
if (testWorker)
|
||||
workerTest(opts, command, wantsError, test('worker'));
|
||||
}
|
||||
|
||||
async function collectStream(readable) {
|
||||
readable.setEncoding('utf8');
|
||||
let data = '';
|
||||
for await (const chunk of readable) {
|
||||
data += chunk;
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
function workerTest(opts, command, wantsError, test) {
|
||||
let workerError = null;
|
||||
const worker = new Worker(command, {
|
||||
...opts,
|
||||
execArgv: [],
|
||||
eval: true,
|
||||
stdout: true,
|
||||
stderr: true
|
||||
});
|
||||
worker.on('error', (err) => {
|
||||
workerError = err;
|
||||
});
|
||||
worker.on('exit', common.mustCall((code) => {
|
||||
assert.strictEqual(code, wantsError ? 1 : 0);
|
||||
collectStream(worker.stdout).then((stdout) => {
|
||||
test(workerError, stdout);
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ const { spawn } = require('child_process');
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const process = require('process');
|
||||
const { Worker } = require('worker_threads');
|
||||
|
||||
// Setup: Copy fixtures to tmp directory.
|
||||
|
||||
@ -53,7 +54,7 @@ const linkDir = path.join(dirName,
|
||||
|
||||
const linkTarget = path.join('..', '..', 'dep2');
|
||||
|
||||
const linkScript = 'linkscript.js';
|
||||
const linkScript = './linkscript.js';
|
||||
|
||||
const linkScriptTarget = path.join(dirName, 'symlinked.js');
|
||||
|
||||
@ -84,4 +85,31 @@ function test() {
|
||||
assert.strictEqual(code, 0);
|
||||
assert(!signal);
|
||||
});
|
||||
|
||||
// Also verify that symlinks works for setting preserve via env variables in
|
||||
// Workers.
|
||||
const worker = new Worker(linkScript, {
|
||||
env: { ...process.env, NODE_PRESERVE_SYMLINKS: '1' }
|
||||
});
|
||||
worker.on('error', (err) => {
|
||||
console.log('Worker failed');
|
||||
throw err;
|
||||
});
|
||||
worker.on('exit', common.mustCall((code) => {
|
||||
assert.strictEqual(code, 0);
|
||||
}));
|
||||
|
||||
// Also verify that symlinks works for setting preserve via env variables in
|
||||
// Workers with explicit execArgv.
|
||||
const workerArgv = new Worker(linkScript, {
|
||||
execArgv: [],
|
||||
env: { ...process.env, NODE_PRESERVE_SYMLINKS: '1' }
|
||||
});
|
||||
workerArgv.on('error', (err) => {
|
||||
console.log('Worker with execArgv failed');
|
||||
throw err;
|
||||
});
|
||||
workerArgv.on('exit', common.mustCall((code) => {
|
||||
assert.strictEqual(code, 0);
|
||||
}));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user