diff --git a/src/app.js b/src/app.js index 3e13ac7..935d965 100644 --- a/src/app.js +++ b/src/app.js @@ -32,6 +32,11 @@ const { SimulateMentionFormSchema, parseOrThrow, } = require("./lib/validation"); +const { createPolarAdapter, hasStandardWebhookHeaders } = require("./integrations/polar"); +const { createTTSAdapter } = require("./integrations/tts-client"); +const { createStorageAdapter } = require("./integrations/storage-client"); +const { createXAdapter } = require("./integrations/x-client"); +const { createAudioGenerationService } = require("./services/audio-generation"); function sanitizeReturnTo(value, fallback = "/app") { if (!value || typeof value !== "string") { @@ -54,12 +59,46 @@ function buildApp({ initialState = null, onMutation = null, logger = console, + polarAdapter = null, + xAdapter = null, + ttsAdapter = null, + storageAdapter = null, + audioGenerationService = null, }) { const engine = new XArtAudioEngine({ creditConfig: config.credit, initialState: initialState && initialState.engine ? initialState.engine : null, }); const rateLimits = config.rateLimits || {}; + const polar = polarAdapter || createPolarAdapter({ + accessToken: config.polarAccessToken, + server: config.polarServer, + productIds: config.polarProductIds, + webhookSecret: config.polarWebhookSecret, + }); + const x = xAdapter || createXAdapter({ + bearerToken: config.xBearerToken, + botUserId: config.xBotUserId, + }); + const tts = ttsAdapter || createTTSAdapter({ + apiKey: config.ttsApiKey, + baseURL: config.ttsBaseUrl || undefined, + model: config.ttsModel, + voice: config.ttsVoice, + }); + const storage = storageAdapter || createStorageAdapter({ + bucket: config.s3Bucket, + region: config.s3Region, + endpoint: config.s3Endpoint || undefined, + accessKeyId: config.s3AccessKeyId, + secretAccessKey: config.s3SecretAccessKey, + signedUrlTtlSec: config.s3SignedUrlTtlSec, + }); + const generationService = audioGenerationService || createAudioGenerationService({ + tts, + storage, + logger, + }); const webhookLimiter = new FixedWindowRateLimiter({ limit: rateLimits.webhookPerMinute || 120, windowMs: 60_000, @@ -117,6 +156,31 @@ function buildApp({ })); } + function scheduleAudioGeneration(job) { + if (!generationService || !generationService.isConfigured()) { + return; + } + + generationService.enqueueJob({ + assetId: job.assetId, + text: job.article.content, + onCompleted: (audioMeta) => { + try { + engine.updateAsset(job.assetId, audioMeta); + persistMutation(); + logger.info({ assetId: job.assetId }, "audio generation completed"); + } catch (error) { + logger.error({ err: error, assetId: job.assetId }, "failed to apply generated audio metadata"); + } + }, + onFailed: (error) => { + logger.error({ err: error, assetId: job.assetId }, "audio generation job failed"); + }, + }).catch((error) => { + logger.error({ err: error, assetId: job.assetId }, "audio generation scheduling failed"); + }); + } + function ensureAuth(userId, returnTo) { if (userId) { return null; @@ -161,6 +225,7 @@ function buildApp({ } persistMutation(); + scheduleAudioGeneration(result.job); return json(200, { status: "completed", @@ -176,24 +241,32 @@ function buildApp({ } function handlePolarWebhook(headers, rawBody) { - const signature = headers["x-signature"]; - const isValid = verifySignature({ - payload: rawBody, - secret: config.polarWebhookSecret, - signature, - }); - - if (!isValid) { - return json(401, { error: "invalid_signature" }); - } - - const payload = parseOrThrow( - PolarWebhookPayloadSchema, - parseJSON(rawBody), - "invalid_polar_webhook_payload", - ); - try { + let payload; + + if (hasStandardWebhookHeaders(headers)) { + payload = polar.extractTopUp(polar.parseWebhookEvent(rawBody, headers)); + } else { + const signature = headers["x-signature"]; + const isValid = verifySignature({ + payload: rawBody, + secret: config.polarWebhookSecret, + signature, + }); + + if (!isValid) { + return json(401, { error: "invalid_signature" }); + } + + payload = polar.extractTopUp( + parseOrThrow( + PolarWebhookPayloadSchema, + parseJSON(rawBody), + "invalid_polar_webhook_payload", + ), + ); + } + engine.topUpCredits(payload.userId, payload.credits, `polar:${payload.eventId}`); persistMutation(); return json(200, { status: "credited" }); @@ -203,7 +276,7 @@ function buildApp({ } } - function handleRequest({ method, path, headers, rawBody, query }) { + async function handleRequest({ method, path, headers, rawBody, query }) { const safeHeaders = headers || {}; const safeQuery = query || {}; const userId = getAuthenticatedUserId(safeHeaders); @@ -360,6 +433,7 @@ function buildApp({ } persistMutation(); + scheduleAudioGeneration(result.job); return redirect(withQuery(`/audio/${result.job.assetId}`, { flash: "Audiobook generated", })); @@ -395,7 +469,15 @@ function buildApp({ const accessDecision = audio ? engine.checkAudioAccess(assetId, userId) : { allowed: false, reason: "not_found" }; - return html(200, renderAudioPage({ audio, accessDecision, userId })); + let playbackUrl = null; + if (audio && accessDecision.allowed && storage.isConfigured()) { + try { + playbackUrl = await storage.getSignedDownloadUrl(audio.storageKey); + } catch (error) { + logger.warn({ err: error, assetId }, "failed to create signed playback url"); + } + } + return html(200, renderAudioPage({ audio, accessDecision, userId, playbackUrl })); } if (method === "POST" && path === "/api/webhooks/x") { @@ -406,6 +488,20 @@ function buildApp({ return handleXWebhook(safeHeaders, rawBody); } + if (method === "GET" && path === "/api/x/mentions") { + if (!x.isConfigured()) { + return json(503, { error: "x_api_not_configured" }); + } + + try { + const mentions = await x.listMentions({ sinceId: safeQuery.sinceId || undefined }); + return json(200, { mentions }); + } catch (error) { + logger.warn({ err: error }, "failed to fetch mentions from x api"); + return json(400, { error: error.message }); + } + } + if (method === "POST" && path === "/api/webhooks/polar") { const rateLimited = enforceJsonRateLimit(webhookLimiter, `webhook:${clientAddress}`); if (rateLimited) { @@ -414,6 +510,33 @@ function buildApp({ return handlePolarWebhook(safeHeaders, rawBody); } + if (method === "POST" && path === "/api/payments/create-checkout") { + if (!userId) { + return json(401, { error: "auth_required" }); + } + + if (!polar.isConfigured()) { + return json(503, { error: "polar_checkout_not_configured" }); + } + + try { + const checkout = await polar.createCheckoutSession({ + userId, + successUrl: `${config.appBaseUrl}/app?flash=Payment%20confirmed`, + returnUrl: `${config.appBaseUrl}/app`, + metadata: { + xartaudio_user_id: userId, + xartaudio_credits: "50", + }, + }); + + return json(200, { checkoutUrl: checkout.url, checkoutId: checkout.id }); + } catch (error) { + logger.warn({ err: error }, "failed to create polar checkout session"); + return json(400, { error: error.message }); + } + } + if (method === "GET" && path === "/api/me/wallet") { if (!userId) { return json(401, { error: "auth_required" }); diff --git a/src/lib/engine.js b/src/lib/engine.js index 13b13dc..b722d41 100644 --- a/src/lib/engine.js +++ b/src/lib/engine.js @@ -151,6 +151,21 @@ class XArtAudioEngine { return this.assets.get(String(assetId)) || null; } + updateAsset(assetId, patch) { + const key = String(assetId); + const current = this.assets.get(key); + if (!current) { + throw new Error("asset_not_found"); + } + + const next = { + ...current, + ...(patch || {}), + }; + this.assets.set(key, next); + return next; + } + checkAudioAccess(assetId, userId) { const audio = this.getAsset(assetId); if (!audio) { diff --git a/src/server.js b/src/server.js index a0a6a3a..323f556 100644 --- a/src/server.js +++ b/src/server.js @@ -46,7 +46,7 @@ function createHttpServer({ app }) { return http.createServer(async (req, res) => { try { const rawBody = await readBody(req); - const response = app.handleRequest(mapToAppRequest({ req, rawBody })); + const response = await app.handleRequest(mapToAppRequest({ req, rawBody })); res.statusCode = response.status; for (const [key, value] of Object.entries(response.headers || {})) { diff --git a/src/views/pages.js b/src/views/pages.js index eb6a7af..a8bf737 100644 --- a/src/views/pages.js +++ b/src/views/pages.js @@ -189,7 +189,7 @@ function renderAppPage({ userId, summary, jobs, flash = null }) { }); } -function renderAudioPage({ audio, accessDecision, userId }) { +function renderAudioPage({ audio, accessDecision, userId, playbackUrl = null }) { if (!audio) { return shell({ title: "Audio not found", @@ -218,7 +218,9 @@ function renderAudioPage({ audio, accessDecision, userId }) {

${escapeHtml(audio.articleTitle)}

Duration ~ ${audio.durationSec}s • Asset ${escapeHtml(audio.id)}
${action} - ${accessDecision.allowed ? `
stream://${escapeHtml(audio.storageKey)}
` : ""} + ${accessDecision.allowed + ? `
${escapeHtml(playbackUrl || `stream://${audio.storageKey}`)}
` + : ""} `, diff --git a/test/app.test.js b/test/app.test.js index ee3361b..ae3f9ee 100644 --- a/test/app.test.js +++ b/test/app.test.js @@ -9,6 +9,22 @@ function createApp(options = {}) { const baseConfig = { xWebhookSecret: "x-secret", polarWebhookSecret: "polar-secret", + xBearerToken: "", + xBotUserId: "", + polarAccessToken: "", + polarServer: "production", + polarProductIds: [], + appBaseUrl: "http://localhost:3000", + ttsApiKey: "", + ttsBaseUrl: "", + ttsModel: "gpt-4o-mini-tts", + ttsVoice: "alloy", + s3Bucket: "", + s3Region: "", + s3Endpoint: "", + s3AccessKeyId: "", + s3SecretAccessKey: "", + s3SignedUrlTtlSec: 3600, rateLimits: { webhookPerMinute: 120, authPerMinute: 30, @@ -46,7 +62,7 @@ function createApp(options = {}) { }); } -function call(app, { method, path, headers = {}, body = "", query = {} }) { +async function call(app, { method, path, headers = {}, body = "", query = {} }) { return app.handleRequest({ method, path, @@ -56,36 +72,36 @@ function call(app, { method, path, headers = {}, body = "", query = {} }) { }); } -function postJSONWebhook(app, path, payload, secret) { +async function postJSONWebhook(app, path, payload, secret, extraHeaders) { const rawBody = JSON.stringify(payload); const sig = hmacSHA256Hex(rawBody, secret); return call(app, { method: "POST", path, - headers: { "x-signature": `sha256=${sig}` }, + headers: { "x-signature": `sha256=${sig}`, ...(extraHeaders || {}) }, body: rawBody, }); } -test("GET / renders landing page", () => { +test("GET / renders landing page", async () => { const app = createApp(); - const response = call(app, { method: "GET", path: "/" }); + const response = await call(app, { method: "GET", path: "/" }); assert.equal(response.status, 200); assert.match(response.body, /From X Article to audiobook in one mention/); }); -test("unauthenticated /app redirects to /login with returnTo", () => { +test("unauthenticated /app redirects to /login with returnTo", async () => { const app = createApp(); - const response = call(app, { method: "GET", path: "/app" }); + const response = await call(app, { method: "GET", path: "/app" }); assert.equal(response.status, 303); assert.match(response.headers.location, /^\/login\?/); assert.match(response.headers.location, /returnTo=%2Fapp/); }); -test("POST /auth/dev-login sets cookie and redirects", () => { +test("POST /auth/dev-login sets cookie and redirects", async () => { const app = createApp(); - const response = call(app, { + const response = await call(app, { method: "POST", path: "/auth/dev-login", body: "userId=matiss&returnTo=%2Fapp", @@ -96,11 +112,11 @@ test("POST /auth/dev-login sets cookie and redirects", () => { assert.match(response.headers["set-cookie"], /^xartaudio_user=matiss/); }); -test("authenticated dashboard topup + simulate mention flow", () => { +test("authenticated dashboard topup + simulate mention flow", async () => { const app = createApp(); const cookieHeader = "xartaudio_user=alice"; - const topup = call(app, { + const topup = await call(app, { method: "POST", path: "/app/actions/topup", headers: { cookie: cookieHeader }, @@ -109,7 +125,7 @@ test("authenticated dashboard topup + simulate mention flow", () => { assert.equal(topup.status, 303); assert.match(topup.headers.location, /Added%208%20credits/); - const simulate = call(app, { + const simulate = await call(app, { method: "POST", path: "/app/actions/simulate-mention", headers: { cookie: cookieHeader }, @@ -118,7 +134,7 @@ test("authenticated dashboard topup + simulate mention flow", () => { assert.equal(simulate.status, 303); assert.match(simulate.headers.location, /^\/audio\//); - const dashboard = call(app, { + const dashboard = await call(app, { method: "GET", path: "/app", headers: { cookie: cookieHeader }, @@ -128,23 +144,23 @@ test("authenticated dashboard topup + simulate mention flow", () => { assert.match(dashboard.body, /Hello/); }); -test("audio flow requires auth for unlock and supports permanent unlock", () => { +test("audio flow requires auth for unlock and supports permanent unlock", async () => { const app = createApp(); - call(app, { + await call(app, { method: "POST", path: "/app/actions/topup", headers: { cookie: "xartaudio_user=owner" }, body: "amount=5", }); - call(app, { + await call(app, { method: "POST", path: "/app/actions/topup", headers: { cookie: "xartaudio_user=viewer" }, body: "amount=5", }); - const generated = call(app, { + const generated = await call(app, { method: "POST", path: "/app/actions/simulate-mention", headers: { cookie: "xartaudio_user=owner" }, @@ -154,28 +170,28 @@ test("audio flow requires auth for unlock and supports permanent unlock", () => const audioPath = generated.headers.location.split("?")[0]; const assetId = audioPath.replace("/audio/", ""); - const beforeUnlock = call(app, { + const beforeUnlock = await call(app, { method: "GET", path: audioPath, headers: { cookie: "xartaudio_user=viewer" }, }); assert.match(beforeUnlock.body, /Unlock required: 1 credits/); - const unlock = call(app, { + const unlock = await call(app, { method: "POST", path: `/audio/${assetId}/unlock`, headers: { cookie: "xartaudio_user=viewer" }, }); assert.equal(unlock.status, 303); - const afterUnlock = call(app, { + const afterUnlock = await call(app, { method: "GET", path: audioPath, headers: { cookie: "xartaudio_user=viewer" }, }); assert.match(afterUnlock.body, /Access granted/); - const wallet = call(app, { + const wallet = await call(app, { method: "GET", path: "/api/me/wallet", headers: { cookie: "xartaudio_user=viewer" }, @@ -184,9 +200,146 @@ test("audio flow requires auth for unlock and supports permanent unlock", () => assert.equal(walletData.balance, 4); }); -test("X webhook invalid signature is rejected", () => { +test("audio page uses signed storage URL when storage adapter is configured", async () => { + const app = createApp({ + storageAdapter: { + isConfigured() { + return true; + }, + async getSignedDownloadUrl(key) { + return `https://signed.local/${key}`; + }, + async uploadAudio() {}, + }, + }); + + await call(app, { + method: "POST", + path: "/app/actions/topup", + headers: { cookie: "xartaudio_user=owner" }, + body: "amount=2", + }); + + const generated = await call(app, { + method: "POST", + path: "/app/actions/simulate-mention", + headers: { cookie: "xartaudio_user=owner" }, + body: "title=Signed&body=Audio+Body", + }); + + const audioPath = generated.headers.location.split("?")[0]; + const page = await call(app, { + method: "GET", + path: audioPath, + headers: { cookie: "xartaudio_user=owner" }, + }); + + assert.match(page.body, /https:\/\/signed\.local\/audio\/1\.mp3/); +}); + +test("/api/x/mentions returns upstream mentions when configured", async () => { + const app = createApp({ + xAdapter: { + isConfigured() { + return true; + }, + async listMentions({ sinceId }) { + return [{ id: "m1", sinceId: sinceId || null }]; + }, + }, + }); + + const response = await call(app, { + method: "GET", + path: "/api/x/mentions", + query: { sinceId: "100" }, + }); + + assert.equal(response.status, 200); + const body = JSON.parse(response.body); + assert.equal(body.mentions.length, 1); + assert.equal(body.mentions[0].id, "m1"); +}); + +test("simulate mention schedules background audio generation when service is configured", async () => { + const queued = []; + const app = createApp({ + audioGenerationService: { + isConfigured() { + return true; + }, + async enqueueJob(payload) { + queued.push(payload); + }, + }, + }); + + await call(app, { + method: "POST", + path: "/app/actions/topup", + headers: { cookie: "xartaudio_user=alice" }, + body: "amount=3", + }); + + const simulate = await call(app, { + method: "POST", + path: "/app/actions/simulate-mention", + headers: { cookie: "xartaudio_user=alice" }, + body: "title=T&body=hello+world", + }); + + assert.equal(simulate.status, 303); + assert.equal(queued.length, 1); + assert.equal(typeof queued[0].assetId, "string"); + assert.equal(queued[0].text, "hello world"); +}); + +test("/api/payments/create-checkout returns 503 when Polar is not configured", async () => { const app = createApp(); - const response = call(app, { + + const response = await call(app, { + method: "POST", + path: "/api/payments/create-checkout", + headers: { cookie: "xartaudio_user=viewer" }, + }); + + assert.equal(response.status, 503); + assert.equal(JSON.parse(response.body).error, "polar_checkout_not_configured"); +}); + +test("/api/payments/create-checkout returns checkout URL when adapter is configured", async () => { + const app = createApp({ + polarAdapter: { + isConfigured() { + return true; + }, + async createCheckoutSession() { + return { id: "chk_1", url: "https://polar.sh/checkout/chk_1" }; + }, + parseWebhookEvent() { + return null; + }, + extractTopUp(payload) { + return payload; + }, + }, + }); + + const response = await call(app, { + method: "POST", + path: "/api/payments/create-checkout", + headers: { cookie: "xartaudio_user=buyer" }, + }); + + assert.equal(response.status, 200); + const body = JSON.parse(response.body); + assert.equal(body.checkoutId, "chk_1"); + assert.equal(body.checkoutUrl, "https://polar.sh/checkout/chk_1"); +}); + +test("X webhook invalid signature is rejected", async () => { + const app = createApp(); + const response = await call(app, { method: "POST", path: "/api/webhooks/x", headers: { "x-signature": "sha256=deadbeef" }, @@ -196,11 +349,11 @@ test("X webhook invalid signature is rejected", () => { assert.equal(response.status, 401); }); -test("X webhook valid flow processes article", () => { +test("X webhook valid flow processes article", async () => { const app = createApp(); - postJSONWebhook(app, "/api/webhooks/polar", { userId: "u1", credits: 4, eventId: "evt1" }, "polar-secret"); - const response = postJSONWebhook( + await postJSONWebhook(app, "/api/webhooks/polar", { userId: "u1", credits: 4, eventId: "evt1" }, "polar-secret"); + const response = await postJSONWebhook( app, "/api/webhooks/x", { @@ -220,7 +373,55 @@ test("X webhook valid flow processes article", () => { assert.equal(body.creditsCharged, 1); }); -test("emits persistence snapshots on mutating actions", () => { +test("Polar webhook uses adapter parsing for standard webhook headers", async () => { + const app = createApp({ + polarAdapter: { + isConfigured() { + return false; + }, + async createCheckoutSession() { + return null; + }, + parseWebhookEvent() { + return { + type: "order.paid", + data: { + id: "ord_1", + metadata: { xartaudio_user_id: "u9", xartaudio_credits: "7" }, + }, + }; + }, + extractTopUp(event) { + return { + userId: event.data.metadata.xartaudio_user_id, + credits: Number.parseInt(event.data.metadata.xartaudio_credits, 10), + eventId: event.data.id, + }; + }, + }, + }); + + const response = await call(app, { + method: "POST", + path: "/api/webhooks/polar", + headers: { + "webhook-id": "wh_1", + "webhook-timestamp": "1", + "webhook-signature": "sig", + }, + body: "{\"type\":\"order.paid\"}", + }); + + assert.equal(response.status, 200); + const wallet = await call(app, { + method: "GET", + path: "/api/me/wallet", + headers: { cookie: "xartaudio_user=u9" }, + }); + assert.equal(JSON.parse(wallet.body).balance, 7); +}); + +test("emits persistence snapshots on mutating actions", async () => { const snapshots = []; const app = createApp({ onMutation(state) { @@ -228,14 +429,14 @@ test("emits persistence snapshots on mutating actions", () => { }, }); - call(app, { + await call(app, { method: "POST", path: "/app/actions/topup", headers: { cookie: "xartaudio_user=alice" }, body: "amount=5", }); - call(app, { + await call(app, { method: "POST", path: "/app/actions/simulate-mention", headers: { cookie: "xartaudio_user=alice" }, @@ -249,7 +450,7 @@ test("emits persistence snapshots on mutating actions", () => { assert.equal(typeof latest.engine, "object"); }); -test("can boot app from previously persisted state snapshot", () => { +test("can boot app from previously persisted state snapshot", async () => { const snapshots = []; const app1 = createApp({ onMutation(state) { @@ -257,7 +458,7 @@ test("can boot app from previously persisted state snapshot", () => { }, }); - call(app1, { + await call(app1, { method: "POST", path: "/app/actions/topup", headers: { cookie: "xartaudio_user=restart-user" }, @@ -269,7 +470,7 @@ test("can boot app from previously persisted state snapshot", () => { initialState: persistedState, }); - const wallet = call(app2, { + const wallet = await call(app2, { method: "GET", path: "/api/me/wallet", headers: { cookie: "xartaudio_user=restart-user" }, @@ -279,7 +480,7 @@ test("can boot app from previously persisted state snapshot", () => { assert.equal(JSON.parse(wallet.body).balance, 6); }); -test("rate limits repeated webhook calls", () => { +test("rate limits repeated webhook calls", async () => { const app = createApp({ config: { rateLimits: { @@ -288,13 +489,13 @@ test("rate limits repeated webhook calls", () => { }, }); - const first = call(app, { + const first = await call(app, { method: "POST", path: "/api/webhooks/x", headers: { "x-forwarded-for": "1.2.3.4", "x-signature": "sha256=deadbeef" }, body: JSON.stringify({}), }); - const second = call(app, { + const second = await call(app, { method: "POST", path: "/api/webhooks/x", headers: { "x-forwarded-for": "1.2.3.4", "x-signature": "sha256=deadbeef" }, @@ -305,7 +506,7 @@ test("rate limits repeated webhook calls", () => { assert.equal(second.status, 429); }); -test("rate limits repeated login attempts from same IP", () => { +test("rate limits repeated login attempts from same IP", async () => { const app = createApp({ config: { rateLimits: { @@ -314,13 +515,13 @@ test("rate limits repeated login attempts from same IP", () => { }, }); - const first = call(app, { + const first = await call(app, { method: "POST", path: "/auth/dev-login", headers: { "x-forwarded-for": "5.5.5.5" }, body: "userId=alice&returnTo=%2Fapp", }); - const second = call(app, { + const second = await call(app, { method: "POST", path: "/auth/dev-login", headers: { "x-forwarded-for": "5.5.5.5" }, diff --git a/test/engine.test.js b/test/engine.test.js index 253ca1a..294b43e 100644 --- a/test/engine.test.js +++ b/test/engine.test.js @@ -180,3 +180,25 @@ test("round-trips state snapshot across engine restart", () => { assert.equal(engine2.getAsset(created.job.assetId).articleTitle, "Snap"); assert.equal(engine2.checkAudioAccess(created.job.assetId, "u1").allowed, true); }); + +test("updateAsset patches stored asset metadata", () => { + const engine = createEngine(); + engine.topUpCredits("u1", 5, "topup-update-asset"); + const created = engine.processMention({ + mentionPostId: "m-update-asset", + callerUserId: "u1", + parentPost: { + id: "p-update-asset", + authorId: "author", + article: { id: "a-update", title: "T", body: "hello" }, + }, + }); + + const updated = engine.updateAsset(created.job.assetId, { + storageKey: "audio/real-file.mp3", + sizeBytes: 12345, + }); + + assert.equal(updated.storageKey, "audio/real-file.mp3"); + assert.equal(updated.sizeBytes, 12345); +});