feat(convex): add full domain schema and snapshot sync bridge

This commit is contained in:
Codex
2026-02-18 15:06:53 +00:00
parent d1fbff481b
commit 489972c6dc
4 changed files with 970 additions and 2 deletions

819
convex/domain.ts Normal file
View File

@@ -0,0 +1,819 @@
import { mutation, query } from "./_generated/server";
import { v } from "convex/values";
const WalletTxType = v.union(v.literal("credit"), v.literal("debit"), v.literal("refund"));
const JobStatus = v.union(
v.literal("received"),
v.literal("validated"),
v.literal("priced"),
v.literal("charged"),
v.literal("synthesizing"),
v.literal("uploaded"),
v.literal("completed"),
v.literal("failed_refunded"),
v.literal("failed_not_refunded"),
);
function nowIso() {
return new Date().toISOString();
}
async function findWalletByUser(ctx: any, userId: any) {
return ctx.db
.query("wallets")
.withIndex("by_user_id", (q: any) => q.eq("user_id", userId))
.first();
}
export const upsertUser = mutation({
args: {
x_user_id: v.optional(v.string()),
username: v.optional(v.string()),
email: v.optional(v.string()),
},
handler: async (ctx, args) => {
let existing = null;
if (args.x_user_id) {
existing = await ctx.db
.query("users")
.withIndex("by_x_user_id", (q) => q.eq("x_user_id", args.x_user_id!))
.first();
}
if (!existing && args.email) {
existing = await ctx.db
.query("users")
.withIndex("by_email", (q) => q.eq("email", args.email!))
.first();
}
if (!existing && args.username) {
existing = await ctx.db
.query("users")
.withIndex("by_username", (q) => q.eq("username", args.username!))
.first();
}
const updated_at = nowIso();
if (existing) {
await ctx.db.patch(existing._id, {
x_user_id: args.x_user_id ?? existing.x_user_id,
username: args.username ?? existing.username,
email: args.email ?? existing.email,
updated_at,
});
return existing._id;
}
return ctx.db.insert("users", {
x_user_id: args.x_user_id,
username: args.username,
email: args.email,
created_at: updated_at,
updated_at,
});
},
});
export const getWallet = query({
args: { user_id: v.id("users") },
handler: async (ctx, args) => {
const wallet = await ctx.db
.query("wallets")
.withIndex("by_user_id", (q) => q.eq("user_id", args.user_id))
.first();
return wallet || {
user_id: args.user_id,
balance_credits: 0,
updated_at: null,
};
},
});
export const applyWalletTransaction = mutation({
args: {
user_id: v.id("users"),
type: WalletTxType,
amount: v.number(),
reason: v.optional(v.string()),
idempotency_key: v.string(),
},
handler: async (ctx, args) => {
if (!Number.isInteger(args.amount) || args.amount <= 0) {
throw new Error("invalid_amount");
}
const existing = await ctx.db
.query("wallet_transactions")
.withIndex("by_idempotency_key", (q) => q.eq("idempotency_key", args.idempotency_key))
.first();
if (existing) {
return {
deduped: true,
transaction: existing,
};
}
const wallet = await findWalletByUser(ctx, args.user_id);
const currentBalance = wallet ? wallet.balance_credits : 0;
if (args.type === "debit" && currentBalance < args.amount) {
throw new Error("insufficient_credits");
}
const delta = args.type === "debit" ? -args.amount : args.amount;
const balance_after = currentBalance + delta;
const updated_at = nowIso();
if (wallet) {
await ctx.db.patch(wallet._id, {
balance_credits: balance_after,
updated_at,
});
} else {
await ctx.db.insert("wallets", {
user_id: args.user_id,
balance_credits: balance_after,
updated_at,
});
}
const txId = await ctx.db.insert("wallet_transactions", {
user_id: args.user_id,
type: args.type,
amount: args.amount,
reason: args.reason,
idempotency_key: args.idempotency_key,
balance_after,
created_at: updated_at,
});
const tx = await ctx.db.get(txId);
return {
deduped: false,
transaction: tx,
};
},
});
export const recordMentionEvent = mutation({
args: {
mention_post_id: v.string(),
mention_author_id: v.string(),
parent_post_id: v.string(),
status: v.string(),
error_code: v.optional(v.string()),
},
handler: async (ctx, args) => {
const existing = await ctx.db
.query("mention_events")
.withIndex("by_mention_post_id", (q) => q.eq("mention_post_id", args.mention_post_id))
.first();
const updated_at = nowIso();
if (existing) {
await ctx.db.patch(existing._id, {
mention_author_id: args.mention_author_id,
parent_post_id: args.parent_post_id,
status: args.status,
error_code: args.error_code,
updated_at,
});
return { id: existing._id, deduped: true };
}
const id = await ctx.db.insert("mention_events", {
mention_post_id: args.mention_post_id,
mention_author_id: args.mention_author_id,
parent_post_id: args.parent_post_id,
status: args.status,
error_code: args.error_code,
created_at: updated_at,
updated_at,
});
return { id, deduped: false };
},
});
export const upsertArticle = mutation({
args: {
x_article_id: v.optional(v.string()),
parent_post_id: v.string(),
author_id: v.optional(v.string()),
title: v.string(),
raw_content: v.optional(v.string()),
char_count: v.optional(v.number()),
content_hash: v.optional(v.string()),
},
handler: async (ctx, args) => {
const normalizedCount = Number.isInteger(args.char_count)
? args.char_count
: (args.raw_content ? args.raw_content.length : 0);
if (!Number.isInteger(normalizedCount) || normalizedCount <= 0) {
throw new Error("invalid_char_count");
}
let existing = null;
if (args.x_article_id) {
existing = await ctx.db
.query("articles")
.withIndex("by_x_article_id", (q) => q.eq("x_article_id", args.x_article_id!))
.first();
}
if (!existing) {
existing = await ctx.db
.query("articles")
.withIndex("by_parent_post_id", (q) => q.eq("parent_post_id", args.parent_post_id))
.first();
}
const updated_at = nowIso();
if (existing) {
await ctx.db.patch(existing._id, {
x_article_id: args.x_article_id ?? existing.x_article_id,
parent_post_id: args.parent_post_id,
author_id: args.author_id,
title: args.title,
raw_content: args.raw_content,
char_count: normalizedCount,
content_hash: args.content_hash,
updated_at,
});
return existing._id;
}
return ctx.db.insert("articles", {
x_article_id: args.x_article_id,
parent_post_id: args.parent_post_id,
author_id: args.author_id,
title: args.title,
raw_content: args.raw_content,
char_count: normalizedCount,
content_hash: args.content_hash,
created_at: updated_at,
updated_at,
});
},
});
export const createAudioJob = mutation({
args: {
user_id: v.id("users"),
mention_event_id: v.optional(v.id("mention_events")),
article_id: v.id("articles"),
status: v.optional(JobStatus),
credits_charged: v.number(),
tts_provider: v.optional(v.string()),
tts_model: v.optional(v.string()),
error: v.optional(v.string()),
},
handler: async (ctx, args) => {
if (!Number.isInteger(args.credits_charged) || args.credits_charged < 0) {
throw new Error("invalid_credits_charged");
}
const timestamp = nowIso();
return ctx.db.insert("audio_jobs", {
user_id: args.user_id,
mention_event_id: args.mention_event_id,
article_id: args.article_id,
status: args.status || "received",
credits_charged: args.credits_charged,
tts_provider: args.tts_provider,
tts_model: args.tts_model,
error: args.error,
created_at: timestamp,
updated_at: timestamp,
});
},
});
export const updateAudioJob = mutation({
args: {
job_id: v.id("audio_jobs"),
status: v.optional(JobStatus),
credits_charged: v.optional(v.number()),
tts_provider: v.optional(v.string()),
tts_model: v.optional(v.string()),
error: v.optional(v.string()),
asset_id: v.optional(v.id("audio_assets")),
},
handler: async (ctx, args) => {
const job = await ctx.db.get(args.job_id);
if (!job) {
throw new Error("job_not_found");
}
await ctx.db.patch(args.job_id, {
status: args.status ?? job.status,
credits_charged: args.credits_charged ?? job.credits_charged,
tts_provider: args.tts_provider ?? job.tts_provider,
tts_model: args.tts_model ?? job.tts_model,
error: args.error,
asset_id: args.asset_id ?? job.asset_id,
updated_at: nowIso(),
});
return ctx.db.get(args.job_id);
},
});
export const createAudioAsset = mutation({
args: {
job_id: v.id("audio_jobs"),
storage_key: v.optional(v.string()),
duration_sec: v.optional(v.number()),
size_bytes: v.optional(v.number()),
codec: v.optional(v.string()),
public_url_ttl: v.optional(v.number()),
},
handler: async (ctx, args) => {
const job = await ctx.db.get(args.job_id);
if (!job) {
throw new Error("job_not_found");
}
const timestamp = nowIso();
const assetId = await ctx.db.insert("audio_assets", {
job_id: args.job_id,
storage_key: args.storage_key,
duration_sec: args.duration_sec,
size_bytes: args.size_bytes,
codec: args.codec,
public_url_ttl: args.public_url_ttl,
created_at: timestamp,
updated_at: timestamp,
});
await ctx.db.patch(args.job_id, {
asset_id: assetId,
updated_at: timestamp,
});
return assetId;
},
});
export const updateAudioAsset = mutation({
args: {
asset_id: v.id("audio_assets"),
storage_key: v.optional(v.string()),
duration_sec: v.optional(v.number()),
size_bytes: v.optional(v.number()),
codec: v.optional(v.string()),
public_url_ttl: v.optional(v.number()),
deleted_at: v.optional(v.string()),
last_played_at: v.optional(v.string()),
},
handler: async (ctx, args) => {
const asset = await ctx.db.get(args.asset_id);
if (!asset) {
throw new Error("asset_not_found");
}
await ctx.db.patch(args.asset_id, {
storage_key: args.storage_key ?? asset.storage_key,
duration_sec: args.duration_sec ?? asset.duration_sec,
size_bytes: args.size_bytes ?? asset.size_bytes,
codec: args.codec ?? asset.codec,
public_url_ttl: args.public_url_ttl ?? asset.public_url_ttl,
deleted_at: args.deleted_at,
last_played_at: args.last_played_at,
updated_at: nowIso(),
});
return ctx.db.get(args.asset_id);
},
});
export const grantAudioAccess = mutation({
args: {
audio_asset_id: v.id("audio_assets"),
user_id: v.id("users"),
granted_via: v.union(v.literal("owner"), v.literal("repurchase"), v.literal("admin")),
credits_paid: v.number(),
},
handler: async (ctx, args) => {
const existing = await ctx.db
.query("audio_access_grants")
.withIndex("by_asset_and_user", (q) => q.eq("audio_asset_id", args.audio_asset_id).eq("user_id", args.user_id))
.first();
if (existing) {
return { id: existing._id, deduped: true };
}
const id = await ctx.db.insert("audio_access_grants", {
audio_asset_id: args.audio_asset_id,
user_id: args.user_id,
granted_via: args.granted_via,
credits_paid: args.credits_paid,
created_at: nowIso(),
});
return { id, deduped: false };
},
});
export const hasAudioAccess = query({
args: {
audio_asset_id: v.id("audio_assets"),
user_id: v.optional(v.id("users")),
},
handler: async (ctx, args) => {
const asset = await ctx.db.get(args.audio_asset_id);
if (!asset) {
return { allowed: false, reason: "not_found" };
}
const job = await ctx.db.get(asset.job_id);
if (!job) {
return { allowed: false, reason: "job_not_found" };
}
if (!args.user_id) {
return { allowed: false, reason: "auth_required", credits_required: job.credits_charged };
}
if (job.user_id === args.user_id) {
return { allowed: true, reason: "owner", credits_required: 0 };
}
const grant = await ctx.db
.query("audio_access_grants")
.withIndex("by_asset_and_user", (q) => q.eq("audio_asset_id", args.audio_asset_id).eq("user_id", args.user_id!))
.first();
if (grant) {
return { allowed: true, reason: "grant", credits_required: 0 };
}
return {
allowed: false,
reason: "payment_required",
credits_required: job.credits_charged,
};
},
});
export const recordPaymentEvent = mutation({
args: {
provider: v.string(),
provider_event_id: v.string(),
status: v.string(),
payload_hash: v.optional(v.string()),
},
handler: async (ctx, args) => {
const existing = await ctx.db
.query("payment_events")
.withIndex("by_provider_and_event", (q) => q.eq("provider", args.provider).eq("provider_event_id", args.provider_event_id))
.first();
const updated_at = nowIso();
if (existing) {
await ctx.db.patch(existing._id, {
status: args.status,
payload_hash: args.payload_hash,
updated_at,
});
return { id: existing._id, deduped: true };
}
const id = await ctx.db.insert("payment_events", {
provider: args.provider,
provider_event_id: args.provider_event_id,
status: args.status,
payload_hash: args.payload_hash,
created_at: updated_at,
updated_at,
});
return { id, deduped: false };
},
});
export const getJob = query({
args: { job_id: v.id("audio_jobs") },
handler: async (ctx, args) => {
const job = await ctx.db.get(args.job_id);
if (!job) {
return null;
}
const article = await ctx.db.get(job.article_id);
const asset = job.asset_id ? await ctx.db.get(job.asset_id) : null;
return {
job,
article,
asset,
};
},
});
export const listJobsForUser = query({
args: { user_id: v.id("users") },
handler: async (ctx, args) => {
const jobs = await ctx.db
.query("audio_jobs")
.withIndex("by_user_id", (q) => q.eq("user_id", args.user_id))
.collect();
jobs.sort((a, b) => (a.created_at > b.created_at ? -1 : 1));
return jobs;
},
});
async function clearTable(ctx: any, tableName: string) {
const rows = await (ctx.db as any).query(tableName).collect();
for (const row of rows) {
await (ctx.db as any).delete(row._id);
}
}
function normalizeStatus(rawStatus: string | undefined) {
const allowed = new Set([
"received",
"validated",
"priced",
"charged",
"synthesizing",
"uploaded",
"completed",
"failed_refunded",
"failed_not_refunded",
]);
if (rawStatus && allowed.has(rawStatus)) {
return rawStatus;
}
return "received";
}
export async function syncFromEngineSnapshot(ctx: any, rawSnapshot: any) {
const engine = rawSnapshot && typeof rawSnapshot === "object" && rawSnapshot.engine
? rawSnapshot.engine
: rawSnapshot;
if (!engine || typeof engine !== "object") {
return { imported: false, reason: "missing_engine_snapshot" };
}
const walletsState = engine.wallets || {};
const balances = walletsState.balances && typeof walletsState.balances === "object"
? walletsState.balances
: {};
const walletTransactions = Array.isArray(walletsState.transactions)
? walletsState.transactions
: [];
const jobsById = engine.jobs && typeof engine.jobs === "object" ? engine.jobs : {};
const assetsById = engine.assets && typeof engine.assets === "object" ? engine.assets : {};
const mentionsByPostId = engine.mentions && typeof engine.mentions === "object" ? engine.mentions : {};
const grantsByAssetId = engine.access && engine.access.grants && typeof engine.access.grants === "object"
? engine.access.grants
: {};
await clearTable(ctx, "audio_access_grants");
await clearTable(ctx, "audio_assets");
await clearTable(ctx, "audio_jobs");
await clearTable(ctx, "articles");
await clearTable(ctx, "mention_events");
await clearTable(ctx, "wallet_transactions");
await clearTable(ctx, "wallets");
await clearTable(ctx, "payment_events");
await clearTable(ctx, "users");
const userKeys = new Set<string>();
for (const key of Object.keys(balances)) {
userKeys.add(String(key));
}
for (const tx of walletTransactions) {
if (tx && tx.userId) {
userKeys.add(String(tx.userId));
}
}
for (const job of Object.values(jobsById)) {
if (job && (job as any).callerUserId) {
userKeys.add(String((job as any).callerUserId));
}
}
for (const asset of Object.values(assetsById)) {
if (asset && (asset as any).ownerUserId) {
userKeys.add(String((asset as any).ownerUserId));
}
}
for (const users of Object.values(grantsByAssetId)) {
if (!Array.isArray(users)) {
continue;
}
for (const user of users) {
if (user) {
userKeys.add(String(user));
}
}
}
const userIdByLegacy = new Map<string, any>();
for (const userKey of Array.from(userKeys.values()).sort()) {
const ts = nowIso();
const userId = await (ctx.db as any).insert("users", {
username: userKey,
email: userKey.includes("@") ? userKey : undefined,
x_user_id: undefined,
created_at: ts,
updated_at: ts,
});
userIdByLegacy.set(userKey, userId);
}
for (const [legacyUser, userId] of userIdByLegacy.entries()) {
await (ctx.db as any).insert("wallets", {
user_id: userId,
balance_credits: Number.isInteger((balances as any)[legacyUser]) ? (balances as any)[legacyUser] : 0,
updated_at: nowIso(),
});
}
walletTransactions.sort((a: any, b: any) => {
const aTime = String(a && a.createdAt ? a.createdAt : "");
const bTime = String(b && b.createdAt ? b.createdAt : "");
return aTime.localeCompare(bTime);
});
for (const tx of walletTransactions) {
if (!tx || !tx.userId || !userIdByLegacy.has(String(tx.userId))) {
continue;
}
await (ctx.db as any).insert("wallet_transactions", {
user_id: userIdByLegacy.get(String(tx.userId)),
type: tx.type,
amount: Number.isInteger(tx.amount) ? tx.amount : 0,
reason: tx.reason || undefined,
idempotency_key: String(tx.idempotencyKey || `legacy-tx:${tx.id || nowIso()}`),
balance_after: Number.isInteger(tx.balanceAfter) ? tx.balanceAfter : 0,
created_at: String(tx.createdAt || nowIso()),
});
}
const mentionIdByPost = new Map<string, any>();
for (const [mentionPostId, legacyJobId] of Object.entries(mentionsByPostId)) {
const job = (jobsById as any)[legacyJobId];
const id = await (ctx.db as any).insert("mention_events", {
mention_post_id: mentionPostId,
mention_author_id: String((job && job.callerUserId) || "unknown"),
parent_post_id: String((job && job.article && job.article.parentPostId) || "unknown"),
status: normalizeStatus(job && job.status),
error_code: job && job.error ? String(job.error) : undefined,
created_at: String((job && job.createdAt) || nowIso()),
updated_at: String((job && job.updatedAt) || nowIso()),
});
mentionIdByPost.set(mentionPostId, id);
}
const articleIdByLegacyKey = new Map<string, any>();
const articleIdByJobId = new Map<string, any>();
for (const [legacyJobId, rawJob] of Object.entries(jobsById)) {
const job: any = rawJob || {};
const article = job.article || {};
const legacyArticleKey = String(article.xArticleId || article.parentPostId || `job:${legacyJobId}`);
if (!articleIdByLegacyKey.has(legacyArticleKey)) {
const id = await (ctx.db as any).insert("articles", {
x_article_id: article.xArticleId || undefined,
parent_post_id: String(article.parentPostId || `legacy-parent:${legacyJobId}`),
author_id: article.authorId || undefined,
title: String(article.title || "Untitled"),
char_count: Number.isInteger(article.charCount)
? article.charCount
: String(article.content || "").length,
content_hash: article.contentHash || undefined,
raw_content: typeof article.content === "string" ? article.content : undefined,
created_at: String(job.createdAt || nowIso()),
updated_at: String(job.updatedAt || nowIso()),
});
articleIdByLegacyKey.set(legacyArticleKey, id);
}
articleIdByJobId.set(legacyJobId, articleIdByLegacyKey.get(legacyArticleKey));
}
const legacyJobByAssetId = new Map<string, string>();
const jobIdByLegacy = new Map<string, any>();
const sortedJobs = Object.entries(jobsById).sort(([, aRaw], [, bRaw]) => {
const a = aRaw as any;
const b = bRaw as any;
const aTime = String(a && a.createdAt ? a.createdAt : "");
const bTime = String(b && b.createdAt ? b.createdAt : "");
return aTime.localeCompare(bTime);
});
for (const [legacyJobId, rawJob] of sortedJobs) {
const job: any = rawJob || {};
if (job.assetId) {
legacyJobByAssetId.set(String(job.assetId), legacyJobId);
}
const caller = String(job.callerUserId || "");
const userId = userIdByLegacy.get(caller);
if (!userId) {
continue;
}
const id = await (ctx.db as any).insert("audio_jobs", {
user_id: userId,
mention_event_id: mentionIdByPost.get(String(job.mentionPostId || "")) || undefined,
article_id: articleIdByJobId.get(legacyJobId),
status: normalizeStatus(job.status),
credits_charged: Number.isInteger(job.creditsCharged) ? job.creditsCharged : 0,
tts_provider: undefined,
tts_model: undefined,
error: job.error || undefined,
created_at: String(job.createdAt || nowIso()),
updated_at: String(job.updatedAt || nowIso()),
});
jobIdByLegacy.set(legacyJobId, id);
}
const assetIdByLegacy = new Map<string, any>();
for (const [legacyAssetId, rawAsset] of Object.entries(assetsById)) {
const asset: any = rawAsset || {};
const legacyJobId = legacyJobByAssetId.get(legacyAssetId);
const jobId = legacyJobId ? jobIdByLegacy.get(legacyJobId) : null;
if (!jobId) {
continue;
}
const id = await (ctx.db as any).insert("audio_assets", {
job_id: jobId,
storage_key: asset.storageKey || undefined,
duration_sec: Number.isInteger(asset.durationSec) ? asset.durationSec : undefined,
size_bytes: Number.isInteger(asset.sizeBytes) ? asset.sizeBytes : undefined,
codec: "mp3",
public_url_ttl: undefined,
deleted_at: asset.deletedAt || undefined,
last_played_at: asset.lastPlayedAt || undefined,
created_at: String(asset.createdAt || nowIso()),
updated_at: String(asset.updatedAt || asset.createdAt || nowIso()),
});
assetIdByLegacy.set(legacyAssetId, id);
await (ctx.db as any).patch(jobId, {
asset_id: id,
updated_at: nowIso(),
});
}
for (const [legacyAssetId, users] of Object.entries(grantsByAssetId)) {
if (!Array.isArray(users)) {
continue;
}
const assetId = assetIdByLegacy.get(legacyAssetId);
if (!assetId) {
continue;
}
const legacyJobId = legacyJobByAssetId.get(legacyAssetId);
const job = legacyJobId ? (jobsById as any)[legacyJobId] : null;
const owner = job && job.callerUserId ? String(job.callerUserId) : null;
const creditsCharged = job && Number.isInteger(job.creditsCharged) ? job.creditsCharged : 0;
for (const legacyUserId of users) {
const userId = userIdByLegacy.get(String(legacyUserId));
if (!userId) {
continue;
}
const isOwner = owner && owner === String(legacyUserId);
await (ctx.db as any).insert("audio_access_grants", {
audio_asset_id: assetId,
user_id: userId,
granted_via: isOwner ? "owner" : "repurchase",
credits_paid: isOwner ? 0 : creditsCharged,
created_at: nowIso(),
});
}
}
return {
imported: true,
users: userIdByLegacy.size,
wallets: userIdByLegacy.size,
wallet_transactions: walletTransactions.length,
mention_events: mentionIdByPost.size,
articles: articleIdByLegacyKey.size,
audio_jobs: jobIdByLegacy.size,
audio_assets: assetIdByLegacy.size,
};
}
export const syncFromEngineSnapshotMutation = mutation({
args: {
snapshot: v.any(),
},
handler: async (ctx, args) => syncFromEngineSnapshot(ctx, args.snapshot),
});

