diagnostics_channel: fix unsubscribe during publish
PR-URL: https://github.com/nodejs/node/pull/55116 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com> Reviewed-By: Claudio Wunder <cwunder@gnome.org>
This commit is contained in:
parent
488ce99d76
commit
80b56bbab0
@ -4,6 +4,8 @@ const {
|
|||||||
ArrayPrototypeAt,
|
ArrayPrototypeAt,
|
||||||
ArrayPrototypeIndexOf,
|
ArrayPrototypeIndexOf,
|
||||||
ArrayPrototypePush,
|
ArrayPrototypePush,
|
||||||
|
ArrayPrototypePushApply,
|
||||||
|
ArrayPrototypeSlice,
|
||||||
ArrayPrototypeSplice,
|
ArrayPrototypeSplice,
|
||||||
ObjectDefineProperty,
|
ObjectDefineProperty,
|
||||||
ObjectGetPrototypeOf,
|
ObjectGetPrototypeOf,
|
||||||
@ -97,6 +99,7 @@ function wrapStoreRun(store, data, next, transform = defaultTransform) {
|
|||||||
class ActiveChannel {
|
class ActiveChannel {
|
||||||
subscribe(subscription) {
|
subscribe(subscription) {
|
||||||
validateFunction(subscription, 'subscription');
|
validateFunction(subscription, 'subscription');
|
||||||
|
this._subscribers = ArrayPrototypeSlice(this._subscribers);
|
||||||
ArrayPrototypePush(this._subscribers, subscription);
|
ArrayPrototypePush(this._subscribers, subscription);
|
||||||
channels.incRef(this.name);
|
channels.incRef(this.name);
|
||||||
}
|
}
|
||||||
@ -105,7 +108,10 @@ class ActiveChannel {
|
|||||||
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
|
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
|
||||||
if (index === -1) return false;
|
if (index === -1) return false;
|
||||||
|
|
||||||
ArrayPrototypeSplice(this._subscribers, index, 1);
|
const before = ArrayPrototypeSlice(this._subscribers, 0, index);
|
||||||
|
const after = ArrayPrototypeSlice(this._subscribers, index + 1);
|
||||||
|
this._subscribers = before;
|
||||||
|
ArrayPrototypePushApply(this._subscribers, after);
|
||||||
|
|
||||||
channels.decRef(this.name);
|
channels.decRef(this.name);
|
||||||
maybeMarkInactive(this);
|
maybeMarkInactive(this);
|
||||||
@ -137,9 +143,10 @@ class ActiveChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
publish(data) {
|
publish(data) {
|
||||||
for (let i = 0; i < (this._subscribers?.length || 0); i++) {
|
const subscribers = this._subscribers;
|
||||||
|
for (let i = 0; i < (subscribers?.length || 0); i++) {
|
||||||
try {
|
try {
|
||||||
const onMessage = this._subscribers[i];
|
const onMessage = subscribers[i];
|
||||||
onMessage(data, this.name);
|
onMessage(data, this.name);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
process.nextTick(() => {
|
process.nextTick(() => {
|
||||||
|
@ -9,6 +9,7 @@ const published_data = 'some message';
|
|||||||
const onMessageHandler = common.mustCall(() => dc.unsubscribe(channel_name, onMessageHandler));
|
const onMessageHandler = common.mustCall(() => dc.unsubscribe(channel_name, onMessageHandler));
|
||||||
|
|
||||||
dc.subscribe(channel_name, onMessageHandler);
|
dc.subscribe(channel_name, onMessageHandler);
|
||||||
|
dc.subscribe(channel_name, common.mustCall());
|
||||||
|
|
||||||
// This must not throw.
|
// This must not throw.
|
||||||
dc.channel(channel_name).publish(published_data);
|
dc.channel(channel_name).publish(published_data);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user