src: implement DataQueue

See documentation in dataqueue/queue.h for details

Co-authored-by: flakey5 <73616808+flakey5@users.noreply.github.com>
PR-URL: https://github.com/nodejs/node/pull/45258
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell 2022-12-17 13:07:42 -08:00
parent 9a82938b82
commit c8cc7e89e6
5 changed files with 1506 additions and 0 deletions

View File

@ -477,6 +477,7 @@
'src/cleanup_queue.cc',
'src/connect_wrap.cc',
'src/connection_wrap.cc',
'src/dataqueue/queue.cc',
'src/debug_utils.cc',
'src/env.cc',
'src/fs_event_wrap.cc',
@ -580,6 +581,7 @@
'src/cleanup_queue-inl.h',
'src/connect_wrap.h',
'src/connection_wrap.h',
'src/dataqueue/queue.h',
'src/debug_utils.h',
'src/debug_utils-inl.h',
'src/env_properties.h',
@ -991,6 +993,7 @@
'test/cctest/test_sockaddr.cc',
'test/cctest/test_traced_value.cc',
'test/cctest/test_util.cc',
'test/cctest/test_dataqueue.cc',
],
'conditions': [

1202
src/dataqueue/queue.cc Normal file

File diff suppressed because it is too large Load Diff

295
src/dataqueue/queue.h Normal file
View File

@ -0,0 +1,295 @@
#pragma once
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include <base_object.h>
#include <node.h>
#include <node_bob.h>
#include <node_file.h>
#include <memory_tracker.h>
#include <stream_base.h>
#include <v8.h>
#include <uv.h>
#include <memory>
#include <vector>
namespace node {
// Represents a sequenced collection of data sources that can be
// consumed as a single logical stream of data. Sources can be
// memory-resident or streaming.
//
// There are two essential kinds of DataQueue:
//
// * Idempotent - Multiple reads always produce the same result.
// This is even the case if individual sources
// are not memory-resident. Reads never change
// the state of the DataQueue. Every entry in
// an Idempotent DataQueue must also be idempotent.
//
// * Non-idempotent - Reads are destructive of the internal state.
// A non-idempotent DataQueue can be read at
// most once and only by a single reader.
// Entries in a non-idempotent DataQueue can
// be a mix of idempotent and non-idempotent
// entries.
//
// The DataQueue is essentially a collection of DataQueue::Entry
// instances. A DataQueue::Entry is a single logical source of
// data. The data may be memory-resident or streaming. The entry
// can be idempotent or non-idempotent. An entry cannot be read
// by itself, it must be part of a DataQueue to be consumed.
//
// Example of creating an idempotent DataQueue:
//
// std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
// std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
//
// std::vector<std::unique_ptr<DataQueue::Entry>> list;
// list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
// store1, 0, len1));
// list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(
// store2, 0, len2));
//
// std::shared_ptr<DataQueue> data_queue =
// DataQueue::CreateIdempotent(std::move(list));
//
// Importantly, idempotent DataQueue's are immutable and all entries
// must be provided when the DataQueue is constructed. Every entry
// must be idempotent with known sizes. The entries may be memory
// resident or streaming. Streaming entries must be capable of
// being read multiple times.
//
// Because idempotent DataQueue's will always produce the same results
// when read, they can be sliced. Slices yield a new DataQueue instance
// that is a subset view over the original:
//
// std::shared_ptr<DataQueue> slice = data_queue.slice(
// 5, v8::Just(10UL));
//
// Example of creating a non-idempotent DataQueue:
//
// std::shared_ptr<v8::BackingStore> store1 = getBackingStoreSomehow();
// std::shared_ptr<v8::BackingStore> store2 = getBackingStoreSomehow();
//
// std::shared_ptr<DataQueue> data_queue = DataQueue::Create();
//
// data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
// store1, 0, len1));
//
// data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(
// store2, 0, len2));
//
// These data-queues can have new entries appended to them. Entries can
// be memory-resident or streaming. Streaming entries might not have
// a known size. Entries may not be capable of being read multiple
// times.
//
// A non-idempotent data queue will, by default, allow any amount of
// entries to be appended to it. To limit the size of the DataQueue,
// or the close the DataQueue (preventing new entries from being
// appending), use the cap() method. The DataQueue can be capped
// at a specific size or whatever size it currently it.
//
// It might not be possible for a non-idempotent DataQueue to provide
// a size because it might not know how much data a streaming entry
// will ultimately provide.
//
// Non-idempotent DataQueues cannot be sliced.
//
// To read from a DataQueue, we use the node::bob::Source API
// (see src/node_bob.h).
//
// std::unique_ptr<DataQueue::Reader> reader = data_queue->getReader();
//
// reader->Pull(
// [](int status, const DataQueue::Vec* vecs, size_t count, Done done) {
// // status is one of node::bob::Status
// // vecs is zero or more data buffers containing the read data
// // count is the number of vecs
// // done is a callback to be invoked when done processing the data
// }, options, nullptr, 0, 16);
//
// Keep calling Pull() until status is equal to node::bob::Status::STATUS_END.
//
// For idempotent DataQueues, any number of readers can be created and
// pull concurrently from the same DataQueue. The DataQueue can be read
// multiple times. Succesful reads should always produce the same result.
// If, for whatever reason, the implementation cannot ensure that the
// data read will remain the same, the read must fail with an error status.
//
// For non-idempotent DataQueues, only a single reader is ever allowed for
// the DataQueue, and the data can only ever be read once.
class DataQueue : public MemoryRetainer {
public:
struct Vec {
uint8_t* base;
size_t len;
};
// A DataQueue::Reader consumes the DataQueue. If the data queue is
// idempotent, multiple Readers can be attached to the DataQueue at
// any given time, all guaranteed to yield the same result when the
// data is read. Otherwise, only a single Reader can be attached.
class Reader : public MemoryRetainer,
public bob::Source<Vec> {
public:
using Next = bob::Next<Vec>;
using Done = bob::Done;
};
// A DataQueue::Entry represents a logical chunk of data in the queue.
// The entry may or may not represent memory-resident data. It may
// or may not be consumable more than once.
class Entry : public MemoryRetainer {
public:
// Returns a new Entry that is a view over this entries data
// from the start offset to the ending offset. If the end
// offset is omitted, the slice extends to the end of the
// data.
//
// Creating a slice is only possible if isIdempotent() returns
// true. This is because consuming either the original entry or
// the new entry would change the state of the other in non-
// deterministic ways. When isIdempotent() returns false, slice()
// must return a nulled unique_ptr.
//
// Creating a slice is also only possible if the size of the
// entry is known. If size() returns v8::Nothing<size_t>, slice()
// must return a nulled unique_ptr.
virtual std::unique_ptr<Entry> slice(
size_t start,
v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0;
// Returns the number of bytes represented by this Entry if it is
// known. Certain types of entries, such as those backed by streams
// might not know the size in advance and therefore cannot provide
// a value. In such cases, size() must return v8::Nothing<size_t>.
//
// If the entry is idempotent, a size should always be available.
virtual v8::Maybe<size_t> size() const = 0;
// When true, multiple reads on the object must produce the exact
// same data or the reads will fail. Some sources of entry data,
// such as streams, may not be capable of preserving idempotency
// and therefore must not claim to be. If an entry claims to be
// idempotent and cannot preserve that quality, subsequent reads
// must fail with an error when a variance is detected.
virtual bool isIdempotent() const = 0;
};
// Creates an idempotent DataQueue with a pre-established collection
// of entries. All of the entries must also be idempotent otherwise
// an empty std::unique_ptr will be returned.
static std::shared_ptr<DataQueue> CreateIdempotent(
std::vector<std::unique_ptr<Entry>> list);
// Creates a non-idempotent DataQueue. This kind of queue can be
// mutated and updated such that multiple reads are not guaranteed
// to produce the same result. The entries added can be of any type.
static std::shared_ptr<DataQueue> Create(
v8::Maybe<size_t> capped = v8::Nothing<size_t>());
// Creates an idempotent Entry from a v8::ArrayBufferView. To help
// ensure idempotency, the underlying ArrayBuffer is detached from
// the BackingStore. It is the callers responsibility to ensure that
// the BackingStore is not otherwise modified through any other
// means. If the ArrayBuffer is not detachable, nullptr will be
// returned.
static std::unique_ptr<Entry> CreateInMemoryEntryFromView(
v8::Local<v8::ArrayBufferView> view);
// Creates an idempotent Entry from a v8::BackingStore. It is the
// callers responsibility to ensure that the BackingStore is not
// otherwise modified through any other means. If the ArrayBuffer
// is not detachable, nullptr will be returned.
static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore(
std::shared_ptr<v8::BackingStore> store,
size_t offset,
size_t length);
static std::unique_ptr<Entry> CreateDataQueueEntry(
std::shared_ptr<DataQueue> data_queue);
static std::unique_ptr<Entry> CreateFdEntry(
BaseObjectPtr<fs::FileHandle> handle);
// Creates a Reader for the given queue. If the queue is idempotent,
// any number of readers can be created, all of which are guaranteed
// to provide the same data. Otherwise, only a single reader is
// permitted.
virtual std::unique_ptr<Reader> getReader() = 0;
// Append a single new entry to the queue. Appending is only allowed
// when isIdempotent() is false. v8::Nothing<bool>() will be returned
// if isIdempotent() is true. v8::Just(false) will be returned if the
// data queue is not idempotent but the entry otherwise cannot be added.
virtual v8::Maybe<bool> append(std::unique_ptr<Entry> entry) = 0;
// Caps the size of this DataQueue preventing additional entries to
// be added if those cause the size to extend beyond the specified
// limit.
//
// If limit is zero, or is less than the known current size of the
// data queue, the limit is set to the current known size, meaning
// that no additional entries can be added at all.
//
// If the size of the data queue is not known, the limit will be
// ignored and no additional entries will be allowed at all.
//
// If isIdempotent is true capping is unnecessary because the data
// queue cannot be appended to. In that case, cap() is a non-op.
//
// If the data queue has already been capped, cap can be called
// again with a smaller size.
virtual void cap(size_t limit = 0) = 0;
// Returns a new DataQueue that is a view over this queues data
// from the start offset to the ending offset. If the end offset
// is omitted, the slice extends to the end of the data.
//
// The slice will coverage a range from start up to, but excluding, end.
//
// Creating a slice is only possible if isIdempotent() returns
// true. This is because consuming either the original DataQueue or
// the new queue would change the state of the other in non-
// deterministic ways. When isIdempotent() returns false, slice()
// must return a nulled unique_ptr.
//
// Creating a slice is also only possible if the size of the
// DataQueue is known. If size() returns v8::Nothing<size_t>, slice()
// must return a null unique_ptr.
virtual std::shared_ptr<DataQueue> slice(
size_t start,
v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0;
// The size of DataQueue is the total size of all of its member entries.
// If any of the entries is not able to specify a size, the DataQueue
// will also be incapable of doing so, in which case size() must return
// v8::Nothing<size_t>.
virtual v8::Maybe<size_t> size() const = 0;
// A DataQueue is idempotent only if all of its member entries are
// idempotent.
virtual bool isIdempotent() const = 0;
// True only if cap is called or the data queue is a limited to a
// fixed size.
virtual bool isCapped() const = 0;
// If the data queue has been capped, and the size of the data queue
// is known, maybeCapRemaining will return the number of additional
// bytes the data queue can receive before reaching the cap limit.
// If the size of the queue cannot be known, or the cap has not
// been set, maybeCapRemaining() will return v8::Nothing<size_t>.
virtual v8::Maybe<size_t> maybeCapRemaining() const = 0;
static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
};
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

View File

@ -10,6 +10,10 @@ constexpr size_t kMaxCountHint = 16;
// Negative status codes indicate error conditions.
enum Status : int {
// Indicates that there was an error while pulling.
// Should be treated similar to STATUS_EOS
STATUS_FAILED = -2,
// Indicates that an attempt was made to pull after end.
STATUS_EOS = -1,
@ -72,6 +76,7 @@ using Next = std::function<void(int, const T*, size_t count, Done done)>;
template <typename T>
class Source {
public:
virtual ~Source() = default;
virtual int Pull(
Next<T> next,
int options,

View File

@ -156,6 +156,7 @@ ERRORS_WITH_CODE(V)
"Context not associated with Node.js environment") \
V(ERR_INVALID_ADDRESS, "Invalid socket address") \
V(ERR_INVALID_MODULE, "No such module") \
V(ERR_INVALID_STATE, "Invalid state") \
V(ERR_INVALID_THIS, "Value of \"this\" is the wrong type") \
V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \