Fix blocking_operation_wait
use-after-free bug.
This commit is contained in:
parent
81a23c5793
commit
ead14b19aa
Notes:
git
2025-06-06 04:13:28 +00:00
@ -394,11 +394,54 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io);
|
|||||||
*/
|
*/
|
||||||
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);
|
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);
|
||||||
|
|
||||||
|
// The state of the blocking operation execution.
|
||||||
struct rb_fiber_scheduler_blocking_operation_state {
|
struct rb_fiber_scheduler_blocking_operation_state {
|
||||||
void *result;
|
void *result;
|
||||||
int saved_errno;
|
int saved_errno;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// The opaque handle for the blocking operation.
|
||||||
|
typedef struct rb_fiber_scheduler_blocking_operation rb_fiber_scheduler_blocking_operation_t;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the blocking operation handle from a BlockingOperationRuby object.
|
||||||
|
*
|
||||||
|
* This function safely extracts the opaque handle from a BlockingOperation VALUE
|
||||||
|
* while holding the GVL. The returned pointer can be passed to worker threads
|
||||||
|
* and used with rb_fiber_scheduler_blocking_operation_execute.
|
||||||
|
*
|
||||||
|
* @param[in] self The BlockingOperation VALUE to extract from
|
||||||
|
* @return The opaque struct pointer on success, NULL on error
|
||||||
|
* @note Experimental.
|
||||||
|
*/
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *rb_fiber_scheduler_blocking_operation_extract(VALUE self);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute blocking operation from handle (GVL not required).
|
||||||
|
*
|
||||||
|
* This function executes a blocking operation using the opaque handle
|
||||||
|
* obtained from rb_fiber_scheduler_blocking_operation_extract.
|
||||||
|
* It can be called from native threads without holding the GVL.
|
||||||
|
*
|
||||||
|
* @param[in] blocking_operation The opaque handle.
|
||||||
|
* @return 0 on success, -1 on error.
|
||||||
|
* @note Experimental. Can be called from any thread without holding the GVL
|
||||||
|
*/
|
||||||
|
int rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel a blocking operation.
|
||||||
|
*
|
||||||
|
* This function cancels a blocking operation. If the operation is queued,
|
||||||
|
* it just marks it as cancelled. If it's executing, it marks it as cancelled
|
||||||
|
* and calls the unblock function to interrupt the operation.
|
||||||
|
*
|
||||||
|
* @param blocking_operation The opaque struct pointer
|
||||||
|
* @return 1 if unblock function was called, 0 if just marked cancelled, -1 on error
|
||||||
|
* @note Experimental.
|
||||||
|
*/
|
||||||
|
int rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defer the execution of the passed function to the scheduler.
|
* Defer the execution of the passed function to the scheduler.
|
||||||
*
|
*
|
||||||
|
2
inits.c
2
inits.c
@ -63,9 +63,9 @@ rb_call_inits(void)
|
|||||||
CALL(ISeq);
|
CALL(ISeq);
|
||||||
CALL(Thread);
|
CALL(Thread);
|
||||||
CALL(signal);
|
CALL(signal);
|
||||||
|
CALL(Cont);
|
||||||
CALL(Fiber_Scheduler);
|
CALL(Fiber_Scheduler);
|
||||||
CALL(process);
|
CALL(process);
|
||||||
CALL(Cont);
|
|
||||||
CALL(Rational);
|
CALL(Rational);
|
||||||
CALL(Complex);
|
CALL(Complex);
|
||||||
CALL(MemoryView);
|
CALL(MemoryView);
|
||||||
|
345
scheduler.c
345
scheduler.c
@ -15,9 +15,12 @@
|
|||||||
|
|
||||||
#include "ruby/thread.h"
|
#include "ruby/thread.h"
|
||||||
|
|
||||||
// For `ruby_thread_has_gvl_p`.
|
// For `ruby_thread_has_gvl_p`:
|
||||||
#include "internal/thread.h"
|
#include "internal/thread.h"
|
||||||
|
|
||||||
|
// For atomic operations:
|
||||||
|
#include "ruby_atomic.h"
|
||||||
|
|
||||||
static ID id_close;
|
static ID id_close;
|
||||||
static ID id_scheduler_close;
|
static ID id_scheduler_close;
|
||||||
|
|
||||||
@ -41,7 +44,219 @@ static ID id_fiber_interrupt;
|
|||||||
|
|
||||||
static ID id_fiber_schedule;
|
static ID id_fiber_schedule;
|
||||||
|
|
||||||
|
// Our custom blocking operation class
|
||||||
|
static VALUE rb_cFiberSchedulerBlockingOperation;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
* Custom blocking operation structure for blocking operations
|
||||||
|
* This replaces the use of Ruby procs to avoid use-after-free issues
|
||||||
|
* and provides a cleaner C API for native work pools.
|
||||||
|
*/
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started
|
||||||
|
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
|
||||||
|
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
|
||||||
|
RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled
|
||||||
|
} rb_fiber_blocking_operation_status_t;
|
||||||
|
|
||||||
|
struct rb_fiber_scheduler_blocking_operation {
|
||||||
|
void *(*function)(void *);
|
||||||
|
void *data;
|
||||||
|
rb_unblock_function_t *unblock_function;
|
||||||
|
void *data2;
|
||||||
|
int flags;
|
||||||
|
struct rb_fiber_scheduler_blocking_operation_state *state;
|
||||||
|
|
||||||
|
// Execution status
|
||||||
|
volatile rb_atomic_t status;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void
|
||||||
|
blocking_operation_mark(void *ptr)
|
||||||
|
{
|
||||||
|
// No Ruby objects to mark in our struct
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blocking_operation_free(void *ptr)
|
||||||
|
{
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *blocking_operation = (rb_fiber_scheduler_blocking_operation_t *)ptr;
|
||||||
|
ruby_xfree(blocking_operation);
|
||||||
|
}
|
||||||
|
|
||||||
|
static size_t
|
||||||
|
blocking_operation_memsize(const void *ptr)
|
||||||
|
{
|
||||||
|
return sizeof(rb_fiber_scheduler_blocking_operation_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
static const rb_data_type_t blocking_operation_data_type = {
|
||||||
|
"Fiber::Scheduler::BlockingOperation",
|
||||||
|
{
|
||||||
|
blocking_operation_mark,
|
||||||
|
blocking_operation_free,
|
||||||
|
blocking_operation_memsize,
|
||||||
|
},
|
||||||
|
0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Allocate a new blocking operation
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
blocking_operation_alloc(VALUE klass)
|
||||||
|
{
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *blocking_operation;
|
||||||
|
VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
|
||||||
|
|
||||||
|
blocking_operation->function = NULL;
|
||||||
|
blocking_operation->data = NULL;
|
||||||
|
blocking_operation->unblock_function = NULL;
|
||||||
|
blocking_operation->data2 = NULL;
|
||||||
|
blocking_operation->flags = 0;
|
||||||
|
blocking_operation->state = NULL;
|
||||||
|
blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
|
||||||
|
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get the blocking operation struct from a Ruby object
|
||||||
|
*/
|
||||||
|
static rb_fiber_scheduler_blocking_operation_t *
|
||||||
|
get_blocking_operation(VALUE obj)
|
||||||
|
{
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *blocking_operation;
|
||||||
|
TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
|
||||||
|
return blocking_operation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: Fiber::Scheduler::BlockingOperation#call
|
||||||
|
*
|
||||||
|
* Execute the blocking operation. This method releases the GVL and calls
|
||||||
|
* the blocking function, then restores the errno value.
|
||||||
|
*
|
||||||
|
* Returns nil. The actual result is stored in the associated state object.
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
blocking_operation_call(VALUE self)
|
||||||
|
{
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
|
||||||
|
|
||||||
|
if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
|
||||||
|
rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blocking_operation->function == NULL) {
|
||||||
|
rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blocking_operation->state == NULL) {
|
||||||
|
rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark as executing
|
||||||
|
blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
|
||||||
|
|
||||||
|
// Execute the blocking operation without GVL
|
||||||
|
blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
|
||||||
|
blocking_operation->unblock_function, blocking_operation->data2,
|
||||||
|
blocking_operation->flags);
|
||||||
|
blocking_operation->state->saved_errno = rb_errno();
|
||||||
|
|
||||||
|
// Mark as completed
|
||||||
|
blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
|
||||||
|
|
||||||
|
return Qnil;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* C API: Extract blocking operation struct from Ruby object (GVL required)
|
||||||
|
*
|
||||||
|
* This function safely extracts the opaque struct from a BlockingOperation VALUE
|
||||||
|
* while holding the GVL. The returned pointer can be passed to worker threads
|
||||||
|
* and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
|
||||||
|
*
|
||||||
|
* Returns the opaque struct pointer on success, NULL on error.
|
||||||
|
* Must be called while holding the GVL.
|
||||||
|
*/
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *
|
||||||
|
rb_fiber_scheduler_blocking_operation_extract(VALUE self)
|
||||||
|
{
|
||||||
|
return get_blocking_operation(self);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* C API: Execute blocking operation from opaque struct (GVL not required)
|
||||||
|
*
|
||||||
|
* This function executes a blocking operation using the opaque struct pointer
|
||||||
|
* obtained from rb_fiber_scheduler_blocking_operation_extract.
|
||||||
|
* It can be called from native threads without holding the GVL.
|
||||||
|
*
|
||||||
|
* Returns 0 on success, -1 on error.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
|
||||||
|
{
|
||||||
|
if (blocking_operation == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
|
||||||
|
return -1; // Invalid blocking operation
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomically check if we can transition from QUEUED to EXECUTING
|
||||||
|
rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
|
||||||
|
if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
|
||||||
|
// Already cancelled or in wrong state
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we're executing - call the function
|
||||||
|
blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
|
||||||
|
blocking_operation->state->saved_errno = errno;
|
||||||
|
|
||||||
|
// Atomically transition to completed (unless cancelled during execution)
|
||||||
|
expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
|
||||||
|
if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
|
||||||
|
// Successfully completed
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
// Was cancelled during execution
|
||||||
|
blocking_operation->state->saved_errno = EINTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* C API: Create a new blocking operation
|
||||||
|
*
|
||||||
|
* This creates a blocking operation that can be executed by native work pools.
|
||||||
|
* The blocking operation holds references to the function and data safely.
|
||||||
|
*/
|
||||||
|
VALUE
|
||||||
|
rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
|
||||||
|
rb_unblock_function_t *unblock_function, void *data2,
|
||||||
|
int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
|
||||||
|
{
|
||||||
|
VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
|
||||||
|
|
||||||
|
blocking_operation->function = function;
|
||||||
|
blocking_operation->data = data;
|
||||||
|
blocking_operation->unblock_function = unblock_function;
|
||||||
|
blocking_operation->data2 = data2;
|
||||||
|
blocking_operation->flags = flags;
|
||||||
|
blocking_operation->state = state;
|
||||||
|
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
* Document-class: Fiber::Scheduler
|
* Document-class: Fiber::Scheduler
|
||||||
*
|
*
|
||||||
* This is not an existing class, but documentation of the interface that Scheduler
|
* This is not an existing class, but documentation of the interface that Scheduler
|
||||||
@ -121,6 +336,15 @@ Init_Fiber_Scheduler(void)
|
|||||||
|
|
||||||
id_fiber_schedule = rb_intern_const("fiber");
|
id_fiber_schedule = rb_intern_const("fiber");
|
||||||
|
|
||||||
|
// Define an anonymous BlockingOperation class for internal use only
|
||||||
|
// This is completely hidden from Ruby code and cannot be instantiated directly
|
||||||
|
rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
|
||||||
|
rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
|
||||||
|
rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
|
||||||
|
|
||||||
|
// Register the anonymous class as a GC root so it doesn't get collected
|
||||||
|
rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
|
||||||
|
|
||||||
#if 0 /* for RDoc */
|
#if 0 /* for RDoc */
|
||||||
rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
|
rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
|
||||||
rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
|
rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
|
||||||
@ -136,7 +360,7 @@ Init_Fiber_Scheduler(void)
|
|||||||
rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
|
rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
|
||||||
rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
|
rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
|
||||||
rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
|
rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
|
||||||
rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler, -2);
|
rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
|
||||||
rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
|
rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
@ -798,60 +1022,52 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
|
|||||||
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
|
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct rb_blocking_operation_wait_arguments {
|
|
||||||
void *(*function)(void *);
|
|
||||||
void *data;
|
|
||||||
rb_unblock_function_t *unblock_function;
|
|
||||||
void *data2;
|
|
||||||
int flags;
|
|
||||||
|
|
||||||
struct rb_fiber_scheduler_blocking_operation_state *state;
|
|
||||||
};
|
|
||||||
|
|
||||||
static VALUE
|
|
||||||
rb_fiber_scheduler_blocking_operation_wait_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments))
|
|
||||||
{
|
|
||||||
struct rb_blocking_operation_wait_arguments *arguments = (struct rb_blocking_operation_wait_arguments*)_arguments;
|
|
||||||
|
|
||||||
if (arguments->state == NULL) {
|
|
||||||
rb_raise(rb_eRuntimeError, "Blocking function was already invoked!");
|
|
||||||
}
|
|
||||||
|
|
||||||
arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
|
|
||||||
arguments->state->saved_errno = rb_errno();
|
|
||||||
|
|
||||||
// Make sure it's only invoked once.
|
|
||||||
arguments->state = NULL;
|
|
||||||
|
|
||||||
return Qnil;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Document-method: Fiber::Scheduler#blocking_operation_wait
|
* Document-method: Fiber::Scheduler#blocking_operation_wait
|
||||||
* call-seq: blocking_operation_wait(work)
|
* call-seq: blocking_operation_wait(blocking_operation)
|
||||||
*
|
*
|
||||||
* Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
|
* Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
|
||||||
|
* The blocking_operation is a Fiber::Scheduler::BlockingOperation that encapsulates the blocking operation.
|
||||||
|
*
|
||||||
|
* If the scheduler doesn't implement this method, or if the scheduler doesn't execute
|
||||||
|
* the blocking operation, Ruby will fall back to the non-scheduler implementation.
|
||||||
*
|
*
|
||||||
* Minimal suggested implementation is:
|
* Minimal suggested implementation is:
|
||||||
*
|
*
|
||||||
* def blocking_operation_wait(work)
|
* def blocking_operation_wait(blocking_operation)
|
||||||
* Thread.new(&work).join
|
* Thread.new { blocking_operation.call }.join
|
||||||
* end
|
* end
|
||||||
*/
|
*/
|
||||||
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
|
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
|
||||||
{
|
{
|
||||||
struct rb_blocking_operation_wait_arguments arguments = {
|
// Check if scheduler supports blocking_operation_wait before creating the object
|
||||||
.function = function,
|
if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
|
||||||
.data = data,
|
return Qundef;
|
||||||
.unblock_function = unblock_function,
|
}
|
||||||
.data2 = data2,
|
|
||||||
.flags = flags,
|
|
||||||
.state = state
|
|
||||||
};
|
|
||||||
|
|
||||||
VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments);
|
// Create a new BlockingOperation with the blocking operation
|
||||||
|
VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
|
||||||
|
|
||||||
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
|
VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
|
||||||
|
|
||||||
|
// Get the operation data to check if it was executed
|
||||||
|
rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
|
||||||
|
rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
|
||||||
|
|
||||||
|
// Invalidate the operation now that we're done with it
|
||||||
|
operation->function = NULL;
|
||||||
|
operation->state = NULL;
|
||||||
|
operation->data = NULL;
|
||||||
|
operation->data2 = NULL;
|
||||||
|
operation->unblock_function = NULL;
|
||||||
|
|
||||||
|
// If the blocking operation was never executed, return Qundef to signal
|
||||||
|
// the caller to use rb_nogvl instead
|
||||||
|
if (current_status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) {
|
||||||
|
return Qundef;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
|
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
|
||||||
@ -890,3 +1106,46 @@ rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
|
|||||||
{
|
{
|
||||||
return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
|
return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* C API: Cancel a blocking operation
|
||||||
|
*
|
||||||
|
* This function cancels a blocking operation. If the operation is queued,
|
||||||
|
* it just marks it as cancelled. If it's executing, it marks it as cancelled
|
||||||
|
* and calls the unblock function to interrupt the operation.
|
||||||
|
*
|
||||||
|
* Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
|
||||||
|
{
|
||||||
|
if (blocking_operation == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
|
||||||
|
|
||||||
|
switch (current_state) {
|
||||||
|
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
|
||||||
|
// Work hasn't started - just mark as cancelled
|
||||||
|
if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
|
||||||
|
return 0; // Successfully cancelled before execution
|
||||||
|
}
|
||||||
|
// Fall through if state changed between load and CAS
|
||||||
|
|
||||||
|
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
|
||||||
|
// Work is running - mark cancelled AND call unblock function
|
||||||
|
RUBY_ATOMIC_SET(blocking_operation->status, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED);
|
||||||
|
if (blocking_operation->unblock_function) {
|
||||||
|
blocking_operation->unblock_function(blocking_operation->data2);
|
||||||
|
}
|
||||||
|
return 1; // Cancelled during execution (unblock function called)
|
||||||
|
|
||||||
|
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
|
||||||
|
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
|
||||||
|
// Already finished or cancelled
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
@ -341,7 +341,7 @@ class Scheduler
|
|||||||
end
|
end
|
||||||
|
|
||||||
def blocking_operation_wait(work)
|
def blocking_operation_wait(work)
|
||||||
thread = Thread.new(&work)
|
thread = Thread.new{work.call}
|
||||||
|
|
||||||
thread.join
|
thread.join
|
||||||
|
|
||||||
|
@ -16,9 +16,8 @@ class TestFiberIOClose < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Problematic on Windows.
|
|
||||||
def test_io_close_across_fibers
|
def test_io_close_across_fibers
|
||||||
omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
# omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
||||||
|
|
||||||
with_socket_pair do |i, o|
|
with_socket_pair do |i, o|
|
||||||
error = nil
|
error = nil
|
||||||
@ -45,7 +44,6 @@ class TestFiberIOClose < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Okay on all platforms.
|
|
||||||
def test_io_close_blocking_thread
|
def test_io_close_blocking_thread
|
||||||
omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
||||||
|
|
||||||
@ -77,9 +75,8 @@ class TestFiberIOClose < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Problematic on Windows.
|
|
||||||
def test_io_close_blocking_fiber
|
def test_io_close_blocking_fiber
|
||||||
omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
# omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
||||||
|
|
||||||
with_socket_pair do |i, o|
|
with_socket_pair do |i, o|
|
||||||
error = nil
|
error = nil
|
||||||
|
2
thread.c
2
thread.c
@ -1552,7 +1552,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
|
|||||||
if (flags & RB_NOGVL_OFFLOAD_SAFE) {
|
if (flags & RB_NOGVL_OFFLOAD_SAFE) {
|
||||||
VALUE scheduler = rb_fiber_scheduler_current();
|
VALUE scheduler = rb_fiber_scheduler_current();
|
||||||
if (scheduler != Qnil) {
|
if (scheduler != Qnil) {
|
||||||
struct rb_fiber_scheduler_blocking_operation_state state;
|
struct rb_fiber_scheduler_blocking_operation_state state = {0};
|
||||||
|
|
||||||
VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);
|
VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user