worker: improve MessagePort performance
Use a JS function as the single entry point for emitting `.onmessage()` calls, avoiding the overhead of manually constructing each message event object in C++. confidence improvement accuracy (*) (**) (***) worker/echo.js n=100000 sendsPerBroadcast=1 payload='object' workers=1 *** 16.34 % ±1.16% ±1.54% ±1.99% worker/echo.js n=100000 sendsPerBroadcast=1 payload='string' workers=1 *** 24.41 % ±1.50% ±1.99% ±2.58% worker/echo.js n=100000 sendsPerBroadcast=10 payload='object' workers=1 *** 26.66 % ±1.54% ±2.05% ±2.65% worker/echo.js n=100000 sendsPerBroadcast=10 payload='string' workers=1 *** 32.72 % ±1.60% ±2.11% ±2.73% worker/messageport.js n=1000000 payload='object' *** 40.28 % ±1.48% ±1.95% ±2.52% worker/messageport.js n=1000000 payload='string' *** 76.95 % ±2.19% ±2.90% ±3.75% Also fix handling exceptions returned from `MessagePort::New`. PR-URL: https://github.com/nodejs/node/pull/31605 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Denys Otrishko <shishugi@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
parent
e2a3a87060
commit
a555be2e45
12
lib/internal/per_context/messageport.js
Normal file
12
lib/internal/per_context/messageport.js
Normal file
@ -0,0 +1,12 @@
|
||||
'use strict';
|
||||
class MessageEvent {
|
||||
constructor(data, target) {
|
||||
this.data = data;
|
||||
this.target = target;
|
||||
}
|
||||
}
|
||||
|
||||
exports.emitMessage = function(data) {
|
||||
if (typeof this.onmessage === 'function')
|
||||
this.onmessage(new MessageEvent(data, this));
|
||||
};
|
1
node.gyp
1
node.gyp
@ -37,6 +37,7 @@
|
||||
'lib/internal/bootstrap/switches/is_not_main_thread.js',
|
||||
'lib/internal/per_context/primordials.js',
|
||||
'lib/internal/per_context/domexception.js',
|
||||
'lib/internal/per_context/messageport.js',
|
||||
'lib/async_hooks.js',
|
||||
'lib/assert.js',
|
||||
'lib/buffer.js',
|
||||
|
@ -482,6 +482,7 @@ bool InitializeContextForSnapshot(Local<Context> context) {
|
||||
|
||||
static const char* context_files[] = {"internal/per_context/primordials",
|
||||
"internal/per_context/domexception",
|
||||
"internal/per_context/messageport",
|
||||
nullptr};
|
||||
|
||||
for (const char** module = context_files; *module != nullptr; module++) {
|
||||
|
@ -403,7 +403,6 @@ constexpr size_t kFsStatsBufferLength =
|
||||
V(http2stream_constructor_template, v8::ObjectTemplate) \
|
||||
V(http2ping_constructor_template, v8::ObjectTemplate) \
|
||||
V(libuv_stream_wrap_ctor_template, v8::FunctionTemplate) \
|
||||
V(message_event_object_template, v8::ObjectTemplate) \
|
||||
V(message_port_constructor_template, v8::FunctionTemplate) \
|
||||
V(pipe_constructor_template, v8::FunctionTemplate) \
|
||||
V(promise_wrap_template, v8::ObjectTemplate) \
|
||||
|
@ -29,7 +29,6 @@ using v8::Maybe;
|
||||
using v8::MaybeLocal;
|
||||
using v8::Nothing;
|
||||
using v8::Object;
|
||||
using v8::ObjectTemplate;
|
||||
using v8::SharedArrayBuffer;
|
||||
using v8::String;
|
||||
using v8::Symbol;
|
||||
@ -170,6 +169,20 @@ uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) {
|
||||
|
||||
namespace {
|
||||
|
||||
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
|
||||
Isolate* isolate = context->GetIsolate();
|
||||
Local<Object> per_context_bindings;
|
||||
Local<Value> emit_message_val;
|
||||
if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
|
||||
!per_context_bindings->Get(context,
|
||||
FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
|
||||
.ToLocal(&emit_message_val)) {
|
||||
return MaybeLocal<Function>();
|
||||
}
|
||||
CHECK(emit_message_val->IsFunction());
|
||||
return emit_message_val.As<Function>();
|
||||
}
|
||||
|
||||
MaybeLocal<Function> GetDOMException(Local<Context> context) {
|
||||
Isolate* isolate = context->GetIsolate();
|
||||
Local<Object> per_context_bindings;
|
||||
@ -471,10 +484,14 @@ MessagePort::MessagePort(Environment* env,
|
||||
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
|
||||
channel->OnMessage();
|
||||
};
|
||||
|
||||
CHECK_EQ(uv_async_init(env->event_loop(),
|
||||
&async_,
|
||||
onmessage), 0);
|
||||
async_.data = static_cast<void*>(this);
|
||||
async_.data = nullptr; // Reset later to indicate success of the constructor.
|
||||
auto cleanup = OnScopeLeave([&]() {
|
||||
if (async_.data == nullptr) Close();
|
||||
});
|
||||
|
||||
Local<Value> fn;
|
||||
if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
|
||||
@ -482,9 +499,16 @@ MessagePort::MessagePort(Environment* env,
|
||||
|
||||
if (fn->IsFunction()) {
|
||||
Local<Function> init = fn.As<Function>();
|
||||
USE(init->Call(context, wrap, 0, nullptr));
|
||||
if (init->Call(context, wrap, 0, nullptr).IsEmpty())
|
||||
return;
|
||||
}
|
||||
|
||||
Local<Function> emit_message_fn;
|
||||
if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
|
||||
return;
|
||||
emit_message_fn_.Reset(env->isolate(), emit_message_fn);
|
||||
|
||||
async_.data = static_cast<void*>(this);
|
||||
Debug(this, "Created message port");
|
||||
}
|
||||
|
||||
@ -532,6 +556,11 @@ MessagePort* MessagePort::New(
|
||||
return nullptr;
|
||||
MessagePort* port = new MessagePort(env, context, instance);
|
||||
CHECK_NOT_NULL(port);
|
||||
if (port->IsHandleClosing()) {
|
||||
// Construction failed with an exception.
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (data) {
|
||||
port->Detach();
|
||||
port->data_ = std::move(data);
|
||||
@ -624,16 +653,8 @@ void MessagePort::OnMessage() {
|
||||
continue;
|
||||
}
|
||||
|
||||
Local<Object> event;
|
||||
Local<Value> cb_args[1];
|
||||
if (!env()->message_event_object_template()->NewInstance(context)
|
||||
.ToLocal(&event) ||
|
||||
event->Set(context, env()->data_string(), payload).IsNothing() ||
|
||||
event->Set(context, env()->target_string(), object()).IsNothing() ||
|
||||
(cb_args[0] = event, false) ||
|
||||
MakeCallback(env()->onmessage_string(),
|
||||
arraysize(cb_args),
|
||||
cb_args).IsEmpty()) {
|
||||
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
|
||||
if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
|
||||
// Re-schedule OnMessage() execution in case of failure.
|
||||
if (data_)
|
||||
TriggerAsync();
|
||||
@ -902,6 +923,7 @@ void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
|
||||
|
||||
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
|
||||
tracker->TrackField("data", data_);
|
||||
tracker->TrackField("emit_message_fn", emit_message_fn_);
|
||||
}
|
||||
|
||||
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
|
||||
@ -911,8 +933,6 @@ Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
|
||||
if (!templ.IsEmpty())
|
||||
return templ;
|
||||
|
||||
Isolate* isolate = env->isolate();
|
||||
|
||||
{
|
||||
Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
|
||||
m->SetClassName(env->message_port_constructor_string());
|
||||
@ -923,13 +943,6 @@ Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
|
||||
env->SetProtoMethod(m, "start", MessagePort::Start);
|
||||
|
||||
env->set_message_port_constructor_template(m);
|
||||
|
||||
Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
|
||||
event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
|
||||
Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
|
||||
e->Set(env->data_string(), Null(isolate));
|
||||
e->Set(env->target_string(), Null(isolate));
|
||||
env->set_message_event_object_template(e);
|
||||
}
|
||||
|
||||
return GetMessagePortConstructorTemplate(env);
|
||||
@ -948,7 +961,13 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
|
||||
Context::Scope context_scope(context);
|
||||
|
||||
MessagePort* port1 = MessagePort::New(env, context);
|
||||
if (port1 == nullptr) return;
|
||||
MessagePort* port2 = MessagePort::New(env, context);
|
||||
if (port2 == nullptr) {
|
||||
port1->Close();
|
||||
return;
|
||||
}
|
||||
|
||||
MessagePort::Entangle(port1, port2);
|
||||
|
||||
args.This()->Set(context, env->port1_string(), port1->object())
|
||||
|
@ -131,12 +131,16 @@ class MessagePortData : public MemoryRetainer {
|
||||
// the uv_async_t handle that is used to notify the current event loop of
|
||||
// new incoming messages.
|
||||
class MessagePort : public HandleWrap {
|
||||
public:
|
||||
private:
|
||||
// Create a new MessagePort. The `context` argument specifies the Context
|
||||
// instance that is used for creating the values emitted from this port.
|
||||
// This is called by MessagePort::New(), which is the public API used for
|
||||
// creating MessagePort instances.
|
||||
MessagePort(Environment* env,
|
||||
v8::Local<v8::Context> context,
|
||||
v8::Local<v8::Object> wrap);
|
||||
|
||||
public:
|
||||
~MessagePort() override;
|
||||
|
||||
// Create a new message port instance, optionally over an existing
|
||||
@ -205,6 +209,7 @@ class MessagePort : public HandleWrap {
|
||||
std::unique_ptr<MessagePortData> data_ = nullptr;
|
||||
bool receiving_messages_ = false;
|
||||
uv_async_t async_;
|
||||
v8::Global<v8::Function> emit_message_fn_;
|
||||
|
||||
friend class MessagePortData;
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user