ruby/thread_pthread.h
Koichi Sasada ef2bb61018 Ractor::Port
* Added `Ractor::Port`
  * `Ractor::Port#receive` (support multi-threads)
  * `Rcator::Port#close`
  * `Ractor::Port#closed?`
* Added some methods
  * `Ractor#join`
  * `Ractor#value`
  * `Ractor#monitor`
  * `Ractor#unmonitor`
* Removed some methods
  * `Ractor#take`
  * `Ractor.yield`
* Change the spec
  * `Racotr.select`

You can wait for multiple sequences of messages with `Ractor::Port`.

```ruby
ports = 3.times.map{ Ractor::Port.new }
ports.map.with_index do |port, ri|
  Ractor.new port,ri do |port, ri|
    3.times{|i| port << "r#{ri}-#{i}"}
  end
end

p ports.each{|port| pp 3.times.map{port.receive}}

```

In this example, we use 3 ports, and 3 Ractors send messages to them respectively.
We can receive a series of messages from each port.

You can use `Ractor#value` to get the last value of a Ractor's block:

```ruby
result = Ractor.new do
  heavy_task()
end.value
```

You can wait for the termination of a Ractor with `Ractor#join` like this:

```ruby
Ractor.new do
  some_task()
end.join
```

`#value` and `#join` are similar to `Thread#value` and `Thread#join`.

To implement `#join`, `Ractor#monitor` (and `Ractor#unmonitor`) is introduced.

This commit changes `Ractor.select()` method.
It now only accepts ports or Ractors, and returns when a port receives a message or a Ractor terminates.

We removes `Ractor.yield` and `Ractor#take` because:
* `Ractor::Port` supports most of similar use cases in a simpler manner.
* Removing them significantly simplifies the code.

We also change the internal thread scheduler code (thread_pthread.c):
* During barrier synchronization, we keep the `ractor_sched` lock to avoid deadlocks.
  This lock is released by `rb_ractor_sched_barrier_end()`
  which is called at the end of operations that require the barrier.
* fix potential deadlock issues by checking interrupts just before setting UBF.

https://bugs.ruby-lang.org/issues/21262
2025-05-31 04:01:33 +09:00

172 lines
4.3 KiB
C

#ifndef RUBY_THREAD_PTHREAD_H
#define RUBY_THREAD_PTHREAD_H
/**********************************************************************
thread_pthread.h -
$Author$
Copyright (C) 2004-2007 Koichi Sasada
**********************************************************************/
#ifdef HAVE_PTHREAD_NP_H
#include <pthread_np.h>
#endif
#define RB_NATIVETHREAD_LOCK_INIT PTHREAD_MUTEX_INITIALIZER
#define RB_NATIVETHREAD_COND_INIT PTHREAD_COND_INITIALIZER
// this data should be protected by timer_th.waiting_lock
struct rb_thread_sched_waiting {
enum thread_sched_waiting_flag {
thread_sched_waiting_none = 0x00,
thread_sched_waiting_timeout = 0x01,
thread_sched_waiting_io_read = 0x02,
thread_sched_waiting_io_write = 0x08,
thread_sched_waiting_io_force = 0x40, // ignore readable
} flags;
struct {
// should be compat with hrtime.h
#ifdef MY_RUBY_BUILD_MAY_TIME_TRAVEL
int128_t timeout;
#else
uint64_t timeout;
#endif
int fd; // -1 for timeout only
int result;
} data;
// connected to timer_th.waiting
struct ccan_list_node node;
};
// per-Thead scheduler helper data
struct rb_thread_sched_item {
struct {
struct ccan_list_node ubf;
// connected to ractor->threads.sched.reqdyq
// locked by ractor->threads.sched.lock
struct ccan_list_node readyq;
// connected to vm->ractor.sched.timeslice_threads
// locked by vm->ractor.sched.lock
struct ccan_list_node timeslice_threads;
// connected to vm->ractor.sched.running_threads
// locked by vm->ractor.sched.lock
struct ccan_list_node running_threads;
// connected to vm->ractor.sched.zombie_threads
struct ccan_list_node zombie_threads;
} node;
struct rb_thread_sched_waiting waiting_reason;
bool finished;
bool malloc_stack;
void *context_stack;
struct coroutine_context *context;
};
struct rb_native_thread {
rb_atomic_t serial;
struct rb_vm_struct *vm;
rb_nativethread_id_t thread_id;
#ifdef RB_THREAD_T_HAS_NATIVE_ID
int tid;
#endif
struct rb_thread_struct *running_thread;
// to control native thread
#if defined(__GLIBC__) || defined(__FreeBSD__)
union
#else
/*
* assume the platform condvars are badly implemented and have a
* "memory" of which mutex they're associated with
*/
struct
#endif
{
rb_nativethread_cond_t intr; /* th->interrupt_lock */
rb_nativethread_cond_t readyq; /* use sched->lock */
} cond;
#ifdef USE_SIGALTSTACK
void *altstack;
#endif
struct coroutine_context *nt_context;
int dedicated;
size_t machine_stack_maxsize;
};
#undef except
#undef try
#undef leave
#undef finally
// per-Ractor
struct rb_thread_sched {
rb_nativethread_lock_t lock_;
#if VM_CHECK_MODE
struct rb_thread_struct *lock_owner;
#endif
struct rb_thread_struct *running; // running thread or NULL
bool is_running;
bool is_running_timeslice;
bool enable_mn_threads;
struct ccan_list_head readyq;
int readyq_cnt;
// ractor scheduling
struct ccan_list_node grq_node;
};
#ifdef RB_THREAD_LOCAL_SPECIFIER
NOINLINE(void rb_current_ec_set(struct rb_execution_context_struct *));
# if defined(__arm64__) || defined(__aarch64__)
// on Arm64, TLS can not be accessed across .so
NOINLINE(struct rb_execution_context_struct *rb_current_ec(void));
# else
RUBY_EXTERN RB_THREAD_LOCAL_SPECIFIER struct rb_execution_context_struct *ruby_current_ec;
// for RUBY_DEBUG_LOG()
RUBY_EXTERN RB_THREAD_LOCAL_SPECIFIER rb_atomic_t ruby_nt_serial;
#define RUBY_NT_SERIAL 1
# endif
#else
typedef pthread_key_t native_tls_key_t;
static inline void *
native_tls_get(native_tls_key_t key)
{
// return value should be checked by caller
return pthread_getspecific(key);
}
static inline void
native_tls_set(native_tls_key_t key, void *ptr)
{
if (UNLIKELY(pthread_setspecific(key, ptr) != 0)) {
rb_bug("pthread_setspecific error");
}
}
RUBY_EXTERN native_tls_key_t ruby_current_ec_key;
#endif
struct rb_ractor_struct;
void rb_ractor_sched_wait(struct rb_execution_context_struct *ec, struct rb_ractor_struct *cr, rb_unblock_function_t *ubf, void *ptr);
void rb_ractor_sched_wakeup(struct rb_ractor_struct *r, struct rb_thread_struct *th);
#endif /* RUBY_THREAD_PTHREAD_H */