ruby/ractor_sync.c
Koichi Sasada ef2bb61018 Ractor::Port
* Added `Ractor::Port`
  * `Ractor::Port#receive` (support multi-threads)
  * `Rcator::Port#close`
  * `Ractor::Port#closed?`
* Added some methods
  * `Ractor#join`
  * `Ractor#value`
  * `Ractor#monitor`
  * `Ractor#unmonitor`
* Removed some methods
  * `Ractor#take`
  * `Ractor.yield`
* Change the spec
  * `Racotr.select`

You can wait for multiple sequences of messages with `Ractor::Port`.

```ruby
ports = 3.times.map{ Ractor::Port.new }
ports.map.with_index do |port, ri|
  Ractor.new port,ri do |port, ri|
    3.times{|i| port << "r#{ri}-#{i}"}
  end
end

p ports.each{|port| pp 3.times.map{port.receive}}

```

In this example, we use 3 ports, and 3 Ractors send messages to them respectively.
We can receive a series of messages from each port.

You can use `Ractor#value` to get the last value of a Ractor's block:

```ruby
result = Ractor.new do
  heavy_task()
end.value
```

You can wait for the termination of a Ractor with `Ractor#join` like this:

```ruby
Ractor.new do
  some_task()
end.join
```

`#value` and `#join` are similar to `Thread#value` and `Thread#join`.

To implement `#join`, `Ractor#monitor` (and `Ractor#unmonitor`) is introduced.

This commit changes `Ractor.select()` method.
It now only accepts ports or Ractors, and returns when a port receives a message or a Ractor terminates.

We removes `Ractor.yield` and `Ractor#take` because:
* `Ractor::Port` supports most of similar use cases in a simpler manner.
* Removing them significantly simplifies the code.

We also change the internal thread scheduler code (thread_pthread.c):
* During barrier synchronization, we keep the `ractor_sched` lock to avoid deadlocks.
  This lock is released by `rb_ractor_sched_barrier_end()`
  which is called at the end of operations that require the barrier.
* fix potential deadlock issues by checking interrupts just before setting UBF.

https://bugs.ruby-lang.org/issues/21262
2025-05-31 04:01:33 +09:00

1490 lines
34 KiB
C

