EYHN aa4874a55c
feat(core): use cloud indexer for search (#12899)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added enhanced error handling and user-friendly error messages in
quick search and document search menus.
  - Introduced loading state indicators for search operations.
  - Quick Search now provides explicit error feedback in the UI.

- **Improvements**
- Search and aggregation operations can now prefer remote or local
indexers based on user or system preference.
- Streamlined indexer logic for more consistent and reliable search
experiences.
- Refined error handling in messaging and synchronization layers for
improved stability.
- Enhanced error object handling in messaging for clearer error
propagation.
- Updated cloud workspace storage to always use IndexedDB locally and
CloudIndexer remotely.
- Shifted indexer operations to use synchronized indexer layer for
better consistency.
  - Simplified indexer client by consolidating storage and sync layers.
- Improved error propagation in messaging handlers by wrapping error
objects.
- Updated document search to prioritize remote indexer results by
default.

- **Bug Fixes**
- Improved robustness of search features by handling errors gracefully
and preventing potential runtime issues.

- **Style**
  - Added new styles for displaying error messages in search interfaces.

- **Chores**
- Removed the obsolete "Enable Cloud Indexer" feature flag; cloud
indexer behavior is now always enabled where applicable.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-06-25 02:55:27 +00:00

208 lines
5.4 KiB
TypeScript

import EventEmitter2 from 'eventemitter2';
import { pick } from 'lodash-es';
import { defer, from, fromEvent, Observable, of, take, takeUntil } from 'rxjs';
import { MANUALLY_STOP } from '../utils';
import {
AutoMessageHandler,
type CallMessage,
fetchTransferables,
type MessageHandlers,
type ReturnMessage,
type SubscribeMessage,
type SubscriptionCompleteMessage,
type SubscriptionErrorMessage,
type SubscriptionNextMessage,
} from './message';
import type { OpInput, OpNames, OpOutput, OpSchema } from './types';
interface OpCallContext {
signal: AbortSignal;
}
export type OpHandler<Ops extends OpSchema, Op extends OpNames<Ops>> = (
payload: OpInput<Ops, Op>[0],
ctx: OpCallContext
) =>
| OpOutput<Ops, Op>
| Promise<OpOutput<Ops, Op>>
| Observable<OpOutput<Ops, Op>>;
export class OpConsumer<Ops extends OpSchema> extends AutoMessageHandler {
private readonly eventBus = new EventEmitter2();
private readonly registeredOpHandlers = new Map<
OpNames<Ops>,
OpHandler<Ops, any>
>();
private readonly processing = new Map<string, AbortController>();
override get handlers() {
return {
call: this.handleCallMessage,
cancel: this.handleCancelMessage,
subscribe: this.handleSubscribeMessage,
unsubscribe: this.handleCancelMessage,
};
}
private readonly handleCallMessage: MessageHandlers['call'] = msg => {
const abortController = new AbortController();
this.processing.set(msg.id, abortController);
this.eventBus.emit(`before:${msg.name}`, msg.payload);
this.ob$(msg, abortController.signal)
.pipe(take(1))
.subscribe({
next: data => {
this.eventBus.emit(`after:${msg.name}`, msg.payload, data);
const transferables = fetchTransferables(data);
this.port.postMessage(
{
type: 'return',
id: msg.id,
data,
} satisfies ReturnMessage,
{ transfer: transferables }
);
},
error: error => {
this.port.postMessage({
type: 'return',
id: msg.id,
error: pick(error, [
'name',
'message',
'code',
'type',
'status',
'data',
'stacktrace',
]),
} satisfies ReturnMessage);
},
complete: () => {
this.processing.delete(msg.id);
},
});
};
private readonly handleSubscribeMessage: MessageHandlers['subscribe'] =
msg => {
const abortController = new AbortController();
this.processing.set(msg.id, abortController);
this.ob$(msg, abortController.signal).subscribe({
next: data => {
const transferables = fetchTransferables(data);
this.port.postMessage(
{
type: 'next',
id: msg.id,
data,
} satisfies SubscriptionNextMessage,
{ transfer: transferables }
);
},
error: error => {
this.port.postMessage({
type: 'error',
id: msg.id,
error: pick(error, [
'name',
'message',
'code',
'type',
'status',
'data',
'stacktrace',
]),
} satisfies SubscriptionErrorMessage);
},
complete: () => {
this.port.postMessage({
type: 'complete',
id: msg.id,
} satisfies SubscriptionCompleteMessage);
this.processing.delete(msg.id);
},
});
};
private readonly handleCancelMessage: MessageHandlers['cancel'] &
MessageHandlers['unsubscribe'] = msg => {
const abortController = this.processing.get(msg.id);
if (!abortController) {
return;
}
abortController.abort(MANUALLY_STOP);
};
register<Op extends OpNames<Ops>>(op: Op, handler: OpHandler<Ops, Op>) {
this.registeredOpHandlers.set(op, handler);
}
registerAll(
handlers: OpNames<Ops> extends string
? { [K in OpNames<Ops>]: OpHandler<Ops, K> }
: never
) {
for (const [op, handler] of Object.entries(handlers)) {
this.register(op as any, handler as any);
}
}
before<Op extends OpNames<Ops>>(
op: Op,
handler: (...input: OpInput<Ops, Op>) => void
) {
this.eventBus.on(`before:${op}`, handler);
}
after<Op extends OpNames<Ops>>(
op: Op,
handler: (...args: [...OpInput<Ops, Op>, OpOutput<Ops, Op>]) => void
) {
this.eventBus.on(`after:${op}`, handler);
}
/**
* @internal
*/
ob$(op: CallMessage | SubscribeMessage, signal: AbortSignal) {
return defer(() => {
const handler = this.registeredOpHandlers.get(op.name as any);
if (!handler) {
throw new Error(
`Handler for operation [${op.name}] is not registered.`
);
}
const ret$ = handler(op.payload, { signal });
let ob$: Observable<any>;
if (ret$ instanceof Promise) {
ob$ = from(ret$);
} else if (ret$ instanceof Observable) {
ob$ = ret$;
} else {
ob$ = of(ret$);
}
return ob$.pipe(takeUntil(fromEvent(signal, 'abort')));
});
}
destroy() {
super.close();
this.registeredOpHandlers.clear();
this.processing.forEach(controller => {
controller.abort(MANUALLY_STOP);
});
this.processing.clear();
this.eventBus.removeAllListeners();
}
}