2020-08-20 13:51:45 +12:00
/**********************************************************************
scheduler . c
$ Author $
Copyright ( C ) 2020 Samuel Grant Dawson Williams
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2020-10-16 14:25:58 +13:00
# include "vm_core.h"
2021-02-09 19:39:56 +13:00
# include "ruby/fiber/scheduler.h"
2020-08-20 13:51:45 +12:00
# include "ruby/io.h"
2021-07-02 22:41:16 +12:00
# include "ruby/io/buffer.h"
2024-11-20 19:40:17 +13:00
# include "ruby/thread.h"
2025-06-05 12:49:02 +09:00
// For `ruby_thread_has_gvl_p`:
2021-07-16 15:22:17 +12:00
# include "internal/thread.h"
2020-08-20 13:51:45 +12:00
2025-06-05 12:49:02 +09:00
// For atomic operations:
# include "ruby_atomic.h"
2020-09-20 11:34:02 +12:00
static ID id_close ;
2021-07-19 10:14:51 +12:00
static ID id_scheduler_close ;
2020-09-20 11:34:02 +12:00
2020-09-17 14:30:40 +02:00
static ID id_block ;
static ID id_unblock ;
2020-09-20 11:34:02 +12:00
2020-12-26 22:09:49 +13:00
static ID id_timeout_after ;
2020-09-20 11:34:02 +12:00
static ID id_kernel_sleep ;
2020-12-08 09:29:09 +13:00
static ID id_process_wait ;
2020-09-20 11:34:02 +12:00
2021-12-23 12:20:09 +13:00
static ID id_io_read , id_io_pread ;
static ID id_io_write , id_io_pwrite ;
2020-08-20 13:51:45 +12:00
static ID id_io_wait ;
2022-10-15 19:59:04 +13:00
static ID id_io_select ;
2021-07-02 22:41:16 +12:00
static ID id_io_close ;
2020-08-20 13:51:45 +12:00
2021-06-14 16:21:08 +12:00
static ID id_address_resolve ;
2024-11-20 19:40:17 +13:00
static ID id_blocking_operation_wait ;
2025-05-23 14:55:05 +09:00
static ID id_fiber_interrupt ;
2024-11-20 19:40:17 +13:00
2022-10-15 21:43:45 +13:00
static ID id_fiber_schedule ;
2025-06-05 12:49:02 +09:00
// Our custom blocking operation class
static VALUE rb_cFiberSchedulerBlockingOperation ;
/*
* Custom blocking operation structure for blocking operations
* This replaces the use of Ruby procs to avoid use - after - free issues
* and provides a cleaner C API for native work pools .
*/
typedef enum {
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED , // Submitted but not started
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING , // Currently running
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED , // Finished (success/error)
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled
} rb_fiber_blocking_operation_status_t ;
struct rb_fiber_scheduler_blocking_operation {
void * ( * function ) ( void * ) ;
void * data ;
rb_unblock_function_t * unblock_function ;
void * data2 ;
int flags ;
struct rb_fiber_scheduler_blocking_operation_state * state ;
// Execution status
volatile rb_atomic_t status ;
} ;
static void
blocking_operation_mark ( void * ptr )
{
// No Ruby objects to mark in our struct
}
static void
blocking_operation_free ( void * ptr )
{
rb_fiber_scheduler_blocking_operation_t * blocking_operation = ( rb_fiber_scheduler_blocking_operation_t * ) ptr ;
ruby_xfree ( blocking_operation ) ;
}
static size_t
blocking_operation_memsize ( const void * ptr )
{
return sizeof ( rb_fiber_scheduler_blocking_operation_t ) ;
}
static const rb_data_type_t blocking_operation_data_type = {
" Fiber::Scheduler::BlockingOperation " ,
{
blocking_operation_mark ,
blocking_operation_free ,
blocking_operation_memsize ,
} ,
0 , 0 , RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
} ;
/*
* Allocate a new blocking operation
*/
static VALUE
blocking_operation_alloc ( VALUE klass )
{
rb_fiber_scheduler_blocking_operation_t * blocking_operation ;
VALUE obj = TypedData_Make_Struct ( klass , rb_fiber_scheduler_blocking_operation_t , & blocking_operation_data_type , blocking_operation ) ;
blocking_operation - > function = NULL ;
blocking_operation - > data = NULL ;
blocking_operation - > unblock_function = NULL ;
blocking_operation - > data2 = NULL ;
blocking_operation - > flags = 0 ;
blocking_operation - > state = NULL ;
blocking_operation - > status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED ;
return obj ;
}
/*
* Get the blocking operation struct from a Ruby object
*/
static rb_fiber_scheduler_blocking_operation_t *
get_blocking_operation ( VALUE obj )
{
rb_fiber_scheduler_blocking_operation_t * blocking_operation ;
TypedData_Get_Struct ( obj , rb_fiber_scheduler_blocking_operation_t , & blocking_operation_data_type , blocking_operation ) ;
return blocking_operation ;
}
/*
* Document - method : Fiber : : Scheduler : : BlockingOperation # call
*
* Execute the blocking operation . This method releases the GVL and calls
* the blocking function , then restores the errno value .
*
* Returns nil . The actual result is stored in the associated state object .
*/
static VALUE
blocking_operation_call ( VALUE self )
{
rb_fiber_scheduler_blocking_operation_t * blocking_operation = get_blocking_operation ( self ) ;
if ( blocking_operation - > status ! = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED ) {
rb_raise ( rb_eRuntimeError , " Blocking operation has already been executed! " ) ;
}
if ( blocking_operation - > function = = NULL ) {
rb_raise ( rb_eRuntimeError , " Blocking operation has no function to execute! " ) ;
}
if ( blocking_operation - > state = = NULL ) {
rb_raise ( rb_eRuntimeError , " Blocking operation has no result object! " ) ;
}
// Mark as executing
blocking_operation - > status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING ;
// Execute the blocking operation without GVL
blocking_operation - > state - > result = rb_nogvl ( blocking_operation - > function , blocking_operation - > data ,
blocking_operation - > unblock_function , blocking_operation - > data2 ,
blocking_operation - > flags ) ;
blocking_operation - > state - > saved_errno = rb_errno ( ) ;
// Mark as completed
blocking_operation - > status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED ;
return Qnil ;
}
/*
* C API : Extract blocking operation struct from Ruby object ( GVL required )
*
* This function safely extracts the opaque struct from a BlockingOperation VALUE
* while holding the GVL . The returned pointer can be passed to worker threads
* and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl .
*
* Returns the opaque struct pointer on success , NULL on error .
* Must be called while holding the GVL .
*/
rb_fiber_scheduler_blocking_operation_t *
rb_fiber_scheduler_blocking_operation_extract ( VALUE self )
{
return get_blocking_operation ( self ) ;
}
/*
* C API : Execute blocking operation from opaque struct ( GVL not required )
*
* This function executes a blocking operation using the opaque struct pointer
* obtained from rb_fiber_scheduler_blocking_operation_extract .
* It can be called from native threads without holding the GVL .
*
* Returns 0 on success , - 1 on error .
*/
int
rb_fiber_scheduler_blocking_operation_execute ( rb_fiber_scheduler_blocking_operation_t * blocking_operation )
{
if ( blocking_operation = = NULL ) {
return - 1 ;
}
if ( blocking_operation - > function = = NULL | | blocking_operation - > state = = NULL ) {
return - 1 ; // Invalid blocking operation
}
// Atomically check if we can transition from QUEUED to EXECUTING
rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED ;
if ( RUBY_ATOMIC_CAS ( blocking_operation - > status , expected , RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING ) ! = expected ) {
// Already cancelled or in wrong state
return - 1 ;
}
// Now we're executing - call the function
blocking_operation - > state - > result = blocking_operation - > function ( blocking_operation - > data ) ;
blocking_operation - > state - > saved_errno = errno ;
// Atomically transition to completed (unless cancelled during execution)
expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING ;
if ( RUBY_ATOMIC_CAS ( blocking_operation - > status , expected , RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED ) = = expected ) {
// Successfully completed
return 0 ;
} else {
// Was cancelled during execution
blocking_operation - > state - > saved_errno = EINTR ;
return - 1 ;
}
}
2022-10-15 21:43:45 +13:00
/*
2025-06-05 12:49:02 +09:00
* C API : Create a new blocking operation
*
* This creates a blocking operation that can be executed by native work pools .
* The blocking operation holds references to the function and data safely .
*/
VALUE
rb_fiber_scheduler_blocking_operation_new ( void * ( * function ) ( void * ) , void * data ,
rb_unblock_function_t * unblock_function , void * data2 ,
int flags , struct rb_fiber_scheduler_blocking_operation_state * state )
{
VALUE self = blocking_operation_alloc ( rb_cFiberSchedulerBlockingOperation ) ;
rb_fiber_scheduler_blocking_operation_t * blocking_operation = get_blocking_operation ( self ) ;
blocking_operation - > function = function ;
blocking_operation - > data = data ;
blocking_operation - > unblock_function = unblock_function ;
blocking_operation - > data2 = data2 ;
blocking_operation - > flags = flags ;
blocking_operation - > state = state ;
return self ;
}
/*
*
2022-10-15 21:43:45 +13:00
* Document - class : Fiber : : Scheduler
*
* This is not an existing class , but documentation of the interface that Scheduler
* object should comply to in order to be used as argument to Fiber . scheduler and handle non - blocking
* fibers . See also the " Non-blocking fibers " section in Fiber class docs for explanations
* of some concepts .
*
* Scheduler ' s behavior and usage are expected to be as follows :
*
* * When the execution in the non - blocking Fiber reaches some blocking operation ( like
* sleep , wait for a process , or a non - ready I / O ) , it calls some of the scheduler ' s
* hook methods , listed below .
* * Scheduler somehow registers what the current fiber is waiting on , and yields control
* to other fibers with Fiber . yield ( so the fiber would be suspended while expecting its
* wait to end , and other fibers in the same thread can perform )
* * At the end of the current thread execution , the scheduler ' s method # scheduler_close is called
* * The scheduler runs into a wait loop , checking all the blocked fibers ( which it has
* registered on hook calls ) and resuming them when the awaited resource is ready
* ( e . g . I / O ready or sleep time elapsed ) .
*
* This way concurrent execution will be achieved transparently for every
* individual Fiber ' s code .
*
* Scheduler implementations are provided by gems , like
* Async [ https : //github.com/socketry/async].
*
* Hook methods are :
*
* * # io_wait , # io_read , # io_write , # io_pread , # io_pwrite , and # io_select , # io_close
* * # process_wait
* * # kernel_sleep
* * # timeout_after
* * # address_resolve
* * # block and # unblock
2024-12-13 14:21:15 -05:00
* * # blocking_operation_wait
2022-10-15 21:43:45 +13:00
* * ( the list is expanded as Ruby developers make more methods having non - blocking calls )
*
* When not specified otherwise , the hook implementations are mandatory : if they are not
* implemented , the methods trying to call hook will fail . To provide backward compatibility ,
* in the future hooks will be optional ( if they are not implemented , due to the scheduler
* being created for the older Ruby version , the code which needs this hook will not fail ,
* and will just behave in a blocking fashion ) .
*
* It is also strongly recommended that the scheduler implements the # fiber method , which is
* delegated to by Fiber . schedule .
*
* Sample _toy_ implementation of the scheduler can be found in Ruby ' s code , in
* < tt > test / fiber / scheduler . rb < / tt >
*
*/
2020-08-20 13:51:45 +12:00
void
2021-02-09 19:39:56 +13:00
Init_Fiber_Scheduler ( void )
2020-08-20 13:51:45 +12:00
{
2020-09-20 11:34:02 +12:00
id_close = rb_intern_const ( " close " ) ;
2021-07-19 10:14:51 +12:00
id_scheduler_close = rb_intern_const ( " scheduler_close " ) ;
2020-09-20 11:34:02 +12:00
2020-09-17 14:30:40 +02:00
id_block = rb_intern_const ( " block " ) ;
id_unblock = rb_intern_const ( " unblock " ) ;
2020-09-20 11:34:02 +12:00
2020-12-26 22:09:49 +13:00
id_timeout_after = rb_intern_const ( " timeout_after " ) ;
2020-09-20 11:34:02 +12:00
id_kernel_sleep = rb_intern_const ( " kernel_sleep " ) ;
2020-12-08 09:29:09 +13:00
id_process_wait = rb_intern_const ( " process_wait " ) ;
2020-09-20 11:34:02 +12:00
2020-08-20 13:51:45 +12:00
id_io_read = rb_intern_const ( " io_read " ) ;
2021-12-23 12:20:09 +13:00
id_io_pread = rb_intern_const ( " io_pread " ) ;
2020-08-20 13:51:45 +12:00
id_io_write = rb_intern_const ( " io_write " ) ;
2021-12-23 12:20:09 +13:00
id_io_pwrite = rb_intern_const ( " io_pwrite " ) ;
2020-08-20 13:51:45 +12:00
id_io_wait = rb_intern_const ( " io_wait " ) ;
2022-10-15 19:59:04 +13:00
id_io_select = rb_intern_const ( " io_select " ) ;
2021-07-02 22:41:16 +12:00
id_io_close = rb_intern_const ( " io_close " ) ;
2021-06-14 16:21:08 +12:00
id_address_resolve = rb_intern_const ( " address_resolve " ) ;
2022-10-15 21:43:45 +13:00
2024-11-20 19:40:17 +13:00
id_blocking_operation_wait = rb_intern_const ( " blocking_operation_wait " ) ;
2025-05-23 14:55:05 +09:00
id_fiber_interrupt = rb_intern_const ( " fiber_interrupt " ) ;
2024-11-20 19:40:17 +13:00
2022-10-15 21:43:45 +13:00
id_fiber_schedule = rb_intern_const ( " fiber " ) ;
2025-06-05 12:49:02 +09:00
// Define an anonymous BlockingOperation class for internal use only
// This is completely hidden from Ruby code and cannot be instantiated directly
rb_cFiberSchedulerBlockingOperation = rb_class_new ( rb_cObject ) ;
rb_define_alloc_func ( rb_cFiberSchedulerBlockingOperation , blocking_operation_alloc ) ;
rb_define_method ( rb_cFiberSchedulerBlockingOperation , " call " , blocking_operation_call , 0 ) ;
// Register the anonymous class as a GC root so it doesn't get collected
rb_gc_register_mark_object ( rb_cFiberSchedulerBlockingOperation ) ;
2022-10-15 21:43:45 +13:00
#if 0 /* for RDoc */
rb_cFiberScheduler = rb_define_class_under ( rb_cFiber , " Scheduler " , rb_cObject ) ;
rb_define_method ( rb_cFiberScheduler , " close " , rb_fiber_scheduler_close , 0 ) ;
rb_define_method ( rb_cFiberScheduler , " process_wait " , rb_fiber_scheduler_process_wait , 2 ) ;
rb_define_method ( rb_cFiberScheduler , " io_wait " , rb_fiber_scheduler_io_wait , 3 ) ;
rb_define_method ( rb_cFiberScheduler , " io_read " , rb_fiber_scheduler_io_read , 4 ) ;
rb_define_method ( rb_cFiberScheduler , " io_write " , rb_fiber_scheduler_io_write , 4 ) ;
2022-12-24 00:48:58 +02:00
rb_define_method ( rb_cFiberScheduler , " io_pread " , rb_fiber_scheduler_io_pread , 5 ) ;
rb_define_method ( rb_cFiberScheduler , " io_pwrite " , rb_fiber_scheduler_io_pwrite , 5 ) ;
rb_define_method ( rb_cFiberScheduler , " io_select " , rb_fiber_scheduler_io_select , 4 ) ;
2022-10-15 21:43:45 +13:00
rb_define_method ( rb_cFiberScheduler , " kernel_sleep " , rb_fiber_scheduler_kernel_sleep , 1 ) ;
rb_define_method ( rb_cFiberScheduler , " address_resolve " , rb_fiber_scheduler_address_resolve , 1 ) ;
rb_define_method ( rb_cFiberScheduler , " timeout_after " , rb_fiber_scheduler_timeout_after , 3 ) ;
rb_define_method ( rb_cFiberScheduler , " block " , rb_fiber_scheduler_block , 2 ) ;
rb_define_method ( rb_cFiberScheduler , " unblock " , rb_fiber_scheduler_unblock , 2 ) ;
2025-06-05 12:49:02 +09:00
rb_define_method ( rb_cFiberScheduler , " fiber " , rb_fiber_scheduler_fiber , - 2 ) ;
2024-12-13 14:21:15 -05:00
rb_define_method ( rb_cFiberScheduler , " blocking_operation_wait " , rb_fiber_scheduler_blocking_operation_wait , - 2 ) ;
2022-10-15 21:43:45 +13:00
# endif
2020-08-20 13:51:45 +12:00
}
2020-10-16 14:25:58 +13:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_get ( void )
2020-10-16 14:25:58 +13:00
{
2024-11-21 12:00:55 +13:00
RUBY_ASSERT ( ruby_thread_has_gvl_p ( ) ) ;
2021-07-16 15:22:17 +12:00
2020-10-16 14:25:58 +13:00
rb_thread_t * thread = GET_THREAD ( ) ;
2024-11-21 12:00:55 +13:00
RUBY_ASSERT ( thread ) ;
2020-10-16 14:25:58 +13:00
return thread - > scheduler ;
}
2021-06-14 17:56:53 +12:00
static void
verify_interface ( VALUE scheduler )
{
if ( ! rb_respond_to ( scheduler , id_block ) ) {
2021-06-14 21:15:12 +09:00
rb_raise ( rb_eArgError , " Scheduler must implement #block " ) ;
2021-06-14 17:56:53 +12:00
}
if ( ! rb_respond_to ( scheduler , id_unblock ) ) {
2021-06-14 21:15:12 +09:00
rb_raise ( rb_eArgError , " Scheduler must implement #unblock " ) ;
2021-06-14 17:56:53 +12:00
}
if ( ! rb_respond_to ( scheduler , id_kernel_sleep ) ) {
2021-06-14 21:15:12 +09:00
rb_raise ( rb_eArgError , " Scheduler must implement #kernel_sleep " ) ;
2021-06-14 17:56:53 +12:00
}
if ( ! rb_respond_to ( scheduler , id_io_wait ) ) {
2021-06-14 21:15:12 +09:00
rb_raise ( rb_eArgError , " Scheduler must implement #io_wait " ) ;
2021-06-14 17:56:53 +12:00
}
2025-06-02 05:50:23 -04:00
if ( ! rb_respond_to ( scheduler , id_fiber_interrupt ) ) {
rb_warn ( " Scheduler should implement #fiber_interrupt " ) ;
}
2021-06-14 17:56:53 +12:00
}
2024-02-22 00:33:18 +13:00
static VALUE
fiber_scheduler_close ( VALUE scheduler )
{
return rb_fiber_scheduler_close ( scheduler ) ;
}
static VALUE
fiber_scheduler_close_ensure ( VALUE _thread )
{
rb_thread_t * thread = ( rb_thread_t * ) _thread ;
thread - > scheduler = Qnil ;
return Qnil ;
}
2020-10-16 14:25:58 +13:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_set ( VALUE scheduler )
2020-10-16 14:25:58 +13:00
{
2024-11-21 12:00:55 +13:00
RUBY_ASSERT ( ruby_thread_has_gvl_p ( ) ) ;
2021-07-16 15:22:17 +12:00
2020-10-16 14:25:58 +13:00
rb_thread_t * thread = GET_THREAD ( ) ;
2024-11-21 12:00:55 +13:00
RUBY_ASSERT ( thread ) ;
2020-10-16 14:25:58 +13:00
2021-06-14 17:56:53 +12:00
if ( scheduler ! = Qnil ) {
verify_interface ( scheduler ) ;
}
2022-10-15 21:43:45 +13:00
// We invoke Scheduler#close when setting it to something else, to ensure
// the previous scheduler runs to completion before changing the scheduler.
// That way, we do not need to consider interactions, e.g., of a Fiber from
// the previous scheduler with the new scheduler.
2020-10-16 14:25:58 +13:00
if ( thread - > scheduler ! = Qnil ) {
2024-02-22 00:33:18 +13:00
// rb_fiber_scheduler_close(thread->scheduler);
rb_ensure ( fiber_scheduler_close , thread - > scheduler , fiber_scheduler_close_ensure , ( VALUE ) thread ) ;
2020-10-16 14:25:58 +13:00
}
thread - > scheduler = scheduler ;
return thread - > scheduler ;
}
static VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_current_for_threadptr ( rb_thread_t * thread )
2020-10-16 14:25:58 +13:00
{
2024-11-21 12:00:55 +13:00
RUBY_ASSERT ( thread ) ;
2020-10-16 14:25:58 +13:00
if ( thread - > blocking = = 0 ) {
return thread - > scheduler ;
2021-06-16 22:07:05 +09:00
}
else {
2020-10-16 14:25:58 +13:00
return Qnil ;
}
}
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_current ( void )
2020-10-16 14:25:58 +13:00
{
2021-02-09 19:39:56 +13:00
return rb_fiber_scheduler_current_for_threadptr ( GET_THREAD ( ) ) ;
2020-10-16 14:25:58 +13:00
}
2021-02-09 19:39:56 +13:00
VALUE rb_fiber_scheduler_current_for_thread ( VALUE thread )
2020-10-16 14:25:58 +13:00
{
2021-02-09 19:39:56 +13:00
return rb_fiber_scheduler_current_for_threadptr ( rb_thread_ptr ( thread ) ) ;
2020-10-16 14:25:58 +13:00
}
2022-10-15 21:43:45 +13:00
/*
*
* Document - method : Fiber : : Scheduler # close
*
* Called when the current thread exits . The scheduler is expected to implement this
* method in order to allow all waiting fibers to finalize their execution .
*
* The suggested pattern is to implement the main event loop in the # close method .
*
*/
2020-10-01 13:44:29 +09:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_close ( VALUE scheduler )
2020-09-20 11:34:02 +12:00
{
2024-11-21 12:00:55 +13:00
RUBY_ASSERT ( ruby_thread_has_gvl_p ( ) ) ;
2021-07-16 15:22:17 +12:00
2021-07-19 10:14:51 +12:00
VALUE result ;
2022-10-15 21:43:45 +13:00
// The reason for calling `scheduler_close` before calling `close` is for
// legacy schedulers which implement `close` and expect the user to call
// it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
// which should call `scheduler_close`. If it were to call `close`, it
// would create an infinite loop.
2021-07-19 10:14:51 +12:00
result = rb_check_funcall ( scheduler , id_scheduler_close , 0 , NULL ) ;
2022-11-15 13:24:08 +09:00
if ( ! UNDEF_P ( result ) ) return result ;
2021-07-19 10:14:51 +12:00
result = rb_check_funcall ( scheduler , id_close , 0 , NULL ) ;
2022-11-15 13:24:08 +09:00
if ( ! UNDEF_P ( result ) ) return result ;
2020-10-01 13:45:50 +09:00
return Qnil ;
2020-09-20 11:34:02 +12:00
}
2020-08-20 13:51:45 +12:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_make_timeout ( struct timeval * timeout )
2020-10-01 13:44:29 +09:00
{
2020-08-20 13:51:45 +12:00
if ( timeout ) {
2025-02-25 16:37:21 +13:00
return rb_float_new ( ( double ) timeout - > tv_sec + ( 0.000001 * timeout - > tv_usec ) ) ;
2020-08-20 13:51:45 +12:00
}
return Qnil ;
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # kernel_sleep
* call - seq : kernel_sleep ( duration = nil )
*
* Invoked by Kernel # sleep and Mutex # sleep and is expected to provide
* an implementation of sleeping in a non - blocking way . Implementation might
* register the current fiber in some list of " which fiber wait until what
* moment " , call Fiber.yield to pass control, and then in #close resume
* the fibers whose wait period has elapsed .
*
*/
2020-12-26 22:09:49 +13:00
VALUE
2021-03-30 17:33:15 +13:00
rb_fiber_scheduler_kernel_sleep ( VALUE scheduler , VALUE timeout )
2021-02-12 14:40:50 +13:00
{
2021-03-30 17:33:15 +13:00
return rb_funcall ( scheduler , id_kernel_sleep , 1 , timeout ) ;
2021-02-12 14:40:50 +13:00
}
2021-02-11 19:17:54 +13:00
VALUE
2021-03-30 17:33:15 +13:00
rb_fiber_scheduler_kernel_sleepv ( VALUE scheduler , int argc , VALUE * argv )
2021-02-11 19:17:54 +13:00
{
2021-03-30 17:33:15 +13:00
return rb_funcallv ( scheduler , id_kernel_sleep , argc , argv ) ;
2021-02-11 19:17:54 +13:00
}
2021-03-30 17:33:15 +13:00
#if 0
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # timeout_after
* call - seq : timeout_after ( duration , exception_class , * exception_arguments , & block ) - > result of block
*
* Invoked by Timeout . timeout to execute the given + block + within the given
* + duration + . It can also be invoked directly by the scheduler or user code .
*
* Attempt to limit the execution time of a given + block + to the given
* + duration + if possible . When a non - blocking operation causes the + block + ' s
* execution time to exceed the specified + duration + , that non - blocking
* operation should be interrupted by raising the specified + exception_class +
* constructed with the given + exception_arguments + .
*
* General execution timeouts are often considered risky . This implementation
* will only interrupt non - blocking operations . This is by design because it ' s
* expected that non - blocking operations can fail for a variety of
* unpredictable reasons , so applications should already be robust in handling
* these conditions and by implication timeouts .
*
* However , as a result of this design , if the + block + does not invoke any
* non - blocking operations , it will be impossible to interrupt it . If you
* desire to provide predictable points for timeouts , consider adding
* + sleep ( 0 ) + .
*
* If the block is executed successfully , its result will be returned .
*
* The exception will typically be raised using Fiber # raise .
*/
2020-10-01 13:44:29 +09:00
VALUE
2021-03-30 17:33:15 +13:00
rb_fiber_scheduler_timeout_after ( VALUE scheduler , VALUE timeout , VALUE exception , VALUE message )
2020-08-20 13:51:45 +12:00
{
2021-03-30 17:33:15 +13:00
VALUE arguments [ ] = {
timeout , exception , message
} ;
return rb_check_funcall ( scheduler , id_timeout_after , 3 , arguments ) ;
2020-08-20 13:51:45 +12:00
}
2020-10-01 13:44:29 +09:00
VALUE
2021-03-30 17:33:15 +13:00
rb_fiber_scheduler_timeout_afterv ( VALUE scheduler , int argc , VALUE * argv )
2020-08-20 13:51:45 +12:00
{
2021-03-30 17:33:15 +13:00
return rb_check_funcall ( scheduler , id_timeout_after , argc , argv ) ;
2020-08-20 13:51:45 +12:00
}
2021-03-30 17:33:15 +13:00
# endif
2020-08-20 13:51:45 +12:00
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # process_wait
* call - seq : process_wait ( pid , flags )
*
* Invoked by Process : : Status . wait in order to wait for a specified process .
* See that method description for arguments description .
*
* Suggested minimal implementation :
*
* Thread . new do
* Process : : Status . wait ( pid , flags )
* end . value
*
* This hook is optional : if it is not present in the current scheduler ,
* Process : : Status . wait will behave as a blocking method .
*
* Expected to return a Process : : Status instance .
*/
2020-12-08 09:29:09 +13:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_process_wait ( VALUE scheduler , rb_pid_t pid , int flags )
2020-12-08 09:29:09 +13:00
{
2021-02-09 19:39:56 +13:00
VALUE arguments [ ] = {
PIDT2NUM ( pid ) , RB_INT2NUM ( flags )
} ;
2021-02-09 16:59:15 +09:00
2021-02-09 19:39:56 +13:00
return rb_check_funcall ( scheduler , id_process_wait , 2 , arguments ) ;
2020-12-08 09:29:09 +13:00
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # block
* call - seq : block ( blocker , timeout = nil )
*
* Invoked by methods like Thread . join , and by Mutex , to signify that current
* Fiber is blocked until further notice ( e . g . # unblock ) or until + timeout + has
* elapsed .
*
* + blocker + is what we are waiting on , informational only ( for debugging and
* logging ) . There are no guarantee about its value .
*
* Expected to return boolean , specifying whether the blocking operation was
* successful or not .
*/
2020-10-01 13:44:29 +09:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_block ( VALUE scheduler , VALUE blocker , VALUE timeout )
2020-09-05 16:26:24 +12:00
{
2020-09-21 09:54:08 +12:00
return rb_funcall ( scheduler , id_block , 2 , blocker , timeout ) ;
2020-09-05 16:26:24 +12:00
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # unblock
* call - seq : unblock ( blocker , fiber )
*
* Invoked to wake up Fiber previously blocked with # block ( for example , Mutex # lock
* calls # block and Mutex # unlock calls # unblock ) . The scheduler should use
* the + fiber + parameter to understand which fiber is unblocked .
*
* + blocker + is what was awaited for , but it is informational only ( for debugging
* and logging ) , and it is not guaranteed to be the same value as the + blocker + for
* # block .
*
*/
2020-10-01 13:44:29 +09:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_unblock ( VALUE scheduler , VALUE blocker , VALUE fiber )
2020-09-05 16:26:24 +12:00
{
2024-11-21 12:00:55 +13:00
RUBY_ASSERT ( rb_obj_is_fiber ( fiber ) ) ;
2021-07-16 15:22:17 +12:00
2025-01-15 14:59:46 +13:00
// `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
// If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
int saved_errno = errno ;
2025-05-24 20:33:08 +09:00
# ifdef RUBY_DEBUG
rb_execution_context_t * ec = GET_EC ( ) ;
2025-06-06 19:12:11 +09:00
if ( RUBY_VM_INTERRUPTED ( ec ) ) {
rb_bug ( " rb_fiber_scheduler_unblock called with pending interrupt " ) ;
2025-05-24 20:33:08 +09:00
}
# endif
2025-01-15 14:59:46 +13:00
VALUE result = rb_funcall ( scheduler , id_unblock , 2 , blocker , fiber ) ;
errno = saved_errno ;
return result ;
2020-09-05 16:26:24 +12:00
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # io_wait
* call - seq : io_wait ( io , events , timeout )
*
* Invoked by IO # wait , IO # wait_readable , IO # wait_writable to ask whether the
* specified descriptor is ready for specified events within
* the specified + timeout + .
*
* + events + is a bit mask of < tt > IO : : READABLE < / tt > , < tt > IO : : WRITABLE < / tt > , and
* < tt > IO : : PRIORITY < / tt > .
*
* Suggested implementation should register which Fiber is waiting for which
* resources and immediately calling Fiber . yield to pass control to other
* fibers . Then , in the # close method , the scheduler might dispatch all the
* I / O resources to fibers waiting for it .
*
* Expected to return the subset of events that are ready immediately .
*
*/
2025-05-23 14:55:05 +09:00
static VALUE
fiber_scheduler_io_wait ( VALUE _argument ) {
VALUE * arguments = ( VALUE * ) _argument ;
return rb_funcallv ( arguments [ 0 ] , id_io_wait , 3 , arguments + 1 ) ;
}
2020-10-01 13:44:29 +09:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_io_wait ( VALUE scheduler , VALUE io , VALUE events , VALUE timeout )
2020-08-20 13:51:45 +12:00
{
2025-05-23 14:55:05 +09:00
VALUE arguments [ ] = {
scheduler , io , events , timeout
} ;
2025-06-02 05:50:23 -04:00
if ( rb_respond_to ( scheduler , id_fiber_interrupt ) ) {
return rb_thread_io_blocking_operation ( io , fiber_scheduler_io_wait , ( VALUE ) & arguments ) ;
} else {
return fiber_scheduler_io_wait ( ( VALUE ) & arguments ) ;
}
2020-08-20 13:51:45 +12:00
}
2020-10-01 13:44:29 +09:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_io_wait_readable ( VALUE scheduler , VALUE io )
2020-08-20 13:51:45 +12:00
{
2022-10-07 21:48:38 +13:00
return rb_fiber_scheduler_io_wait ( scheduler , io , RB_UINT2NUM ( RUBY_IO_READABLE ) , rb_io_timeout ( io ) ) ;
2020-08-20 13:51:45 +12:00
}
2020-10-01 13:44:29 +09:00
VALUE
2021-02-09 19:39:56 +13:00
rb_fiber_scheduler_io_wait_writable ( VALUE scheduler , VALUE io )
2020-08-20 13:51:45 +12:00
{
2022-10-07 21:48:38 +13:00
return rb_fiber_scheduler_io_wait ( scheduler , io , RB_UINT2NUM ( RUBY_IO_WRITABLE ) , rb_io_timeout ( io ) ) ;
2020-08-20 13:51:45 +12:00
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # io_select
* call - seq : io_select ( readables , writables , exceptables , timeout )
*
* Invoked by IO . select to ask whether the specified descriptors are ready for
* specified events within the specified + timeout + .
*
* Expected to return the 3 - tuple of Array of IOs that are ready .
*
*/
2022-10-15 19:59:04 +13:00
VALUE rb_fiber_scheduler_io_select ( VALUE scheduler , VALUE readables , VALUE writables , VALUE exceptables , VALUE timeout )
{
VALUE arguments [ ] = {
readables , writables , exceptables , timeout
} ;
return rb_fiber_scheduler_io_selectv ( scheduler , 4 , arguments ) ;
}
VALUE rb_fiber_scheduler_io_selectv ( VALUE scheduler , int argc , VALUE * argv )
{
// I wondered about extracting argv, and checking if there is only a single
// IO instance, and instead calling `io_wait`. However, it would require a
// decent amount of work and it would be hard to preserve the exact
// semantics of IO.select.
return rb_check_funcall ( scheduler , id_io_select , argc , argv ) ;
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # io_read
2023-09-09 15:50:21 +09:00
* call - seq : io_read ( io , buffer , length , offset ) - > read length or - errno
2022-10-15 21:43:45 +13:00
*
2023-03-25 18:36:27 +13:00
* Invoked by IO # read or IO # Buffer . read to read + length + bytes from + io + into a
2023-03-31 00:48:55 +13:00
* specified + buffer + ( see IO : : Buffer ) at the given + offset + .
2022-10-15 21:43:45 +13:00
*
2023-03-31 00:48:55 +13:00
* The + length + argument is the " minimum length to be read " . If the IO buffer
* size is 8 KiB , but the + length + is + 1024 + ( 1 KiB ) , up to 8 KiB might be read ,
* but at least 1 KiB will be . Generally , the only case where less data than
* + length + will be read is if there is an error reading the data .
2022-10-15 21:43:45 +13:00
*
2023-03-25 18:36:27 +13:00
* Specifying a + length + of 0 is valid and means try reading at least once and
* return any available data .
2022-10-15 21:43:45 +13:00
*
* Suggested implementation should try to read from + io + in a non - blocking
* manner and call # io_wait if the + io + is not ready ( which will yield control
* to other fibers ) .
*
* See IO : : Buffer for an interface available to return data .
*
2023-03-25 18:36:27 +13:00
* Expected to return number of bytes read , or , in case of an error ,
* < tt > - errno < / tt > ( negated number corresponding to system ' s error code ) .
2022-10-15 21:43:45 +13:00
*
* The method should be considered _experimental_ .
*/
2025-05-23 14:55:05 +09:00
static VALUE
fiber_scheduler_io_read ( VALUE _argument ) {
VALUE * arguments = ( VALUE * ) _argument ;
return rb_funcallv ( arguments [ 0 ] , id_io_read , 4 , arguments + 1 ) ;
}
2020-10-01 13:44:29 +09:00
VALUE
2022-10-12 12:59:05 +13:00
rb_fiber_scheduler_io_read ( VALUE scheduler , VALUE io , VALUE buffer , size_t length , size_t offset )
2020-08-21 00:53:08 +12:00
{
2025-05-23 14:55:05 +09:00
if ( ! rb_respond_to ( scheduler , id_io_read ) ) {
return RUBY_Qundef ;
}
2021-02-09 19:39:56 +13:00
VALUE arguments [ ] = {
2025-05-23 14:55:05 +09:00
scheduler , io , buffer , SIZET2NUM ( length ) , SIZET2NUM ( offset )
2021-02-09 19:39:56 +13:00
} ;
2021-02-09 16:59:15 +09:00
2025-06-02 05:50:23 -04:00
if ( rb_respond_to ( scheduler , id_fiber_interrupt ) ) {
return rb_thread_io_blocking_operation ( io , fiber_scheduler_io_read , ( VALUE ) & arguments ) ;
} else {
return fiber_scheduler_io_read ( ( VALUE ) & arguments ) ;
}
2020-08-21 00:53:08 +12:00
}
2022-12-24 00:48:58 +02:00
/*
* Document - method : Fiber : : Scheduler # io_read
* call - seq : io_pread ( io , buffer , from , length , offset ) - > read length or - errno
*
2023-03-31 00:48:55 +13:00
* Invoked by IO # pread or IO : : Buffer # pread to read + length + bytes from + io +
* at offset + from + into a specified + buffer + ( see IO : : Buffer ) at the given
* + offset + .
*
* This method is semantically the same as # io_read , but it allows to specify
* the offset to read from and is often better for asynchronous IO on the same
* file .
2022-12-24 00:48:58 +02:00
*
2023-03-31 00:48:55 +13:00
* The method should be considered _experimental_ .
2022-12-24 00:48:58 +02:00
*/
2025-05-23 14:55:05 +09:00
static VALUE
fiber_scheduler_io_pread ( VALUE _argument ) {
VALUE * arguments = ( VALUE * ) _argument ;
return rb_funcallv ( arguments [ 0 ] , id_io_pread , 5 , arguments + 1 ) ;
}
2021-12-23 12:20:09 +13:00
VALUE
2022-10-12 12:59:05 +13:00
rb_fiber_scheduler_io_pread ( VALUE scheduler , VALUE io , rb_off_t from , VALUE buffer , size_t length , size_t offset )
2021-12-23 12:20:09 +13:00
{
2025-05-23 14:55:05 +09:00
if ( ! rb_respond_to ( scheduler , id_io_pread ) ) {
return RUBY_Qundef ;
}
2021-12-23 12:20:09 +13:00
VALUE arguments [ ] = {
2025-05-23 14:55:05 +09:00
scheduler , io , buffer , OFFT2NUM ( from ) , SIZET2NUM ( length ) , SIZET2NUM ( offset )
2021-12-23 12:20:09 +13:00
} ;
2025-06-02 05:50:23 -04:00
if ( rb_respond_to ( scheduler , id_fiber_interrupt ) ) {
return rb_thread_io_blocking_operation ( io , fiber_scheduler_io_pread , ( VALUE ) & arguments ) ;
} else {
return fiber_scheduler_io_pread ( ( VALUE ) & arguments ) ;
}
2021-12-23 12:20:09 +13:00
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Scheduler # io_write
2023-09-09 15:50:21 +09:00
* call - seq : io_write ( io , buffer , length , offset ) - > written length or - errno
2022-10-15 21:43:45 +13:00
*
2023-03-25 18:36:27 +13:00
* Invoked by IO # write or IO : : Buffer # write to write + length + bytes to + io + from
2023-03-31 00:48:55 +13:00
* from a specified + buffer + ( see IO : : Buffer ) at the given + offset + .
2022-10-15 21:43:45 +13:00
*
2023-03-31 00:48:55 +13:00
* The + length + argument is the " minimum length to be written " . If the IO
* buffer size is 8 KiB , but the + length + specified is 1024 ( 1 KiB ) , at most 8 KiB
* will be written , but at least 1 KiB will be . Generally , the only case where
* less data than + length + will be written is if there is an error writing the
* data .
2022-10-15 21:43:45 +13:00
*
2023-03-25 18:36:27 +13:00
* Specifying a + length + of 0 is valid and means try writing at least once , as
* much data as possible .
2022-10-15 21:43:45 +13:00
*
* Suggested implementation should try to write to + io + in a non - blocking
* manner and call # io_wait if the + io + is not ready ( which will yield control
* to other fibers ) .
*
2023-03-25 18:36:27 +13:00
* See IO : : Buffer for an interface available to get data from buffer
* efficiently .
2022-10-15 21:43:45 +13:00
*
2023-03-25 18:36:27 +13:00
* Expected to return number of bytes written , or , in case of an error ,
* < tt > - errno < / tt > ( negated number corresponding to system ' s error code ) .
2022-10-15 21:43:45 +13:00
*
* The method should be considered _experimental_ .
*/
2025-05-23 14:55:05 +09:00
static VALUE
fiber_scheduler_io_write ( VALUE _argument ) {
VALUE * arguments = ( VALUE * ) _argument ;
return rb_funcallv ( arguments [ 0 ] , id_io_write , 4 , arguments + 1 ) ;
}
2020-10-01 13:44:29 +09:00
VALUE
2022-10-12 12:59:05 +13:00
rb_fiber_scheduler_io_write ( VALUE scheduler , VALUE io , VALUE buffer , size_t length , size_t offset )
2020-08-20 13:51:45 +12:00
{
2025-05-23 14:55:05 +09:00
if ( ! rb_respond_to ( scheduler , id_io_write ) ) {
return RUBY_Qundef ;
}
2021-02-09 19:39:56 +13:00
VALUE arguments [ ] = {
2025-05-23 14:55:05 +09:00
scheduler , io , buffer , SIZET2NUM ( length ) , SIZET2NUM ( offset )
2021-02-09 19:39:56 +13:00
} ;
2025-06-02 05:50:23 -04:00
if ( rb_respond_to ( scheduler , id_fiber_interrupt ) ) {
return rb_thread_io_blocking_operation ( io , fiber_scheduler_io_write , ( VALUE ) & arguments ) ;
} else {
return fiber_scheduler_io_write ( ( VALUE ) & arguments ) ;
}
2021-07-02 22:41:16 +12:00
}
2022-12-24 00:48:58 +02:00
/*
* Document - method : Fiber : : Scheduler # io_pwrite
* call - seq : io_pwrite ( io , buffer , from , length , offset ) - > written length or - errno
*
2023-03-31 00:48:55 +13:00
* Invoked by IO # pwrite or IO : : Buffer # pwrite to write + length + bytes to + io +
* at offset + from + into a specified + buffer + ( see IO : : Buffer ) at the given
* + offset + .
*
* This method is semantically the same as # io_write , but it allows to specify
* the offset to write to and is often better for asynchronous IO on the same
* file .
*
* The method should be considered _experimental_ .
2022-12-24 00:48:58 +02:00
*
*/
2025-05-23 14:55:05 +09:00
static VALUE
fiber_scheduler_io_pwrite ( VALUE _argument ) {
VALUE * arguments = ( VALUE * ) _argument ;
return rb_funcallv ( arguments [ 0 ] , id_io_pwrite , 5 , arguments + 1 ) ;
}
2021-12-23 12:20:09 +13:00
VALUE
2022-10-12 12:59:05 +13:00
rb_fiber_scheduler_io_pwrite ( VALUE scheduler , VALUE io , rb_off_t from , VALUE buffer , size_t length , size_t offset )
2021-12-23 12:20:09 +13:00
{
2025-05-23 14:55:05 +09:00
if ( ! rb_respond_to ( scheduler , id_io_pwrite ) ) {
return RUBY_Qundef ;
}
2021-12-23 12:20:09 +13:00
VALUE arguments [ ] = {
2025-05-23 14:55:05 +09:00
scheduler , io , buffer , OFFT2NUM ( from ) , SIZET2NUM ( length ) , SIZET2NUM ( offset )
2021-12-23 12:20:09 +13:00
} ;
2025-06-02 05:50:23 -04:00
if ( rb_respond_to ( scheduler , id_fiber_interrupt ) ) {
return rb_thread_io_blocking_operation ( io , fiber_scheduler_io_pwrite , ( VALUE ) & arguments ) ;
} else {
return fiber_scheduler_io_pwrite ( ( VALUE ) & arguments ) ;
}
2021-12-23 12:20:09 +13:00
}
2021-07-02 22:41:16 +12:00
VALUE
rb_fiber_scheduler_io_read_memory ( VALUE scheduler , VALUE io , void * base , size_t size , size_t length )
{
VALUE buffer = rb_io_buffer_new ( base , size , RB_IO_BUFFER_LOCKED ) ;
2022-10-12 12:59:05 +13:00
VALUE result = rb_fiber_scheduler_io_read ( scheduler , io , buffer , length , 0 ) ;
2021-07-02 22:41:16 +12:00
2023-03-31 00:48:55 +13:00
rb_io_buffer_free_locked ( buffer ) ;
2021-07-02 22:41:16 +12:00
return result ;
}
VALUE
rb_fiber_scheduler_io_write_memory ( VALUE scheduler , VALUE io , const void * base , size_t size , size_t length )
{
2021-12-20 23:06:21 +13:00
VALUE buffer = rb_io_buffer_new ( ( void * ) base , size , RB_IO_BUFFER_LOCKED | RB_IO_BUFFER_READONLY ) ;
2021-07-02 22:41:16 +12:00
2022-10-12 12:59:05 +13:00
VALUE result = rb_fiber_scheduler_io_write ( scheduler , io , buffer , length , 0 ) ;
2021-07-02 22:41:16 +12:00
2023-03-31 00:48:55 +13:00
rb_io_buffer_free_locked ( buffer ) ;
return result ;
}
VALUE
rb_fiber_scheduler_io_pread_memory ( VALUE scheduler , VALUE io , rb_off_t from , void * base , size_t size , size_t length )
{
VALUE buffer = rb_io_buffer_new ( base , size , RB_IO_BUFFER_LOCKED ) ;
VALUE result = rb_fiber_scheduler_io_pread ( scheduler , io , from , buffer , length , 0 ) ;
rb_io_buffer_free_locked ( buffer ) ;
return result ;
}
VALUE
rb_fiber_scheduler_io_pwrite_memory ( VALUE scheduler , VALUE io , rb_off_t from , const void * base , size_t size , size_t length )
{
VALUE buffer = rb_io_buffer_new ( ( void * ) base , size , RB_IO_BUFFER_LOCKED | RB_IO_BUFFER_READONLY ) ;
VALUE result = rb_fiber_scheduler_io_pwrite ( scheduler , io , from , buffer , length , 0 ) ;
rb_io_buffer_free_locked ( buffer ) ;
2021-07-02 22:41:16 +12:00
return result ;
}
VALUE
rb_fiber_scheduler_io_close ( VALUE scheduler , VALUE io )
{
VALUE arguments [ ] = { io } ;
return rb_check_funcall ( scheduler , id_io_close , 1 , arguments ) ;
2020-08-20 13:51:45 +12:00
}
2021-06-14 16:21:08 +12:00
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # address_resolve
* call - seq : address_resolve ( hostname ) - > array_of_strings or nil
*
* Invoked by any method that performs a non - reverse DNS lookup . The most
* notable method is Addrinfo . getaddrinfo , but there are many other .
*
* The method is expected to return an array of strings corresponding to ip
* addresses the + hostname + is resolved to , or + nil + if it can not be resolved .
*
* Fairly exhaustive list of all possible call - sites :
*
* - Addrinfo . getaddrinfo
* - Addrinfo . tcp
* - Addrinfo . udp
* - Addrinfo . ip
* - Addrinfo . new
* - Addrinfo . marshal_load
* - SOCKSSocket . new
* - TCPServer . new
* - TCPSocket . new
* - IPSocket . getaddress
* - TCPSocket . gethostbyname
* - UDPSocket # connect
* - UDPSocket # bind
* - UDPSocket # send
* - Socket . getaddrinfo
* - Socket . gethostbyname
* - Socket . pack_sockaddr_in
* - Socket . sockaddr_in
* - Socket . unpack_sockaddr_in
*/
2021-06-14 16:21:08 +12:00
VALUE
rb_fiber_scheduler_address_resolve ( VALUE scheduler , VALUE hostname )
{
VALUE arguments [ ] = {
hostname
} ;
return rb_check_funcall ( scheduler , id_address_resolve , 1 , arguments ) ;
}
2022-10-15 21:43:45 +13:00
2024-11-20 19:40:17 +13:00
/*
* Document - method : Fiber : : Scheduler # blocking_operation_wait
2025-06-05 12:49:02 +09:00
* call - seq : blocking_operation_wait ( blocking_operation )
2024-11-20 19:40:17 +13:00
*
* Invoked by Ruby ' s core methods to run a blocking operation in a non - blocking way .
2025-06-05 12:49:02 +09:00
* The blocking_operation is a Fiber : : Scheduler : : BlockingOperation that encapsulates the blocking operation .
*
* If the scheduler doesn ' t implement this method , or if the scheduler doesn ' t execute
* the blocking operation , Ruby will fall back to the non - scheduler implementation .
2024-11-20 19:40:17 +13:00
*
* Minimal suggested implementation is :
*
2025-06-05 12:49:02 +09:00
* def blocking_operation_wait ( blocking_operation )
* Thread . new { blocking_operation . call } . join
2024-11-20 19:40:17 +13:00
* end
*/
VALUE rb_fiber_scheduler_blocking_operation_wait ( VALUE scheduler , void * ( * function ) ( void * ) , void * data , rb_unblock_function_t * unblock_function , void * data2 , int flags , struct rb_fiber_scheduler_blocking_operation_state * state )
{
2025-06-05 12:49:02 +09:00
// Check if scheduler supports blocking_operation_wait before creating the object
if ( ! rb_respond_to ( scheduler , id_blocking_operation_wait ) ) {
return Qundef ;
}
// Create a new BlockingOperation with the blocking operation
VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new ( function , data , unblock_function , data2 , flags , state ) ;
2024-11-20 19:40:17 +13:00
2025-06-05 12:49:02 +09:00
VALUE result = rb_funcall ( scheduler , id_blocking_operation_wait , 1 , blocking_operation ) ;
2024-11-20 19:40:17 +13:00
2025-06-05 12:49:02 +09:00
// Get the operation data to check if it was executed
rb_fiber_scheduler_blocking_operation_t * operation = get_blocking_operation ( blocking_operation ) ;
rb_atomic_t current_status = RUBY_ATOMIC_LOAD ( operation - > status ) ;
// Invalidate the operation now that we're done with it
operation - > function = NULL ;
operation - > state = NULL ;
operation - > data = NULL ;
operation - > data2 = NULL ;
operation - > unblock_function = NULL ;
2025-06-10 16:30:43 +09:00
// If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
if ( current_status = = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED ) {
2025-06-05 12:49:02 +09:00
return Qundef ;
}
return result ;
2024-11-20 19:40:17 +13:00
}
2025-05-23 14:55:05 +09:00
VALUE rb_fiber_scheduler_fiber_interrupt ( VALUE scheduler , VALUE fiber , VALUE exception )
{
VALUE arguments [ ] = {
fiber , exception
} ;
2025-05-24 20:33:08 +09:00
# ifdef RUBY_DEBUG
rb_execution_context_t * ec = GET_EC ( ) ;
2025-06-06 19:12:11 +09:00
if ( RUBY_VM_INTERRUPTED ( ec ) ) {
rb_bug ( " rb_fiber_scheduler_fiber_interrupt called with pending interrupt " ) ;
2025-05-24 20:33:08 +09:00
}
# endif
2025-05-23 14:55:05 +09:00
return rb_check_funcall ( scheduler , id_fiber_interrupt , 2 , arguments ) ;
}
2022-10-15 21:43:45 +13:00
/*
* Document - method : Fiber : : Scheduler # fiber
* call - seq : fiber ( & block )
*
* Implementation of the Fiber . schedule . The method is < em > expected < / em > to immediately
* run the given block of code in a separate non - blocking fiber , and to return that Fiber .
*
* Minimal suggested implementation is :
*
* def fiber ( & block )
* fiber = Fiber . new ( blocking : false , & block )
* fiber . resume
* fiber
* end
*/
VALUE
rb_fiber_scheduler_fiber ( VALUE scheduler , int argc , VALUE * argv , int kw_splat )
{
return rb_funcall_passing_block_kw ( scheduler , id_fiber_schedule , argc , argv , kw_splat ) ;
}
2025-06-05 12:49:02 +09:00
/*
* C API : Cancel a blocking operation
*
* This function cancels a blocking operation . If the operation is queued ,
* it just marks it as cancelled . If it ' s executing , it marks it as cancelled
* and calls the unblock function to interrupt the operation .
*
* Returns 1 if unblock function was called , 0 if just marked cancelled , - 1 on error .
*/
int
rb_fiber_scheduler_blocking_operation_cancel ( rb_fiber_scheduler_blocking_operation_t * blocking_operation )
{
if ( blocking_operation = = NULL ) {
return - 1 ;
}
rb_atomic_t current_state = RUBY_ATOMIC_LOAD ( blocking_operation - > status ) ;
switch ( current_state ) {
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED :
// Work hasn't started - just mark as cancelled
if ( RUBY_ATOMIC_CAS ( blocking_operation - > status , current_state , RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED ) = = current_state ) {
return 0 ; // Successfully cancelled before execution
}
// Fall through if state changed between load and CAS
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING :
// Work is running - mark cancelled AND call unblock function
RUBY_ATOMIC_SET ( blocking_operation - > status , RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED ) ;
if ( blocking_operation - > unblock_function ) {
blocking_operation - > unblock_function ( blocking_operation - > data2 ) ;
}
return 1 ; // Cancelled during execution (unblock function called)
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED :
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED :
// Already finished or cancelled
return 0 ;
}
return 0 ;
}