2025-01-10 08:20:27 -08:00
export default async function pMap (
2020-10-02 17:52:19 -04:00
iterable ,
mapper ,
{
2025-01-10 08:20:27 -08:00
concurrency = Number . POSITIVE _INFINITY ,
stopOnError = true ,
signal ,
} = { } ,
) {
return new Promise ( ( resolve _ , reject _ ) => {
if ( iterable [ Symbol . iterator ] === undefined && iterable [ Symbol . asyncIterator ] === undefined ) {
throw new TypeError ( ` Expected \` input \` to be either an \` Iterable \` or \` AsyncIterable \` , got ( ${ typeof iterable } ) ` ) ;
}
2020-10-02 17:52:19 -04:00
if ( typeof mapper !== 'function' ) {
throw new TypeError ( 'Mapper function is required' ) ;
}
2025-01-10 08:20:27 -08:00
if ( ! ( ( Number . isSafeInteger ( concurrency ) && concurrency >= 1 ) || concurrency === Number . POSITIVE _INFINITY ) ) {
2020-10-02 17:52:19 -04:00
throw new TypeError ( ` Expected \` concurrency \` to be an integer from 1 and up or \` Infinity \` , got \` ${ concurrency } \` ( ${ typeof concurrency } ) ` ) ;
}
const result = [ ] ;
const errors = [ ] ;
2025-01-10 08:20:27 -08:00
const skippedIndexesMap = new Map ( ) ;
2020-10-02 17:52:19 -04:00
let isRejected = false ;
2025-01-10 08:20:27 -08:00
let isResolved = false ;
2020-10-02 17:52:19 -04:00
let isIterableDone = false ;
let resolvingCount = 0 ;
let currentIndex = 0 ;
2025-01-10 08:20:27 -08:00
const iterator = iterable [ Symbol . iterator ] === undefined ? iterable [ Symbol . asyncIterator ] ( ) : iterable [ Symbol . iterator ] ( ) ;
const signalListener = ( ) => {
reject ( signal . reason ) ;
} ;
2020-10-02 17:52:19 -04:00
2025-01-10 08:20:27 -08:00
const cleanup = ( ) => {
signal ? . removeEventListener ( 'abort' , signalListener ) ;
} ;
const resolve = value => {
resolve _ ( value ) ;
cleanup ( ) ;
} ;
const reject = reason => {
isRejected = true ;
isResolved = true ;
reject _ ( reason ) ;
cleanup ( ) ;
} ;
if ( signal ) {
if ( signal . aborted ) {
reject ( signal . reason ) ;
}
signal . addEventListener ( 'abort' , signalListener , { once : true } ) ;
}
const next = async ( ) => {
if ( isResolved ) {
2020-10-02 17:52:19 -04:00
return ;
}
2025-01-10 08:20:27 -08:00
const nextItem = await iterator . next ( ) ;
2020-10-02 17:52:19 -04:00
const index = currentIndex ;
currentIndex ++ ;
2025-01-10 08:20:27 -08:00
// Note: `iterator.next()` can be called many times in parallel.
// This can cause multiple calls to this `next()` function to
// receive a `nextItem` with `done === true`.
// The shutdown logic that rejects/resolves must be protected
// so it runs only one time as the `skippedIndex` logic is
// non-idempotent.
2020-10-02 17:52:19 -04:00
if ( nextItem . done ) {
isIterableDone = true ;
2025-01-10 08:20:27 -08:00
if ( resolvingCount === 0 && ! isResolved ) {
if ( ! stopOnError && errors . length > 0 ) {
reject ( new AggregateError ( errors ) ) ; // eslint-disable-line unicorn/error-message
return ;
}
isResolved = true ;
if ( skippedIndexesMap . size === 0 ) {
2020-10-02 17:52:19 -04:00
resolve ( result ) ;
2025-01-10 08:20:27 -08:00
return ;
}
const pureResult = [ ] ;
// Support multiple `pMapSkip`'s.
for ( const [ index , value ] of result . entries ( ) ) {
if ( skippedIndexesMap . get ( index ) === pMapSkip ) {
continue ;
}
pureResult . push ( value ) ;
2020-10-02 17:52:19 -04:00
}
2025-01-10 08:20:27 -08:00
resolve ( pureResult ) ;
2020-10-02 17:52:19 -04:00
}
return ;
}
resolvingCount ++ ;
2025-01-10 08:20:27 -08:00
// Intentionally detached
2020-10-02 17:52:19 -04:00
( async ( ) => {
try {
const element = await nextItem . value ;
2025-01-10 08:20:27 -08:00
if ( isResolved ) {
return ;
}
const value = await mapper ( element , index ) ;
// Use Map to stage the index of the element.
if ( value === pMapSkip ) {
skippedIndexesMap . set ( index , value ) ;
}
result [ index ] = value ;
2020-10-02 17:52:19 -04:00
resolvingCount -- ;
2025-01-10 08:20:27 -08:00
await next ( ) ;
2020-10-02 17:52:19 -04:00
} catch ( error ) {
if ( stopOnError ) {
reject ( error ) ;
} else {
errors . push ( error ) ;
resolvingCount -- ;
2025-01-10 08:20:27 -08:00
// In that case we can't really continue regardless of `stopOnError` state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling `next()` indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
await next ( ) ;
} catch ( error ) {
reject ( error ) ;
}
2020-10-02 17:52:19 -04:00
}
}
} ) ( ) ;
} ;
2025-01-10 08:20:27 -08:00
// Create the concurrent runners in a detached (non-awaited)
// promise. We need this so we can await the `next()` calls
// to stop creating runners before hitting the concurrency limit
// if the iterable has already been marked as done.
// NOTE: We *must* do this for async iterators otherwise we'll spin up
// infinite `next()` calls by default and never start the event loop.
( async ( ) => {
for ( let index = 0 ; index < concurrency ; index ++ ) {
try {
// eslint-disable-next-line no-await-in-loop
await next ( ) ;
} catch ( error ) {
reject ( error ) ;
break ;
}
2020-10-02 17:52:19 -04:00
2025-01-10 08:20:27 -08:00
if ( isIterableDone || isRejected ) {
break ;
}
2020-10-02 17:52:19 -04:00
}
2025-01-10 08:20:27 -08:00
} ) ( ) ;
2020-10-02 17:52:19 -04:00
} ) ;
2025-01-10 08:20:27 -08:00
}
export function pMapIterable (
iterable ,
mapper ,
{
concurrency = Number . POSITIVE _INFINITY ,
backpressure = concurrency ,
} = { } ,
) {
if ( iterable [ Symbol . iterator ] === undefined && iterable [ Symbol . asyncIterator ] === undefined ) {
throw new TypeError ( ` Expected \` input \` to be either an \` Iterable \` or \` AsyncIterable \` , got ( ${ typeof iterable } ) ` ) ;
}
if ( typeof mapper !== 'function' ) {
throw new TypeError ( 'Mapper function is required' ) ;
}
if ( ! ( ( Number . isSafeInteger ( concurrency ) && concurrency >= 1 ) || concurrency === Number . POSITIVE _INFINITY ) ) {
throw new TypeError ( ` Expected \` concurrency \` to be an integer from 1 and up or \` Infinity \` , got \` ${ concurrency } \` ( ${ typeof concurrency } ) ` ) ;
}
if ( ! ( ( Number . isSafeInteger ( backpressure ) && backpressure >= concurrency ) || backpressure === Number . POSITIVE _INFINITY ) ) {
throw new TypeError ( ` Expected \` backpressure \` to be an integer from \` concurrency \` ( ${ concurrency } ) and up or \` Infinity \` , got \` ${ backpressure } \` ( ${ typeof backpressure } ) ` ) ;
}
return {
async * [ Symbol . asyncIterator ] ( ) {
const iterator = iterable [ Symbol . asyncIterator ] === undefined ? iterable [ Symbol . iterator ] ( ) : iterable [ Symbol . asyncIterator ] ( ) ;
const promises = [ ] ;
let runningMappersCount = 0 ;
let isDone = false ;
let index = 0 ;
function trySpawn ( ) {
if ( isDone || ! ( runningMappersCount < concurrency && promises . length < backpressure ) ) {
return ;
}
const promise = ( async ( ) => {
const { done , value } = await iterator . next ( ) ;
if ( done ) {
return { done : true } ;
}
runningMappersCount ++ ;
// Spawn if still below concurrency and backpressure limit
trySpawn ( ) ;
try {
const returnValue = await mapper ( await value , index ++ ) ;
runningMappersCount -- ;
if ( returnValue === pMapSkip ) {
const index = promises . indexOf ( promise ) ;
if ( index > 0 ) {
promises . splice ( index , 1 ) ;
}
}
// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn ( ) ;
return { done : false , value : returnValue } ;
} catch ( error ) {
isDone = true ;
return { error } ;
}
} ) ( ) ;
promises . push ( promise ) ;
}
trySpawn ( ) ;
while ( promises . length > 0 ) {
const { error , done , value } = await promises [ 0 ] ; // eslint-disable-line no-await-in-loop
promises . shift ( ) ;
if ( error ) {
throw error ;
}
if ( done ) {
return ;
}
// Spawn if just dropped below backpressure limit and below the concurrency limit
trySpawn ( ) ;
if ( value === pMapSkip ) {
continue ;
}
yield value ;
}
} ,
} ;
}
export const pMapSkip = Symbol ( 'skip' ) ;