Allow IO#close to interrupt IO operations on fibers using fiber_interrupt hook. (#12839)

This commit is contained in:
Samuel Williams 2025-05-23 14:55:05 +09:00 committed by GitHub
parent e32054736f
commit 73c9d6ccaa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
Notes: git 2025-05-23 05:55:18 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
10 changed files with 328 additions and 24 deletions

View File

@ -35,6 +35,13 @@ Note: We're only listing outstanding class updates.
* Update Unicode to Version 16.0.0 and Emoji Version 16.0.
[[Feature #19908]][[Feature #20724]] (also applies to Regexp)
* Fiber::Scheduler
* Introduce `Fiber::Scheduler#fiber_interrupt` to interrupt a fiber with a
given exception. The initial use case is to interrupt a fiber that is
waiting on a blocking IO operation when the IO operation is closed.
[[Feature #21166]]
## Stdlib updates
The following bundled gems are promoted from default gems.
@ -134,6 +141,7 @@ The following bundled gems are updated.
[Feature #20724]: https://bugs.ruby-lang.org/issues/20724
[Feature #21047]: https://bugs.ruby-lang.org/issues/21047
[Bug #21049]: https://bugs.ruby-lang.org/issues/21049
[Feature #21166]: https://bugs.ruby-lang.org/issues/21166
[Feature #21216]: https://bugs.ruby-lang.org/issues/21216
[Feature #21258]: https://bugs.ruby-lang.org/issues/21258
[Feature #21287]: https://bugs.ruby-lang.org/issues/21287

13
benchmark/io_close.yml Normal file
View File

@ -0,0 +1,13 @@
prelude: |
ios = 1000.times.map do
100.times.map{IO.pipe}
end
benchmark:
# Close IO
io_close: |
# Process each batch of ios per iteration of the benchmark.
ios.pop.each do |r, w|
r.close
w.close
end
loop_count: 100

View File

@ -0,0 +1,21 @@
prelude: |
ios = 100.times.map do
10.times.map do
pipe = IO.pipe.tap do |r, w|
Thread.new do
r.read
rescue IOError
# Ignore
end
end
end
end
benchmark:
# Close IO
io_close_contended: |
# Process each batch of ios per iteration of the benchmark.
ios.pop.each do |r, w|
r.close
w.close
end
loop_count: 10

View File

@ -212,6 +212,64 @@ I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
I/O is blocking. Provided that there *is* a scheduler and the current thread *is
non-blocking*, the operation will invoke the scheduler.
##### `IO#close`
Closing an IO interrupts all blocking operations on that IO. When a thread calls `IO#close`, it first attempts to interrupt any threads or fibers that are blocked on that IO. The closing thread waits until all blocked threads and fibers have been properly interrupted and removed from the IO's blocking list. Each interrupted thread or fiber receives an `IOError` and is cleanly removed from the blocking operation. Only after all blocking operations have been interrupted and cleaned up will the actual file descriptor be closed, ensuring proper resource cleanup and preventing potential race conditions.
For fibers managed by a scheduler, the interruption process involves calling `rb_fiber_scheduler_fiber_interrupt` on the scheduler. This allows the scheduler to handle the interruption in a way that's appropriate for its event loop implementation. The scheduler can then notify the fiber, which will receive an `IOError` and be removed from the blocking operation. This mechanism ensures that fiber-based concurrency works correctly with IO operations, even when those operations are interrupted by `IO#close`.
```mermaid
sequenceDiagram
participant ThreadB
participant ThreadA
participant Scheduler
participant IO
participant Fiber1
participant Fiber2
Note over ThreadA: Thread A has a fiber scheduler
activate Scheduler
ThreadA->>Fiber1: Schedule Fiber 1
activate Fiber1
Fiber1->>IO: IO.read
IO->>Scheduler: rb_thread_io_blocking_region
deactivate Fiber1
ThreadA->>Fiber2: Schedule Fiber 2
activate Fiber2
Fiber2->>IO: IO.read
IO->>Scheduler: rb_thread_io_blocking_region
deactivate Fiber2
Note over Fiber1,Fiber2: Both fibers blocked on same IO
Note over ThreadB: IO.close
activate ThreadB
ThreadB->>IO: thread_io_close_notify_all
Note over ThreadB: rb_mutex_sleep
IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber1)
Scheduler->>Fiber1: fiber_interrupt with IOError
activate Fiber1
Note over IO: fiber_interrupt causes removal from blocking list
Fiber1->>IO: rb_io_blocking_operation_exit()
IO-->>ThreadB: Wakeup thread
deactivate Fiber1
IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber2)
Scheduler->>Fiber2: fiber_interrupt with IOError
activate Fiber2
Note over IO: fiber_interrupt causes removal from blocking list
Fiber2->>IO: rb_io_blocking_operation_exit()
IO-->>ThreadB: Wakeup thread
deactivate Fiber2
deactivate Scheduler
Note over ThreadB: Blocking operations list empty
ThreadB->>IO: close(fd)
deactivate ThreadB
```
#### Mutex
The `Mutex` class can be used in a non-blocking context and is fiber specific.

View File

@ -199,6 +199,8 @@ VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout);
/**
* Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
*
* This function may be called from a different thread.
*
* @param[in] scheduler Target scheduler.
* @param[in] blocker What was awaited for.
* @param[in] fiber What to unblock.
@ -411,6 +413,14 @@ struct rb_fiber_scheduler_blocking_operation_state {
*/
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);
/**
* Interrupt a fiber by raising an exception. You can construct an exception using `rb_make_exception`.
*
* This hook may be invoked by a different thread.
*
*/
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception);
/**
* Create and schedule a non-blocking fiber.
*

View File

@ -72,6 +72,9 @@ void *rb_thread_prevent_fork(void *(*func)(void *), void *data); /* for ext/sock
VALUE rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1);
VALUE rb_thread_io_blocking_call(struct rb_io *io, rb_blocking_function_t *func, void *data1, int events);
// Invoke the given function, with the specified argument, in a way that `IO#close` from another execution context can interrupt it.
VALUE rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);
/* thread.c (export) */
int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */

View File

@ -2733,7 +2733,6 @@ io_buffer_blocking_region_ensure(VALUE _argument)
static VALUE
io_buffer_blocking_region(VALUE io, struct rb_io_buffer *buffer, rb_blocking_function_t *function, void *data)
{
io = rb_io_get_io(io);
struct rb_io *ioptr;
RB_IO_POINTER(io, ioptr);
@ -2798,6 +2797,8 @@ io_buffer_read_internal(void *_argument)
VALUE
rb_io_buffer_read(VALUE self, VALUE io, size_t length, size_t offset)
{
io = rb_io_get_io(io);
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length, offset);
@ -2915,6 +2916,8 @@ io_buffer_pread_internal(void *_argument)
VALUE
rb_io_buffer_pread(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
{
io = rb_io_get_io(io);
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, self, length, offset);
@ -3035,6 +3038,8 @@ io_buffer_write_internal(void *_argument)
VALUE
rb_io_buffer_write(VALUE self, VALUE io, size_t length, size_t offset)
{
io = rb_io_get_write_io(rb_io_get_io(io));
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length, offset);
@ -3099,6 +3104,7 @@ io_buffer_write(int argc, VALUE *argv, VALUE self)
return rb_io_buffer_write(self, io, length, offset);
}
struct io_buffer_pwrite_internal_argument {
// The file descriptor to write to:
int descriptor;
@ -3144,6 +3150,8 @@ io_buffer_pwrite_internal(void *_argument)
VALUE
rb_io_buffer_pwrite(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
{
io = rb_io_get_write_io(rb_io_get_io(io));
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, self, length, offset);

View File

@ -37,6 +37,7 @@ static ID id_io_close;
static ID id_address_resolve;
static ID id_blocking_operation_wait;
static ID id_fiber_interrupt;
static ID id_fiber_schedule;
@ -116,6 +117,7 @@ Init_Fiber_Scheduler(void)
id_address_resolve = rb_intern_const("address_resolve");
id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
id_fiber_interrupt = rb_intern_const("fiber_interrupt");
id_fiber_schedule = rb_intern_const("fiber");
@ -442,10 +444,21 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
* Expected to return the subset of events that are ready immediately.
*
*/
static VALUE
fiber_scheduler_io_wait(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;
return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
}
VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
VALUE arguments[] = {
scheduler, io, events, timeout
};
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
}
VALUE
@ -515,14 +528,25 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
*
* The method should be considered _experimental_.
*/
static VALUE
fiber_scheduler_io_read(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;
return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
}
VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_read)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
}
/*
@ -539,14 +563,25 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
*
* The method should be considered _experimental_.
*/
static VALUE
fiber_scheduler_io_pread(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;
return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
}
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pread)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
}
/*
@ -577,14 +612,25 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
*
* The method should be considered _experimental_.
*/
static VALUE
fiber_scheduler_io_write(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;
return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
}
VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_write)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_write, 4, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
}
/*
@ -602,14 +648,25 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
* The method should be considered _experimental_.
*
*/
static VALUE
fiber_scheduler_io_pwrite(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;
return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
}
VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pwrite)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
}
VALUE
@ -766,6 +823,15 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
}
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
{
VALUE arguments[] = {
fiber, exception
};
return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
}
/*
* Document-method: Fiber::Scheduler#fiber
* call-seq: fiber(&block)

View File

@ -68,9 +68,15 @@ class Scheduler
def run
# $stderr.puts [__method__, Fiber.current].inspect
readable = writable = nil
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
# May only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
begin
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
rescue IOError
# Ignore - this can happen if the IO is closed while we are waiting.
end
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
@ -290,6 +296,30 @@ class Scheduler
io.write_nonblock('.')
end
class FiberInterrupt
def initialize(fiber, exception)
@fiber = fiber
@exception = exception
end
def alive?
@fiber.alive?
end
def transfer
@fiber.raise(@exception)
end
end
def fiber_interrupt(fiber, exception)
@lock.synchronize do
@ready << FiberInterrupt.new(fiber, exception)
end
io = @urgent.last
io.write_nonblock('.')
end
# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
# it to create scheduled fibers, but it is not required in practice;
# `Fiber.new` is usually sufficient.

113
thread.c
View File

@ -1698,7 +1698,8 @@ rb_io_blocking_operations(struct rb_io *io)
{
rb_serial_t fork_generation = GET_VM()->fork_gen;
// On fork, all existing entries in this list (which are stack allocated) become invalid. Therefore, we re-initialize the list which clears it.
// On fork, all existing entries in this list (which are stack allocated) become invalid.
// Therefore, we re-initialize the list which clears it.
if (io->fork_generation != fork_generation) {
ccan_list_head_init(&io->blocking_operations);
io->fork_generation = fork_generation;
@ -1707,6 +1708,16 @@ rb_io_blocking_operations(struct rb_io *io)
return &io->blocking_operations;
}
/*
* Registers a blocking operation for an IO object. This is used to track all threads and fibers
* that are currently blocked on this IO for reading, writing or other operations.
*
* When the IO is closed, all blocking operations will be notified via rb_fiber_scheduler_fiber_interrupt
* for fibers with a scheduler, or via rb_threadptr_interrupt for threads without a scheduler.
*
* @parameter io The IO object on which the operation will block
* @parameter blocking_operation The operation details including the execution context that will be blocked
*/
static void
rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
{
@ -1740,6 +1751,16 @@ io_blocking_operation_exit(VALUE _arguments)
return Qnil;
}
/*
* Called when a blocking operation completes or is interrupted. Removes the operation from
* the IO's blocking_operations list and wakes up any waiting threads/fibers.
*
* If there's a wakeup_mutex (meaning an IO close is in progress), synchronizes the cleanup
* through that mutex to ensure proper coordination with the closing thread.
*
* @parameter io The IO object the operation was performed on
* @parameter blocking_operation The completed operation to clean up
*/
static void
rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
{
@ -1758,6 +1779,49 @@ rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation
}
}
static VALUE
rb_thread_io_blocking_operation_ensure(VALUE _argument)
{
struct io_blocking_operation_arguments *arguments = (void*)_argument;
rb_io_blocking_operation_exit(arguments->io, arguments->blocking_operation);
return Qnil;
}
/*
* Executes a function that performs a blocking IO operation, while properly tracking
* the operation in the IO's blocking_operations list. This ensures proper cleanup
* and interruption handling if the IO is closed while blocked.
*
* The operation is automatically removed from the blocking_operations list when the function
* returns, whether normally or due to an exception.
*
* @parameter self The IO object
* @parameter function The function to execute that will perform the blocking operation
* @parameter argument The argument to pass to the function
* @returns The result of the blocking operation function
*/
VALUE
rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
{
struct rb_io *io;
RB_IO_POINTER(self, io);
rb_execution_context_t *ec = GET_EC();
struct rb_io_blocking_operation blocking_operation = {
.ec = ec,
};
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
struct io_blocking_operation_arguments io_blocking_operation_arguments = {
.io = io,
.blocking_operation = &blocking_operation
};
return rb_ensure(function, argument, rb_thread_io_blocking_operation_ensure, (VALUE)&io_blocking_operation_arguments);
}
static bool
thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
{
@ -1859,7 +1923,7 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
saved_errno = errno;
}, ubf_select, th, FALSE);
th = rb_ec_thread_ptr(ec);
RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
if (events &&
blocking_call_retryable_p((int)val, saved_errno) &&
thread_io_wait_events(th, fd, events, NULL)) {
@ -2672,10 +2736,30 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
return 1;
}
static size_t
thread_io_close_notify_all(struct rb_io *io)
/*
* Thread-safe IO closing mechanism.
*
* When an IO is closed while other threads or fibers are blocked on it, we need to:
* 1. Track and notify all blocking operations through io->blocking_operations
* 2. Ensure only one thread can close at a time using io->closing_ec
* 3. Synchronize cleanup using wakeup_mutex
*
* The close process works as follows:
* - First check if any thread is already closing (io->closing_ec)
* - Set up wakeup_mutex for synchronization
* - Iterate through all blocking operations in io->blocking_operations
* - For each blocked fiber with a scheduler:
* - Notify via rb_fiber_scheduler_fiber_interrupt
* - For each blocked thread without a scheduler:
* - Enqueue IOError via rb_threadptr_pending_interrupt_enque
* - Wake via rb_threadptr_interrupt
* - Wait on wakeup_mutex until all operations are cleaned up
* - Only then clear closing state and allow actual close to proceed
*/
static VALUE
thread_io_close_notify_all(VALUE _io)
{
RUBY_ASSERT_CRITICAL_SECTION_ENTER();
struct rb_io *io = (struct rb_io *)_io;
size_t count = 0;
rb_vm_t *vm = io->closing_ec->thread_ptr->vm;
@ -2687,17 +2771,17 @@ thread_io_close_notify_all(struct rb_io *io)
rb_thread_t *thread = ec->thread_ptr;
rb_threadptr_pending_interrupt_enque(thread, error);
// This operation is slow:
rb_threadptr_interrupt(thread);
if (thread->scheduler != Qnil) {
rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
} else {
rb_threadptr_pending_interrupt_enque(thread, error);
rb_threadptr_interrupt(thread);
}
count += 1;
}
RUBY_ASSERT_CRITICAL_SECTION_LEAVE();
return count;
return (VALUE)count;
}
size_t
@ -2720,7 +2804,10 @@ rb_thread_io_close_interrupt(struct rb_io *io)
// This is used to ensure the correct execution context is woken up after the blocking operation is interrupted:
io->wakeup_mutex = rb_mutex_new();
return thread_io_close_notify_all(io);
// We need to use a mutex here as entering the fiber scheduler may cause a context switch:
VALUE result = rb_mutex_synchronize(io->wakeup_mutex, thread_io_close_notify_all, (VALUE)io);
return (size_t)result;
}
void