* refactor(crypto): remove circular dependency * refactor(crypto): expose compress/decompress as part of the DidKeyPlugin interface * fix(crypto): remove import from private file * refactor: isolate tsconfig * fix: remove unused bench file * chore(repo): remove unused deps * fix(ozone): properly list dependencies * fix(services): do lint js files * fix(services/pds): remove unused deps * chore(pds): remove bench * chore(dev-env): remove unused deps * chore(api): remove bench * remove unused babel.config.js files * fix: remove .ts extension from import * fix(pds): remove imports of src files * fix(tsconfig): properly list all projects * fix(dev-env): remove imports of src files * fix(bsky): remove direct import to crypto src * fix(api): remove imports to api internals * chore(build): prevent bundling of built output * chore(dev): add "dev" script to build in watch mode * chore(deps): move ts-node dependency where it is actually used * fix(deps): add dev-env as project dependency * fix(xrpc-server): properly type kexicon * fix(bsky): improve typings * fix(pds): fully type formatRecordEmbedInternal return value * fix(repo): remove imports from @ipld/car/api * feat(dev-env): re-export BskyIngester * fix: properly lint & type jest config & test files * fix(ci): test after build * fix(types): use NodeJS.Timeout instead of NodeJS.Timer * fix(bsky): make types exportable * fix(ozone): make types exportable * fix(xrpc-server): make types exportable * fix(xprc-server): make code compliant with "node" types * fix(xrpc-server): avoid accessing properties of unknown * chore(deps): update @types/node * feat(tsconfig): narrow down available types depending on the package's target environment * fix(pds): remove unused prop * fix(bsync): Database's migrator not always initialized * fix(dev-env): remove unreachable code * fix(xrpc-server): remove unused import * fix(xrpc-server): mark header property as abstract * fix(pds): initialize LeakyTxPlugin's txOver property * fix(bsky): initialize LeakyTxPlugin's txOver property * fix(bsky): remove unused migrator from DatabaseCoordinator * fix(bsky): Properly initialize LabelService's cache property * fix(ozone): Database's migrator not initialized * fix(ozone): initialize LeakyTxPlugin's txOver property * fix(crypto): ignore unused variable error * feat(tsconfig): use stricter rules * feat(tsconfig): enable useDefineForClassFields * feat(xrpc-server): add support for brotli incoming payload * fix(xrpc-server): properly parse & process content-encoding * fix(common:stream): always call cb in _transform * tidy/fix tests and service entrypoints * Revert "fix(xrpc-server): properly parse & process content-encoding" This reverts commit 2b1c66e153820d3e128fc839fcc1834d52a66686. * Revert "feat(xrpc-server): add support for brotli incoming payload" This reverts commit e710c21e6118214ddf215b0515e68cb87299a952. * remove special node env for tests (defaults to jest val of "test") * kill mute sync handler on disconnect * work around connect-es bug w/ request aborts * style(crypto): rename imports from uint8arrays * fix update package-lock * fix lint * force hbs files to be bundled as cjs * fix: use concurrently instead of npm-run-all npm-run-all seems not to be maintained anymore. Additionally, concurrently better forwards signals to child processes. * remove concurrently alltogether * ignore sqlite files in services/pds * fix verify * fix verify * tidy, fix verify * fix blob diversion test * build rework changeset --------- Co-authored-by: Devin Ivy <devinivy@gmail.com>
186 lines
6.0 KiB
TypeScript
186 lines
6.0 KiB
TypeScript
import { wait } from '@atproto/common'
|
|
import { randomStr } from '@atproto/crypto'
|
|
import {
|
|
ConsecutiveList,
|
|
LatestQueue,
|
|
PartitionedQueue,
|
|
} from '../../../src/data-plane/server/subscription/util'
|
|
|
|
describe('subscription utils', () => {
|
|
describe('ConsecutiveList', () => {
|
|
it('tracks consecutive complete items.', () => {
|
|
const consecutive = new ConsecutiveList<number>()
|
|
// add items
|
|
const item1 = consecutive.push(1)
|
|
const item2 = consecutive.push(2)
|
|
const item3 = consecutive.push(3)
|
|
expect(item1.isComplete).toEqual(false)
|
|
expect(item2.isComplete).toEqual(false)
|
|
expect(item3.isComplete).toEqual(false)
|
|
// complete items out of order
|
|
expect(consecutive.list.length).toBe(3)
|
|
expect(item2.complete()).toEqual([])
|
|
expect(item2.isComplete).toEqual(true)
|
|
expect(consecutive.list.length).toBe(3)
|
|
expect(item1.complete()).toEqual([1, 2])
|
|
expect(item1.isComplete).toEqual(true)
|
|
expect(consecutive.list.length).toBe(1)
|
|
expect(item3.complete()).toEqual([3])
|
|
expect(consecutive.list.length).toBe(0)
|
|
expect(item3.isComplete).toEqual(true)
|
|
})
|
|
})
|
|
|
|
describe('LatestQueue', () => {
|
|
it('only performs most recently queued item.', async () => {
|
|
const latest = new LatestQueue()
|
|
const complete: number[] = []
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(1)
|
|
})
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(2)
|
|
})
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(3)
|
|
})
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(4)
|
|
})
|
|
await latest.queue.onIdle()
|
|
expect(complete).toEqual([1, 4]) // skip 2, 3
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(5)
|
|
})
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(6)
|
|
})
|
|
await latest.queue.onIdle()
|
|
expect(complete).toEqual([1, 4, 5, 6])
|
|
})
|
|
|
|
it('stops processing queued messages on destroy.', async () => {
|
|
const latest = new LatestQueue()
|
|
const complete: number[] = []
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(1)
|
|
})
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(2)
|
|
})
|
|
const destroyed = latest.destroy()
|
|
latest.add(async () => {
|
|
await wait(1)
|
|
complete.push(3)
|
|
})
|
|
await destroyed
|
|
expect(complete).toEqual([1]) // 2 was cleared, 3 was after destroy
|
|
// show that waiting on destroyed above was already enough to reflect all complete items
|
|
await latest.queue.onIdle()
|
|
expect(complete).toEqual([1])
|
|
})
|
|
})
|
|
|
|
describe('PartitionedQueue', () => {
|
|
it('performs work in parallel across partitions, serial within a partition.', async () => {
|
|
const partitioned = new PartitionedQueue({ concurrency: Infinity })
|
|
const complete: number[] = []
|
|
// partition 1 items start slow but get faster: slow should still complete first.
|
|
partitioned.add('1', async () => {
|
|
await wait(30)
|
|
complete.push(11)
|
|
})
|
|
partitioned.add('1', async () => {
|
|
await wait(20)
|
|
complete.push(12)
|
|
})
|
|
partitioned.add('1', async () => {
|
|
await wait(1)
|
|
complete.push(13)
|
|
})
|
|
expect(partitioned.partitions.size).toEqual(1)
|
|
// partition 2 items complete quickly except the last, which is slowest of all events.
|
|
partitioned.add('2', async () => {
|
|
await wait(1)
|
|
complete.push(21)
|
|
})
|
|
partitioned.add('2', async () => {
|
|
await wait(1)
|
|
complete.push(22)
|
|
})
|
|
partitioned.add('2', async () => {
|
|
await wait(1)
|
|
complete.push(23)
|
|
})
|
|
partitioned.add('2', async () => {
|
|
await wait(60)
|
|
complete.push(24)
|
|
})
|
|
expect(partitioned.partitions.size).toEqual(2)
|
|
await partitioned.main.onIdle()
|
|
expect(complete).toEqual([21, 22, 23, 11, 12, 13, 24])
|
|
expect(partitioned.partitions.size).toEqual(0)
|
|
})
|
|
|
|
it('limits overall concurrency.', async () => {
|
|
const partitioned = new PartitionedQueue({ concurrency: 1 })
|
|
const complete: number[] = []
|
|
// if concurrency were not constrained, partition 1 would complete all items
|
|
// before any items from partition 2. since it is constrained, the work is complete in the order added.
|
|
partitioned.add('1', async () => {
|
|
await wait(1)
|
|
complete.push(11)
|
|
})
|
|
partitioned.add('2', async () => {
|
|
await wait(10)
|
|
complete.push(21)
|
|
})
|
|
partitioned.add('1', async () => {
|
|
await wait(1)
|
|
complete.push(12)
|
|
})
|
|
partitioned.add('2', async () => {
|
|
await wait(10)
|
|
complete.push(22)
|
|
})
|
|
// only partition 1 exists so far due to the concurrency
|
|
expect(partitioned.partitions.size).toEqual(1)
|
|
await partitioned.main.onIdle()
|
|
expect(complete).toEqual([11, 21, 12, 22])
|
|
expect(partitioned.partitions.size).toEqual(0)
|
|
})
|
|
|
|
it('settles with many items.', async () => {
|
|
const partitioned = new PartitionedQueue({ concurrency: 100 })
|
|
const complete: { partition: string; id: number }[] = []
|
|
const partitions = new Set<string>()
|
|
for (let i = 0; i < 500; ++i) {
|
|
const partition = randomStr(1, 'base16').slice(0, 1)
|
|
partitions.add(partition)
|
|
partitioned.add(partition, async () => {
|
|
await wait((i % 2) * 2)
|
|
complete.push({ partition, id: i })
|
|
})
|
|
}
|
|
expect(partitioned.partitions.size).toEqual(partitions.size)
|
|
await partitioned.main.onIdle()
|
|
expect(complete.length).toEqual(500)
|
|
for (const partition of partitions) {
|
|
const ids = complete
|
|
.filter((item) => item.partition === partition)
|
|
.map((item) => item.id)
|
|
expect(ids).toEqual([...ids].sort((a, b) => a - b))
|
|
}
|
|
expect(partitioned.partitions.size).toEqual(0)
|
|
})
|
|
})
|
|
})
|