PDS: ActorStore Refactor (#3492)

* Similification of the `pds` package by:
- Removing `DetailedAccountStore` class (moving its logic inside `AccountManager`)
- Factorizes image URL building into its own class (for easy re-use from `AccountManager`)
- Adds an `AppView` class that exposes an `agent: AtpAgent` and url builder function (used by the `ImageUrlBuilder`).
- Reworks the `ActorStore` to avoid circular dependency between `AccountManager` and `LocalViewerCreator` (needed because of first item)

* tidy

* move classes in their own file
This commit is contained in:
Matthieu Sieben 2025-02-05 13:37:16 +01:00 committed by GitHub
parent 8a30e0ed92
commit 53a577fd4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 522 additions and 530 deletions

View File

@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---
Code refactor

View File

@ -1,5 +1,6 @@
import { HOUR, wait } from '@atproto/common'
import {
Account,
AccountInfo,
AccountStore,
Code,
@ -20,12 +21,15 @@ import {
UpdateRequestData,
} from '@atproto/oauth-provider'
import { AuthRequiredError } from '@atproto/xrpc-server'
import { Selectable } from 'kysely'
import { CID } from 'multiformats/cid'
import { KeyObject } from 'node:crypto'
import { ActorStore } from '../actor-store/actor-store'
import { AuthScope } from '../auth-verifier'
import { BackgroundQueue } from '../background'
import { softDeleted } from '../db'
import { ImageUrlBuilder } from '../image/image-url-builder'
import { StatusAttr } from '../lexicon/types/com/atproto/admin/defs'
import { AccountDb, EmailTokenPurpose, getDb, getMigrator } from './db'
import * as account from './helpers/account'
@ -50,6 +54,8 @@ export class AccountManager
db: AccountDb
constructor(
private actorStore: ActorStore,
private imageUrlBuilder: ImageUrlBuilder,
private backgroundQueue: BackgroundQueue,
dbLocation: string,
private jwtKey: KeyObject,
@ -500,6 +506,29 @@ export class AccountManager
// AccountStore
private async buildAccount(row: Selectable<ActorAccount>): Promise<Account> {
const account = deviceAccount.toAccount(row, this.serviceDid)
if (!account.name || !account.picture) {
const did = account.sub
const profile = await this.actorStore.read(did, async (store) => {
return store.record.getProfileRecord()
})
if (profile) {
const { avatar, displayName } = profile
account.name ||= displayName
account.picture ||= avatar
? this.imageUrlBuilder.build('avatar', did, avatar.ref.toString())
: undefined
}
}
return account
}
async authenticateAccount(
{ username: identifier, password, remember = false }: SignInCredentials,
deviceId: DeviceId,
@ -554,7 +583,7 @@ export class AccountManager
if (!row) return null
return {
account: deviceAccount.toAccount(row, this.serviceDid),
account: await this.buildAccount(row),
info: deviceAccount.toDeviceAccountInfo(row),
}
}
@ -564,10 +593,12 @@ export class AccountManager
.listRememberedQB(this.db, deviceId)
.execute()
return rows.map((row) => ({
account: deviceAccount.toAccount(row, this.serviceDid),
info: deviceAccount.toDeviceAccountInfo(row),
}))
return Promise.all(
rows.map(async (row) => ({
account: await this.buildAccount(row),
info: deviceAccount.toDeviceAccountInfo(row),
})),
)
}
async removeDeviceAccount(deviceId: DeviceId, sub: string): Promise<void> {

View File

@ -0,0 +1,45 @@
import { Keypair } from '@atproto/crypto'
import { ActorStoreResources } from './actor-store-resources'
import { ActorDb } from './db'
import { PreferenceReader } from './preference/reader'
import { RecordReader } from './record/reader'
import { RepoReader } from './repo/reader'
import { ActorStoreTransactor } from './actor-store-transactor'
export class ActorStoreReader {
public readonly repo: RepoReader
public readonly record: RecordReader
public readonly pref: PreferenceReader
constructor(
public readonly did: string,
protected readonly db: ActorDb,
protected readonly resources: ActorStoreResources,
public readonly keypair: () => Promise<Keypair>,
) {
const blobstore = resources.blobstore(did)
this.repo = new RepoReader(db, blobstore)
this.record = new RecordReader(db)
this.pref = new PreferenceReader(db)
// Invoke "keypair" once. Also avoids leaking "this" as keypair context.
let keypairPromise: Promise<Keypair>
this.keypair = () => (keypairPromise ??= Promise.resolve().then(keypair))
}
async transact<T>(
fn: (fn: ActorStoreTransactor) => T | PromiseLike<T>,
): Promise<T> {
const keypair = await this.keypair()
return this.db.transaction((dbTxn) => {
const store = new ActorStoreTransactor(
this.did,
dbTxn,
keypair,
this.resources,
)
return fn(store)
})
}
}

View File

@ -0,0 +1,8 @@
import { BlobStore } from '@atproto/repo'
import { BackgroundQueue } from '../background'
export type ActorStoreResources = {
blobstore: (did: string) => BlobStore
backgroundQueue: BackgroundQueue
reservedKeyDir?: string
}

View File

@ -0,0 +1,31 @@
import { Keypair } from '@atproto/crypto'
import { ActorStoreResources } from './actor-store-resources'
import { ActorDb } from './db'
import { PreferenceTransactor } from './preference/transactor'
import { RecordTransactor } from './record/transactor'
import { RepoTransactor } from './repo/transactor'
export class ActorStoreTransactor {
public readonly record: RecordTransactor
public readonly repo: RepoTransactor
public readonly pref: PreferenceTransactor
constructor(
public readonly did: string,
protected readonly db: ActorDb,
protected readonly keypair: Keypair,
protected readonly resources: ActorStoreResources,
) {
const blobstore = resources.blobstore(did)
this.record = new RecordTransactor(db, blobstore)
this.pref = new PreferenceTransactor(db)
this.repo = new RepoTransactor(
db,
blobstore,
did,
keypair,
resources.backgroundQueue,
)
}
}

View File

@ -0,0 +1,17 @@
import { ActorStoreTransactor } from './actor-store-transactor'
export class ActorStoreWriter extends ActorStoreTransactor {
async transact<T>(
fn: (fn: ActorStoreTransactor) => T | PromiseLike<T>,
): Promise<T> {
return this.db.transaction((dbTxn) => {
const transactor = new ActorStoreTransactor(
this.did,
dbTxn,
this.keypair,
this.resources,
)
return fn(transactor)
})
}
}

View File

@ -1,35 +1,23 @@
import path from 'path'
import assert from 'assert'
import fs from 'fs/promises'
import * as crypto from '@atproto/crypto'
import { Keypair, ExportableKeypair } from '@atproto/crypto'
import { BlobStore } from '@atproto/repo'
import {
chunkArray,
fileExists,
readIfExists,
rmIfExists,
} from '@atproto/common'
import { ActorDb, getDb, getMigrator } from './db'
import { BackgroundQueue } from '../background'
import { RecordReader } from './record/reader'
import { PreferenceReader } from './preference/reader'
import { RepoReader } from './repo/reader'
import { RepoTransactor } from './repo/transactor'
import { PreferenceTransactor } from './preference/transactor'
import * as crypto from '@atproto/crypto'
import { ExportableKeypair, Keypair } from '@atproto/crypto'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { RecordTransactor } from './record/transactor'
import { CID } from 'multiformats/cid'
import DiskBlobStore from '../disk-blobstore'
import { mkdir } from 'fs/promises'
import assert from 'node:assert'
import fs, { mkdir } from 'node:fs/promises'
import path from 'node:path'
import { ActorStoreConfig } from '../config'
import { retrySqlite } from '../db'
type ActorStoreResources = {
blobstore: (did: string) => BlobStore
backgroundQueue: BackgroundQueue
reservedKeyDir?: string
}
import DiskBlobStore from '../disk-blobstore'
import { ActorStoreReader } from './actor-store-reader'
import { ActorStoreResources } from './actor-store-resources'
import { ActorStoreTransactor } from './actor-store-transactor'
import { ActorStoreWriter } from './actor-store-writer'
import { ActorDb, getDb, getMigrator } from './db'
export class ActorStore {
reservedKeyDir: string
@ -82,50 +70,39 @@ export class ActorStore {
return db
}
async read<T>(did: string, fn: ActorStoreReadFn<T>) {
async read<T>(did: string, fn: (fn: ActorStoreReader) => T | PromiseLike<T>) {
const db = await this.openDb(did)
try {
const reader = createActorReader(did, db, this.resources, () =>
this.keypair(did),
)
return await fn(reader)
const getKeypair = () => this.keypair(did)
return await fn(new ActorStoreReader(did, db, this.resources, getKeypair))
} finally {
db.close()
}
}
async transact<T>(did: string, fn: ActorStoreTransactFn<T>) {
async transact<T>(
did: string,
fn: (fn: ActorStoreTransactor) => T | PromiseLike<T>,
) {
const keypair = await this.keypair(did)
const db = await this.openDb(did)
try {
return await db.transaction((dbTxn) => {
const store = createActorTransactor(did, dbTxn, keypair, this.resources)
return fn(store)
return fn(new ActorStoreTransactor(did, dbTxn, keypair, this.resources))
})
} finally {
db.close()
}
}
async writeNoTransaction<T>(did: string, fn: ActorStoreWriterFn<T>) {
async writeNoTransaction<T>(
did: string,
fn: (fn: ActorStoreWriter) => T | PromiseLike<T>,
) {
const keypair = await this.keypair(did)
const db = await this.openDb(did)
try {
const writer = createActorTransactor(did, db, keypair, this.resources)
return await fn({
...writer,
transact: async <T>(fn: ActorStoreTransactFn<T>): Promise<T> => {
return db.transaction((dbTxn) => {
const transactor = createActorTransactor(
did,
dbTxn,
keypair,
this.resources,
)
return fn(transactor)
})
},
})
return await fn(new ActorStoreWriter(did, db, keypair, this.resources))
} finally {
db.close()
}
@ -157,10 +134,9 @@ export class ActorStore {
if (blobstore instanceof DiskBlobStore) {
await blobstore.deleteAll()
} else {
const blobRows = await this.read(did, (store) =>
store.db.db.selectFrom('blob').select('cid').execute(),
const cids = await this.read(did, async (store) =>
store.repo.blob.getBlobCids(),
)
const cids = blobRows.map((row) => CID.parse(row.cid))
await Promise.allSettled(
chunkArray(cids, 500).map((chunk) => blobstore.deleteMany(chunk)),
)
@ -226,73 +202,6 @@ const loadKey = async (loc: string): Promise<ExportableKeypair | undefined> => {
return crypto.Secp256k1Keypair.import(privKey, { exportable: true })
}
const createActorTransactor = (
did: string,
db: ActorDb,
keypair: Keypair,
resources: ActorStoreResources,
): ActorStoreTransactor => {
const { blobstore, backgroundQueue } = resources
const userBlobstore = blobstore(did)
return {
did,
db,
repo: new RepoTransactor(db, did, keypair, userBlobstore, backgroundQueue),
record: new RecordTransactor(db, userBlobstore),
pref: new PreferenceTransactor(db),
}
}
const createActorReader = (
did: string,
db: ActorDb,
resources: ActorStoreResources,
getKeypair: () => Promise<Keypair>,
): ActorStoreReader => {
const { blobstore } = resources
return {
did,
db,
repo: new RepoReader(db, blobstore(did)),
record: new RecordReader(db),
pref: new PreferenceReader(db),
keypair: getKeypair,
transact: async <T>(fn: ActorStoreTransactFn<T>): Promise<T> => {
const keypair = await getKeypair()
return db.transaction((dbTxn) => {
const store = createActorTransactor(did, dbTxn, keypair, resources)
return fn(store)
})
},
}
}
export type ActorStoreReadFn<T> = (fn: ActorStoreReader) => Promise<T>
export type ActorStoreTransactFn<T> = (fn: ActorStoreTransactor) => Promise<T>
export type ActorStoreWriterFn<T> = (fn: ActorStoreWriter) => Promise<T>
export type ActorStoreReader = {
did: string
db: ActorDb
repo: RepoReader
record: RecordReader
pref: PreferenceReader
keypair: () => Promise<Keypair>
transact: <T>(fn: ActorStoreTransactFn<T>) => Promise<T>
}
export type ActorStoreTransactor = {
did: string
db: ActorDb
repo: RepoTransactor
record: RecordTransactor
pref: PreferenceTransactor
}
export type ActorStoreWriter = ActorStoreTransactor & {
transact: <T>(fn: ActorStoreTransactFn<T>) => Promise<T>
}
function assertSafePathPart(part: string) {
const normalized = path.normalize(part)
assert(

View File

@ -1,10 +1,10 @@
import stream from 'stream'
import { CID } from 'multiformats/cid'
import { BlobNotFoundError, BlobStore } from '@atproto/repo'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { ActorDb } from '../db'
import { CID } from 'multiformats/cid'
import stream from 'stream'
import { countAll, countDistinct, notSoftDeletedClause } from '../../db/util'
import { StatusAttr } from '../../lexicon/types/com/atproto/admin/defs'
import { ActorDb } from '../db'
export class BlobReader {
constructor(
@ -95,6 +95,16 @@ export class BlobReader {
return res.map((row) => row.recordUri)
}
async getBlobsForRecord(recordUri: string): Promise<string[]> {
const res = await this.db.db
.selectFrom('blob')
.innerJoin('record_blob', 'record_blob.blobCid', 'blob.cid')
.where('recordUri', '=', recordUri)
.select('blob.cid')
.execute()
return res.map((row) => row.cid)
}
async blobCount(): Promise<number> {
const res = await this.db.db
.selectFrom('blob')
@ -138,4 +148,9 @@ export class BlobReader {
recordUri: row.recordUri,
}))
}
async getBlobCids() {
const blobRows = await this.db.db.selectFrom('blob').select('cid').execute()
return blobRows.map((row) => CID.parse(row.cid))
}
}

View File

@ -38,6 +38,21 @@ export class BlobTransactor extends BlobReader {
super(db, blobstore)
}
async insertBlobs(recordUri: string, blobs: Iterable<BlobRef>) {
const values = Array.from(blobs, (cid) => ({
recordUri,
blobCid: cid.ref.toString(),
}))
if (values.length) {
await this.db.db
.insertInto('record_blob')
.values(values)
.onConflict((oc) => oc.doNothing())
.execute()
}
}
async uploadBlobAndGetMetadata(
userSuggestedMime: string,
blobStream: stream.Readable,

View File

@ -1,12 +1,21 @@
import { RepoRecord } from '@atproto/lexicon'
import { cborToLexRecord, CidSet, formatDataKey } from '@atproto/repo'
import * as syntax from '@atproto/syntax'
import { AtUri, ensureValidAtUri } from '@atproto/syntax'
import { cborToLexRecord } from '@atproto/repo'
import { CID } from 'multiformats/cid'
import { countAll, notSoftDeletedClause } from '../../db/util'
import { ids } from '../../lexicon/lexicons'
import { ActorDb, Backlink } from '../db'
import { Record as ProfileRecord } from '../../lexicon/types/app/bsky/actor/profile'
import { Record as PostRecord } from '../../lexicon/types/app/bsky/feed/post'
import { StatusAttr } from '../../lexicon/types/com/atproto/admin/defs'
import { RepoRecord } from '@atproto/lexicon'
import { LocalRecords } from '../../read-after-write/types'
import { ActorDb, Backlink } from '../db'
export type RecordDescript = {
uri: string
path: string
cid: CID
}
export class RecordReader {
constructor(public db: ActorDb) {}
@ -19,6 +28,30 @@ export class RecordReader {
return res?.count ?? 0
}
async listAll(): Promise<RecordDescript[]> {
const records: RecordDescript[] = []
let cursor: string | undefined = ''
while (cursor !== undefined) {
const res = await this.db.db
.selectFrom('record')
.select(['uri', 'cid'])
.where('uri', '>', cursor)
.orderBy('uri', 'asc')
.limit(1000)
.execute()
for (const row of res) {
const parsed = new AtUri(row.uri)
records.push({
uri: row.uri,
path: formatDataKey(parsed.collection, parsed.rkey),
cid: CID.parse(row.cid),
})
}
cursor = res.at(-1)?.uri
}
return records
}
async listCollections(): Promise<string[]> {
const collections = await this.db.db
.selectFrom('record')
@ -194,6 +227,94 @@ export class RecordReader {
.flat()
.map(({ rkey }) => AtUri.make(uri.hostname, uri.collection, rkey))
}
async listExistingBlocks(): Promise<CidSet> {
const cids = new CidSet()
let cursor: string | undefined = ''
while (cursor !== undefined) {
const res = await this.db.db
.selectFrom('repo_block')
.select('cid')
.where('cid', '>', cursor)
.orderBy('cid', 'asc')
.limit(1000)
.execute()
for (const row of res) {
cids.add(CID.parse(row.cid))
}
cursor = res.at(-1)?.cid
}
return cids
}
async getProfileRecord() {
const row = await this.db.db
.selectFrom('record')
.leftJoin('repo_block', 'repo_block.cid', 'record.cid')
.where('record.collection', '=', ids.AppBskyActorProfile)
.where('record.rkey', '=', 'self')
.selectAll()
.executeTakeFirst()
if (!row?.content) return null
return cborToLexRecord(row.content) as ProfileRecord
}
async getRecordsSinceRev(rev: string): Promise<LocalRecords> {
const result: LocalRecords = { count: 0, profile: null, posts: [] }
const res = await this.db.db
.selectFrom('record')
.innerJoin('repo_block', 'repo_block.cid', 'record.cid')
.select([
'repo_block.content',
'uri',
'repo_block.cid',
'record.indexedAt',
])
.where('record.repoRev', '>', rev)
.limit(10)
.orderBy('record.repoRev', 'asc')
.execute()
// sanity check to ensure that the clock received is not before _all_ local records (for instance in case of account migration)
if (res.length > 0) {
const sanityCheckRes = await this.db.db
.selectFrom('record')
.selectAll()
.where('record.repoRev', '<=', rev)
.limit(1)
.executeTakeFirst()
if (!sanityCheckRes) {
return result
}
}
for (const cur of res) {
result.count++
const uri = new AtUri(cur.uri)
if (uri.collection === ids.AppBskyActorProfile && uri.rkey === 'self') {
result.profile = {
uri,
cid: CID.parse(cur.cid),
indexedAt: cur.indexedAt,
record: cborToLexRecord(cur.content) as ProfileRecord,
}
} else if (uri.collection === ids.AppBskyFeedPost) {
result.posts.push({
uri,
cid: CID.parse(cur.cid),
indexedAt: cur.indexedAt,
record: cborToLexRecord(cur.content) as PostRecord,
})
}
}
return result
}
}
// @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs.

View File

@ -1,8 +1,10 @@
import { BlobStore } from '@atproto/repo'
import { SqlRepoReader } from './sql-repo-reader'
import { CID } from 'multiformats/cid'
import { BlobReader } from '../blob/reader'
import { ActorDb } from '../db'
import { RecordReader } from '../record/reader'
import { SqlRepoReader } from './sql-repo-reader'
export class RepoReader {
blob: BlobReader

View File

@ -19,13 +19,14 @@ export class SqlRepoReader extends ReadableBlockstore {
async getRoot(): Promise<CID> {
const root = await this.getRootDetailed()
return root?.cid ?? null
return root.cid
}
async getRootDetailed(): Promise<{ cid: CID; rev: string }> {
const res = await this.db.db
.selectFrom('repo_root')
.selectAll()
.select(['cid', 'rev'])
.limit(1)
.executeTakeFirstOrThrow()
return {
cid: CID.parse(res.cid),

View File

@ -21,21 +21,28 @@ export class RepoTransactor extends RepoReader {
blob: BlobTransactor
record: RecordTransactor
storage: SqlRepoTransactor
now: string
constructor(
public db: ActorDb,
public blobstore: BlobStore,
public did: string,
public signingKey: crypto.Keypair,
public blobstore: BlobStore,
public backgroundQueue: BackgroundQueue,
now?: string,
public now: string = new Date().toISOString(),
) {
super(db, blobstore)
this.blob = new BlobTransactor(db, blobstore, backgroundQueue)
this.record = new RecordTransactor(db, blobstore)
this.now = now ?? new Date().toISOString()
this.storage = new SqlRepoTransactor(db, this.did, this.now)
this.storage = new SqlRepoTransactor(db, did, now)
}
async maybeLoadRepo(): Promise<Repo | null> {
const res = await this.db.db
.selectFrom('repo_root')
.select('cid')
.limit(1)
.executeTakeFirst()
return res ? Repo.load(this.storage, CID.parse(res.cid)) : null
}
async createRepo(writes: PreparedCreate[]): Promise<CommitData> {

View File

@ -3,7 +3,8 @@ import AppContext from '../../../../context'
import { AuthScope } from '../../../../auth-verifier'
export default function (server: Server, ctx: AppContext) {
if (!ctx.cfg.bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.actor.getPreferences({
auth: ctx.authVerifier.accessStandard({
additional: [AuthScope.Takendown],

View File

@ -8,8 +8,8 @@ import {
} from '../../../../read-after-write'
export default function (server: Server, ctx: AppContext) {
const { bskyAppView } = ctx.cfg
if (!bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.actor.getProfile({
auth: ctx.authVerifier.accessStandard(),
handler: async (reqCtx) => {

View File

@ -8,8 +8,8 @@ import {
} from '../../../../read-after-write'
export default function (server: Server, ctx: AppContext) {
const { bskyAppView } = ctx.cfg
if (!bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.actor.getProfiles({
auth: ctx.authVerifier.accessStandard(),
handler: async (reqCtx) => {

View File

@ -4,7 +4,8 @@ import AppContext from '../../../../context'
import { AccountPreference } from '../../../../actor-store/preference/reader'
export default function (server: Server, ctx: AppContext) {
if (!ctx.cfg.bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.actor.putPreferences({
auth: ctx.authVerifier.accessStandard({ checkTakedown: true }),
handler: async ({ auth, input }) => {

View File

@ -8,8 +8,8 @@ import {
} from '../../../../read-after-write'
export default function (server: Server, ctx: AppContext) {
const { bskyAppView } = ctx.cfg
if (!bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.feed.getActorLikes({
auth: ctx.authVerifier.accessStandard(),
handler: async (reqCtx) => {

View File

@ -9,8 +9,8 @@ import {
} from '../../../../read-after-write'
export default function (server: Server, ctx: AppContext) {
const { bskyAppView } = ctx.cfg
if (!bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.feed.getAuthorFeed({
auth: ctx.authVerifier.accessStandard(),
handler: async (reqCtx) => {

View File

@ -6,16 +6,16 @@ import { AtUri } from '@atproto/syntax'
import { InvalidRequestError } from '@atproto/oauth-provider'
export default function (server: Server, ctx: AppContext) {
const { appViewAgent } = ctx
const { bskyAppView } = ctx.cfg
if (!appViewAgent || !bskyAppView) return
const { bskyAppView } = ctx
if (!bskyAppView) return
server.app.bsky.feed.getFeed({
auth: ctx.authVerifier.accessStandard(),
handler: async ({ params, auth, req }) => {
const requester = auth.credentials.did
const feedUrl = new AtUri(params.feed)
const { data } = await appViewAgent.com.atproto.repo.getRecord({
const { data } = await bskyAppView.agent.com.atproto.repo.getRecord({
repo: feedUrl.hostname,
collection: feedUrl.collection,
rkey: feedUrl.rkey,

View File

@ -25,8 +25,8 @@ import {
import { ids } from '../../../../lexicon/lexicons'
export default function (server: Server, ctx: AppContext) {
const { bskyAppView } = ctx.cfg
if (!bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.feed.getPostThread({
auth: ctx.authVerifier.accessStandard(),
handler: async (reqCtx) => {
@ -191,11 +191,12 @@ const readAfterWriteNotFound = async (
const highestParent = getHighestParent(thread)
if (highestParent) {
try {
assert(ctx.appViewAgent)
const parentsRes = await ctx.appViewAgent.api.app.bsky.feed.getPostThread(
{ uri: highestParent, parentHeight: params.parentHeight, depth: 0 },
await ctx.appviewAuthHeaders(requester, ids.AppBskyFeedGetPostThread),
)
assert(ctx.bskyAppView)
const parentsRes =
await ctx.bskyAppView.agent.app.bsky.feed.getPostThread(
{ uri: highestParent, parentHeight: params.parentHeight, depth: 0 },
await ctx.appviewAuthHeaders(requester, ids.AppBskyFeedGetPostThread),
)
thread.parent = parentsRes.data.thread
} catch (err) {
// do nothing

View File

@ -8,8 +8,8 @@ import {
} from '../../../../read-after-write'
export default function (server: Server, ctx: AppContext) {
const { bskyAppView } = ctx.cfg
if (!bskyAppView) return
if (!ctx.bskyAppView) return
server.app.bsky.feed.getTimeline({
auth: ctx.authVerifier.accessStandard(),
handler: async (reqCtx) => {

View File

@ -8,8 +8,9 @@ import { AuthScope } from '../../../../auth-verifier'
import { ids } from '../../../../lexicon/lexicons'
export default function (server: Server, ctx: AppContext) {
const { appViewAgent } = ctx
if (!appViewAgent) return
const { bskyAppView } = ctx
if (!bskyAppView) return
server.app.bsky.notification.registerPush({
auth: ctx.authVerifier.accessStandard({
additional: [AuthScope.SignupQueued],
@ -26,8 +27,8 @@ export default function (server: Server, ctx: AppContext) {
ids.AppBskyNotificationRegisterPush,
)
if (ctx.cfg.bskyAppView?.did === serviceDid) {
await appViewAgent.api.app.bsky.notification.registerPush(input.body, {
if (bskyAppView.did === serviceDid) {
await bskyAppView.agent.app.bsky.notification.registerPush(input.body, {
...authHeaders,
encoding: 'application/json',
})

View File

@ -1,4 +1,3 @@
import { AtpAgent } from '@atproto/api'
import { InvalidRequestError } from '@atproto/xrpc-server'
import * as ident from '@atproto/syntax'
import { Server } from '../../../../lexicon'
@ -33,8 +32,16 @@ export default function (server: Server, ctx: AppContext) {
}
// this is not someone on our server, but we help with resolving anyway
if (!did && ctx.appViewAgent) {
did = await tryResolveFromAppView(ctx.appViewAgent, handle)
if (!did && ctx.bskyAppView) {
try {
const result =
await ctx.bskyAppView.agent.com.atproto.identity.resolveHandle({
handle,
})
did = result.data.did
} catch {
// Ignore
}
}
if (!did) {
@ -51,14 +58,3 @@ export default function (server: Server, ctx: AppContext) {
}
})
}
async function tryResolveFromAppView(agent: AtpAgent, handle: string) {
try {
const result = await agent.api.com.atproto.identity.resolveHandle({
handle,
})
return result.data.did
} catch (_err) {
return
}
}

View File

@ -1,9 +1,8 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { ActorStoreTransactor } from '../../../../actor-store'
import { ActorStoreTransactor } from '../../../../actor-store/actor-store-transactor'
import { TID } from '@atproto/common'
import {
Repo,
WriteOpAction,
getAndParseRecord,
readCarStream,
@ -44,13 +43,7 @@ const importRepo = async (
if (roots.length !== 1) {
throw new InvalidRequestError('expected one root')
}
const currRoot = await actorStore.db.db
.selectFrom('repo_root')
.selectAll()
.executeTakeFirst()
const currRepo = currRoot
? await Repo.load(actorStore.repo.storage, CID.parse(currRoot.cid))
: null
const currRepo = await actorStore.repo.maybeLoadRepo()
const diff = await verifyDiff(
currRepo,
blocks,
@ -89,18 +82,10 @@ const importRepo = async (
now,
)
const recordBlobs = findBlobRefs(parsedRecord)
const blobValues = recordBlobs.map((cid) => ({
recordUri: uri.toString(),
blobCid: cid.ref.toString(),
}))
const indexRecordBlobs =
blobValues.length > 0
? actorStore.db.db
.insertInto('record_blob')
.values(blobValues)
.onConflict((oc) => oc.doNothing())
.execute()
: Promise.resolve()
const indexRecordBlobs = actorStore.repo.blob.insertBlobs(
uri.toString(),
recordBlobs,
)
await Promise.all([indexRecord, indexRecordBlobs])
}
},

View File

@ -1,21 +1,22 @@
import { CID } from 'multiformats/cid'
import { BlobRef } from '@atproto/lexicon'
import { CommitData } from '@atproto/repo'
import { AtUri } from '@atproto/syntax'
import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import { CommitData } from '@atproto/repo'
import { BlobRef } from '@atproto/lexicon'
import { Server } from '../../../../lexicon'
import { prepareUpdate, prepareCreate } from '../../../../repo'
import { CID } from 'multiformats/cid'
import { ActorStoreTransactor } from '../../../../actor-store/actor-store-transactor'
import AppContext from '../../../../context'
import { Server } from '../../../../lexicon'
import { ids } from '../../../../lexicon/lexicons'
import { Record as ProfileRecord } from '../../../../lexicon/types/app/bsky/actor/profile'
import {
BadCommitSwapError,
BadRecordSwapError,
InvalidRecordError,
prepareCreate,
PreparedCreate,
PreparedUpdate,
prepareUpdate,
} from '../../../../repo'
import { ids } from '../../../../lexicon/lexicons'
import { Record as ProfileRecord } from '../../../../lexicon/types/app/bsky/actor/profile'
import { ActorStoreTransactor } from '../../../../actor-store'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.putRecord({

View File

@ -0,0 +1,26 @@
import { format } from 'node:util'
import { AtpAgent } from '@atproto/api'
export type AppViewOptions = {
url: string
did: string
cdnUrlPattern?: string
}
export class BskyAppView {
public did: string
public url: string
public agent: AtpAgent
private cdnUrlPattern?: string
constructor(options: AppViewOptions) {
this.did = options.did
this.url = options.url
this.agent = new AtpAgent({ service: options.url })
this.cdnUrlPattern = options.cdnUrlPattern
}
getImageUrl(pattern: string, did: string, cid: string): string | undefined {
if (this.cdnUrlPattern) return format(this.cdnUrlPattern, pattern, did, cid)
}
}

View File

@ -41,8 +41,10 @@ import { DidSqliteCache } from './did-cache'
import { Crawlers } from './crawlers'
import { DiskBlobStore } from './disk-blobstore'
import { getRedisClient } from './redis'
import { ActorStore } from './actor-store'
import { ActorStore } from './actor-store/actor-store'
import { LocalViewer, LocalViewerCreator } from './read-after-write/viewer'
import { BskyAppView } from './bsky-app-view'
import { ImageUrlBuilder } from './image/image-url-builder'
export type AppContextOptions = {
actorStore: ActorStore
@ -59,7 +61,7 @@ export type AppContextOptions = {
redisScratch?: Redis
ratelimitCreator?: RateLimiterCreator
crawlers: Crawlers
appViewAgent?: AtpAgent
bskyAppView?: BskyAppView
moderationAgent?: AtpAgent
reportingAgent?: AtpAgent
entrywayAgent?: AtpAgent
@ -86,7 +88,7 @@ export class AppContext {
public redisScratch?: Redis
public ratelimitCreator?: RateLimiterCreator
public crawlers: Crawlers
public appViewAgent: AtpAgent | undefined
public bskyAppView?: BskyAppView
public moderationAgent: AtpAgent | undefined
public reportingAgent: AtpAgent | undefined
public entrywayAgent: AtpAgent | undefined
@ -112,7 +114,7 @@ export class AppContext {
this.redisScratch = opts.redisScratch
this.ratelimitCreator = opts.ratelimitCreator
this.crawlers = opts.crawlers
this.appViewAgent = opts.appViewAgent
this.bskyAppView = opts.bskyAppView
this.moderationAgent = opts.moderationAgent
this.reportingAgent = opts.reportingAgent
this.entrywayAgent = opts.entrywayAgent
@ -214,9 +216,10 @@ export class AppContext {
}
}
const appViewAgent = cfg.bskyAppView
? new AtpAgent({ service: cfg.bskyAppView.url })
const bskyAppView = cfg.bskyAppView
? new BskyAppView(cfg.bskyAppView)
: undefined
const moderationAgent = cfg.modService
? new AtpAgent({ service: cfg.modService.url })
: undefined
@ -232,7 +235,19 @@ export class AppContext {
? createPublicKeyObject(cfg.entryway.jwtPublicKeyHex)
: null
const imageUrlBuilder = new ImageUrlBuilder(
cfg.service.hostname,
bskyAppView,
)
const actorStore = new ActorStore(cfg.actorStore, {
blobstore,
backgroundQueue,
})
const accountManager = new AccountManager(
actorStore,
imageUrlBuilder,
backgroundQueue,
cfg.db.accountDbLoc,
jwtSecretKey,
@ -250,18 +265,11 @@ export class AppContext {
secrets.plcRotationKey.privateKeyHex,
)
const actorStore = new ActorStore(cfg.actorStore, {
blobstore,
backgroundQueue,
})
const localViewer = LocalViewer.creator({
const localViewer = LocalViewer.creator(
accountManager,
appViewAgent,
pdsHostname: cfg.service.hostname,
appviewDid: cfg.bskyAppView?.did,
appviewCdnUrlPattern: cfg.bskyAppView?.cdnUrlPattern,
})
imageUrlBuilder,
bskyAppView,
)
// An agent for performing HTTP requests based on user provided URLs.
const proxyAgentBase = new undici.Agent({
@ -322,8 +330,6 @@ export class AppContext {
await JoseKey.fromKeyLike(jwtSecretKey, undefined, 'HS256'),
],
accountManager,
actorStore,
localViewer,
redis: redisScratch,
dpopSecret: secrets.dpopSecret,
customization: cfg.oauth.provider.customization,
@ -371,7 +377,7 @@ export class AppContext {
redisScratch,
ratelimitCreator,
crawlers,
appViewAgent,
bskyAppView,
moderationAgent,
reportingAgent,
entrywayAgent,
@ -386,8 +392,8 @@ export class AppContext {
}
async appviewAuthHeaders(did: string, lxm: string) {
assert(this.cfg.bskyAppView)
return this.serviceAuthHeaders(did, this.cfg.bskyAppView.did, lxm)
assert(this.bskyAppView)
return this.serviceAuthHeaders(did, this.bskyAppView.did, lxm)
}
async serviceAuthHeaders(did: string, aud: string, lxm: string) {

View File

@ -51,7 +51,7 @@ export class Database<Schema> {
}
async transactionNoRetry<T>(
fn: (db: Database<Schema>) => T | Promise<T>,
fn: (db: Database<Schema>) => T | PromiseLike<T>,
): Promise<T> {
this.assertNotTransaction()
const leakyTxPlugin = new LeakyTxPlugin()
@ -77,7 +77,7 @@ export class Database<Schema> {
}
async transaction<T>(
fn: (db: Database<Schema>) => T | Promise<T>,
fn: (db: Database<Schema>) => T | PromiseLike<T>,
): Promise<T> {
return retrySqlite(() => this.transactionNoRetry(fn))
}

View File

@ -0,0 +1,16 @@
import { BskyAppView } from '../bsky-app-view'
import { ids } from '../lexicon/lexicons'
export class ImageUrlBuilder {
constructor(
readonly pdsHostname: string,
readonly bskyAppView?: BskyAppView,
) {}
build(pattern: string, did: string, cid: string): string {
return (
this.bskyAppView?.getImageUrl(pattern, did, cid) ??
`https://${this.pdsHostname}/xrpc/${ids.ComAtprotoSyncGetBlob}?did=${did}&cid=${cid}`
)
}
}

View File

@ -1,96 +0,0 @@
import {
AccountInfo,
AccountStore,
DeviceId,
SignInCredentials,
} from '@atproto/oauth-provider'
import { AccountManager } from '../account-manager/index'
import { ActorStore } from '../actor-store/index'
import { ProfileViewBasic } from '../lexicon/types/app/bsky/actor/defs'
import { LocalViewerCreator } from '../read-after-write/index'
/**
* Although the {@link AccountManager} class implements the {@link AccountStore}
* interface, the accounts it returns do not contain any profile information
* (display name, avatar, etc). This is due to the fact that the account manager
* does not have access to the account's repos. The {@link DetailedAccountStore}
* is a wrapper around the {@link AccountManager} that enriches the accounts
* with profile information using the account's repos through the
* {@link ActorStore}.
*/
export class DetailedAccountStore implements AccountStore {
constructor(
private accountManager: AccountManager,
private actorStore: ActorStore,
private localViewer: LocalViewerCreator,
) {}
private async getProfile(did: string): Promise<ProfileViewBasic | null> {
// TODO: Should we cache this?
return this.actorStore.read(did, async (actorStoreReader) => {
const localViewer = this.localViewer(actorStoreReader)
return localViewer.getProfileBasic()
})
}
private async enrichAccountInfo(
accountInfo: AccountInfo,
): Promise<AccountInfo> {
const { account } = accountInfo
if (!account.picture || !account.name) {
const profile = await this.getProfile(account.sub)
if (profile) {
account.picture ||= profile.avatar
account.name ||= profile.displayName
}
}
return accountInfo
}
async authenticateAccount(
credentials: SignInCredentials,
deviceId: DeviceId,
): Promise<AccountInfo | null> {
const accountInfo = await this.accountManager.authenticateAccount(
credentials,
deviceId,
)
if (!accountInfo) return null
return this.enrichAccountInfo(accountInfo)
}
async addAuthorizedClient(
deviceId: DeviceId,
sub: string,
clientId: string,
): Promise<void> {
return this.accountManager.addAuthorizedClient(deviceId, sub, clientId)
}
async getDeviceAccount(
deviceId: DeviceId,
sub: string,
): Promise<AccountInfo | null> {
const accountInfo = await this.accountManager.getDeviceAccount(
deviceId,
sub,
)
if (!accountInfo) return null
return this.enrichAccountInfo(accountInfo)
}
async listDeviceAccounts(deviceId: DeviceId): Promise<AccountInfo[]> {
const accountInfos = await this.accountManager.listDeviceAccounts(deviceId)
return Promise.all(
accountInfos.map(async (accountInfo) =>
this.enrichAccountInfo(accountInfo),
),
)
}
async removeDeviceAccount(deviceId: DeviceId, sub: string): Promise<void> {
return this.accountManager.removeDeviceAccount(deviceId, sub)
}
}

View File

@ -5,15 +5,10 @@ import {
} from '@atproto/oauth-provider'
import { AccountManager } from '../account-manager/index'
import { ActorStore } from '../actor-store/index'
import { oauthLogger } from '../logger'
import { LocalViewerCreator } from '../read-after-write/index'
import { DetailedAccountStore } from './detailed-account-store'
export type AuthProviderOptions = {
accountManager: AccountManager
actorStore: ActorStore
localViewer: LocalViewerCreator
} & Pick<
OAuthProviderOptions,
'issuer' | 'redis' | 'keyset' | 'dpopSecret' | 'customization'
@ -23,8 +18,6 @@ export type AuthProviderOptions = {
export class PdsOAuthProvider extends OAuthProvider {
constructor({
accountManager,
actorStore,
localViewer,
keyset,
redis,
dpopSecret,
@ -39,6 +32,7 @@ export class PdsOAuthProvider extends OAuthProvider {
redis,
safeFetch,
customization,
store: accountManager,
metadata: {
// PdsOAuthProvider is used when the PDS is both an authorization server
// & resource server, in which case the issuer origin is also the
@ -48,15 +42,6 @@ export class PdsOAuthProvider extends OAuthProvider {
scopes_supported: ['transition:generic', 'transition:chat.bsky'],
},
accountStore: new DetailedAccountStore(
accountManager,
actorStore,
localViewer,
),
requestStore: accountManager,
deviceStore: accountManager,
tokenStore: accountManager,
// If the PDS is both an authorization server & resource server (no
// entryway), there is no need to use JWTs as access tokens. Instead,
// the PDS can use tokenId as access tokens. This allows the PDS to

View File

@ -16,7 +16,6 @@ import {
pipethrough,
} from '../pipethrough'
import { HandlerResponse, LocalRecords, MungeFn } from './types'
import { getRecordsSinceRev } from './viewer'
const REPO_REV_HEADER = 'atproto-repo-rev'
@ -62,7 +61,7 @@ export const pipethroughReadAfterWrite = async <T>(
const lxm = parseReqNsid(req)
return await ctx.actorStore.read(requester, async (store) => {
const local = await getRecordsSinceRev(store, rev)
const local = await store.record.getRecordsSinceRev(rev)
if (local.count === 0) return streamRes
const { buffer } = (bufferRes = await asPipeThroughBuffer(streamRes))

View File

@ -1,8 +1,4 @@
import util from 'util'
import { CID } from 'multiformats/cid'
import { AtUri, INVALID_HANDLE } from '@atproto/syntax'
import { cborToLexRecord } from '@atproto/repo'
import { AtpAgent } from '@atproto/api'
import { createServiceAuthHeaders } from '@atproto/xrpc-server'
import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post'
import { Record as ProfileRecord } from '../lexicon/types/app/bsky/actor/profile'
@ -36,98 +32,75 @@ import {
Main as EmbedRecordWithMedia,
isMain as isEmbedRecordWithMedia,
} from '../lexicon/types/app/bsky/embed/recordWithMedia'
import { ActorStoreReader } from '../actor-store'
import { ActorStoreReader } from '../actor-store/actor-store-reader'
import { LocalRecords, RecordDescript } from './types'
import { AccountManager } from '../account-manager'
import { BskyAppView } from '../bsky-app-view'
import { ImageUrlBuilder } from '../image/image-url-builder'
type CommonSignedUris = 'avatar' | 'banner' | 'feed_thumbnail' | 'feed_fullsize'
export type LocalViewerCreator = (actorStore: ActorStoreReader) => LocalViewer
export type LocalViewerCreator = (
actorStoreReader: ActorStoreReader,
) => LocalViewer
export class LocalViewer {
did: string
actorStore: ActorStoreReader
accountManager: AccountManager
pdsHostname: string
appViewAgent?: AtpAgent
appviewDid?: string
appviewCdnUrlPattern?: string
constructor(
public readonly actorStoreReader: ActorStoreReader,
public readonly accountManager: AccountManager,
public readonly imageUrlBuilder: ImageUrlBuilder,
public readonly bskyAppView?: BskyAppView,
) {}
constructor(params: {
actorStore: ActorStoreReader
accountManager: AccountManager
pdsHostname: string
appViewAgent?: AtpAgent
appviewDid?: string
appviewCdnUrlPattern?: string
}) {
this.did = params.actorStore.did
this.actorStore = params.actorStore
this.accountManager = params.accountManager
this.pdsHostname = params.pdsHostname
this.appViewAgent = params.appViewAgent
this.appviewDid = params.appviewDid
this.appviewCdnUrlPattern = params.appviewCdnUrlPattern
get did() {
return this.actorStoreReader.did
}
static creator(params: {
accountManager: AccountManager
pdsHostname: string
appViewAgent?: AtpAgent
appviewDid?: string
appviewCdnUrlPattern?: string
}): LocalViewerCreator {
return (actorStore) => {
return new LocalViewer({ ...params, actorStore })
}
static creator(
accountManager: AccountManager,
imageUrlBuilder: ImageUrlBuilder,
bskyAppView?: BskyAppView,
): LocalViewerCreator {
return (actorStore) =>
new LocalViewer(actorStore, accountManager, imageUrlBuilder, bskyAppView)
}
getImageUrl(pattern: CommonSignedUris, cid: string) {
if (!this.appviewCdnUrlPattern) {
return `https://${this.pdsHostname}/xrpc/${ids.ComAtprotoSyncGetBlob}?did=${this.did}&cid=${cid}`
}
return util.format(this.appviewCdnUrlPattern, pattern, this.did, cid)
return this.imageUrlBuilder.build(pattern, this.did, cid)
}
async serviceAuthHeaders(did: string, lxm: string) {
if (!this.appviewDid) {
if (!this.bskyAppView) {
throw new Error('Could not find bsky appview did')
}
const keypair = await this.actorStore.keypair()
const keypair = await this.actorStoreReader.keypair()
return createServiceAuthHeaders({
iss: did,
aud: this.appviewDid,
aud: this.bskyAppView.did,
lxm,
keypair,
})
}
async getRecordsSinceRev(rev: string): Promise<LocalRecords> {
return getRecordsSinceRev(this.actorStore, rev)
return this.actorStoreReader.record.getRecordsSinceRev(rev)
}
async getProfileBasic(): Promise<ProfileViewBasic | null> {
const profileQuery = this.actorStore.db.db
.selectFrom('record')
.leftJoin('repo_block', 'repo_block.cid', 'record.cid')
.where('record.collection', '=', ids.AppBskyActorProfile)
.where('record.rkey', '=', 'self')
.selectAll()
const [profileRes, accountRes] = await Promise.all([
profileQuery.executeTakeFirst(),
this.actorStoreReader.record.getProfileRecord(),
this.accountManager.getAccount(this.did),
])
if (!accountRes) return null
const record = profileRes?.content
? (cborToLexRecord(profileRes.content) as ProfileRecord)
: null
return {
did: this.did,
handle: accountRes.handle ?? INVALID_HANDLE,
displayName: record?.displayName,
avatar: record?.avatar
? this.getImageUrl('avatar', record.avatar.ref.toString())
displayName: profileRes?.displayName,
avatar: profileRes?.avatar
? this.getImageUrl('avatar', profileRes.avatar.ref.toString())
: undefined,
}
}
@ -239,12 +212,12 @@ export class LocalViewer {
private async formatRecordEmbedInternal(
embed: EmbedRecord,
): Promise<null | ViewRecord | GeneratorView | ListView> {
if (!this.appViewAgent || !this.appviewDid) {
if (!this.bskyAppView) {
return null
}
const collection = new AtUri(embed.record.uri).collection
if (collection === ids.AppBskyFeedPost) {
const res = await this.appViewAgent.api.app.bsky.feed.getPosts(
const res = await this.bskyAppView.agent.app.bsky.feed.getPosts(
{ uris: [embed.record.uri] },
await this.serviceAuthHeaders(this.did, ids.AppBskyFeedGetPosts),
)
@ -261,7 +234,7 @@ export class LocalViewer {
indexedAt: post.indexedAt,
}
} else if (collection === ids.AppBskyFeedGenerator) {
const res = await this.appViewAgent.api.app.bsky.feed.getFeedGenerator(
const res = await this.bskyAppView.agent.app.bsky.feed.getFeedGenerator(
{ feed: embed.record.uri },
await this.serviceAuthHeaders(
this.did,
@ -273,7 +246,7 @@ export class LocalViewer {
...res.data.view,
}
} else if (collection === ids.AppBskyGraphList) {
const res = await this.appViewAgent.api.app.bsky.graph.getList(
const res = await this.bskyAppView.agent.app.bsky.graph.getList(
{ list: embed.record.uri },
await this.serviceAuthHeaders(this.did, ids.AppBskyGraphGetList),
)
@ -330,50 +303,3 @@ export class LocalViewer {
}
}
}
export const getRecordsSinceRev = async (
actorStore: ActorStoreReader,
rev: string,
): Promise<LocalRecords> => {
const res = await actorStore.db.db
.selectFrom('record')
.innerJoin('repo_block', 'repo_block.cid', 'record.cid')
.select(['repo_block.content', 'uri', 'repo_block.cid', 'record.indexedAt'])
.where('record.repoRev', '>', rev)
.limit(10)
.orderBy('record.repoRev', 'asc')
.execute()
// sanity check to ensure that the clock received is not before _all_ local records (for instance in case of account migration)
if (res.length > 0) {
const sanityCheckRes = await actorStore.db.db
.selectFrom('record')
.selectAll()
.where('record.repoRev', '<=', rev)
.limit(1)
.executeTakeFirst()
if (!sanityCheckRes) {
return { count: 0, profile: null, posts: [] }
}
}
return res.reduce(
(acc, cur) => {
const descript = {
uri: new AtUri(cur.uri),
cid: CID.parse(cur.cid),
indexedAt: cur.indexedAt,
record: cborToLexRecord(cur.content),
}
if (
descript.uri.collection === ids.AppBskyActorProfile &&
descript.uri.rkey === 'self'
) {
acc.profile = descript as RecordDescript<ProfileRecord>
} else if (descript.uri.collection === ids.AppBskyFeedPost) {
acc.posts.push(descript as RecordDescript<PostRecord>)
}
acc.count++
return acc
},
{ count: 0, profile: null, posts: [] } as LocalRecords,
)
}

View File

@ -1,16 +1,12 @@
import readline from 'node:readline/promises'
import { CID } from 'multiformats/cid'
import { TID } from '@atproto/common'
import {
BlockMap,
CidSet,
MST,
MemoryBlockstore,
formatDataKey,
signCommit,
} from '@atproto/repo'
import { AtUri } from '@atproto/syntax'
import { TID } from '@atproto/common'
import { ActorStoreTransactor } from '../actor-store'
import readline from 'node:readline/promises'
import AppContext from '../context'
export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
@ -23,8 +19,8 @@ export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
const rev = TID.nextStr()
const commit = await ctx.actorStore.transact(did, async (store) => {
const [records, existingCids] = await Promise.all([
listAllRecords(store),
listExistingBlocks(store),
store.record.listAll(),
store.record.listExistingBlocks(),
])
let mst = await MST.create(memoryStore)
for (const record of records) {
@ -81,53 +77,6 @@ export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
await ctx.sequencer.sequenceCommit(did, commit, [])
}
const listExistingBlocks = async (
store: ActorStoreTransactor,
): Promise<CidSet> => {
const cids = new CidSet()
let cursor: string | undefined = ''
while (cursor !== undefined) {
const res = await store.db.db
.selectFrom('repo_block')
.select('cid')
.where('cid', '>', cursor)
.orderBy('cid', 'asc')
.limit(1000)
.execute()
for (const row of res) {
cids.add(CID.parse(row.cid))
}
cursor = res.at(-1)?.cid
}
return cids
}
const listAllRecords = async (
store: ActorStoreTransactor,
): Promise<RecordDescript[]> => {
const records: RecordDescript[] = []
let cursor: string | undefined = ''
while (cursor !== undefined) {
const res = await store.db.db
.selectFrom('record')
.select(['uri', 'cid'])
.where('uri', '>', cursor)
.orderBy('uri', 'asc')
.limit(1000)
.execute()
for (const row of res) {
const parsed = new AtUri(row.uri)
records.push({
uri: row.uri,
path: formatDataKey(parsed.collection, parsed.rkey),
cid: CID.parse(row.cid),
})
}
cursor = res.at(-1)?.uri
}
return records
}
const promptContinue = async (): Promise<boolean> => {
const rl = readline.createInterface({
input: process.stdin,
@ -136,9 +85,3 @@ const promptContinue = async (): Promise<boolean> => {
const answer = await rl.question('Continue? y/n ')
return answer === ''
}
type RecordDescript = {
uri: string
path: string
cid: CID
}

View File

@ -40,9 +40,7 @@ describe('blob deletes', () => {
})
const getDbBlobsForDid = (did: string) => {
return ctx.actorStore.read(did, (store) =>
store.db.db.selectFrom('blob').selectAll().execute(),
)
return ctx.actorStore.read(did, (store) => store.repo.blob.getBlobCids())
}
it('deletes blob when record is deleted', async () => {
@ -79,7 +77,7 @@ describe('blob deletes', () => {
const dbBlobs = await getDbBlobsForDid(alice)
expect(dbBlobs.length).toBe(1)
expect(dbBlobs[0].cid).toEqual(img2.image.ref.toString())
expect(dbBlobs[0].toString()).toEqual(img2.image.ref.toString())
const hasImg = await ctx.blobstore(alice).hasStored(img.image.ref)
expect(hasImg).toBeFalsy()

View File

@ -1,4 +1,5 @@
import fs from 'fs/promises'
import assert from 'node:assert'
import fs from 'node:fs/promises'
import { AtUri } from '@atproto/syntax'
import { AtpAgent } from '@atproto/api'
import { BlobRef } from '@atproto/lexicon'
@ -828,22 +829,17 @@ describe('crud operations', () => {
const record = await ctx.actorStore.read(aliceAgent.accountDid, (store) =>
store.record.getRecord(new AtUri(res.data.uri), res.data.cid),
)
expect(record?.value).toMatchObject({
assert(record)
expect(record.value).toMatchObject({
$type: 'com.example.record',
blah: 'thing',
})
const recordBlobs = await ctx.actorStore.read(
aliceAgent.accountDid,
(store) =>
store.db.db
.selectFrom('blob')
.innerJoin('record_blob', 'record_blob.blobCid', 'blob.cid')
.where('recordUri', '=', res.data.uri)
.selectAll()
.execute(),
aliceAgent.assertDid,
(store) => store.repo.blob.getBlobsForRecord(record.uri),
)
expect(recordBlobs.length).toBe(1)
expect(recordBlobs.at(0)?.cid).toBe(uploadedRes.data.blob.ref.toString())
expect(recordBlobs.at(0)).toBe(uploadedRes.data.blob.ref.toString())
})
it('enforces record type constraint even when unvalidated', async () => {