// this file is included by ractor.c
struct ractor_port {
rb_ractor_t *r;
st_data_t id_;
};
static st_data_t
ractor_port_id(const struct ractor_port *rp)
{
return rp->id_;
}
static VALUE rb_cRactorPort;
static VALUE ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp);
static VALUE ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move);
static VALUE ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move);
static void ractor_add_port(rb_ractor_t *r, st_data_t id);
static void
ractor_port_mark(void *ptr)
{
const struct ractor_port *rp = (struct ractor_port *)ptr;
if (rp->r) {
rb_gc_mark(rp->r->pub.self);
}
}
static void
ractor_port_free(void *ptr)
{
xfree(ptr);
}
static size_t
ractor_port_memsize(const void *ptr)
{
return sizeof(struct ractor_port);
}
static const rb_data_type_t ractor_port_data_type = {
"ractor/port",
{
ractor_port_mark,
ractor_port_free,
ractor_port_memsize,
NULL, // update
},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
};
static st_data_t
ractor_genid_for_port(rb_ractor_t *cr)
{
// TODO: enough?
return cr->sync.next_port_id++;
}
static struct ractor_port *
RACTOR_PORT_PTR(VALUE self)
{
VM_ASSERT(rb_typeddata_is_kind_of(self, &ractor_port_data_type));
struct ractor_port *rp = DATA_PTR(self);
return rp;
}
static VALUE
ractor_port_alloc(VALUE klass)
{
struct ractor_port *rp;
VALUE rpv = TypedData_Make_Struct(klass, struct ractor_port, &ractor_port_data_type, rp);
return rpv;
}
static VALUE
ractor_port_init(VALUE rpv, rb_ractor_t *r)
{
struct ractor_port *rp = RACTOR_PORT_PTR(rpv);
rp->r = r;
rp->id_ = ractor_genid_for_port(r);
ractor_add_port(r, ractor_port_id(rp));
rb_obj_freeze(rpv);
return rpv;
}
static VALUE
ractor_port_initialzie(VALUE self)
{
return ractor_port_init(self, GET_RACTOR());
}
static VALUE
ractor_port_initialzie_copy(VALUE self, VALUE orig)
{
struct ractor_port *dst = RACTOR_PORT_PTR(self);
struct ractor_port *src = RACTOR_PORT_PTR(orig);
dst->r = src->r;
dst->id_ = ractor_port_id(src);
return self;
}
static VALUE
ractor_port_new(rb_ractor_t *r)
{
VALUE rpv = ractor_port_alloc(rb_cRactorPort);
ractor_port_init(rpv, r);
return rpv;
}
static bool
ractor_port_p(VALUE self)
{
return rb_typeddata_is_kind_of(self, &ractor_port_data_type);
}
static VALUE
ractor_port_receive(rb_execution_context_t *ec, VALUE self)
{
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
if (rp->r != rb_ec_ractor_ptr(ec)) {
rb_raise(rb_eRactorError, "only allowed from the creator Ractor of this port");
}
return ractor_receive(ec, rp);
}
static VALUE
ractor_port_send(rb_execution_context_t *ec, VALUE self, VALUE obj, VALUE move)
{
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
ractor_send(ec, rp, obj, RTEST(move));
return self;
}
static bool ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp);
static bool ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp);
static VALUE
ractor_port_closed_p(rb_execution_context_t *ec, VALUE self)
{
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
if (ractor_closed_port_p(ec, rp->r, rp)) {
return Qtrue;
}
else {
return Qfalse;
}
}
static VALUE
ractor_port_close(rb_execution_context_t *ec, VALUE self)
{
const struct ractor_port *rp = RACTOR_PORT_PTR(self);
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
if (cr != rp->r) {
rb_raise(rb_eRactorError, "closing port by other ractors is not allowed");
}
ractor_close_port(ec, cr, rp);
return self;
}
// ractor-internal
// ractor-internal - ractor_basket
enum ractor_basket_type {
// basket is empty
basket_type_none,
// value is available
basket_type_ref,
basket_type_copy,
basket_type_move,
};
struct ractor_basket {
enum ractor_basket_type type;
VALUE sender;
st_data_t port_id;
struct {
VALUE v;
bool exception;
} p; // payload
struct ccan_list_node node;
};
#if 0
static inline bool
ractor_basket_type_p(const struct ractor_basket *b, enum ractor_basket_type type)
{
return b->type == type;
}
static inline bool
ractor_basket_none_p(const struct ractor_basket *b)
{
return ractor_basket_type_p(b, basket_type_none);
}
#endif
static void
ractor_basket_mark(const struct ractor_basket *b)
{
rb_gc_mark(b->p.v);
}
static void
ractor_basket_free(struct ractor_basket *b)
{
xfree(b);
}
static struct ractor_basket *
ractor_basket_alloc(void)
{
struct ractor_basket *b = ALLOC(struct ractor_basket);
return b;
}
// ractor-internal - ractor_queue
struct ractor_queue {
struct ccan_list_head set;
bool closed;
};
static void
ractor_queue_init(struct ractor_queue *rq)
{
ccan_list_head_init(&rq->set);
rq->closed = false;
}
static struct ractor_queue *
ractor_queue_new(void)
{
struct ractor_queue *rq = ALLOC(struct ractor_queue);
ractor_queue_init(rq);
return rq;
}
static void
ractor_queue_mark(const struct ractor_queue *rq)
{
const struct ractor_basket *b;
ccan_list_for_each(&rq->set, b, node) {
ractor_basket_mark(b);
}
}
static void
ractor_queue_free(struct ractor_queue *rq)
{
struct ractor_basket *b, *nxt;
ccan_list_for_each_safe(&rq->set, b, nxt, node) {
ccan_list_del_init(&b->node);
ractor_basket_free(b);
}
VM_ASSERT(ccan_list_empty(&rq->set));
xfree(rq);
}
RBIMPL_ATTR_MAYBE_UNUSED()
static size_t
ractor_queue_size(const struct ractor_queue *rq)
{
size_t size = 0;
const struct ractor_basket *b;
ccan_list_for_each(&rq->set, b, node) {
size++;
}
return size;
}
static void
ractor_queue_close(struct ractor_queue *rq)
{
rq->closed = true;
}
static void
ractor_queue_move(struct ractor_queue *dst_rq, struct ractor_queue *src_rq)
{
struct ccan_list_head *src = &src_rq->set;
struct ccan_list_head *dst = &dst_rq->set;
dst->n.next = src->n.next;
dst->n.prev = src->n.prev;
dst->n.next->prev = &dst->n;
dst->n.prev->next = &dst->n;
ccan_list_head_init(src);
}
#if 0
static struct ractor_basket *
ractor_queue_head(rb_ractor_t *r, struct ractor_queue *rq)
{
return ccan_list_top(&rq->set, struct ractor_basket, node);
}
#endif
static bool
ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq)
{
return ccan_list_empty(&rq->set);
}
static struct ractor_basket *
ractor_queue_deq(rb_ractor_t *r, struct ractor_queue *rq)
{
VM_ASSERT(GET_RACTOR() == r);
return ccan_list_pop(&rq->set, struct ractor_basket, node);
}
static void
ractor_queue_enq(rb_ractor_t *r, struct ractor_queue *rq, struct ractor_basket *basket)
{
ccan_list_add_tail(&rq->set, &basket->node);
}
#if 0
static void
rq_dump(const struct ractor_queue *rq)
{
int i=0;
struct ractor_basket *b;
ccan_list_for_each(&rq->set, b, node) {
fprintf(stderr, "%d type:%s %p\n", i, basket_type_name(b->type), (void *)b);
i++;
}
}
#endif
static void ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked);
static struct ractor_queue *
ractor_get_queue(rb_ractor_t *cr, st_data_t id, bool locked)
{
VM_ASSERT(cr == GET_RACTOR());
struct ractor_queue *rq;
if (cr->sync.ports && st_lookup(cr->sync.ports, id, (st_data_t *)&rq)) {
if (rq->closed && ractor_queue_empty_p(cr, rq)) {
ractor_delete_port(cr, id, locked);
return NULL;
}
else {
return rq;
}
}
else {
return NULL;
}
}
// ractor-internal - ports
static void
ractor_add_port(rb_ractor_t *r, st_data_t id)
{
struct ractor_queue *rq = ractor_queue_new();
ASSERT_ractor_unlocking(r);
RUBY_DEBUG_LOG("id:%u", (unsigned int)id);
RACTOR_LOCK(r);
{
// memo: can cause GC, but GC doesn't use ractor locking.
st_insert(r->sync.ports, id, (st_data_t)rq);
}
RACTOR_UNLOCK(r);
}
static void
ractor_delete_port_locked(rb_ractor_t *cr, st_data_t id)
{
ASSERT_ractor_locking(cr);
RUBY_DEBUG_LOG("id:%u", (unsigned int)id);
struct ractor_queue *rq;
if (st_delete(cr->sync.ports, &id, (st_data_t *)&rq)) {
ractor_queue_free(rq);
}
else {
VM_ASSERT(0);
}
}
static void
ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked)
{
if (locked) {
ractor_delete_port_locked(cr, id);
}
else {
RACTOR_LOCK_SELF(cr);
{
ractor_delete_port_locked(cr, id);
}
RACTOR_UNLOCK_SELF(cr);
}
}
static const struct ractor_port *
ractor_default_port(rb_ractor_t *r)
{
return RACTOR_PORT_PTR(r->sync.default_port_value);
}
static VALUE
ractor_default_port_value(rb_ractor_t *r)
{
return r->sync.default_port_value;
}
static bool
ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp)
{
VM_ASSERT(rb_ec_ractor_ptr(ec) == rp->r ? 1 : (ASSERT_ractor_locking(rp->r), 1));
const struct ractor_queue *rq;
if (rp->r->sync.ports && st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) {
return rq->closed;
}
else {
return true;
}
}
static void ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr);
static bool ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq);
static bool
ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp)
{
VM_ASSERT(cr == rp->r);
struct ractor_queue *rq = NULL;
RACTOR_LOCK_SELF(cr);
{
ractor_deliver_incoming_messages(ec, cr); // check incoming messages
if (st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) {
ractor_queue_close(rq);
if (ractor_queue_empty_p(cr, rq)) {
// delete from the table
ractor_delete_port(cr, ractor_port_id(rp), true);
}
// TODO: free rq
}
}
RACTOR_UNLOCK_SELF(cr);
return rq != NULL;
}
static int
ractor_free_all_ports_i(st_data_t port_id, st_data_t val, st_data_t dat)
{
struct ractor_queue *rq = (struct ractor_queue *)val;
// rb_ractor_t *cr = (rb_ractor_t *)dat;
ractor_queue_free(rq);
return ST_CONTINUE;
}
static void
ractor_free_all_ports(rb_ractor_t *cr)
{
if (cr->sync.ports) {
st_foreach(cr->sync.ports, ractor_free_all_ports_i, (st_data_t)cr);
st_free_table(cr->sync.ports);
cr->sync.ports = NULL;
}
if (cr->sync.recv_queue) {
ractor_queue_free(cr->sync.recv_queue);
cr->sync.recv_queue = NULL;
}
}
static void
ractor_sync_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r)
{
ractor_free_all_ports(r);
r->sync.legacy = Qnil;
}
// Ractor#monitor
struct ractor_monitor {
struct ractor_port port;
struct ccan_list_node node;
};
static void
ractor_mark_monitors(rb_ractor_t *r)
{
const struct ractor_monitor *rm;
ccan_list_for_each(&r->sync.monitors, rm, node) {
rb_gc_mark(rm->port.r->pub.self);
}
}
static VALUE
ractor_exit_token(bool exc)
{
if (exc) {
RUBY_DEBUG_LOG("aborted");
return ID2SYM(rb_intern("aborted"));
}
else {
RUBY_DEBUG_LOG("exited");
return ID2SYM(rb_intern("exited"));
}
}
static VALUE
ractor_monitor(rb_execution_context_t *ec, VALUE self, VALUE port)
{
rb_ractor_t *r = RACTOR_PTR(self);
bool terminated = false;
const struct ractor_port *rp = RACTOR_PORT_PTR(port);
struct ractor_monitor *rm = ALLOC(struct ractor_monitor);
rm->port = *rp; // copy port information
RACTOR_LOCK(r);
{
if (UNDEF_P(r->sync.legacy)) { // not terminated
RUBY_DEBUG_LOG("OK/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
ccan_list_add_tail(&r->sync.monitors, &rm->node);
}
else {
RUBY_DEBUG_LOG("NG/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
terminated = true;
}
}
RACTOR_UNLOCK(r);
if (terminated) {
xfree(rm);
ractor_port_send(ec, port, ractor_exit_token(r->sync.legacy_exc), Qfalse);
return Qfalse;
}
else {
return Qtrue;
}
}
static VALUE
ractor_unmonitor(rb_execution_context_t *ec, VALUE self, VALUE port)
{
rb_ractor_t *r = RACTOR_PTR(self);
const struct ractor_port *rp = RACTOR_PORT_PTR(port);
RACTOR_LOCK(r);
{
if (UNDEF_P(r->sync.legacy)) { // not terminated
struct ractor_monitor *rm, *nxt;
ccan_list_for_each_safe(&r->sync.monitors, rm, nxt, node) {
if (ractor_port_id(&rm->port) == ractor_port_id(rp)) {
RUBY_DEBUG_LOG("r:%u -> port:%u@r%u",
(unsigned int)rb_ractor_id(r),
(unsigned int)ractor_port_id(&rm->port),
(unsigned int)rb_ractor_id(rm->port.r));
ccan_list_del(&rm->node);
xfree(rm);
}
}
}
}
RACTOR_UNLOCK(r);
return self;
}
static void
ractor_notify_exit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE legacy, bool exc)
{
RUBY_DEBUG_LOG("exc:%d", exc);
VM_ASSERT(!UNDEF_P(legacy));
VM_ASSERT(cr->sync.legacy == Qundef);
RACTOR_LOCK_SELF(cr);
{
ractor_free_all_ports(cr);
cr->sync.legacy = legacy;
cr->sync.legacy_exc = exc;
}
RACTOR_UNLOCK_SELF(cr);
// send token
VALUE token = ractor_exit_token(exc);
struct ractor_monitor *rm, *nxt;
ccan_list_for_each_safe(&cr->sync.monitors, rm, nxt, node)
{
RUBY_DEBUG_LOG("port:%u@r%u", (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
ractor_try_send(ec, &rm->port, token, false);
ccan_list_del(&rm->node);
xfree(rm);
}
VM_ASSERT(ccan_list_empty(&cr->sync.monitors));
}
// ractor-internal - initialize, mark, free, memsize
static int
ractor_mark_ports_i(st_data_t key, st_data_t val, st_data_t data)
{
// id -> ractor_queue
const struct ractor_queue *rq = (struct ractor_queue *)val;
ractor_queue_mark(rq);
return ST_CONTINUE;
}
static void
ractor_sync_mark(rb_ractor_t *r)
{
rb_gc_mark(r->sync.default_port_value);
if (r->sync.ports) {
ractor_queue_mark(r->sync.recv_queue);
st_foreach(r->sync.ports, ractor_mark_ports_i, 0);
}
ractor_mark_monitors(r);
}
static void
ractor_sync_free(rb_ractor_t *r)
{
// maybe NULL
if (r->sync.ports) {
st_free_table(r->sync.ports);
r->sync.ports = NULL;
}
}
static size_t
ractor_sync_memsize(const rb_ractor_t *r)
{
return st_table_size(r->sync.ports);
}
static void
ractor_sync_init(rb_ractor_t *r)
{
// lock
rb_native_mutex_initialize(&r->sync.lock);
// monitors
ccan_list_head_init(&r->sync.monitors);
// waiters
ccan_list_head_init(&r->sync.waiters);
// receiving queue
r->sync.recv_queue = ractor_queue_new();
// ports
r->sync.ports = st_init_numtable();
r->sync.default_port_value = ractor_port_new(r);
FL_SET_RAW(r->sync.default_port_value, RUBY_FL_SHAREABLE); // only default ports are shareable
// legacy
r->sync.legacy = Qundef;
#ifndef RUBY_THREAD_PTHREAD_H
rb_native_cond_initialize(&r->sync.wakeup_cond);
#endif
}
// Ractor#value
static rb_ractor_t *
ractor_set_successor_once(rb_ractor_t *r, rb_ractor_t *cr)
{
if (r->sync.successor == NULL) {
RACTOR_LOCK(r);
{
if (r->sync.successor != NULL) {
// already `value`ed
}
else {
r->sync.successor = cr;
}
}
RACTOR_UNLOCK(r);
}
VM_ASSERT(r->sync.successor != NULL);
return r->sync.successor;
}
static VALUE ractor_reset_belonging(VALUE obj);
static VALUE
ractor_make_remote_exception(VALUE cause, VALUE sender)
{
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
rb_ivar_set(err, rb_intern("@ractor"), sender);
rb_ec_setup_exception(NULL, err, cause);
return err;
}
static VALUE
ractor_value(rb_execution_context_t *ec, VALUE self)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
rb_ractor_t *r = RACTOR_PTR(self);
rb_ractor_t *sr = ractor_set_successor_once(r, cr);
if (sr == cr) {
ractor_reset_belonging(r->sync.legacy);
if (r->sync.legacy_exc) {
rb_exc_raise(ractor_make_remote_exception(r->sync.legacy, self));
}
return r->sync.legacy;
}
else {
rb_raise(rb_eRactorError, "Only the successor ractor can take a value");
}
}
static VALUE ractor_move(VALUE obj); // in this file
static VALUE ractor_copy(VALUE obj); // in this file
static VALUE
ractor_prepare_payload(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type *ptype)
{
switch (*ptype) {
case basket_type_ref:
return obj;
case basket_type_move:
return ractor_move(obj);
default:
if (rb_ractor_shareable_p(obj)) {
*ptype = basket_type_ref;
return obj;
}
else {
*ptype = basket_type_copy;
return ractor_copy(obj);
}
}
}
static struct ractor_basket *
ractor_basket_new(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type type, bool exc)
{
VALUE v = ractor_prepare_payload(ec, obj, &type);
struct ractor_basket *b = ractor_basket_alloc();
b->type = type;
b->p.v = v;
b->p.exception = exc;
return b;
}
static VALUE
ractor_basket_value(struct ractor_basket *b)
{
switch (b->type) {
case basket_type_ref:
break;
case basket_type_copy:
case basket_type_move:
ractor_reset_belonging(b->p.v);
break;
default:
VM_ASSERT(0); // unreachable
}
VM_ASSERT(!RB_TYPE_P(b->p.v, T_NONE));
return b->p.v;
}
static VALUE
ractor_basket_accept(struct ractor_basket *b)
{
VALUE v = ractor_basket_value(b);
if (b->p.exception) {
VALUE err = ractor_make_remote_exception(v, b->sender);
ractor_basket_free(b);
rb_exc_raise(err);
}
ractor_basket_free(b);
return v;
}
// Ractor blocking by receive
enum ractor_wakeup_status {
wakeup_none,
wakeup_by_send,
wakeup_by_interrupt,
// wakeup_by_close,
};
struct ractor_waiter {
enum ractor_wakeup_status wakeup_status;
rb_thread_t *th;
struct ccan_list_node node;
};
#if VM_CHECK_MODE > 0
static bool
ractor_waiter_included(rb_ractor_t *cr, rb_thread_t *th)
{
ASSERT_ractor_locking(cr);
struct ractor_waiter *w;
ccan_list_for_each(&cr->sync.waiters, w, node) {
if (w->th == th) {
return true;
}
}
return false;
}
#endif
#if USE_RUBY_DEBUG_LOG
static const char *
wakeup_status_str(enum ractor_wakeup_status wakeup_status)
{
switch (wakeup_status) {
case wakeup_none: return "none";
case wakeup_by_send: return "by_send";
case wakeup_by_interrupt: return "by_interrupt";
// case wakeup_by_close: return "by_close";
}
rb_bug("unreachable");
}
static const char *
basket_type_name(enum ractor_basket_type type)
{
switch (type) {
case basket_type_none: return "none";
case basket_type_ref: return "ref";
case basket_type_copy: return "copy";
case basket_type_move: return "move";
}
VM_ASSERT(0);
return NULL;
}
#endif // USE_RUBY_DEBUG_LOG
#ifdef RUBY_THREAD_PTHREAD_H
//
#else // win32
static void
ractor_cond_wait(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
VALUE locked_by = r->sync.locked_by;
r->sync.locked_by = Qnil;
#endif
rb_native_cond_wait(&r->sync.wakeup_cond, &r->sync.lock);
#if RACTOR_CHECK_MODE > 0
r->sync.locked_by = locked_by;
#endif
}
static void *
ractor_wait_no_gvl(void *ptr)
{
struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
rb_ractor_t *cr = waiter->th->ractor;
RACTOR_LOCK_SELF(cr);
{
if (waiter->wakeup_status == wakeup_none) {
ractor_cond_wait(cr);
}
}
RACTOR_UNLOCK_SELF(cr);
return NULL;
}
static void
rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ptr)
{
struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
RACTOR_UNLOCK(cr);
{
rb_nogvl(ractor_wait_no_gvl, waiter,
ubf, waiter,
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
}
RACTOR_LOCK(cr);
}
static void
rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th)
{
// ractor lock is acquired
rb_native_cond_broadcast(&r->sync.wakeup_cond);
}
#endif
static bool
ractor_wakeup_all(rb_ractor_t *r, enum ractor_wakeup_status wakeup_status)
{
ASSERT_ractor_unlocking(r);
RUBY_DEBUG_LOG("r:%u wakeup:%s", rb_ractor_id(r), wakeup_status_str(wakeup_status));
bool wakeup_p = false;
RACTOR_LOCK(r);
while (1) {
struct ractor_waiter *waiter = ccan_list_pop(&r->sync.waiters, struct ractor_waiter, node);
if (waiter) {
VM_ASSERT(waiter->wakeup_status == wakeup_none);
waiter->wakeup_status = wakeup_status;
rb_ractor_sched_wakeup(r, waiter->th);
wakeup_p = true;
}
else {
break;
}
}
RACTOR_UNLOCK(r);
return wakeup_p;
}
static void
ubf_ractor_wait(void *ptr)
{
struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
rb_thread_t *th = waiter->th;
rb_ractor_t *r = th->ractor;
// clear ubf and nobody can kick UBF
th->unblock.func = NULL;
th->unblock.arg = NULL;
rb_native_mutex_unlock(&th->interrupt_lock);
{
RACTOR_LOCK(r);
{
if (waiter->wakeup_status == wakeup_none) {
RUBY_DEBUG_LOG("waiter:%p", (void *)waiter);
waiter->wakeup_status = wakeup_by_interrupt;
ccan_list_del(&waiter->node);
rb_ractor_sched_wakeup(r, waiter->th);
}
}
RACTOR_UNLOCK(r);
}
rb_native_mutex_lock(&th->interrupt_lock);
}
static enum ractor_wakeup_status
ractor_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
{
rb_thread_t *th = rb_ec_thread_ptr(ec);
struct ractor_waiter waiter = {
.wakeup_status = wakeup_none,
.th = th,
};
RUBY_DEBUG_LOG("wait%s", "");
ASSERT_ractor_locking(cr);
VM_ASSERT(GET_RACTOR() == cr);
VM_ASSERT(!ractor_waiter_included(cr, th));
ccan_list_add_tail(&cr->sync.waiters, &waiter.node);
// resume another ready thread and wait for an event
rb_ractor_sched_wait(ec, cr, ubf_ractor_wait, &waiter);
if (waiter.wakeup_status == wakeup_none) {
ccan_list_del(&waiter.node);
}
RUBY_DEBUG_LOG("wakeup_status:%s", wakeup_status_str(waiter.wakeup_status));
RACTOR_UNLOCK_SELF(cr);
{
rb_ec_check_ints(ec);
}
RACTOR_LOCK_SELF(cr);
VM_ASSERT(!ractor_waiter_included(cr, th));
return waiter.wakeup_status;
}
static void
ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr)
{
ASSERT_ractor_locking(cr);
struct ractor_queue *recv_q = cr->sync.recv_queue;
struct ractor_basket *b;
while ((b = ractor_queue_deq(cr, recv_q)) != NULL) {
ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, true), b);
}
}
static bool
ractor_check_received(rb_ractor_t *cr, struct ractor_queue *messages)
{
struct ractor_queue *received_queue = cr->sync.recv_queue;
bool received = false;
ASSERT_ractor_locking(cr);
if (ractor_queue_empty_p(cr, received_queue)) {
RUBY_DEBUG_LOG("empty");
}
else {
received = true;
// messages <- incoming
ractor_queue_init(messages);
ractor_queue_move(messages, received_queue);
}
VM_ASSERT(ractor_queue_empty_p(cr, received_queue));
RUBY_DEBUG_LOG("received:%d", received);
return received;
}
static void
ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
{
struct ractor_queue messages;
bool deliverred = false;
RACTOR_LOCK_SELF(cr);
{
if (ractor_check_received(cr, &messages)) {
deliverred = true;
}
else {
ractor_wait(ec, cr);
}
}
RACTOR_UNLOCK_SELF(cr);
if (deliverred) {
VM_ASSERT(!ractor_queue_empty_p(cr, &messages));
struct ractor_basket *b;
while ((b = ractor_queue_deq(cr, &messages)) != NULL) {
ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, false), b);
}
}
}
static VALUE
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp)
{
struct ractor_queue *rq = ractor_get_queue(cr, ractor_port_id(rp), false);
if (rq == NULL) {
rb_raise(rb_eRactorClosedError, "The port was already closed");
}
struct ractor_basket *b = ractor_queue_deq(cr, rq);
if (rq->closed && ractor_queue_empty_p(cr, rq)) {
ractor_delete_port(cr, ractor_port_id(rp), false);
}
if (b) {
return ractor_basket_accept(b);
}
else {
return Qundef;
}
}
static VALUE
ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
VM_ASSERT(cr == rp->r);
RUBY_DEBUG_LOG("port:%u", (unsigned int)ractor_port_id(rp));
while (1) {
VALUE v = ractor_try_receive(ec, cr, rp);
if (v != Qundef) {
return v;
}
else {
ractor_wait_receive(ec, cr);
}
}
}
// Ractor#send
static void
ractor_send_basket(rb_execution_context_t *ec, const struct ractor_port *rp, struct ractor_basket *b, bool raise_on_error)
{
bool closed = false;
RUBY_DEBUG_LOG("port:%u@r%u b:%s v:%p", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r), basket_type_name(b->type), (void *)b->p.v);
RACTOR_LOCK(rp->r);
{
if (ractor_closed_port_p(ec, rp->r, rp)) {
closed = true;
}
else {
b->port_id = ractor_port_id(rp);
ractor_queue_enq(rp->r, rp->r->sync.recv_queue, b);
}
}
RACTOR_UNLOCK(rp->r);
// NOTE: ref r -> b->p.v is created, but Ractor is unprotected object, so no problem on that.
if (!closed) {
ractor_wakeup_all(rp->r, wakeup_by_send);
}
else {
RUBY_DEBUG_LOG("closed:%u@r%u", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r));
if (raise_on_error) {
rb_raise(rb_eRactorClosedError, "The port was already closed");
}
}
}
static VALUE
ractor_send0(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move, bool raise_on_error)
{
struct ractor_basket *b = ractor_basket_new(ec, obj, RTEST(move) ? basket_type_move : basket_type_none, false);
ractor_send_basket(ec, rp, b, raise_on_error);
RB_GC_GUARD(obj);
return rp->r->pub.self;
}
static VALUE
ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move)
{
return ractor_send0(ec, rp, obj, move, true);
}
static VALUE
ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move)
{
return ractor_send0(ec, rp, obj, move, false);
}
// Ractor::Selector
struct ractor_selector {
rb_ractor_t *r;
struct st_table *ports; // rpv -> rp
};
static int
ractor_selector_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
{
rb_gc_mark((VALUE)key); // rpv
return ST_CONTINUE;
}
static void
ractor_selector_mark(void *ptr)
{
struct ractor_selector *s = ptr;
if (s->ports) {
st_foreach(s->ports, ractor_selector_mark_i, 0);
}
}
static void
ractor_selector_free(void *ptr)
{
struct ractor_selector *s = ptr;
st_free_table(s->ports);
ruby_xfree(ptr);
}
static size_t
ractor_selector_memsize(const void *ptr)
{
const struct ractor_selector *s = ptr;
return sizeof(struct ractor_selector) + st_memsize(s->ports);
}
static const rb_data_type_t ractor_selector_data_type = {
"ractor/selector",
{
ractor_selector_mark,
ractor_selector_free,
ractor_selector_memsize,
NULL, // update
},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY,
};
static struct ractor_selector *
RACTOR_SELECTOR_PTR(VALUE selv)
{
VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
return (struct ractor_selector *)DATA_PTR(selv);
}
// Ractor::Selector.new
static VALUE
ractor_selector_create(VALUE klass)
{
struct ractor_selector *s;
VALUE selv = TypedData_Make_Struct(klass, struct ractor_selector, &ractor_selector_data_type, s);
s->ports = st_init_numtable(); // TODO
return selv;
}
// Ractor::Selector#add(r)
/*
* call-seq:
* add(ractor) -> ractor
*
* Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added.
* Returns _ractor_.
*/
static VALUE
ractor_selector_add(VALUE selv, VALUE rpv)
{
if (!ractor_port_p(rpv)) {
rb_raise(rb_eArgError, "Not a Ractor::Port object");
}
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
const struct ractor_port *rp = RACTOR_PORT_PTR(rpv);
if (st_lookup(s->ports, (st_data_t)rpv, NULL)) {
rb_raise(rb_eArgError, "already added");
}
st_insert(s->ports, (st_data_t)rpv, (st_data_t)rp);
return selv;
}
// Ractor::Selector#remove(r)
/* call-seq:
* remove(ractor) -> ractor
*
* Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added.
* Returns the removed _ractor_.
*/
static VALUE
ractor_selector_remove(VALUE selv, VALUE rpv)
{
if (!ractor_port_p(rpv)) {
rb_raise(rb_eArgError, "Not a Ractor::Port object");
}
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
if (!st_lookup(s->ports, (st_data_t)rpv, NULL)) {
rb_raise(rb_eArgError, "not added yet");
}
st_delete(s->ports, (st_data_t *)&rpv, NULL);
return selv;
}
// Ractor::Selector#clear
/*
* call-seq:
* clear -> self
*
* Removes all ractors from +self+. Raises +self+.
*/
static VALUE
ractor_selector_clear(VALUE selv)
{
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
st_clear(s->ports);
return selv;
}
/*
* call-seq:
* empty? -> true or false
*
* Returns +true+ if no ractor is added.
*/
static VALUE
ractor_selector_empty_p(VALUE selv)
{
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
return s->ports->num_entries == 0 ? Qtrue : Qfalse;
}
// Ractor::Selector#wait
struct ractor_selector_wait_data {
rb_ractor_t *cr;
rb_execution_context_t *ec;
bool found;
VALUE v;
VALUE rpv;
};
static int
ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t data)
{
struct ractor_selector_wait_data *p = (struct ractor_selector_wait_data *)data;
const struct ractor_port *rp = (const struct ractor_port *)val;
VALUE v = ractor_try_receive(p->ec, p->cr, rp);
if (v != Qundef) {
p->found = true;
p->v = v;
p->rpv = (VALUE)key;
return ST_STOP;
}
else {
return ST_CONTINUE;
}
}
static VALUE
ractor_selector__wait(rb_execution_context_t *ec, VALUE selector)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
struct ractor_selector *s = RACTOR_SELECTOR_PTR(selector);
struct ractor_selector_wait_data data = {
.ec = ec,
.cr = cr,
.found = false,
};
while (1) {
st_foreach(s->ports, ractor_selector_wait_i, (st_data_t)&data);
if (data.found) {
return rb_ary_new_from_args(2, data.rpv, data.v);
}
ractor_wait_receive(ec, cr);
}
}
/*
* call-seq:
* wait(receive: false, yield_value: undef, move: false) -> [ractor, value]
*
* Waits until any ractor in _selector_ can be active.
*/
static VALUE
ractor_selector_wait(VALUE selector)
{
return ractor_selector__wait(GET_EC(), selector);
}
static VALUE
ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
{
VALUE selector = ractor_selector_create(klass);
for (int i=0; i<argc; i++) {
ractor_selector_add(selector, ractors[i]);
}
return selector;
}
static VALUE
ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ports)
{
VALUE selector = ractor_selector_new(RARRAY_LENINT(ports), (VALUE *)RARRAY_CONST_PTR(ports), rb_cRactorSelector);
VALUE result = ractor_selector__wait(ec, selector);
RB_GC_GUARD(selector);
RB_GC_GUARD(ports);
return result;
}
#ifndef USE_RACTOR_SELECTOR
#define USE_RACTOR_SELECTOR 0
#endif
RUBY_SYMBOL_EXPORT_BEGIN
void rb_init_ractor_selector(void);
RUBY_SYMBOL_EXPORT_END
/*
* Document-class: Ractor::Selector
* :nodoc: currently
*
* Selects multiple Ractors to be activated.
*/
void
rb_init_ractor_selector(void)
{
rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
rb_undef_alloc_func(rb_cRactorSelector);
rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, 0);
}
static void
Init_RactorPort(void)
{
rb_cRactorPort = rb_define_class_under(rb_cRactor, "Port", rb_cObject);
rb_define_alloc_func(rb_cRactorPort, ractor_port_alloc);
rb_define_method(rb_cRactorPort, "initialize", ractor_port_initialzie, 0);
rb_define_method(rb_cRactorPort, "initialize_copy", ractor_port_initialzie_copy, 1);
#if USE_RACTOR_SELECTOR
rb_init_ractor_selector();
#endif
}