From 141d7b42a87cc56669403b313809a5279ccc61ae Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 18 Feb 2026 14:19:21 +0000 Subject: [PATCH] feat: add job lifecycle controls abuse policies and retention operations --- src/app.js | 316 +++++++++++++++++++++++++++++++-- src/config.js | 12 ++ src/lib/engine.js | 201 ++++++++++++++++++++- test/app.test.js | 419 ++++++++++++++++++++++++++++++++++++++++++++ test/config.test.js | 30 +++- test/engine.test.js | 103 ++++++++++- 6 files changed, 1055 insertions(+), 26 deletions(-) diff --git a/src/app.js b/src/app.js index 30966f8..c192f41 100644 --- a/src/app.js +++ b/src/app.js @@ -1,6 +1,7 @@ "use strict"; const { randomUUID } = require("node:crypto"); +const { createHmac } = require("node:crypto"); const fs = require("node:fs/promises"); const pathLib = require("node:path"); const { XArtAudioEngine } = require("./lib/engine"); @@ -22,7 +23,6 @@ const { } = require("./lib/http"); const { FixedWindowRateLimiter } = require("./lib/rate-limit"); const { - XWebhookPayloadSchema, PolarWebhookPayloadSchema, LoginFormSchema, TopUpFormSchema, @@ -71,6 +71,7 @@ function buildApp({ initialState: initialState && initialState.engine ? initialState.engine : null, }); const rateLimits = config.rateLimits || {}; + const abusePolicy = config.abuse || {}; const polar = polarAdapter || createPolarAdapter({ accessToken: config.polarAccessToken, server: config.polarServer, @@ -169,23 +170,47 @@ function buildApp({ function scheduleAudioGeneration(job) { if (!generationService || !generationService.isConfigured()) { + try { + engine.completeJob(job.id); + persistMutation(); + } catch (error) { + logger.error({ err: error, jobId: job.id }, "failed to mark job as completed without generation worker"); + } + return; + } + + try { + engine.startJob(job.id); + persistMutation(); + } catch (error) { + logger.error({ err: error, jobId: job.id }, "failed to start audio generation job"); return; } generationService.enqueueJob({ + jobId: job.id, assetId: job.assetId, text: job.article.content, onCompleted: (audioMeta) => { try { - engine.updateAsset(job.assetId, audioMeta); + engine.completeJob(job.id, audioMeta); persistMutation(); - logger.info({ assetId: job.assetId }, "audio generation completed"); + logger.info({ assetId: job.assetId, jobId: job.id }, "audio generation completed"); } catch (error) { logger.error({ err: error, assetId: job.assetId }, "failed to apply generated audio metadata"); } }, onFailed: (error) => { - logger.error({ err: error, assetId: job.assetId }, "audio generation job failed"); + try { + engine.failJob(job.id, { + error: error && error.message ? error.message : "audio_generation_failed", + refund: true, + }); + persistMutation(); + } catch (failureError) { + logger.error({ err: failureError, jobId: job.id }, "failed to mark generation failure"); + } + logger.error({ err: error, assetId: job.assetId, jobId: job.id }, "audio generation job failed"); }, }).catch((error) => { logger.error({ err: error, assetId: job.assetId }, "audio generation scheduling failed"); @@ -203,7 +228,138 @@ function buildApp({ })); } - function handleXWebhook(headers, rawBody) { + function ensureInternalAuth(headers) { + if (!config.internalApiToken) { + return json(503, { error: "internal_api_disabled" }); + } + + const token = headers["x-internal-token"]; + if (!token || token !== config.internalApiToken) { + return json(401, { error: "invalid_internal_token" }); + } + + return null; + } + + function getAbuseDecision(callerUserId) { + if (!callerUserId) { + return { allowed: true }; + } + + const denyList = Array.isArray(abusePolicy.denyUserIds) ? abusePolicy.denyUserIds : []; + if (denyList.includes(callerUserId)) { + return { allowed: false, code: "user_denied" }; + } + + const nowMs = Date.now(); + const jobs = engine.listJobsForUser(callerUserId); + + const maxJobsPerDay = Number.isInteger(abusePolicy.maxJobsPerUserPerDay) + ? abusePolicy.maxJobsPerUserPerDay + : 0; + if (maxJobsPerDay > 0) { + const oneDayAgo = nowMs - 24 * 60 * 60 * 1000; + const recentCount = jobs.filter((job) => new Date(job.createdAt).getTime() >= oneDayAgo).length; + if (recentCount >= maxJobsPerDay) { + return { allowed: false, code: "daily_limit_exceeded" }; + } + } + + const cooldownSec = Number.isInteger(abusePolicy.cooldownSec) ? abusePolicy.cooldownSec : 0; + if (cooldownSec > 0) { + const latestJob = jobs[0] || null; + if (latestJob) { + const latestMs = new Date(latestJob.createdAt).getTime(); + const elapsedSec = Math.max(0, Math.floor((nowMs - latestMs) / 1000)); + if (elapsedSec < cooldownSec) { + return { + allowed: false, + code: "cooldown_active", + retryAfterSec: cooldownSec - elapsedSec, + }; + } + } + } + + return { allowed: true }; + } + + async function normalizeXWebhookPayload(parsedPayload) { + if (parsedPayload && parsedPayload.mentionPostId && parsedPayload.callerUserId) { + return { + mentionPostId: String(parsedPayload.mentionPostId), + callerUserId: String(parsedPayload.callerUserId), + parentPost: parsedPayload.parentPost, + }; + } + + if (parsedPayload && parsedPayload.mentionTweetId && parsedPayload.callerUserId) { + const mentionPostId = String(parsedPayload.mentionTweetId); + const callerUserId = String(parsedPayload.callerUserId); + const parentPost = parsedPayload.parentPost + || (x.isConfigured() && typeof x.fetchParentPostFromMention === "function" + ? await x.fetchParentPostFromMention(mentionPostId) + : null); + + if (!parentPost) { + throw new Error("parent_post_not_found"); + } + + return { + mentionPostId, + callerUserId, + parentPost, + }; + } + + const events = parsedPayload && Array.isArray(parsedPayload.tweet_create_events) + ? parsedPayload.tweet_create_events + : []; + if (events.length > 0) { + const mention = events[0]; + const mentionPostId = String(mention.id_str || mention.id || ""); + const callerUserId = String( + (mention.user && (mention.user.id_str || mention.user.id)) + || parsedPayload.for_user_id + || "", + ); + if (!mentionPostId || !callerUserId) { + throw new Error("invalid_x_webhook_payload"); + } + + if (!x.isConfigured() || typeof x.fetchParentPostFromMention !== "function") { + throw new Error("x_api_not_configured_for_parent_fetch"); + } + const parentPost = await x.fetchParentPostFromMention(mentionPostId); + if (!parentPost) { + throw new Error("parent_post_not_found"); + } + + return { + mentionPostId, + callerUserId, + parentPost, + }; + } + + throw new Error("invalid_x_webhook_payload"); + } + + async function replyToMentionIfPossible(mentionPostId, message) { + if (!x.isConfigured() || typeof x.replyToMention !== "function") { + return false; + } + + try { + await x.replyToMention({ mentionTweetId: mentionPostId, text: message }); + return true; + } catch (error) { + logger.warn({ err: error, mentionPostId }, "failed replying to x mention"); + return false; + } + } + + async function handleXWebhook(headers, rawBody) { const signature = headers["x-signature"]; const isValid = verifySignature({ payload: rawBody, @@ -215,13 +371,10 @@ function buildApp({ return json(401, { error: "invalid_signature" }); } - const payload = parseOrThrow( - XWebhookPayloadSchema, - parseJSON(rawBody), - "invalid_x_webhook_payload", - ); + const parsedPayload = parseJSON(rawBody); try { + const payload = await normalizeXWebhookPayload(parsedPayload); const result = engine.processMention({ mentionPostId: payload.mentionPostId, callerUserId: payload.callerUserId, @@ -229,14 +382,20 @@ function buildApp({ }); if (!result.ok && result.status === "not_article") { + const replied = await replyToMentionIfPossible(payload.mentionPostId, "This parent post is not an X Article."); return json(200, { status: "not_article", message: "This parent post is not an X Article.", + replied, }); } persistMutation(); scheduleAudioGeneration(result.job); + const replyMessage = result.reply + ? result.reply.message + : `Your audiobook is ready: /audio/${result.job.assetId}`; + const replied = await replyToMentionIfPossible(payload.mentionPostId, replyMessage); return json(200, { status: "completed", @@ -244,6 +403,7 @@ function buildApp({ jobId: result.job.id, creditsCharged: result.job.creditsCharged, publicLink: result.reply ? result.reply.publicLink : `/audio/${result.job.assetId}`, + replied, }); } catch (error) { logger.warn({ err: error }, "x webhook request failed"); @@ -451,6 +611,16 @@ function buildApp({ } try { + const abuseDecision = getAbuseDecision(userId); + if (!abuseDecision.allowed) { + const message = abuseDecision.code === "cooldown_active" + ? `Cooldown active. Retry in ${abuseDecision.retryAfterSec}s` + : abuseDecision.code === "daily_limit_exceeded" + ? "Daily generation limit reached" + : "This user is blocked from generating audio"; + return redirect(withQuery("/app", { flash: message })); + } + const result = engine.processMention({ mentionPostId: `manual:${userId}:${randomUUID()}`, callerUserId: userId, @@ -506,6 +676,9 @@ function buildApp({ const accessDecision = audio ? engine.checkAudioAccess(assetId, userId) : { allowed: false, reason: "not_found" }; + if (audio && accessDecision.allowed) { + engine.recordPlayback(assetId); + } let playbackUrl = null; if (audio && accessDecision.allowed && storage.isConfigured()) { try { @@ -522,9 +695,45 @@ function buildApp({ if (rateLimited) { return rateLimited; } + const payload = parseJSON(rawBody || "{}"); + let callerUserId = payload.callerUserId || null; + if (!callerUserId && Array.isArray(payload.tweet_create_events) && payload.tweet_create_events[0]) { + callerUserId = payload.tweet_create_events[0].user + ? payload.tweet_create_events[0].user.id_str || payload.tweet_create_events[0].user.id + : null; + } + if (!callerUserId && payload.mentionTweetId) { + callerUserId = payload.callerUserId || null; + } + + const abuseDecision = getAbuseDecision(callerUserId); + if (!abuseDecision.allowed) { + const response = { + error: abuseDecision.code, + }; + if (abuseDecision.retryAfterSec) { + response.retryAfterSec = abuseDecision.retryAfterSec; + } + return json(429, response); + } + return handleXWebhook(safeHeaders, rawBody); } + if (method === "GET" && path === "/api/webhooks/x") { + const crcToken = safeQuery.crc_token; + if (!crcToken) { + return json(400, { error: "crc_token_required" }); + } + + const digest = createHmac("sha256", config.xWebhookSecret) + .update(String(crcToken)) + .digest("base64"); + return json(200, { + response_token: `sha256=${digest}`, + }); + } + if (method === "GET" && path === "/api/x/mentions") { if (!x.isConfigured()) { return json(503, { error: "x_api_not_configured" }); @@ -539,6 +748,21 @@ function buildApp({ } } + if (method === "POST" && path === "/internal/retention/run") { + const authResponse = ensureInternalAuth(safeHeaders); + if (authResponse) { + return authResponse; + } + + const payload = parseJSON(rawBody || "{}"); + const summary = engine.applyRetention({ + rawArticleHours: Number.isFinite(payload.rawArticleHours) ? payload.rawArticleHours : 24, + audioDays: Number.isFinite(payload.audioDays) ? payload.audioDays : 90, + }); + persistMutation(); + return json(200, { status: "ok", summary }); + } + if (method === "POST" && path === "/api/webhooks/polar") { const rateLimited = enforceJsonRateLimit(webhookLimiter, `webhook:${clientAddress}`); if (rateLimited) { @@ -616,6 +840,78 @@ function buildApp({ } } + if (method === "DELETE" && path.startsWith("/api/audio/")) { + if (!userId) { + return json(401, { error: "auth_required" }); + } + + const assetId = path.slice("/api/audio/".length); + try { + const deleted = engine.takedownAudio(assetId, userId); + persistMutation(); + return json(200, { status: "deleted", assetId: deleted.id }); + } catch (error) { + const status = error.message === "forbidden" ? 403 : 400; + return json(status, { error: error.message }); + } + } + + if (method === "POST" && path.startsWith("/internal/jobs/") && path.endsWith("/start")) { + const authResponse = ensureInternalAuth(safeHeaders); + if (authResponse) { + return authResponse; + } + + const jobId = path.slice("/internal/jobs/".length, -"/start".length); + try { + const job = engine.startJob(jobId); + persistMutation(); + return json(200, { job }); + } catch (error) { + return json(400, { error: error.message }); + } + } + + if (method === "POST" && path.startsWith("/internal/jobs/") && path.endsWith("/complete")) { + const authResponse = ensureInternalAuth(safeHeaders); + if (authResponse) { + return authResponse; + } + + const jobId = path.slice("/internal/jobs/".length, -"/complete".length); + const payload = parseJSON(rawBody || "{}"); + + try { + const job = engine.completeJob(jobId, payload.asset || {}); + persistMutation(); + return json(200, { job }); + } catch (error) { + return json(400, { error: error.message }); + } + } + + if (method === "POST" && path.startsWith("/internal/jobs/") && path.endsWith("/fail")) { + const authResponse = ensureInternalAuth(safeHeaders); + if (authResponse) { + return authResponse; + } + + const jobId = path.slice("/internal/jobs/".length, -"/fail".length); + const payload = parseJSON(rawBody || "{}"); + const shouldRefund = payload.refund !== false; + + try { + const job = engine.failJob(jobId, { + error: payload.error || "generation_failed", + refund: shouldRefund, + }); + persistMutation(); + return json(200, { job }); + } catch (error) { + return json(400, { error: error.message }); + } + } + return json(404, { error: "not_found" }); } diff --git a/src/config.js b/src/config.js index af9229c..1414738 100644 --- a/src/config.js +++ b/src/config.js @@ -54,6 +54,7 @@ const parsed = { betterAuthSecret: strFromEnv("BETTER_AUTH_SECRET", "dev-better-auth-secret"), betterAuthBasePath: strFromEnv("BETTER_AUTH_BASE_PATH", "/api/auth"), betterAuthDevPassword: strFromEnv("BETTER_AUTH_DEV_PASSWORD", "xartaudio-dev-password"), + internalApiToken: strFromEnv("INTERNAL_API_TOKEN", ""), convexDeploymentUrl: strFromEnv("CONVEX_DEPLOYMENT_URL", ""), convexAuthToken: strFromEnv("CONVEX_AUTH_TOKEN", ""), convexStateQuery: strFromEnv("CONVEX_STATE_QUERY", "state:getLatestSnapshot"), @@ -83,6 +84,11 @@ const parsed = { authPerMinute: intFromEnv("AUTH_RPM", 30), actionPerMinute: intFromEnv("ACTION_RPM", 60), }, + abuse: { + maxJobsPerUserPerDay: intFromEnv("ABUSE_MAX_JOBS_PER_USER_PER_DAY", 0), + cooldownSec: intFromEnv("ABUSE_COOLDOWN_SEC", 0), + denyUserIds: listFromEnv("ABUSE_DENY_USER_IDS", []), + }, credit: { baseCredits: intFromEnv("BASE_CREDITS", 1), includedChars: intFromEnv("INCLUDED_CHARS", 25000), @@ -99,6 +105,7 @@ const ConfigSchema = z.object({ betterAuthSecret: z.string().min(1), betterAuthBasePath: z.string().min(1), betterAuthDevPassword: z.string().min(8), + internalApiToken: z.string(), convexDeploymentUrl: z.string(), convexAuthToken: z.string(), convexStateQuery: z.string().min(1), @@ -128,6 +135,11 @@ const ConfigSchema = z.object({ authPerMinute: z.number().int().positive(), actionPerMinute: z.number().int().positive(), }), + abuse: z.object({ + maxJobsPerUserPerDay: z.number().int().nonnegative(), + cooldownSec: z.number().int().nonnegative(), + denyUserIds: z.array(z.string().min(1)), + }), credit: z.object({ baseCredits: z.number().int().positive(), includedChars: z.number().int().positive(), diff --git a/src/lib/engine.js b/src/lib/engine.js index b722d41..a960ee2 100644 --- a/src/lib/engine.js +++ b/src/lib/engine.js @@ -1,5 +1,6 @@ "use strict"; +const { createHash } = require("node:crypto"); const { WalletStore } = require("./wallet"); const { AudioAccessStore } = require("./access"); const { calculateCredits } = require("./credits"); @@ -92,11 +93,13 @@ class XArtAudioEngine { id: jobId, mentionPostId, callerUserId, - status: "completed", + status: "charged", creditsCharged: creditsNeeded, article: articleResult.article, assetId, createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + error: null, }; this.assets.set(assetId, asset); @@ -110,12 +113,93 @@ class XArtAudioEngine { deduped: false, job, reply: { - message: `Your audiobook is ready: /audio/${assetId}`, + message: `Your audiobook is processing: /audio/${assetId}`, publicLink: `/audio/${assetId}`, }, }; } + startJob(jobId) { + const key = String(jobId); + const job = this.jobs.get(key); + if (!job) { + throw new Error("job_not_found"); + } + + if (job.status === "completed") { + return job; + } + + if (job.status !== "charged" && job.status !== "synthesizing") { + throw new Error("job_cannot_start"); + } + + const next = { + ...job, + status: "synthesizing", + updatedAt: new Date().toISOString(), + }; + this.jobs.set(key, next); + return next; + } + + completeJob(jobId, assetPatch) { + const key = String(jobId); + const job = this.jobs.get(key); + if (!job) { + throw new Error("job_not_found"); + } + + if (job.status === "failed_refunded" || job.status === "failed_not_refunded") { + throw new Error("job_already_failed"); + } + + if (assetPatch && Object.keys(assetPatch).length > 0) { + this.updateAsset(job.assetId, assetPatch); + } + + const next = { + ...job, + status: "completed", + error: null, + updatedAt: new Date().toISOString(), + }; + this.jobs.set(key, next); + return next; + } + + failJob(jobId, { error = "generation_failed", refund = true } = {}) { + const key = String(jobId); + const job = this.jobs.get(key); + if (!job) { + throw new Error("job_not_found"); + } + + if (job.status === "completed") { + throw new Error("job_already_completed"); + } + + const shouldRefund = Boolean(refund); + if (shouldRefund) { + this.wallets.applyTransaction({ + userId: job.callerUserId, + type: "refund", + amount: job.creditsCharged, + reason: "audio_generation_failed_refund", + idempotencyKey: `job:${job.id}:refund`, + }); + } + + const next = { + ...job, + status: shouldRefund ? "failed_refunded" : "failed_not_refunded", + error: error || "generation_failed", + updatedAt: new Date().toISOString(), + }; + this.jobs.set(key, next); + return next; + } + getWalletBalance(userId) { return this.wallets.getBalance(userId); } @@ -147,8 +231,18 @@ class XArtAudioEngine { }; } - getAsset(assetId) { - return this.assets.get(String(assetId)) || null; + getAsset(assetId, options = {}) { + const includeDeleted = Boolean(options.includeDeleted); + const asset = this.assets.get(String(assetId)) || null; + if (!asset) { + return null; + } + + if (asset.deletedAt && !includeDeleted) { + return null; + } + + return asset; } updateAsset(assetId, patch) { @@ -186,6 +280,105 @@ class XArtAudioEngine { }); } + takedownAudio(assetId, requestedByUserId) { + const key = String(assetId); + const current = this.assets.get(key); + if (!current) { + throw new Error("audio_not_found"); + } + + if (current.ownerUserId !== requestedByUserId) { + throw new Error("forbidden"); + } + + const next = { + ...current, + deletedAt: new Date().toISOString(), + storageKey: null, + durationSec: 0, + sizeBytes: 0, + }; + this.assets.set(key, next); + return next; + } + + recordPlayback(assetId) { + const key = String(assetId); + const current = this.assets.get(key); + if (!current || current.deletedAt) { + return null; + } + + const next = { + ...current, + lastPlayedAt: new Date().toISOString(), + }; + this.assets.set(key, next); + return next; + } + + applyRetention({ + rawArticleHours = 24, + audioDays = 90, + now = new Date(), + } = {}) { + const nowMs = now instanceof Date ? now.getTime() : new Date(now).getTime(); + const rawArticleCutoffMs = nowMs - (Math.max(0, rawArticleHours) * 60 * 60 * 1000); + const audioCutoffMs = nowMs - (Math.max(0, audioDays) * 24 * 60 * 60 * 1000); + + let prunedArticleBodies = 0; + let deletedAssets = 0; + + for (const [jobId, job] of this.jobs.entries()) { + const createdAtMs = new Date(job.createdAt).getTime(); + const hasContent = Boolean(job.article && typeof job.article.content === "string" && job.article.content.length > 0); + if (!hasContent || Number.isNaN(createdAtMs) || createdAtMs > rawArticleCutoffMs) { + continue; + } + + const contentHash = createHash("sha256") + .update(job.article.content, "utf8") + .digest("hex"); + const nextJob = { + ...job, + article: { + ...job.article, + content: "", + contentHash, + }, + updatedAt: new Date().toISOString(), + }; + this.jobs.set(jobId, nextJob); + prunedArticleBodies += 1; + } + + for (const [assetId, asset] of this.assets.entries()) { + if (asset.deletedAt) { + continue; + } + + const referenceMs = new Date(asset.lastPlayedAt || asset.createdAt).getTime(); + if (Number.isNaN(referenceMs) || referenceMs > audioCutoffMs) { + continue; + } + + const nextAsset = { + ...asset, + deletedAt: new Date().toISOString(), + storageKey: null, + durationSec: 0, + sizeBytes: 0, + }; + this.assets.set(assetId, nextAsset); + deletedAssets += 1; + } + + return { + prunedArticleBodies, + deletedAssets, + }; + } + exportState() { return { wallets: this.wallets.exportState(), diff --git a/test/app.test.js b/test/app.test.js index b3aae7b..4072f22 100644 --- a/test/app.test.js +++ b/test/app.test.js @@ -18,6 +18,7 @@ function createApp(options = {}) { betterAuthSecret: "test-better-auth-secret", betterAuthBasePath: "/api/auth", betterAuthDevPassword: "xartaudio-dev-password", + internalApiToken: "", convexDeploymentUrl: "", convexAuthToken: "", convexStateQuery: "state:getLatestSnapshot", @@ -40,6 +41,11 @@ function createApp(options = {}) { authPerMinute: 30, actionPerMinute: 60, }, + abuse: { + maxJobsPerUserPerDay: 0, + cooldownSec: 0, + denyUserIds: [], + }, credit: { baseCredits: 1, includedChars: 25000, @@ -57,6 +63,10 @@ function createApp(options = {}) { ...baseConfig.rateLimits, ...(overrideConfig.rateLimits || {}), }, + abuse: { + ...baseConfig.abuse, + ...(overrideConfig.abuse || {}), + }, credit: { ...baseConfig.credit, ...(overrideConfig.credit || {}), @@ -218,6 +228,46 @@ test("audio flow requires auth for unlock and supports permanent unlock", async assert.equal(walletData.balance, 4); }); +test("owner can delete audio while non owner is forbidden", async () => { + const app = createApp(); + await call(app, { + method: "POST", + path: "/app/actions/topup", + headers: { cookie: "xartaudio_user=owner-delete" }, + body: "amount=5", + }); + + const generated = await call(app, { + method: "POST", + path: "/app/actions/simulate-mention", + headers: { cookie: "xartaudio_user=owner-delete" }, + body: "title=Delete+Me&body=Body", + }); + const assetId = generated.headers.location.split("?")[0].replace("/audio/", ""); + + const forbidden = await call(app, { + method: "DELETE", + path: `/api/audio/${assetId}`, + headers: { cookie: "xartaudio_user=someone-else" }, + }); + assert.equal(forbidden.status, 403); + + const deleted = await call(app, { + method: "DELETE", + path: `/api/audio/${assetId}`, + headers: { cookie: "xartaudio_user=owner-delete" }, + }); + assert.equal(deleted.status, 200); + assert.equal(JSON.parse(deleted.body).status, "deleted"); + + const pageAfterDelete = await call(app, { + method: "GET", + path: `/audio/${assetId}`, + headers: { cookie: "xartaudio_user=owner-delete" }, + }); + assert.match(pageAfterDelete.body, /Audio not found/); +}); + test("audio page uses signed storage URL when storage adapter is configured", async () => { const app = createApp({ storageAdapter: { @@ -312,6 +362,166 @@ test("simulate mention schedules background audio generation when service is con assert.equal(queued[0].text, "hello world"); }); +test("failed background generation refunds charged credits", async () => { + const app = createApp({ + audioGenerationService: { + isConfigured() { + return true; + }, + async enqueueJob(payload) { + payload.onFailed(new Error("tts_outage")); + }, + }, + }); + + await call(app, { + method: "POST", + path: "/app/actions/topup", + headers: { cookie: "xartaudio_user=alice" }, + body: "amount=3", + }); + + await call(app, { + method: "POST", + path: "/app/actions/simulate-mention", + headers: { cookie: "xartaudio_user=alice" }, + body: "title=T&body=hello+world", + }); + + const wallet = await call(app, { + method: "GET", + path: "/api/me/wallet", + headers: { cookie: "xartaudio_user=alice" }, + }); + assert.equal(JSON.parse(wallet.body).balance, 3); +}); + +test("internal worker endpoints require token and can complete jobs", async () => { + const queued = []; + const app = createApp({ + config: { + internalApiToken: "internal-token", + }, + audioGenerationService: { + isConfigured() { + return true; + }, + async enqueueJob(payload) { + queued.push(payload); + }, + }, + }); + + await call(app, { + method: "POST", + path: "/app/actions/topup", + headers: { cookie: "xartaudio_user=alice" }, + body: "amount=2", + }); + await call(app, { + method: "POST", + path: "/app/actions/simulate-mention", + headers: { cookie: "xartaudio_user=alice" }, + body: "title=Queued&body=hello", + }); + + const job = app.engine.listJobsForUser("alice")[0]; + assert.equal(job.status, "synthesizing"); + + const denied = await call(app, { + method: "POST", + path: `/internal/jobs/${job.id}/complete`, + body: JSON.stringify({}), + }); + assert.equal(denied.status, 401); + + const completed = await call(app, { + method: "POST", + path: `/internal/jobs/${job.id}/complete`, + headers: { "x-internal-token": "internal-token" }, + body: JSON.stringify({ + asset: { + storageKey: "audio/worker.mp3", + sizeBytes: 999, + }, + }), + }); + + assert.equal(completed.status, 200); + const completedBody = JSON.parse(completed.body); + assert.equal(completedBody.job.status, "completed"); + + const readJob = await call(app, { + method: "GET", + path: `/api/jobs/${job.id}`, + headers: { cookie: "xartaudio_user=alice" }, + }); + assert.equal(readJob.status, 200); + assert.equal(JSON.parse(readJob.body).job.status, "completed"); +}); + +test("internal retention endpoint prunes stale content and assets", async () => { + const app = createApp({ + config: { + internalApiToken: "internal-token", + }, + }); + + await call(app, { + method: "POST", + path: "/app/actions/topup", + headers: { cookie: "xartaudio_user=retention-owner" }, + body: "amount=2", + }); + await call(app, { + method: "POST", + path: "/app/actions/simulate-mention", + headers: { cookie: "xartaudio_user=retention-owner" }, + body: "title=Retention&body=Body", + }); + + const job = app.engine.listJobsForUser("retention-owner")[0]; + const asset = app.engine.getAsset(job.assetId, { includeDeleted: true }); + job.createdAt = "2020-01-01T00:00:00.000Z"; + asset.createdAt = "2020-01-01T00:00:00.000Z"; + + const denied = await call(app, { + method: "POST", + path: "/internal/retention/run", + body: JSON.stringify({ rawArticleHours: 1, audioDays: 1 }), + }); + assert.equal(denied.status, 401); + + const run = await call(app, { + method: "POST", + path: "/internal/retention/run", + headers: { "x-internal-token": "internal-token" }, + body: JSON.stringify({ rawArticleHours: 1, audioDays: 1 }), + }); + assert.equal(run.status, 200); + const summary = JSON.parse(run.body).summary; + assert.equal(summary.prunedArticleBodies >= 1, true); + assert.equal(summary.deletedAssets >= 1, true); + + const page = await call(app, { + method: "GET", + path: `/audio/${job.assetId}`, + headers: { cookie: "xartaudio_user=retention-owner" }, + }); + assert.match(page.body, /Audio not found/); +}); + +test("internal endpoints are disabled when no token configured", async () => { + const app = createApp(); + const response = await call(app, { + method: "POST", + path: "/internal/retention/run", + body: "{}", + }); + assert.equal(response.status, 503); + assert.equal(JSON.parse(response.body).error, "internal_api_disabled"); +}); + test("/api/payments/create-checkout returns 503 when Polar is not configured", async () => { const app = createApp(); @@ -391,6 +601,125 @@ test("X webhook valid flow processes article", async () => { assert.equal(body.creditsCharged, 1); }); +test("X webhook supports CRC challenge response", async () => { + const app = createApp(); + const response = await call(app, { + method: "GET", + path: "/api/webhooks/x", + query: { crc_token: "token-123" }, + }); + + assert.equal(response.status, 200); + const body = JSON.parse(response.body); + assert.match(body.response_token, /^sha256=/); +}); + +test("X webhook can normalize mentionTweetId payload and reply via adapter", async () => { + const replies = []; + const app = createApp({ + xAdapter: { + isConfigured() { + return true; + }, + async listMentions() { + return []; + }, + async fetchParentPostFromMention() { + return { + id: "parent-1", + authorId: "author-1", + article: { id: "article-1", title: "From X", body: "Article body" }, + }; + }, + async replyToMention(payload) { + replies.push(payload); + }, + }, + }); + + await postJSONWebhook(app, "/api/webhooks/polar", { userId: "u10", credits: 5, eventId: "evt10" }, "polar-secret"); + const response = await postJSONWebhook(app, "/api/webhooks/x", { + mentionTweetId: "mention-10", + callerUserId: "u10", + }, "x-secret"); + + assert.equal(response.status, 200); + const body = JSON.parse(response.body); + assert.equal(body.status, "completed"); + assert.equal(body.replied, true); + assert.equal(replies.length, 1); + assert.equal(replies[0].mentionTweetId, "mention-10"); + assert.match(replies[0].text, /audiobook/i); +}); + +test("X webhook replies with not article message when parent is not article", async () => { + const replies = []; + const app = createApp({ + xAdapter: { + isConfigured() { + return true; + }, + async listMentions() { + return []; + }, + async fetchParentPostFromMention() { + return { id: "parent-2", text: "not article" }; + }, + async replyToMention(payload) { + replies.push(payload); + }, + }, + }); + + await postJSONWebhook(app, "/api/webhooks/polar", { userId: "u11", credits: 5, eventId: "evt11" }, "polar-secret"); + const response = await postJSONWebhook(app, "/api/webhooks/x", { + mentionTweetId: "mention-11", + callerUserId: "u11", + }, "x-secret"); + + assert.equal(response.status, 200); + const body = JSON.parse(response.body); + assert.equal(body.status, "not_article"); + assert.equal(body.replied, true); + assert.equal(replies.length, 1); + assert.match(replies[0].text, /not an X Article/); +}); + +test("X webhook can normalize tweet_create_events payload", async () => { + const app = createApp({ + xAdapter: { + isConfigured() { + return true; + }, + async listMentions() { + return []; + }, + async fetchParentPostFromMention(mentionTweetId) { + assert.equal(mentionTweetId, "mention-evt-1"); + return { + id: "parent-evt-1", + authorId: "author-evt-1", + article: { id: "article-evt-1", title: "Evt", body: "Body" }, + }; + }, + async replyToMention() {}, + }, + }); + + await postJSONWebhook(app, "/api/webhooks/polar", { userId: "u-evt", credits: 4, eventId: "evt-seed" }, "polar-secret"); + const response = await postJSONWebhook(app, "/api/webhooks/x", { + tweet_create_events: [ + { + id_str: "mention-evt-1", + user: { id_str: "u-evt" }, + }, + ], + }, "x-secret"); + + assert.equal(response.status, 200); + assert.equal(JSON.parse(response.body).status, "completed"); +}); + test("Polar webhook uses adapter parsing for standard webhook headers", async () => { const app = createApp({ polarAdapter: { @@ -524,6 +853,96 @@ test("rate limits repeated webhook calls", async () => { assert.equal(second.status, 429); }); +test("anti abuse deny list blocks webhook generation", async () => { + const app = createApp({ + config: { + abuse: { + denyUserIds: ["blocked-user"], + }, + }, + }); + + const response = await postJSONWebhook(app, "/api/webhooks/x", { + mentionPostId: "m-deny", + callerUserId: "blocked-user", + parentPost: { + id: "p-deny", + article: { id: "a-deny", title: "T", body: "hello" }, + }, + }, "x-secret"); + + assert.equal(response.status, 429); + assert.equal(JSON.parse(response.body).error, "user_denied"); +}); + +test("anti abuse daily limit blocks second generated job", async () => { + const app = createApp({ + config: { + abuse: { + maxJobsPerUserPerDay: 1, + }, + }, + }); + + await postJSONWebhook(app, "/api/webhooks/polar", { userId: "u-limit", credits: 4, eventId: "evt-limit" }, "polar-secret"); + const first = await postJSONWebhook(app, "/api/webhooks/x", { + mentionPostId: "m-limit-1", + callerUserId: "u-limit", + parentPost: { + id: "p-limit-1", + article: { id: "a-limit-1", title: "T1", body: "hello" }, + }, + }, "x-secret"); + + const second = await postJSONWebhook(app, "/api/webhooks/x", { + mentionPostId: "m-limit-2", + callerUserId: "u-limit", + parentPost: { + id: "p-limit-2", + article: { id: "a-limit-2", title: "T2", body: "hello" }, + }, + }, "x-secret"); + + assert.equal(first.status, 200); + assert.equal(second.status, 429); + assert.equal(JSON.parse(second.body).error, "daily_limit_exceeded"); +}); + +test("anti abuse cooldown reports retry delay", async () => { + const app = createApp({ + config: { + abuse: { + cooldownSec: 60, + }, + }, + }); + + await postJSONWebhook(app, "/api/webhooks/polar", { userId: "u-cool", credits: 4, eventId: "evt-cool" }, "polar-secret"); + const first = await postJSONWebhook(app, "/api/webhooks/x", { + mentionPostId: "m-cool-1", + callerUserId: "u-cool", + parentPost: { + id: "p-cool-1", + article: { id: "a-cool-1", title: "T1", body: "hello" }, + }, + }, "x-secret"); + + const second = await postJSONWebhook(app, "/api/webhooks/x", { + mentionPostId: "m-cool-2", + callerUserId: "u-cool", + parentPost: { + id: "p-cool-2", + article: { id: "a-cool-2", title: "T2", body: "hello" }, + }, + }, "x-secret"); + + assert.equal(first.status, 200); + assert.equal(second.status, 429); + const body = JSON.parse(second.body); + assert.equal(body.error, "cooldown_active"); + assert.equal(typeof body.retryAfterSec, "number"); +}); + test("rate limits repeated login attempts from same IP", async () => { const app = createApp({ config: { diff --git a/test/config.test.js b/test/config.test.js index 1d09a79..71d54b6 100644 --- a/test/config.test.js +++ b/test/config.test.js @@ -31,25 +31,29 @@ function withTempEnv(patch, run) { test("config uses defaults when env is missing", () => { withTempEnv({ - PORT: undefined, - LOG_LEVEL: undefined, - APP_BASE_URL: undefined, - BETTER_AUTH_SECRET: undefined, - BETTER_AUTH_BASE_PATH: undefined, - QWEN_TTS_MODEL: undefined, - MINIO_SIGNED_URL_TTL_SEC: undefined, - MINIO_USE_SSL: undefined, - WEBHOOK_RPM: undefined, + PORT: "", + LOG_LEVEL: "", + APP_BASE_URL: "", + BETTER_AUTH_SECRET: "", + BETTER_AUTH_BASE_PATH: "", + QWEN_TTS_MODEL: "", + MINIO_SIGNED_URL_TTL_SEC: "", + MINIO_USE_SSL: "", + WEBHOOK_RPM: "", }, () => { const { config } = require("../src/config"); assert.equal(config.port, 3000); assert.equal(config.logLevel, "info"); assert.equal(config.appBaseUrl, "http://localhost:3000"); assert.equal(config.betterAuthBasePath, "/api/auth"); + assert.equal(config.internalApiToken, ""); assert.equal(config.qwenTtsModel, "qwen-tts-latest"); assert.equal(config.minioSignedUrlTtlSec, 3600); assert.equal(config.minioUseSSL, true); assert.equal(config.rateLimits.webhookPerMinute, 120); + assert.equal(config.abuse.maxJobsPerUserPerDay, 0); + assert.equal(config.abuse.cooldownSec, 0); + assert.deepEqual(config.abuse.denyUserIds, []); }); }); @@ -61,6 +65,7 @@ test("config reads convex/qwen/minio overrides", () => { BETTER_AUTH_SECRET: "prod-secret", BETTER_AUTH_BASE_PATH: "/api/auth", BETTER_AUTH_DEV_PASSWORD: "xartaudio-dev-password", + INTERNAL_API_TOKEN: "internal-token", CONVEX_DEPLOYMENT_URL: "https://example.convex.cloud", CONVEX_AUTH_TOKEN: "convex-token", CONVEX_STATE_QUERY: "state:get", @@ -72,12 +77,16 @@ test("config reads convex/qwen/minio overrides", () => { MINIO_BUCKET: "audio", MINIO_SIGNED_URL_TTL_SEC: "7200", WEBHOOK_RPM: "77", + ABUSE_MAX_JOBS_PER_USER_PER_DAY: "5", + ABUSE_COOLDOWN_SEC: "120", + ABUSE_DENY_USER_IDS: "u1,u2", }, () => { const { config } = require("../src/config"); assert.equal(config.port, 8080); assert.equal(config.logLevel, "debug"); assert.equal(config.appBaseUrl, "https://xartaudio.app"); assert.equal(config.betterAuthSecret, "prod-secret"); + assert.equal(config.internalApiToken, "internal-token"); assert.equal(config.convexDeploymentUrl, "https://example.convex.cloud"); assert.equal(config.convexAuthToken, "convex-token"); assert.equal(config.convexStateQuery, "state:get"); @@ -89,5 +98,8 @@ test("config reads convex/qwen/minio overrides", () => { assert.equal(config.minioBucket, "audio"); assert.equal(config.minioSignedUrlTtlSec, 7200); assert.equal(config.rateLimits.webhookPerMinute, 77); + assert.equal(config.abuse.maxJobsPerUserPerDay, 5); + assert.equal(config.abuse.cooldownSec, 120); + assert.deepEqual(config.abuse.denyUserIds, ["u1", "u2"]); }); }); diff --git a/test/engine.test.js b/test/engine.test.js index 294b43e..38664ba 100644 --- a/test/engine.test.js +++ b/test/engine.test.js @@ -31,7 +31,7 @@ test("returns not_article and does not charge caller", () => { assert.equal(engine.getWalletBalance("u1"), 5); }); -test("charges credits and creates completed job for valid article", () => { +test("charges credits and creates charged job for valid article", () => { const engine = createEngine(); engine.topUpCredits("u1", 5, "topup-2"); @@ -50,7 +50,7 @@ test("charges credits and creates completed job for valid article", () => { }); assert.equal(result.ok, true); - assert.equal(result.job.status, "completed"); + assert.equal(result.job.status, "charged"); assert.equal(result.job.creditsCharged, 1); assert.equal(engine.getWalletBalance("u1"), 4); @@ -150,11 +150,60 @@ test("lists jobs for user newest first and provides summary", () => { const summary = engine.getUserSummary("u1"); assert.equal(summary.totalJobs, 2); - assert.equal(summary.completedJobs, 2); + assert.equal(summary.completedJobs, 0); assert.equal(summary.totalCreditsSpent, 2); assert.equal(summary.balance, 8); }); +test("job can transition through start and completion states", () => { + const engine = createEngine(); + engine.topUpCredits("u1", 5, "topup-transition"); + + const created = engine.processMention({ + mentionPostId: "m-transition", + callerUserId: "u1", + parentPost: { + id: "p-transition", + article: { id: "a-transition", title: "T", body: "hello world" }, + }, + }); + + const started = engine.startJob(created.job.id); + assert.equal(started.status, "synthesizing"); + + const completed = engine.completeJob(created.job.id, { + storageKey: "audio/final.mp3", + sizeBytes: 42, + }); + assert.equal(completed.status, "completed"); + assert.equal(engine.getAsset(created.job.assetId).storageKey, "audio/final.mp3"); + assert.equal(engine.getAsset(created.job.assetId).sizeBytes, 42); +}); + +test("failed generation can refund caller credits once", () => { + const engine = createEngine(); + engine.topUpCredits("u1", 5, "topup-fail-refund"); + + const created = engine.processMention({ + mentionPostId: "m-fail-refund", + callerUserId: "u1", + parentPost: { + id: "p-fail-refund", + article: { id: "a-fail-refund", title: "T", body: "hello world" }, + }, + }); + + assert.equal(engine.getWalletBalance("u1"), 4); + engine.startJob(created.job.id); + const failed = engine.failJob(created.job.id, { error: "tts_down", refund: true }); + assert.equal(failed.status, "failed_refunded"); + assert.equal(engine.getWalletBalance("u1"), 5); + + const second = engine.failJob(created.job.id, { error: "tts_down", refund: true }); + assert.equal(second.status, "failed_refunded"); + assert.equal(engine.getWalletBalance("u1"), 5); +}); + test("round-trips state snapshot across engine restart", () => { const engine1 = createEngine(); engine1.topUpCredits("u1", 5, "topup-snapshot"); @@ -202,3 +251,51 @@ test("updateAsset patches stored asset metadata", () => { assert.equal(updated.storageKey, "audio/real-file.mp3"); assert.equal(updated.sizeBytes, 12345); }); + +test("owner can takedown audio and hide it from access checks", () => { + const engine = createEngine(); + engine.topUpCredits("owner", 5, "topup-takedown"); + const created = engine.processMention({ + mentionPostId: "m-takedown", + callerUserId: "owner", + parentPost: { + id: "p-takedown", + article: { id: "a-takedown", title: "T", body: "hello" }, + }, + }); + + engine.takedownAudio(created.job.assetId, "owner"); + assert.equal(engine.getAsset(created.job.assetId), null); + assert.equal(engine.getAsset(created.job.assetId, { includeDeleted: true }).deletedAt !== null, true); + assert.equal(engine.checkAudioAccess(created.job.assetId, "owner").reason, "not_found"); +}); + +test("retention prunes old article content and deletes stale assets", () => { + const engine = createEngine(); + engine.topUpCredits("owner", 5, "topup-retention"); + const created = engine.processMention({ + mentionPostId: "m-retention", + callerUserId: "owner", + parentPost: { + id: "p-retention", + article: { id: "a-retention", title: "T", body: "hello retention" }, + }, + }); + + const job = engine.getJob(created.job.id); + const asset = engine.getAsset(created.job.assetId, { includeDeleted: true }); + job.createdAt = "2020-01-01T00:00:00.000Z"; + asset.createdAt = "2020-01-01T00:00:00.000Z"; + + const summary = engine.applyRetention({ + rawArticleHours: 1, + audioDays: 1, + now: new Date("2020-01-03T00:00:00.000Z"), + }); + + assert.equal(summary.prunedArticleBodies, 1); + assert.equal(summary.deletedAssets, 1); + assert.equal(engine.getJob(created.job.id).article.content, ""); + assert.equal(typeof engine.getJob(created.job.id).article.contentHash, "string"); + assert.equal(engine.getAsset(created.job.assetId), null); +});