View File

@@ -2,6 +2,127 @@ import { defineSchema, defineTable } from "convex/server";
import { v } from "convex/values"; import { v } from "convex/values";
export default defineSchema({ export default defineSchema({
users: defineTable({
x_user_id: v.optional(v.string()),
username: v.optional(v.string()),
email: v.optional(v.string()),
created_at: v.string(),
updated_at: v.string(),
})
.index("by_x_user_id", ["x_user_id"])
.index("by_username", ["username"])
.index("by_email", ["email"]),
wallets: defineTable({
user_id: v.id("users"),
balance_credits: v.number(),
updated_at: v.string(),
}).index("by_user_id", ["user_id"]),
wallet_transactions: defineTable({
user_id: v.id("users"),
type: v.union(v.literal("credit"), v.literal("debit"), v.literal("refund")),
amount: v.number(),
reason: v.optional(v.string()),
idempotency_key: v.string(),
balance_after: v.number(),
created_at: v.string(),
})
.index("by_user_id", ["user_id"])
.index("by_idempotency_key", ["idempotency_key"]),
mention_events: defineTable({
mention_post_id: v.string(),
mention_author_id: v.string(),
parent_post_id: v.string(),
status: v.string(),
error_code: v.optional(v.string()),
created_at: v.string(),
updated_at: v.string(),
})
.index("by_mention_post_id", ["mention_post_id"])
.index("by_parent_post_id", ["parent_post_id"])
.index("by_mention_author_id", ["mention_author_id"]),
articles: defineTable({
x_article_id: v.optional(v.string()),
parent_post_id: v.string(),
author_id: v.optional(v.string()),
title: v.string(),
char_count: v.number(),
content_hash: v.optional(v.string()),
raw_content: v.optional(v.string()),
created_at: v.string(),
updated_at: v.string(),
})
.index("by_x_article_id", ["x_article_id"])
.index("by_parent_post_id", ["parent_post_id"])
.index("by_content_hash", ["content_hash"]),
audio_jobs: defineTable({
user_id: v.id("users"),
mention_event_id: v.optional(v.id("mention_events")),
article_id: v.id("articles"),
status: v.union(
v.literal("received"),
v.literal("validated"),
v.literal("priced"),
v.literal("charged"),
v.literal("synthesizing"),
v.literal("uploaded"),
v.literal("completed"),
v.literal("failed_refunded"),
v.literal("failed_not_refunded"),
),
credits_charged: v.number(),
tts_provider: v.optional(v.string()),
tts_model: v.optional(v.string()),
error: v.optional(v.string()),
asset_id: v.optional(v.id("audio_assets")),
created_at: v.string(),
updated_at: v.string(),
})
.index("by_user_id", ["user_id"])
.index("by_mention_event_id", ["mention_event_id"])
.index("by_article_id", ["article_id"])
.index("by_status", ["status"]),
audio_assets: defineTable({
job_id: v.id("audio_jobs"),
storage_key: v.optional(v.string()),
duration_sec: v.optional(v.number()),
size_bytes: v.optional(v.number()),
codec: v.optional(v.string()),
public_url_ttl: v.optional(v.number()),
deleted_at: v.optional(v.string()),
last_played_at: v.optional(v.string()),
created_at: v.string(),
updated_at: v.string(),
}).index("by_job_id", ["job_id"]),
audio_access_grants: defineTable({
audio_asset_id: v.id("audio_assets"),
user_id: v.id("users"),
granted_via: v.union(v.literal("owner"), v.literal("repurchase"), v.literal("admin")),
credits_paid: v.number(),
created_at: v.string(),
})
.index("by_audio_asset_id", ["audio_asset_id"])
.index("by_user_id", ["user_id"])
.index("by_asset_and_user", ["audio_asset_id", "user_id"]),
payment_events: defineTable({
provider: v.string(),
provider_event_id: v.string(),
status: v.string(),
payload_hash: v.optional(v.string()),
created_at: v.string(),
updated_at: v.string(),
})
.index("by_provider_event_id", ["provider_event_id"])
.index("by_provider_and_event", ["provider", "provider_event_id"]),
// Legacy compatibility table while runtime is still transitioning away from snapshot persistence.
state_snapshots: defineTable({ state_snapshots: defineTable({
snapshot: v.any(), snapshot: v.any(),
updatedAt: v.string(), updatedAt: v.string(),

View File

@@ -1,5 +1,6 @@
import { mutation, query } from "./_generated/server"; import { mutation, query } from "./_generated/server";
import { v } from "convex/values"; import { v } from "convex/values";
import { syncFromEngineSnapshot } from "./domain";
export const getLatestSnapshot = query({ export const getLatestSnapshot = query({
args: {}, args: {},
@@ -29,17 +30,26 @@ export const saveSnapshot = mutation({
.order("desc") .order("desc")
.first(); .first();
const syncSummary = await syncFromEngineSnapshot(ctx, args.snapshot);
if (latest) { if (latest) {
await ctx.db.patch(latest._id, { await ctx.db.patch(latest._id, {
snapshot: args.snapshot, snapshot: args.snapshot,
updatedAt: args.updatedAt, updatedAt: args.updatedAt,
}); });
return latest._id; return {
snapshotId: latest._id,
syncSummary,
};
} }
return ctx.db.insert("state_snapshots", { const snapshotId = await ctx.db.insert("state_snapshots", {
snapshot: args.snapshot, snapshot: args.snapshot,
updatedAt: args.updatedAt, updatedAt: args.updatedAt,
}); });
return {
snapshotId,
syncSummary,
};
}, },
}); });

View File

@@ -7,8 +7,26 @@ const fs = require("node:fs");
test("convex state functions are present for configured function names", () => { test("convex state functions are present for configured function names", () => {
const schema = fs.readFileSync("convex/schema.ts", "utf8"); const schema = fs.readFileSync("convex/schema.ts", "utf8");
const state = fs.readFileSync("convex/state.ts", "utf8"); const state = fs.readFileSync("convex/state.ts", "utf8");
const domain = fs.readFileSync("convex/domain.ts", "utf8");
assert.match(schema, /users: defineTable/);
assert.match(schema, /wallets: defineTable/);
assert.match(schema, /wallet_transactions: defineTable/);
assert.match(schema, /mention_events: defineTable/);
assert.match(schema, /articles: defineTable/);
assert.match(schema, /audio_jobs: defineTable/);
assert.match(schema, /audio_assets: defineTable/);
assert.match(schema, /audio_access_grants: defineTable/);
assert.match(schema, /payment_events: defineTable/);
assert.match(schema, /state_snapshots/); assert.match(schema, /state_snapshots/);
assert.match(state, /export const getLatestSnapshot = query/); assert.match(state, /export const getLatestSnapshot = query/);
assert.match(state, /export const saveSnapshot = mutation/); assert.match(state, /export const saveSnapshot = mutation/);
assert.match(domain, /export const upsertUser = mutation/);
assert.match(domain, /export const applyWalletTransaction = mutation/);
assert.match(domain, /export const recordMentionEvent = mutation/);
assert.match(domain, /export const upsertArticle = mutation/);
assert.match(domain, /export const createAudioJob = mutation/);
assert.match(domain, /export const createAudioAsset = mutation/);
assert.match(domain, /export const grantAudioAccess = mutation/);
assert.match(domain, /export const recordPaymentEvent = mutation/);
}); });