From 489972c6dc6c02b6a31a53bab8309c0a55e09cd4 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 18 Feb 2026 15:06:53 +0000 Subject: [PATCH] feat(convex): add full domain schema and snapshot sync bridge --- convex/domain.ts | 819 ++++++++++++++++++++++++++++++++++ convex/schema.ts | 121 +++++ convex/state.ts | 14 +- test/convex-functions.test.js | 18 + 4 files changed, 970 insertions(+), 2 deletions(-) create mode 100644 convex/domain.ts diff --git a/convex/domain.ts b/convex/domain.ts new file mode 100644 index 0000000..a03e1af --- /dev/null +++ b/convex/domain.ts @@ -0,0 +1,819 @@ +import { mutation, query } from "./_generated/server"; +import { v } from "convex/values"; + +const WalletTxType = v.union(v.literal("credit"), v.literal("debit"), v.literal("refund")); +const JobStatus = v.union( + v.literal("received"), + v.literal("validated"), + v.literal("priced"), + v.literal("charged"), + v.literal("synthesizing"), + v.literal("uploaded"), + v.literal("completed"), + v.literal("failed_refunded"), + v.literal("failed_not_refunded"), +); + +function nowIso() { + return new Date().toISOString(); +} + +async function findWalletByUser(ctx: any, userId: any) { + return ctx.db + .query("wallets") + .withIndex("by_user_id", (q: any) => q.eq("user_id", userId)) + .first(); +} + +export const upsertUser = mutation({ + args: { + x_user_id: v.optional(v.string()), + username: v.optional(v.string()), + email: v.optional(v.string()), + }, + handler: async (ctx, args) => { + let existing = null; + + if (args.x_user_id) { + existing = await ctx.db + .query("users") + .withIndex("by_x_user_id", (q) => q.eq("x_user_id", args.x_user_id!)) + .first(); + } + + if (!existing && args.email) { + existing = await ctx.db + .query("users") + .withIndex("by_email", (q) => q.eq("email", args.email!)) + .first(); + } + + if (!existing && args.username) { + existing = await ctx.db + .query("users") + .withIndex("by_username", (q) => q.eq("username", args.username!)) + .first(); + } + + const updated_at = nowIso(); + if (existing) { + await ctx.db.patch(existing._id, { + x_user_id: args.x_user_id ?? existing.x_user_id, + username: args.username ?? existing.username, + email: args.email ?? existing.email, + updated_at, + }); + return existing._id; + } + + return ctx.db.insert("users", { + x_user_id: args.x_user_id, + username: args.username, + email: args.email, + created_at: updated_at, + updated_at, + }); + }, +}); + +export const getWallet = query({ + args: { user_id: v.id("users") }, + handler: async (ctx, args) => { + const wallet = await ctx.db + .query("wallets") + .withIndex("by_user_id", (q) => q.eq("user_id", args.user_id)) + .first(); + + return wallet || { + user_id: args.user_id, + balance_credits: 0, + updated_at: null, + }; + }, +}); + +export const applyWalletTransaction = mutation({ + args: { + user_id: v.id("users"), + type: WalletTxType, + amount: v.number(), + reason: v.optional(v.string()), + idempotency_key: v.string(), + }, + handler: async (ctx, args) => { + if (!Number.isInteger(args.amount) || args.amount <= 0) { + throw new Error("invalid_amount"); + } + + const existing = await ctx.db + .query("wallet_transactions") + .withIndex("by_idempotency_key", (q) => q.eq("idempotency_key", args.idempotency_key)) + .first(); + + if (existing) { + return { + deduped: true, + transaction: existing, + }; + } + + const wallet = await findWalletByUser(ctx, args.user_id); + const currentBalance = wallet ? wallet.balance_credits : 0; + + if (args.type === "debit" && currentBalance < args.amount) { + throw new Error("insufficient_credits"); + } + + const delta = args.type === "debit" ? -args.amount : args.amount; + const balance_after = currentBalance + delta; + const updated_at = nowIso(); + + if (wallet) { + await ctx.db.patch(wallet._id, { + balance_credits: balance_after, + updated_at, + }); + } else { + await ctx.db.insert("wallets", { + user_id: args.user_id, + balance_credits: balance_after, + updated_at, + }); + } + + const txId = await ctx.db.insert("wallet_transactions", { + user_id: args.user_id, + type: args.type, + amount: args.amount, + reason: args.reason, + idempotency_key: args.idempotency_key, + balance_after, + created_at: updated_at, + }); + + const tx = await ctx.db.get(txId); + return { + deduped: false, + transaction: tx, + }; + }, +}); + +export const recordMentionEvent = mutation({ + args: { + mention_post_id: v.string(), + mention_author_id: v.string(), + parent_post_id: v.string(), + status: v.string(), + error_code: v.optional(v.string()), + }, + handler: async (ctx, args) => { + const existing = await ctx.db + .query("mention_events") + .withIndex("by_mention_post_id", (q) => q.eq("mention_post_id", args.mention_post_id)) + .first(); + + const updated_at = nowIso(); + if (existing) { + await ctx.db.patch(existing._id, { + mention_author_id: args.mention_author_id, + parent_post_id: args.parent_post_id, + status: args.status, + error_code: args.error_code, + updated_at, + }); + return { id: existing._id, deduped: true }; + } + + const id = await ctx.db.insert("mention_events", { + mention_post_id: args.mention_post_id, + mention_author_id: args.mention_author_id, + parent_post_id: args.parent_post_id, + status: args.status, + error_code: args.error_code, + created_at: updated_at, + updated_at, + }); + + return { id, deduped: false }; + }, +}); + +export const upsertArticle = mutation({ + args: { + x_article_id: v.optional(v.string()), + parent_post_id: v.string(), + author_id: v.optional(v.string()), + title: v.string(), + raw_content: v.optional(v.string()), + char_count: v.optional(v.number()), + content_hash: v.optional(v.string()), + }, + handler: async (ctx, args) => { + const normalizedCount = Number.isInteger(args.char_count) + ? args.char_count + : (args.raw_content ? args.raw_content.length : 0); + + if (!Number.isInteger(normalizedCount) || normalizedCount <= 0) { + throw new Error("invalid_char_count"); + } + + let existing = null; + if (args.x_article_id) { + existing = await ctx.db + .query("articles") + .withIndex("by_x_article_id", (q) => q.eq("x_article_id", args.x_article_id!)) + .first(); + } + + if (!existing) { + existing = await ctx.db + .query("articles") + .withIndex("by_parent_post_id", (q) => q.eq("parent_post_id", args.parent_post_id)) + .first(); + } + + const updated_at = nowIso(); + if (existing) { + await ctx.db.patch(existing._id, { + x_article_id: args.x_article_id ?? existing.x_article_id, + parent_post_id: args.parent_post_id, + author_id: args.author_id, + title: args.title, + raw_content: args.raw_content, + char_count: normalizedCount, + content_hash: args.content_hash, + updated_at, + }); + return existing._id; + } + + return ctx.db.insert("articles", { + x_article_id: args.x_article_id, + parent_post_id: args.parent_post_id, + author_id: args.author_id, + title: args.title, + raw_content: args.raw_content, + char_count: normalizedCount, + content_hash: args.content_hash, + created_at: updated_at, + updated_at, + }); + }, +}); + +export const createAudioJob = mutation({ + args: { + user_id: v.id("users"), + mention_event_id: v.optional(v.id("mention_events")), + article_id: v.id("articles"), + status: v.optional(JobStatus), + credits_charged: v.number(), + tts_provider: v.optional(v.string()), + tts_model: v.optional(v.string()), + error: v.optional(v.string()), + }, + handler: async (ctx, args) => { + if (!Number.isInteger(args.credits_charged) || args.credits_charged < 0) { + throw new Error("invalid_credits_charged"); + } + + const timestamp = nowIso(); + return ctx.db.insert("audio_jobs", { + user_id: args.user_id, + mention_event_id: args.mention_event_id, + article_id: args.article_id, + status: args.status || "received", + credits_charged: args.credits_charged, + tts_provider: args.tts_provider, + tts_model: args.tts_model, + error: args.error, + created_at: timestamp, + updated_at: timestamp, + }); + }, +}); + +export const updateAudioJob = mutation({ + args: { + job_id: v.id("audio_jobs"), + status: v.optional(JobStatus), + credits_charged: v.optional(v.number()), + tts_provider: v.optional(v.string()), + tts_model: v.optional(v.string()), + error: v.optional(v.string()), + asset_id: v.optional(v.id("audio_assets")), + }, + handler: async (ctx, args) => { + const job = await ctx.db.get(args.job_id); + if (!job) { + throw new Error("job_not_found"); + } + + await ctx.db.patch(args.job_id, { + status: args.status ?? job.status, + credits_charged: args.credits_charged ?? job.credits_charged, + tts_provider: args.tts_provider ?? job.tts_provider, + tts_model: args.tts_model ?? job.tts_model, + error: args.error, + asset_id: args.asset_id ?? job.asset_id, + updated_at: nowIso(), + }); + + return ctx.db.get(args.job_id); + }, +}); + +export const createAudioAsset = mutation({ + args: { + job_id: v.id("audio_jobs"), + storage_key: v.optional(v.string()), + duration_sec: v.optional(v.number()), + size_bytes: v.optional(v.number()), + codec: v.optional(v.string()), + public_url_ttl: v.optional(v.number()), + }, + handler: async (ctx, args) => { + const job = await ctx.db.get(args.job_id); + if (!job) { + throw new Error("job_not_found"); + } + + const timestamp = nowIso(); + const assetId = await ctx.db.insert("audio_assets", { + job_id: args.job_id, + storage_key: args.storage_key, + duration_sec: args.duration_sec, + size_bytes: args.size_bytes, + codec: args.codec, + public_url_ttl: args.public_url_ttl, + created_at: timestamp, + updated_at: timestamp, + }); + + await ctx.db.patch(args.job_id, { + asset_id: assetId, + updated_at: timestamp, + }); + + return assetId; + }, +}); + +export const updateAudioAsset = mutation({ + args: { + asset_id: v.id("audio_assets"), + storage_key: v.optional(v.string()), + duration_sec: v.optional(v.number()), + size_bytes: v.optional(v.number()), + codec: v.optional(v.string()), + public_url_ttl: v.optional(v.number()), + deleted_at: v.optional(v.string()), + last_played_at: v.optional(v.string()), + }, + handler: async (ctx, args) => { + const asset = await ctx.db.get(args.asset_id); + if (!asset) { + throw new Error("asset_not_found"); + } + + await ctx.db.patch(args.asset_id, { + storage_key: args.storage_key ?? asset.storage_key, + duration_sec: args.duration_sec ?? asset.duration_sec, + size_bytes: args.size_bytes ?? asset.size_bytes, + codec: args.codec ?? asset.codec, + public_url_ttl: args.public_url_ttl ?? asset.public_url_ttl, + deleted_at: args.deleted_at, + last_played_at: args.last_played_at, + updated_at: nowIso(), + }); + + return ctx.db.get(args.asset_id); + }, +}); + +export const grantAudioAccess = mutation({ + args: { + audio_asset_id: v.id("audio_assets"), + user_id: v.id("users"), + granted_via: v.union(v.literal("owner"), v.literal("repurchase"), v.literal("admin")), + credits_paid: v.number(), + }, + handler: async (ctx, args) => { + const existing = await ctx.db + .query("audio_access_grants") + .withIndex("by_asset_and_user", (q) => q.eq("audio_asset_id", args.audio_asset_id).eq("user_id", args.user_id)) + .first(); + + if (existing) { + return { id: existing._id, deduped: true }; + } + + const id = await ctx.db.insert("audio_access_grants", { + audio_asset_id: args.audio_asset_id, + user_id: args.user_id, + granted_via: args.granted_via, + credits_paid: args.credits_paid, + created_at: nowIso(), + }); + + return { id, deduped: false }; + }, +}); + +export const hasAudioAccess = query({ + args: { + audio_asset_id: v.id("audio_assets"), + user_id: v.optional(v.id("users")), + }, + handler: async (ctx, args) => { + const asset = await ctx.db.get(args.audio_asset_id); + if (!asset) { + return { allowed: false, reason: "not_found" }; + } + + const job = await ctx.db.get(asset.job_id); + if (!job) { + return { allowed: false, reason: "job_not_found" }; + } + + if (!args.user_id) { + return { allowed: false, reason: "auth_required", credits_required: job.credits_charged }; + } + + if (job.user_id === args.user_id) { + return { allowed: true, reason: "owner", credits_required: 0 }; + } + + const grant = await ctx.db + .query("audio_access_grants") + .withIndex("by_asset_and_user", (q) => q.eq("audio_asset_id", args.audio_asset_id).eq("user_id", args.user_id!)) + .first(); + + if (grant) { + return { allowed: true, reason: "grant", credits_required: 0 }; + } + + return { + allowed: false, + reason: "payment_required", + credits_required: job.credits_charged, + }; + }, +}); + +export const recordPaymentEvent = mutation({ + args: { + provider: v.string(), + provider_event_id: v.string(), + status: v.string(), + payload_hash: v.optional(v.string()), + }, + handler: async (ctx, args) => { + const existing = await ctx.db + .query("payment_events") + .withIndex("by_provider_and_event", (q) => q.eq("provider", args.provider).eq("provider_event_id", args.provider_event_id)) + .first(); + + const updated_at = nowIso(); + if (existing) { + await ctx.db.patch(existing._id, { + status: args.status, + payload_hash: args.payload_hash, + updated_at, + }); + + return { id: existing._id, deduped: true }; + } + + const id = await ctx.db.insert("payment_events", { + provider: args.provider, + provider_event_id: args.provider_event_id, + status: args.status, + payload_hash: args.payload_hash, + created_at: updated_at, + updated_at, + }); + + return { id, deduped: false }; + }, +}); + +export const getJob = query({ + args: { job_id: v.id("audio_jobs") }, + handler: async (ctx, args) => { + const job = await ctx.db.get(args.job_id); + if (!job) { + return null; + } + + const article = await ctx.db.get(job.article_id); + const asset = job.asset_id ? await ctx.db.get(job.asset_id) : null; + + return { + job, + article, + asset, + }; + }, +}); + +export const listJobsForUser = query({ + args: { user_id: v.id("users") }, + handler: async (ctx, args) => { + const jobs = await ctx.db + .query("audio_jobs") + .withIndex("by_user_id", (q) => q.eq("user_id", args.user_id)) + .collect(); + + jobs.sort((a, b) => (a.created_at > b.created_at ? -1 : 1)); + return jobs; + }, +}); + +async function clearTable(ctx: any, tableName: string) { + const rows = await (ctx.db as any).query(tableName).collect(); + for (const row of rows) { + await (ctx.db as any).delete(row._id); + } +} + +function normalizeStatus(rawStatus: string | undefined) { + const allowed = new Set([ + "received", + "validated", + "priced", + "charged", + "synthesizing", + "uploaded", + "completed", + "failed_refunded", + "failed_not_refunded", + ]); + + if (rawStatus && allowed.has(rawStatus)) { + return rawStatus; + } + + return "received"; +} + +export async function syncFromEngineSnapshot(ctx: any, rawSnapshot: any) { + const engine = rawSnapshot && typeof rawSnapshot === "object" && rawSnapshot.engine + ? rawSnapshot.engine + : rawSnapshot; + + if (!engine || typeof engine !== "object") { + return { imported: false, reason: "missing_engine_snapshot" }; + } + + const walletsState = engine.wallets || {}; + const balances = walletsState.balances && typeof walletsState.balances === "object" + ? walletsState.balances + : {}; + const walletTransactions = Array.isArray(walletsState.transactions) + ? walletsState.transactions + : []; + const jobsById = engine.jobs && typeof engine.jobs === "object" ? engine.jobs : {}; + const assetsById = engine.assets && typeof engine.assets === "object" ? engine.assets : {}; + const mentionsByPostId = engine.mentions && typeof engine.mentions === "object" ? engine.mentions : {}; + const grantsByAssetId = engine.access && engine.access.grants && typeof engine.access.grants === "object" + ? engine.access.grants + : {}; + + await clearTable(ctx, "audio_access_grants"); + await clearTable(ctx, "audio_assets"); + await clearTable(ctx, "audio_jobs"); + await clearTable(ctx, "articles"); + await clearTable(ctx, "mention_events"); + await clearTable(ctx, "wallet_transactions"); + await clearTable(ctx, "wallets"); + await clearTable(ctx, "payment_events"); + await clearTable(ctx, "users"); + + const userKeys = new Set(); + for (const key of Object.keys(balances)) { + userKeys.add(String(key)); + } + for (const tx of walletTransactions) { + if (tx && tx.userId) { + userKeys.add(String(tx.userId)); + } + } + for (const job of Object.values(jobsById)) { + if (job && (job as any).callerUserId) { + userKeys.add(String((job as any).callerUserId)); + } + } + for (const asset of Object.values(assetsById)) { + if (asset && (asset as any).ownerUserId) { + userKeys.add(String((asset as any).ownerUserId)); + } + } + for (const users of Object.values(grantsByAssetId)) { + if (!Array.isArray(users)) { + continue; + } + for (const user of users) { + if (user) { + userKeys.add(String(user)); + } + } + } + + const userIdByLegacy = new Map(); + for (const userKey of Array.from(userKeys.values()).sort()) { + const ts = nowIso(); + const userId = await (ctx.db as any).insert("users", { + username: userKey, + email: userKey.includes("@") ? userKey : undefined, + x_user_id: undefined, + created_at: ts, + updated_at: ts, + }); + userIdByLegacy.set(userKey, userId); + } + + for (const [legacyUser, userId] of userIdByLegacy.entries()) { + await (ctx.db as any).insert("wallets", { + user_id: userId, + balance_credits: Number.isInteger((balances as any)[legacyUser]) ? (balances as any)[legacyUser] : 0, + updated_at: nowIso(), + }); + } + + walletTransactions.sort((a: any, b: any) => { + const aTime = String(a && a.createdAt ? a.createdAt : ""); + const bTime = String(b && b.createdAt ? b.createdAt : ""); + return aTime.localeCompare(bTime); + }); + for (const tx of walletTransactions) { + if (!tx || !tx.userId || !userIdByLegacy.has(String(tx.userId))) { + continue; + } + await (ctx.db as any).insert("wallet_transactions", { + user_id: userIdByLegacy.get(String(tx.userId)), + type: tx.type, + amount: Number.isInteger(tx.amount) ? tx.amount : 0, + reason: tx.reason || undefined, + idempotency_key: String(tx.idempotencyKey || `legacy-tx:${tx.id || nowIso()}`), + balance_after: Number.isInteger(tx.balanceAfter) ? tx.balanceAfter : 0, + created_at: String(tx.createdAt || nowIso()), + }); + } + + const mentionIdByPost = new Map(); + for (const [mentionPostId, legacyJobId] of Object.entries(mentionsByPostId)) { + const job = (jobsById as any)[legacyJobId]; + const id = await (ctx.db as any).insert("mention_events", { + mention_post_id: mentionPostId, + mention_author_id: String((job && job.callerUserId) || "unknown"), + parent_post_id: String((job && job.article && job.article.parentPostId) || "unknown"), + status: normalizeStatus(job && job.status), + error_code: job && job.error ? String(job.error) : undefined, + created_at: String((job && job.createdAt) || nowIso()), + updated_at: String((job && job.updatedAt) || nowIso()), + }); + mentionIdByPost.set(mentionPostId, id); + } + + const articleIdByLegacyKey = new Map(); + const articleIdByJobId = new Map(); + for (const [legacyJobId, rawJob] of Object.entries(jobsById)) { + const job: any = rawJob || {}; + const article = job.article || {}; + const legacyArticleKey = String(article.xArticleId || article.parentPostId || `job:${legacyJobId}`); + if (!articleIdByLegacyKey.has(legacyArticleKey)) { + const id = await (ctx.db as any).insert("articles", { + x_article_id: article.xArticleId || undefined, + parent_post_id: String(article.parentPostId || `legacy-parent:${legacyJobId}`), + author_id: article.authorId || undefined, + title: String(article.title || "Untitled"), + char_count: Number.isInteger(article.charCount) + ? article.charCount + : String(article.content || "").length, + content_hash: article.contentHash || undefined, + raw_content: typeof article.content === "string" ? article.content : undefined, + created_at: String(job.createdAt || nowIso()), + updated_at: String(job.updatedAt || nowIso()), + }); + articleIdByLegacyKey.set(legacyArticleKey, id); + } + articleIdByJobId.set(legacyJobId, articleIdByLegacyKey.get(legacyArticleKey)); + } + + const legacyJobByAssetId = new Map(); + const jobIdByLegacy = new Map(); + const sortedJobs = Object.entries(jobsById).sort(([, aRaw], [, bRaw]) => { + const a = aRaw as any; + const b = bRaw as any; + const aTime = String(a && a.createdAt ? a.createdAt : ""); + const bTime = String(b && b.createdAt ? b.createdAt : ""); + return aTime.localeCompare(bTime); + }); + + for (const [legacyJobId, rawJob] of sortedJobs) { + const job: any = rawJob || {}; + if (job.assetId) { + legacyJobByAssetId.set(String(job.assetId), legacyJobId); + } + const caller = String(job.callerUserId || ""); + const userId = userIdByLegacy.get(caller); + if (!userId) { + continue; + } + + const id = await (ctx.db as any).insert("audio_jobs", { + user_id: userId, + mention_event_id: mentionIdByPost.get(String(job.mentionPostId || "")) || undefined, + article_id: articleIdByJobId.get(legacyJobId), + status: normalizeStatus(job.status), + credits_charged: Number.isInteger(job.creditsCharged) ? job.creditsCharged : 0, + tts_provider: undefined, + tts_model: undefined, + error: job.error || undefined, + created_at: String(job.createdAt || nowIso()), + updated_at: String(job.updatedAt || nowIso()), + }); + jobIdByLegacy.set(legacyJobId, id); + } + + const assetIdByLegacy = new Map(); + for (const [legacyAssetId, rawAsset] of Object.entries(assetsById)) { + const asset: any = rawAsset || {}; + const legacyJobId = legacyJobByAssetId.get(legacyAssetId); + const jobId = legacyJobId ? jobIdByLegacy.get(legacyJobId) : null; + if (!jobId) { + continue; + } + + const id = await (ctx.db as any).insert("audio_assets", { + job_id: jobId, + storage_key: asset.storageKey || undefined, + duration_sec: Number.isInteger(asset.durationSec) ? asset.durationSec : undefined, + size_bytes: Number.isInteger(asset.sizeBytes) ? asset.sizeBytes : undefined, + codec: "mp3", + public_url_ttl: undefined, + deleted_at: asset.deletedAt || undefined, + last_played_at: asset.lastPlayedAt || undefined, + created_at: String(asset.createdAt || nowIso()), + updated_at: String(asset.updatedAt || asset.createdAt || nowIso()), + }); + assetIdByLegacy.set(legacyAssetId, id); + + await (ctx.db as any).patch(jobId, { + asset_id: id, + updated_at: nowIso(), + }); + } + + for (const [legacyAssetId, users] of Object.entries(grantsByAssetId)) { + if (!Array.isArray(users)) { + continue; + } + + const assetId = assetIdByLegacy.get(legacyAssetId); + if (!assetId) { + continue; + } + + const legacyJobId = legacyJobByAssetId.get(legacyAssetId); + const job = legacyJobId ? (jobsById as any)[legacyJobId] : null; + const owner = job && job.callerUserId ? String(job.callerUserId) : null; + const creditsCharged = job && Number.isInteger(job.creditsCharged) ? job.creditsCharged : 0; + + for (const legacyUserId of users) { + const userId = userIdByLegacy.get(String(legacyUserId)); + if (!userId) { + continue; + } + + const isOwner = owner && owner === String(legacyUserId); + await (ctx.db as any).insert("audio_access_grants", { + audio_asset_id: assetId, + user_id: userId, + granted_via: isOwner ? "owner" : "repurchase", + credits_paid: isOwner ? 0 : creditsCharged, + created_at: nowIso(), + }); + } + } + + return { + imported: true, + users: userIdByLegacy.size, + wallets: userIdByLegacy.size, + wallet_transactions: walletTransactions.length, + mention_events: mentionIdByPost.size, + articles: articleIdByLegacyKey.size, + audio_jobs: jobIdByLegacy.size, + audio_assets: assetIdByLegacy.size, + }; +} + +export const syncFromEngineSnapshotMutation = mutation({ + args: { + snapshot: v.any(), + }, + handler: async (ctx, args) => syncFromEngineSnapshot(ctx, args.snapshot), +}); diff --git a/convex/schema.ts b/convex/schema.ts index f6d5b6b..e07cf07 100644 --- a/convex/schema.ts +++ b/convex/schema.ts @@ -2,6 +2,127 @@ import { defineSchema, defineTable } from "convex/server"; import { v } from "convex/values"; export default defineSchema({ + users: defineTable({ + x_user_id: v.optional(v.string()), + username: v.optional(v.string()), + email: v.optional(v.string()), + created_at: v.string(), + updated_at: v.string(), + }) + .index("by_x_user_id", ["x_user_id"]) + .index("by_username", ["username"]) + .index("by_email", ["email"]), + + wallets: defineTable({ + user_id: v.id("users"), + balance_credits: v.number(), + updated_at: v.string(), + }).index("by_user_id", ["user_id"]), + + wallet_transactions: defineTable({ + user_id: v.id("users"), + type: v.union(v.literal("credit"), v.literal("debit"), v.literal("refund")), + amount: v.number(), + reason: v.optional(v.string()), + idempotency_key: v.string(), + balance_after: v.number(), + created_at: v.string(), + }) + .index("by_user_id", ["user_id"]) + .index("by_idempotency_key", ["idempotency_key"]), + + mention_events: defineTable({ + mention_post_id: v.string(), + mention_author_id: v.string(), + parent_post_id: v.string(), + status: v.string(), + error_code: v.optional(v.string()), + created_at: v.string(), + updated_at: v.string(), + }) + .index("by_mention_post_id", ["mention_post_id"]) + .index("by_parent_post_id", ["parent_post_id"]) + .index("by_mention_author_id", ["mention_author_id"]), + + articles: defineTable({ + x_article_id: v.optional(v.string()), + parent_post_id: v.string(), + author_id: v.optional(v.string()), + title: v.string(), + char_count: v.number(), + content_hash: v.optional(v.string()), + raw_content: v.optional(v.string()), + created_at: v.string(), + updated_at: v.string(), + }) + .index("by_x_article_id", ["x_article_id"]) + .index("by_parent_post_id", ["parent_post_id"]) + .index("by_content_hash", ["content_hash"]), + + audio_jobs: defineTable({ + user_id: v.id("users"), + mention_event_id: v.optional(v.id("mention_events")), + article_id: v.id("articles"), + status: v.union( + v.literal("received"), + v.literal("validated"), + v.literal("priced"), + v.literal("charged"), + v.literal("synthesizing"), + v.literal("uploaded"), + v.literal("completed"), + v.literal("failed_refunded"), + v.literal("failed_not_refunded"), + ), + credits_charged: v.number(), + tts_provider: v.optional(v.string()), + tts_model: v.optional(v.string()), + error: v.optional(v.string()), + asset_id: v.optional(v.id("audio_assets")), + created_at: v.string(), + updated_at: v.string(), + }) + .index("by_user_id", ["user_id"]) + .index("by_mention_event_id", ["mention_event_id"]) + .index("by_article_id", ["article_id"]) + .index("by_status", ["status"]), + + audio_assets: defineTable({ + job_id: v.id("audio_jobs"), + storage_key: v.optional(v.string()), + duration_sec: v.optional(v.number()), + size_bytes: v.optional(v.number()), + codec: v.optional(v.string()), + public_url_ttl: v.optional(v.number()), + deleted_at: v.optional(v.string()), + last_played_at: v.optional(v.string()), + created_at: v.string(), + updated_at: v.string(), + }).index("by_job_id", ["job_id"]), + + audio_access_grants: defineTable({ + audio_asset_id: v.id("audio_assets"), + user_id: v.id("users"), + granted_via: v.union(v.literal("owner"), v.literal("repurchase"), v.literal("admin")), + credits_paid: v.number(), + created_at: v.string(), + }) + .index("by_audio_asset_id", ["audio_asset_id"]) + .index("by_user_id", ["user_id"]) + .index("by_asset_and_user", ["audio_asset_id", "user_id"]), + + payment_events: defineTable({ + provider: v.string(), + provider_event_id: v.string(), + status: v.string(), + payload_hash: v.optional(v.string()), + created_at: v.string(), + updated_at: v.string(), + }) + .index("by_provider_event_id", ["provider_event_id"]) + .index("by_provider_and_event", ["provider", "provider_event_id"]), + + // Legacy compatibility table while runtime is still transitioning away from snapshot persistence. state_snapshots: defineTable({ snapshot: v.any(), updatedAt: v.string(), diff --git a/convex/state.ts b/convex/state.ts index 80e0f03..5a543fb 100644 --- a/convex/state.ts +++ b/convex/state.ts @@ -1,5 +1,6 @@ import { mutation, query } from "./_generated/server"; import { v } from "convex/values"; +import { syncFromEngineSnapshot } from "./domain"; export const getLatestSnapshot = query({ args: {}, @@ -29,17 +30,26 @@ export const saveSnapshot = mutation({ .order("desc") .first(); + const syncSummary = await syncFromEngineSnapshot(ctx, args.snapshot); + if (latest) { await ctx.db.patch(latest._id, { snapshot: args.snapshot, updatedAt: args.updatedAt, }); - return latest._id; + return { + snapshotId: latest._id, + syncSummary, + }; } - return ctx.db.insert("state_snapshots", { + const snapshotId = await ctx.db.insert("state_snapshots", { snapshot: args.snapshot, updatedAt: args.updatedAt, }); + return { + snapshotId, + syncSummary, + }; }, }); diff --git a/test/convex-functions.test.js b/test/convex-functions.test.js index 7cf43b2..9ebcbd9 100644 --- a/test/convex-functions.test.js +++ b/test/convex-functions.test.js @@ -7,8 +7,26 @@ const fs = require("node:fs"); test("convex state functions are present for configured function names", () => { const schema = fs.readFileSync("convex/schema.ts", "utf8"); const state = fs.readFileSync("convex/state.ts", "utf8"); + const domain = fs.readFileSync("convex/domain.ts", "utf8"); + assert.match(schema, /users: defineTable/); + assert.match(schema, /wallets: defineTable/); + assert.match(schema, /wallet_transactions: defineTable/); + assert.match(schema, /mention_events: defineTable/); + assert.match(schema, /articles: defineTable/); + assert.match(schema, /audio_jobs: defineTable/); + assert.match(schema, /audio_assets: defineTable/); + assert.match(schema, /audio_access_grants: defineTable/); + assert.match(schema, /payment_events: defineTable/); assert.match(schema, /state_snapshots/); assert.match(state, /export const getLatestSnapshot = query/); assert.match(state, /export const saveSnapshot = mutation/); + assert.match(domain, /export const upsertUser = mutation/); + assert.match(domain, /export const applyWalletTransaction = mutation/); + assert.match(domain, /export const recordMentionEvent = mutation/); + assert.match(domain, /export const upsertArticle = mutation/); + assert.match(domain, /export const createAudioJob = mutation/); + assert.match(domain, /export const createAudioAsset = mutation/); + assert.match(domain, /export const grantAudioAccess = mutation/); + assert.match(domain, /export const recordPaymentEvent = mutation/); });