lib: improved diagnostics_channel subscribe/unsubscribe
Adds a new top-level subscribe/unsubscribe which will ref/unref the channel WeakReference to prevent subscriptions from getting garbage collected. PR-URL: https://github.com/nodejs/node/pull/42714 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Chengzhong Wu <legendecas@gmail.com> Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de> Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com> Reviewed-By: Vladimir de Turckheim <vlad2t@hotmail.com>
This commit is contained in:
parent
f561f31f1c
commit
e9b6d321f3
@ -43,10 +43,12 @@ import diagnostics_channel from 'node:diagnostics_channel';
|
||||
// Get a reusable channel object
|
||||
const channel = diagnostics_channel.channel('my-channel');
|
||||
|
||||
// Subscribe to the channel
|
||||
channel.subscribe((message, name) => {
|
||||
function onMessage(message, name) {
|
||||
// Received data
|
||||
});
|
||||
}
|
||||
|
||||
// Subscribe to the channel
|
||||
diagnostics_channel.subscribe('my-channel', onMessage);
|
||||
|
||||
// Check if the channel has an active subscriber
|
||||
if (channel.hasSubscribers) {
|
||||
@ -55,6 +57,9 @@ if (channel.hasSubscribers) {
|
||||
some: 'data'
|
||||
});
|
||||
}
|
||||
|
||||
// Unsubscribe from the channel
|
||||
diagnostics_channel.unsubscribe('my-channel', onMessage);
|
||||
```
|
||||
|
||||
```cjs
|
||||
@ -63,10 +68,12 @@ const diagnostics_channel = require('node:diagnostics_channel');
|
||||
// Get a reusable channel object
|
||||
const channel = diagnostics_channel.channel('my-channel');
|
||||
|
||||
// Subscribe to the channel
|
||||
channel.subscribe((message, name) => {
|
||||
function onMessage(message, name) {
|
||||
// Received data
|
||||
});
|
||||
}
|
||||
|
||||
// Subscribe to the channel
|
||||
diagnostics_channel.subscribe('my-channel', onMessage);
|
||||
|
||||
// Check if the channel has an active subscriber
|
||||
if (channel.hasSubscribers) {
|
||||
@ -75,6 +82,9 @@ if (channel.hasSubscribers) {
|
||||
some: 'data'
|
||||
});
|
||||
}
|
||||
|
||||
// Unsubscribe from the channel
|
||||
diagnostics_channel.unsubscribe('my-channel', onMessage);
|
||||
```
|
||||
|
||||
#### `diagnostics_channel.hasSubscribers(name)`
|
||||
@ -121,7 +131,7 @@ added:
|
||||
* `name` {string|symbol} The channel name
|
||||
* Returns: {Channel} The named channel object
|
||||
|
||||
This is the primary entry-point for anyone wanting to interact with a named
|
||||
This is the primary entry-point for anyone wanting to publish to a named
|
||||
channel. It produces a channel object which is optimized to reduce overhead at
|
||||
publish time as much as possible.
|
||||
|
||||
@ -137,6 +147,76 @@ const diagnostics_channel = require('node:diagnostics_channel');
|
||||
const channel = diagnostics_channel.channel('my-channel');
|
||||
```
|
||||
|
||||
#### `diagnostics_channel.subscribe(name, onMessage)`
|
||||
|
||||
<!-- YAML
|
||||
added:
|
||||
- REPLACEME
|
||||
-->
|
||||
|
||||
* `name` {string|symbol} The channel name
|
||||
* `onMessage` {Function} The handler to receive channel messages
|
||||
* `message` {any} The message data
|
||||
* `name` {string|symbol} The name of the channel
|
||||
|
||||
Register a message handler to subscribe to this channel. This message handler
|
||||
will be run synchronously whenever a message is published to the channel. Any
|
||||
errors thrown in the message handler will trigger an [`'uncaughtException'`][].
|
||||
|
||||
```mjs
|
||||
import diagnostics_channel from 'diagnostics_channel';
|
||||
|
||||
diagnostics_channel.subscribe('my-channel', (message, name) => {
|
||||
// Received data
|
||||
});
|
||||
```
|
||||
|
||||
```cjs
|
||||
const diagnostics_channel = require('diagnostics_channel');
|
||||
|
||||
diagnostics_channel.subscribe('my-channel', (message, name) => {
|
||||
// Received data
|
||||
});
|
||||
```
|
||||
|
||||
#### `diagnostics_channel.unsubscribe(name, onMessage)`
|
||||
|
||||
<!-- YAML
|
||||
added:
|
||||
- REPLACEME
|
||||
-->
|
||||
|
||||
* `name` {string|symbol} The channel name
|
||||
* `onMessage` {Function} The previous subscribed handler to remove
|
||||
* Returns: {boolean} `true` if the handler was found, `false` otherwise.
|
||||
|
||||
Remove a message handler previously registered to this channel with
|
||||
[`diagnostics_channel.subscribe(name, onMessage)`][].
|
||||
|
||||
```mjs
|
||||
import diagnostics_channel from 'diagnostics_channel';
|
||||
|
||||
function onMessage(message, name) {
|
||||
// Received data
|
||||
}
|
||||
|
||||
diagnostics_channel.subscribe('my-channel', onMessage);
|
||||
|
||||
diagnostics_channel.unsubscribe('my-channel', onMessage);
|
||||
```
|
||||
|
||||
```cjs
|
||||
const diagnostics_channel = require('diagnostics_channel');
|
||||
|
||||
function onMessage(message, name) {
|
||||
// Received data
|
||||
}
|
||||
|
||||
diagnostics_channel.subscribe('my-channel', onMessage);
|
||||
|
||||
diagnostics_channel.unsubscribe('my-channel', onMessage);
|
||||
```
|
||||
|
||||
### Class: `Channel`
|
||||
|
||||
<!-- YAML
|
||||
@ -344,4 +424,5 @@ Emitted when server sends a response.
|
||||
|
||||
[`'uncaughtException'`]: process.md#event-uncaughtexception
|
||||
[`channel.subscribe(onMessage)`]: #channelsubscribeonmessage
|
||||
[`diagnostics_channel.subscribe(name, onMessage)`]: #diagnostics_channelunsubscribename_onmessage
|
||||
[`diagnostics_channel.channel(name)`]: #diagnostics_channelchannelname
|
||||
|
@ -109,6 +109,22 @@ function channel(name) {
|
||||
return channel;
|
||||
}
|
||||
|
||||
function subscribe(name, subscription) {
|
||||
const chan = channel(name);
|
||||
channels[name].incRef();
|
||||
chan.subscribe(subscription);
|
||||
}
|
||||
|
||||
function unsubscribe(name, subscription) {
|
||||
const chan = channel(name);
|
||||
if (!chan.unsubscribe(subscription)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
channels[name].decRef();
|
||||
return true;
|
||||
}
|
||||
|
||||
function hasSubscribers(name) {
|
||||
let channel;
|
||||
const ref = channels[name];
|
||||
@ -123,5 +139,7 @@ function hasSubscribers(name) {
|
||||
module.exports = {
|
||||
channel,
|
||||
hasSubscribers,
|
||||
subscribe,
|
||||
unsubscribe,
|
||||
Channel
|
||||
};
|
||||
|
44
test/parallel/test-diagnostics-channel-pub-sub.js
Normal file
44
test/parallel/test-diagnostics-channel-pub-sub.js
Normal file
@ -0,0 +1,44 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const dc = require('diagnostics_channel');
|
||||
const assert = require('assert');
|
||||
const { Channel } = dc;
|
||||
|
||||
const name = 'test';
|
||||
const input = {
|
||||
foo: 'bar'
|
||||
};
|
||||
|
||||
// Individual channel objects can be created to avoid future lookups
|
||||
const channel = dc.channel(name);
|
||||
assert.ok(channel instanceof Channel);
|
||||
|
||||
// No subscribers yet, should not publish
|
||||
assert.ok(!channel.hasSubscribers);
|
||||
|
||||
const subscriber = common.mustCall((message, name) => {
|
||||
assert.strictEqual(name, channel.name);
|
||||
assert.deepStrictEqual(message, input);
|
||||
});
|
||||
|
||||
// Now there's a subscriber, should publish
|
||||
dc.subscribe(name, subscriber);
|
||||
assert.ok(channel.hasSubscribers);
|
||||
|
||||
// The ActiveChannel prototype swap should not fail instanceof
|
||||
assert.ok(channel instanceof Channel);
|
||||
|
||||
// Should trigger the subscriber once
|
||||
channel.publish(input);
|
||||
|
||||
// Should not publish after subscriber is unsubscribed
|
||||
assert.ok(dc.unsubscribe(name, subscriber));
|
||||
assert.ok(!channel.hasSubscribers);
|
||||
|
||||
// unsubscribe() should return false when subscriber is not found
|
||||
assert.ok(!dc.unsubscribe(name, subscriber));
|
||||
|
||||
assert.throws(() => {
|
||||
dc.subscribe(name, null);
|
||||
}, { code: 'ERR_INVALID_ARG_TYPE' });
|
Loading…
x
Reference in New Issue
Block a user