import { mutation, query } from "./_generated/server"; import { v } from "convex/values"; const WalletTxType = v.union(v.literal("credit"), v.literal("debit"), v.literal("refund")); const JobStatus = v.union( v.literal("received"), v.literal("validated"), v.literal("priced"), v.literal("charged"), v.literal("synthesizing"), v.literal("uploaded"), v.literal("completed"), v.literal("failed_refunded"), v.literal("failed_not_refunded"), ); function nowIso() { return new Date().toISOString(); } async function findWalletByUser(ctx: any, userId: any) { return ctx.db .query("wallets") .withIndex("by_user_id", (q: any) => q.eq("user_id", userId)) .first(); } export const upsertUser = mutation({ args: { x_user_id: v.optional(v.string()), username: v.optional(v.string()), email: v.optional(v.string()), }, handler: async (ctx, args) => { let existing = null; if (args.x_user_id) { existing = await ctx.db .query("users") .withIndex("by_x_user_id", (q) => q.eq("x_user_id", args.x_user_id!)) .first(); } if (!existing && args.email) { existing = await ctx.db .query("users") .withIndex("by_email", (q) => q.eq("email", args.email!)) .first(); } if (!existing && args.username) { existing = await ctx.db .query("users") .withIndex("by_username", (q) => q.eq("username", args.username!)) .first(); } const updated_at = nowIso(); if (existing) { await ctx.db.patch(existing._id, { x_user_id: args.x_user_id ?? existing.x_user_id, username: args.username ?? existing.username, email: args.email ?? existing.email, updated_at, }); return existing._id; } return ctx.db.insert("users", { x_user_id: args.x_user_id, username: args.username, email: args.email, created_at: updated_at, updated_at, }); }, }); export const getWallet = query({ args: { user_id: v.id("users") }, handler: async (ctx, args) => { const wallet = await ctx.db .query("wallets") .withIndex("by_user_id", (q) => q.eq("user_id", args.user_id)) .first(); return wallet || { user_id: args.user_id, balance_credits: 0, updated_at: null, }; }, }); export const applyWalletTransaction = mutation({ args: { user_id: v.id("users"), type: WalletTxType, amount: v.number(), reason: v.optional(v.string()), idempotency_key: v.string(), }, handler: async (ctx, args) => { if (!Number.isInteger(args.amount) || args.amount <= 0) { throw new Error("invalid_amount"); } const existing = await ctx.db .query("wallet_transactions") .withIndex("by_idempotency_key", (q) => q.eq("idempotency_key", args.idempotency_key)) .first(); if (existing) { return { deduped: true, transaction: existing, }; } const wallet = await findWalletByUser(ctx, args.user_id); const currentBalance = wallet ? wallet.balance_credits : 0; if (args.type === "debit" && currentBalance < args.amount) { throw new Error("insufficient_credits"); } const delta = args.type === "debit" ? -args.amount : args.amount; const balance_after = currentBalance + delta; const updated_at = nowIso(); if (wallet) { await ctx.db.patch(wallet._id, { balance_credits: balance_after, updated_at, }); } else { await ctx.db.insert("wallets", { user_id: args.user_id, balance_credits: balance_after, updated_at, }); } const txId = await ctx.db.insert("wallet_transactions", { user_id: args.user_id, type: args.type, amount: args.amount, reason: args.reason, idempotency_key: args.idempotency_key, balance_after, created_at: updated_at, }); const tx = await ctx.db.get(txId); return { deduped: false, transaction: tx, }; }, }); export const recordMentionEvent = mutation({ args: { mention_post_id: v.string(), mention_author_id: v.string(), parent_post_id: v.string(), status: v.string(), error_code: v.optional(v.string()), }, handler: async (ctx, args) => { const existing = await ctx.db .query("mention_events") .withIndex("by_mention_post_id", (q) => q.eq("mention_post_id", args.mention_post_id)) .first(); const updated_at = nowIso(); if (existing) { await ctx.db.patch(existing._id, { mention_author_id: args.mention_author_id, parent_post_id: args.parent_post_id, status: args.status, error_code: args.error_code, updated_at, }); return { id: existing._id, deduped: true }; } const id = await ctx.db.insert("mention_events", { mention_post_id: args.mention_post_id, mention_author_id: args.mention_author_id, parent_post_id: args.parent_post_id, status: args.status, error_code: args.error_code, created_at: updated_at, updated_at, }); return { id, deduped: false }; }, }); export const upsertArticle = mutation({ args: { x_article_id: v.optional(v.string()), parent_post_id: v.string(), author_id: v.optional(v.string()), title: v.string(), raw_content: v.optional(v.string()), char_count: v.optional(v.number()), content_hash: v.optional(v.string()), }, handler: async (ctx, args) => { const normalizedCount = Number.isInteger(args.char_count) ? args.char_count : (args.raw_content ? args.raw_content.length : 0); if (!Number.isInteger(normalizedCount) || normalizedCount <= 0) { throw new Error("invalid_char_count"); } let existing = null; if (args.x_article_id) { existing = await ctx.db .query("articles") .withIndex("by_x_article_id", (q) => q.eq("x_article_id", args.x_article_id!)) .first(); } if (!existing) { existing = await ctx.db .query("articles") .withIndex("by_parent_post_id", (q) => q.eq("parent_post_id", args.parent_post_id)) .first(); } const updated_at = nowIso(); if (existing) { await ctx.db.patch(existing._id, { x_article_id: args.x_article_id ?? existing.x_article_id, parent_post_id: args.parent_post_id, author_id: args.author_id, title: args.title, raw_content: args.raw_content, char_count: normalizedCount, content_hash: args.content_hash, updated_at, }); return existing._id; } return ctx.db.insert("articles", { x_article_id: args.x_article_id, parent_post_id: args.parent_post_id, author_id: args.author_id, title: args.title, raw_content: args.raw_content, char_count: normalizedCount, content_hash: args.content_hash, created_at: updated_at, updated_at, }); }, }); export const createAudioJob = mutation({ args: { user_id: v.id("users"), mention_event_id: v.optional(v.id("mention_events")), article_id: v.id("articles"), status: v.optional(JobStatus), credits_charged: v.number(), tts_provider: v.optional(v.string()), tts_model: v.optional(v.string()), error: v.optional(v.string()), }, handler: async (ctx, args) => { if (!Number.isInteger(args.credits_charged) || args.credits_charged < 0) { throw new Error("invalid_credits_charged"); } const timestamp = nowIso(); return ctx.db.insert("audio_jobs", { user_id: args.user_id, mention_event_id: args.mention_event_id, article_id: args.article_id, status: args.status || "received", credits_charged: args.credits_charged, tts_provider: args.tts_provider, tts_model: args.tts_model, error: args.error, created_at: timestamp, updated_at: timestamp, }); }, }); export const updateAudioJob = mutation({ args: { job_id: v.id("audio_jobs"), status: v.optional(JobStatus), credits_charged: v.optional(v.number()), tts_provider: v.optional(v.string()), tts_model: v.optional(v.string()), error: v.optional(v.string()), asset_id: v.optional(v.id("audio_assets")), }, handler: async (ctx, args) => { const job = await ctx.db.get(args.job_id); if (!job) { throw new Error("job_not_found"); } await ctx.db.patch(args.job_id, { status: args.status ?? job.status, credits_charged: args.credits_charged ?? job.credits_charged, tts_provider: args.tts_provider ?? job.tts_provider, tts_model: args.tts_model ?? job.tts_model, error: args.error, asset_id: args.asset_id ?? job.asset_id, updated_at: nowIso(), }); return ctx.db.get(args.job_id); }, }); export const createAudioAsset = mutation({ args: { job_id: v.id("audio_jobs"), storage_key: v.optional(v.string()), duration_sec: v.optional(v.number()), size_bytes: v.optional(v.number()), codec: v.optional(v.string()), public_url_ttl: v.optional(v.number()), }, handler: async (ctx, args) => { const job = await ctx.db.get(args.job_id); if (!job) { throw new Error("job_not_found"); } const timestamp = nowIso(); const assetId = await ctx.db.insert("audio_assets", { job_id: args.job_id, storage_key: args.storage_key, duration_sec: args.duration_sec, size_bytes: args.size_bytes, codec: args.codec, public_url_ttl: args.public_url_ttl, created_at: timestamp, updated_at: timestamp, }); await ctx.db.patch(args.job_id, { asset_id: assetId, updated_at: timestamp, }); return assetId; }, }); export const updateAudioAsset = mutation({ args: { asset_id: v.id("audio_assets"), storage_key: v.optional(v.string()), duration_sec: v.optional(v.number()), size_bytes: v.optional(v.number()), codec: v.optional(v.string()), public_url_ttl: v.optional(v.number()), deleted_at: v.optional(v.string()), last_played_at: v.optional(v.string()), }, handler: async (ctx, args) => { const asset = await ctx.db.get(args.asset_id); if (!asset) { throw new Error("asset_not_found"); } await ctx.db.patch(args.asset_id, { storage_key: args.storage_key ?? asset.storage_key, duration_sec: args.duration_sec ?? asset.duration_sec, size_bytes: args.size_bytes ?? asset.size_bytes, codec: args.codec ?? asset.codec, public_url_ttl: args.public_url_ttl ?? asset.public_url_ttl, deleted_at: args.deleted_at, last_played_at: args.last_played_at, updated_at: nowIso(), }); return ctx.db.get(args.asset_id); }, }); export const grantAudioAccess = mutation({ args: { audio_asset_id: v.id("audio_assets"), user_id: v.id("users"), granted_via: v.union(v.literal("owner"), v.literal("repurchase"), v.literal("admin")), credits_paid: v.number(), }, handler: async (ctx, args) => { const existing = await ctx.db .query("audio_access_grants") .withIndex("by_asset_and_user", (q) => q.eq("audio_asset_id", args.audio_asset_id).eq("user_id", args.user_id)) .first(); if (existing) { return { id: existing._id, deduped: true }; } const id = await ctx.db.insert("audio_access_grants", { audio_asset_id: args.audio_asset_id, user_id: args.user_id, granted_via: args.granted_via, credits_paid: args.credits_paid, created_at: nowIso(), }); return { id, deduped: false }; }, }); export const hasAudioAccess = query({ args: { audio_asset_id: v.id("audio_assets"), user_id: v.optional(v.id("users")), }, handler: async (ctx, args) => { const asset = await ctx.db.get(args.audio_asset_id); if (!asset) { return { allowed: false, reason: "not_found" }; } const job = await ctx.db.get(asset.job_id); if (!job) { return { allowed: false, reason: "job_not_found" }; } if (!args.user_id) { return { allowed: false, reason: "auth_required", credits_required: job.credits_charged }; } if (job.user_id === args.user_id) { return { allowed: true, reason: "owner", credits_required: 0 }; } const grant = await ctx.db .query("audio_access_grants") .withIndex("by_asset_and_user", (q) => q.eq("audio_asset_id", args.audio_asset_id).eq("user_id", args.user_id!)) .first(); if (grant) { return { allowed: true, reason: "grant", credits_required: 0 }; } return { allowed: false, reason: "payment_required", credits_required: job.credits_charged, }; }, }); export const recordPaymentEvent = mutation({ args: { provider: v.string(), provider_event_id: v.string(), status: v.string(), payload_hash: v.optional(v.string()), }, handler: async (ctx, args) => { const existing = await ctx.db .query("payment_events") .withIndex("by_provider_and_event", (q) => q.eq("provider", args.provider).eq("provider_event_id", args.provider_event_id)) .first(); const updated_at = nowIso(); if (existing) { await ctx.db.patch(existing._id, { status: args.status, payload_hash: args.payload_hash, updated_at, }); return { id: existing._id, deduped: true }; } const id = await ctx.db.insert("payment_events", { provider: args.provider, provider_event_id: args.provider_event_id, status: args.status, payload_hash: args.payload_hash, created_at: updated_at, updated_at, }); return { id, deduped: false }; }, }); export const getJob = query({ args: { job_id: v.id("audio_jobs") }, handler: async (ctx, args) => { const job = await ctx.db.get(args.job_id); if (!job) { return null; } const article = await ctx.db.get(job.article_id); const asset = job.asset_id ? await ctx.db.get(job.asset_id) : null; return { job, article, asset, }; }, }); export const listJobsForUser = query({ args: { user_id: v.id("users") }, handler: async (ctx, args) => { const jobs = await ctx.db .query("audio_jobs") .withIndex("by_user_id", (q) => q.eq("user_id", args.user_id)) .collect(); jobs.sort((a, b) => (a.created_at > b.created_at ? -1 : 1)); return jobs; }, }); async function clearTable(ctx: any, tableName: string) { const rows = await (ctx.db as any).query(tableName).collect(); for (const row of rows) { await (ctx.db as any).delete(row._id); } } function normalizeStatus(rawStatus: string | undefined) { const allowed = new Set([ "received", "validated", "priced", "charged", "synthesizing", "uploaded", "completed", "failed_refunded", "failed_not_refunded", ]); if (rawStatus && allowed.has(rawStatus)) { return rawStatus; } return "received"; } export async function syncFromEngineSnapshot(ctx: any, rawSnapshot: any) { const engine = rawSnapshot && typeof rawSnapshot === "object" && rawSnapshot.engine ? rawSnapshot.engine : rawSnapshot; if (!engine || typeof engine !== "object") { return { imported: false, reason: "missing_engine_snapshot" }; } const walletsState = engine.wallets || {}; const balances = walletsState.balances && typeof walletsState.balances === "object" ? walletsState.balances : {}; const walletTransactions = Array.isArray(walletsState.transactions) ? walletsState.transactions : []; const jobsById = engine.jobs && typeof engine.jobs === "object" ? engine.jobs : {}; const assetsById = engine.assets && typeof engine.assets === "object" ? engine.assets : {}; const mentionsByPostId = engine.mentions && typeof engine.mentions === "object" ? engine.mentions : {}; const grantsByAssetId = engine.access && engine.access.grants && typeof engine.access.grants === "object" ? engine.access.grants : {}; await clearTable(ctx, "audio_access_grants"); await clearTable(ctx, "audio_assets"); await clearTable(ctx, "audio_jobs"); await clearTable(ctx, "articles"); await clearTable(ctx, "mention_events"); await clearTable(ctx, "wallet_transactions"); await clearTable(ctx, "wallets"); await clearTable(ctx, "payment_events"); await clearTable(ctx, "users"); const userKeys = new Set(); for (const key of Object.keys(balances)) { userKeys.add(String(key)); } for (const tx of walletTransactions) { if (tx && tx.userId) { userKeys.add(String(tx.userId)); } } for (const job of Object.values(jobsById)) { if (job && (job as any).callerUserId) { userKeys.add(String((job as any).callerUserId)); } } for (const asset of Object.values(assetsById)) { if (asset && (asset as any).ownerUserId) { userKeys.add(String((asset as any).ownerUserId)); } } for (const users of Object.values(grantsByAssetId)) { if (!Array.isArray(users)) { continue; } for (const user of users) { if (user) { userKeys.add(String(user)); } } } const userIdByLegacy = new Map(); for (const userKey of Array.from(userKeys.values()).sort()) { const ts = nowIso(); const userId = await (ctx.db as any).insert("users", { username: userKey, email: userKey.includes("@") ? userKey : undefined, x_user_id: undefined, created_at: ts, updated_at: ts, }); userIdByLegacy.set(userKey, userId); } for (const [legacyUser, userId] of userIdByLegacy.entries()) { await (ctx.db as any).insert("wallets", { user_id: userId, balance_credits: Number.isInteger((balances as any)[legacyUser]) ? (balances as any)[legacyUser] : 0, updated_at: nowIso(), }); } walletTransactions.sort((a: any, b: any) => { const aTime = String(a && a.createdAt ? a.createdAt : ""); const bTime = String(b && b.createdAt ? b.createdAt : ""); return aTime.localeCompare(bTime); }); for (const tx of walletTransactions) { if (!tx || !tx.userId || !userIdByLegacy.has(String(tx.userId))) { continue; } await (ctx.db as any).insert("wallet_transactions", { user_id: userIdByLegacy.get(String(tx.userId)), type: tx.type, amount: Number.isInteger(tx.amount) ? tx.amount : 0, reason: tx.reason || undefined, idempotency_key: String(tx.idempotencyKey || `legacy-tx:${tx.id || nowIso()}`), balance_after: Number.isInteger(tx.balanceAfter) ? tx.balanceAfter : 0, created_at: String(tx.createdAt || nowIso()), }); } const mentionIdByPost = new Map(); for (const [mentionPostId, legacyJobId] of Object.entries(mentionsByPostId)) { const job = (jobsById as any)[legacyJobId]; const id = await (ctx.db as any).insert("mention_events", { mention_post_id: mentionPostId, mention_author_id: String((job && job.callerUserId) || "unknown"), parent_post_id: String((job && job.article && job.article.parentPostId) || "unknown"), status: normalizeStatus(job && job.status), error_code: job && job.error ? String(job.error) : undefined, created_at: String((job && job.createdAt) || nowIso()), updated_at: String((job && job.updatedAt) || nowIso()), }); mentionIdByPost.set(mentionPostId, id); } const articleIdByLegacyKey = new Map(); const articleIdByJobId = new Map(); for (const [legacyJobId, rawJob] of Object.entries(jobsById)) { const job: any = rawJob || {}; const article = job.article || {}; const legacyArticleKey = String(article.xArticleId || article.parentPostId || `job:${legacyJobId}`); if (!articleIdByLegacyKey.has(legacyArticleKey)) { const id = await (ctx.db as any).insert("articles", { x_article_id: article.xArticleId || undefined, parent_post_id: String(article.parentPostId || `legacy-parent:${legacyJobId}`), author_id: article.authorId || undefined, title: String(article.title || "Untitled"), char_count: Number.isInteger(article.charCount) ? article.charCount : String(article.content || "").length, content_hash: article.contentHash || undefined, raw_content: typeof article.content === "string" ? article.content : undefined, created_at: String(job.createdAt || nowIso()), updated_at: String(job.updatedAt || nowIso()), }); articleIdByLegacyKey.set(legacyArticleKey, id); } articleIdByJobId.set(legacyJobId, articleIdByLegacyKey.get(legacyArticleKey)); } const legacyJobByAssetId = new Map(); const jobIdByLegacy = new Map(); const sortedJobs = Object.entries(jobsById).sort(([, aRaw], [, bRaw]) => { const a = aRaw as any; const b = bRaw as any; const aTime = String(a && a.createdAt ? a.createdAt : ""); const bTime = String(b && b.createdAt ? b.createdAt : ""); return aTime.localeCompare(bTime); }); for (const [legacyJobId, rawJob] of sortedJobs) { const job: any = rawJob || {}; if (job.assetId) { legacyJobByAssetId.set(String(job.assetId), legacyJobId); } const caller = String(job.callerUserId || ""); const userId = userIdByLegacy.get(caller); if (!userId) { continue; } const id = await (ctx.db as any).insert("audio_jobs", { user_id: userId, mention_event_id: mentionIdByPost.get(String(job.mentionPostId || "")) || undefined, article_id: articleIdByJobId.get(legacyJobId), status: normalizeStatus(job.status), credits_charged: Number.isInteger(job.creditsCharged) ? job.creditsCharged : 0, tts_provider: undefined, tts_model: undefined, error: job.error || undefined, created_at: String(job.createdAt || nowIso()), updated_at: String(job.updatedAt || nowIso()), }); jobIdByLegacy.set(legacyJobId, id); } const assetIdByLegacy = new Map(); for (const [legacyAssetId, rawAsset] of Object.entries(assetsById)) { const asset: any = rawAsset || {}; const legacyJobId = legacyJobByAssetId.get(legacyAssetId); const jobId = legacyJobId ? jobIdByLegacy.get(legacyJobId) : null; if (!jobId) { continue; } const id = await (ctx.db as any).insert("audio_assets", { job_id: jobId, storage_key: asset.storageKey || undefined, duration_sec: Number.isInteger(asset.durationSec) ? asset.durationSec : undefined, size_bytes: Number.isInteger(asset.sizeBytes) ? asset.sizeBytes : undefined, codec: "mp3", public_url_ttl: undefined, deleted_at: asset.deletedAt || undefined, last_played_at: asset.lastPlayedAt || undefined, created_at: String(asset.createdAt || nowIso()), updated_at: String(asset.updatedAt || asset.createdAt || nowIso()), }); assetIdByLegacy.set(legacyAssetId, id); await (ctx.db as any).patch(jobId, { asset_id: id, updated_at: nowIso(), }); } for (const [legacyAssetId, users] of Object.entries(grantsByAssetId)) { if (!Array.isArray(users)) { continue; } const assetId = assetIdByLegacy.get(legacyAssetId); if (!assetId) { continue; } const legacyJobId = legacyJobByAssetId.get(legacyAssetId); const job = legacyJobId ? (jobsById as any)[legacyJobId] : null; const owner = job && job.callerUserId ? String(job.callerUserId) : null; const creditsCharged = job && Number.isInteger(job.creditsCharged) ? job.creditsCharged : 0; for (const legacyUserId of users) { const userId = userIdByLegacy.get(String(legacyUserId)); if (!userId) { continue; } const isOwner = owner && owner === String(legacyUserId); await (ctx.db as any).insert("audio_access_grants", { audio_asset_id: assetId, user_id: userId, granted_via: isOwner ? "owner" : "repurchase", credits_paid: isOwner ? 0 : creditsCharged, created_at: nowIso(), }); } } return { imported: true, users: userIdByLegacy.size, wallets: userIdByLegacy.size, wallet_transactions: walletTransactions.length, mention_events: mentionIdByPost.size, articles: articleIdByLegacyKey.size, audio_jobs: jobIdByLegacy.size, audio_assets: assetIdByLegacy.size, }; } export const syncFromEngineSnapshotMutation = mutation({ args: { snapshot: v.any(), }, handler: async (ctx, args) => syncFromEngineSnapshot(ctx, args.snapshot), });