atproto/packages/bsync/tests/operations.test.ts

328 lines
9.6 KiB
TypeScript
Raw Normal View History

import assert from 'node:assert'
import { Code, ConnectError } from '@connectrpc/connect'
import getPort from 'get-port'
import { wait } from '@atproto/common'
import {
BsyncClient,
BsyncService,
Database,
authWithApiKey,
createClient,
envToCfg,
} from '../src'
import { Method, Operation } from '../src/proto/bsync_pb'
describe('operations', () => {
let bsync: BsyncService
let client: BsyncClient
const validPayload0 = Buffer.from(JSON.stringify({ value: 0 }))
const validPayload1 = Buffer.from(JSON.stringify({ value: 1 }))
const invalidPayload = Buffer.from('{invalid json}')
beforeAll(async () => {
bsync = await BsyncService.create(
envToCfg({
port: await getPort(),
dbUrl: process.env.DB_POSTGRES_URL,
dbSchema: 'bsync_operations',
apiKeys: ['key-1'],
longPollTimeoutMs: 500,
}),
)
await bsync.ctx.db.migrateToLatestOrThrow()
await bsync.start()
client = createClient({
httpVersion: '1.1',
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
interceptors: [authWithApiKey('key-1')],
})
})
afterAll(async () => {
await bsync.destroy()
})
beforeEach(async () => {
await clearOps(bsync.ctx.db)
})
describe('putOperation', () => {
it('requires auth.', async () => {
// unauthed
const unauthedClient = createClient({
httpVersion: '1.1',
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
})
const tryPutOperation1 = unauthedClient.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
})
await expect(tryPutOperation1).rejects.toEqual(
new ConnectError('missing auth', Code.Unauthenticated),
)
// bad auth
const badauthedClient = createClient({
httpVersion: '1.1',
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
interceptors: [authWithApiKey('key-bad')],
})
const tryPutOperation2 = badauthedClient.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
})
await expect(tryPutOperation2).rejects.toEqual(
new ConnectError('invalid api key', Code.Unauthenticated),
)
})
it('fails on bad inputs.', async () => {
await expect(
client.putOperation({
actorDid: 'did:example:a',
namespace: 'bad-namespace',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
}),
).rejects.toEqual(
new ConnectError(
'operation namespace is invalid NSID',
Code.InvalidArgument,
),
)
await expect(
client.putOperation({
actorDid: 'bad-did',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
}),
).rejects.toEqual(
new ConnectError(
'operation actor_did is invalid DID',
Code.InvalidArgument,
),
)
await expect(
client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: '',
method: Method.CREATE,
payload: validPayload0,
}),
).rejects.toEqual(
new ConnectError('operation key is required', Code.InvalidArgument),
)
await expect(
client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.UNSPECIFIED,
payload: validPayload0,
}),
).rejects.toEqual(
new ConnectError('operation method is invalid', Code.InvalidArgument),
)
await expect(
client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: invalidPayload,
}),
).rejects.toEqual(
new ConnectError(
'payload must be a valid JSON when method is CREATE or UPDATE',
Code.InvalidArgument,
),
)
await expect(
client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.UPDATE,
payload: invalidPayload,
}),
).rejects.toEqual(
new ConnectError(
'payload must be a valid JSON when method is CREATE or UPDATE',
Code.InvalidArgument,
),
)
await expect(
client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.DELETE,
payload: validPayload0,
}),
).rejects.toEqual(
new ConnectError(
'cannot specify a payload when method is DELETE',
Code.InvalidArgument,
),
)
})
it('puts operations.', async () => {
const res1 = await client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
})
const res2 = await client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.UPDATE,
payload: validPayload1,
})
expect(res1.operation?.id).toBe('1')
expect(res2.operation?.id).toBe('2')
expect(await dumpOps(bsync.ctx.db)).toStrictEqual([
{
id: 1,
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
createdAt: expect.any(Date),
},
{
id: 2,
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.UPDATE,
payload: validPayload1,
createdAt: expect.any(Date),
},
])
})
it('returns the operations on creation.', async () => {
const res = await client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
})
const op = res.operation
assert(op)
// Compare each field individually to avoid custom serialization by proto response objects.
expect(op.id).toBe('3')
expect(op.actorDid).toBe('did:example:a')
expect(op.namespace).toBe('app.bsky.some.col')
expect(op.key).toBe('key1')
expect(op.method).toBe(Method.CREATE)
expect(op.payload).toEqual(new Uint8Array(validPayload0))
})
})
describe('scanOperations', () => {
it('requires auth.', async () => {
// unauthed
const unauthedClient = createClient({
httpVersion: '1.1',
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
})
const tryScanOperations1 = unauthedClient.scanOperations({})
await expect(tryScanOperations1).rejects.toEqual(
new ConnectError('missing auth', Code.Unauthenticated),
)
// bad auth
const badauthedClient = createClient({
httpVersion: '1.1',
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
interceptors: [authWithApiKey('key-bad')],
})
const tryScanOperations2 = badauthedClient.scanOperations({})
await expect(tryScanOperations2).rejects.toEqual(
new ConnectError('invalid api key', Code.Unauthenticated),
)
})
it('pages over created ops.', async () => {
// add 100 ops
for (let i = 0; i < 100; ++i) {
await client.putOperation({
actorDid: `did:example:${i}`,
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
})
}
let cursor: string | undefined
const operations: Operation[] = []
do {
const res = await client.scanOperations({
cursor,
limit: 30,
})
operations.push(...res.operations)
cursor = res.operations.length ? res.cursor : undefined
} while (cursor)
expect(operations.length).toEqual(100)
const operationIds = operations.map((op) => parseInt(op.id, 10))
const ascending = (a: number, b: number) => a - b
expect(operationIds).toEqual([...operationIds].sort(ascending))
})
it('supports long-poll, finding an operation.', async () => {
const scanPromise = client.scanOperations({})
await wait(100) // would be complete by now if it wasn't long-polling for an item
const { operation } = await client.putOperation({
actorDid: 'did:example:a',
namespace: 'app.bsky.some.col',
key: 'key1',
method: Method.CREATE,
payload: validPayload0,
})
const res = await scanPromise
expect(res.operations.length).toEqual(1)
expect(res.operations[0]).toEqual(operation)
expect(res.cursor).toEqual(operation?.id)
})
it('supports long-poll, not finding an operation.', async () => {
const res = await client.scanOperations({})
expect(res.cursor).toEqual('')
expect(res.operations).toEqual([])
})
})
})
const dumpOps = async (db: Database) => {
return db.db
.selectFrom('operation')
.selectAll()
.orderBy('id', 'asc')
.execute()
}
const clearOps = async (db: Database) => {
await db.db.deleteFrom('operation').execute()
}