worker: add ability to unshift message from MessagePort

In combination with Atomics, this makes it possible to implement
generic synchronous functionality, e.g. `importScript()`, in Workers
purely by communicating with other threads.

This is a continuation of https://github.com/nodejs/node/pull/26686,
where a preference for a solution was voiced that allowed reading
individual messages, rather than emitting all messages through events.

PR-URL: https://github.com/nodejs/node/pull/27294
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
This commit is contained in:
Anna Henningsen 2019-04-18 00:56:57 +02:00
parent 13d2df530b
commit 76f2168393
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
7 changed files with 139 additions and 49 deletions

View File

@ -125,6 +125,34 @@ if (isMainThread) {
}
```
## worker.receiveMessageOnPort(port)
<!-- YAML
added: REPLACEME
-->
* `port` {MessagePort}
* Returns: {Object|undefined}
Receive a single message from a given `MessagePort`. If no message is available,
`undefined` is returned, otherwise an object with a single `message` property
that contains the message payload, corresponding to the oldest message in the
`MessagePort`s queue.
```js
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });
console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined
```
When this function is used, no `'message'` event will be emitted and the
`onmessage` listener will not be invoked.
## worker.SHARE_ENV
<!-- YAML
added: v11.14.0

View File

@ -4,13 +4,15 @@ const { Object } = primordials;
const {
handle_onclose: handleOnCloseSymbol,
oninit: onInitSymbol
oninit: onInitSymbol,
no_message_symbol: noMessageSymbol
} = internalBinding('symbols');
const {
MessagePort,
MessageChannel,
drainMessagePort,
moveMessagePortToContext,
receiveMessageOnPort: receiveMessageOnPort_,
stopMessagePort
} = internalBinding('messaging');
const {
@ -235,6 +237,12 @@ function createWorkerStdio() {
};
}
function receiveMessageOnPort(port) {
const message = receiveMessageOnPort_(port);
if (message === noMessageSymbol) return undefined;
return { message };
}
module.exports = {
drainMessagePort,
messageTypes,
@ -245,6 +253,7 @@ module.exports = {
moveMessagePortToContext,
MessagePort,
MessageChannel,
receiveMessageOnPort,
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio,

View File

@ -11,6 +11,7 @@ const {
MessagePort,
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort
} = require('internal/worker/io');
module.exports = {
@ -18,6 +19,7 @@ module.exports = {
MessagePort,
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort,
threadId,
SHARE_ENV,
Worker,

View File

@ -130,6 +130,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
// for the sake of convenience.
#define PER_ISOLATE_SYMBOL_PROPERTIES(V) \
V(handle_onclose_symbol, "handle_onclose") \
V(no_message_symbol, "no_message_symbol") \
V(oninit_symbol, "oninit") \
V(owner_symbol, "owner") \

View File

@ -569,6 +569,40 @@ MessagePort* MessagePort::New(
return port;
}
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
bool only_if_receiving) {
Message received;
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
Debug(this, "MessagePort has message");
bool wants_message = receiving_messages_ || !only_if_receiving;
// We have nothing to do if:
// - There are no pending messages
// - We are not intending to receive messages, and the message we would
// receive is not the final "close" message.
if (data_->incoming_messages_.empty() ||
(!wants_message &&
!data_->incoming_messages_.front().IsCloseMessage())) {
return env()->no_message_symbol();
}
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
if (received.IsCloseMessage()) {
Close();
return env()->no_message_symbol();
}
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
return received.Deserialize(env(), context);
}
void MessagePort::OnMessage() {
Debug(this, "Running MessagePort::OnMessage()");
HandleScope handle_scope(env()->isolate());
@ -579,32 +613,12 @@ void MessagePort::OnMessage() {
// messages, so we need to check that this handle still owns its `data_` field
// on every iteration.
while (data_) {
Message received;
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(context);
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(receiving_messages_));
// We have nothing to do if:
// - There are no pending messages
// - We are not intending to receive messages, and the message we would
// receive is not the final "close" message.
if (data_->incoming_messages_.empty() ||
(!receiving_messages_ &&
!data_->incoming_messages_.front().IsCloseMessage())) {
break;
}
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
if (received.IsCloseMessage()) {
Close();
return;
}
Local<Value> payload;
if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
if (payload == env()->no_message_symbol()) break;
if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
@ -612,28 +626,20 @@ void MessagePort::OnMessage() {
continue;
}
{
// Call the JS .onmessage() callback.
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(context);
Local<Object> event;
Local<Value> payload;
Local<Value> cb_args[1];
if (!received.Deserialize(env(), context).ToLocal(&payload) ||
!env()->message_event_object_template()->NewInstance(context)
.ToLocal(&event) ||
event->Set(context, env()->data_string(), payload).IsNothing() ||
event->Set(context, env()->target_string(), object()).IsNothing() ||
(cb_args[0] = event, false) ||
MakeCallback(env()->onmessage_string(),
arraysize(cb_args),
cb_args).IsEmpty()) {
// Re-schedule OnMessage() execution in case of failure.
if (data_)
TriggerAsync();
return;
}
Local<Object> event;
Local<Value> cb_args[1];
if (!env()->message_event_object_template()->NewInstance(context)
.ToLocal(&event) ||
event->Set(context, env()->data_string(), payload).IsNothing() ||
event->Set(context, env()->target_string(), object()).IsNothing() ||
(cb_args[0] = event, false) ||
MakeCallback(env()->onmessage_string(),
arraysize(cb_args),
cb_args).IsEmpty()) {
// Re-schedule OnMessage() execution in case of failure.
if (data_)
TriggerAsync();
return;
}
}
}
@ -754,11 +760,26 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
port->OnMessage();
}
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
if (port == nullptr) {
// Return 'no messages' for a closed port.
args.GetReturnValue().Set(
Environment::GetCurrent(args)->no_message_symbol());
return;
}
MaybeLocal<Value> payload =
port->ReceiveMessage(port->object()->CreationContext(), false);
if (!payload.IsEmpty())
args.GetReturnValue().Set(payload.ToLocalChecked());
}
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args[0]->IsObject() ||
@ -875,6 +896,7 @@ static void InitMessaging(Local<Object> target,
// the browser equivalents do not provide them.
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
env->SetMethod(target, "moveMessagePortToContext",
MessagePort::MoveToContext);
}

View File

@ -163,6 +163,7 @@ class MessagePort : public HandleWrap {
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
/* static */
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
@ -200,6 +201,8 @@ class MessagePort : public HandleWrap {
void OnClose() override;
void OnMessage();
void TriggerAsync();
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
bool only_if_receiving);
std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;

View File

@ -0,0 +1,25 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
const message1 = { hello: 'world' };
const message2 = { foo: 'bar' };
// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does
// when were using events.
assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
port1.postMessage(message1);
port1.postMessage(message2);
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message2 });
assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
assert.deepStrictEqual(receiveMessageOnPort(port2), undefined);
// Make sure message handlers arent called.
port2.on('message', common.mustNotCall());
port1.postMessage(message1);
assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 });
port1.close();