From 4814342156168f60102164a0686cb636f705dc60 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 18 Feb 2026 15:24:49 +0000 Subject: [PATCH] harden state durability and disable destructive snapshot sync --- convex/state.ts | 6 ++++- src/app.js | 64 +++++++++++++++++++++++++-------------------- src/config.js | 8 ++++++ src/server.js | 24 +++++++++++++---- test/config.test.js | 18 +++++++++++++ test/server.test.js | 38 +++++++++++++++++++++++++++ 6 files changed, 123 insertions(+), 35 deletions(-) diff --git a/convex/state.ts b/convex/state.ts index 5a543fb..0cc5bc0 100644 --- a/convex/state.ts +++ b/convex/state.ts @@ -23,6 +23,7 @@ export const saveSnapshot = mutation({ args: { snapshot: v.any(), updatedAt: v.string(), + syncToDomain: v.optional(v.boolean()), }, handler: async (ctx, args) => { const latest = await ctx.db @@ -30,7 +31,10 @@ export const saveSnapshot = mutation({ .order("desc") .first(); - const syncSummary = await syncFromEngineSnapshot(ctx, args.snapshot); + const shouldSync = Boolean(args.syncToDomain); + const syncSummary = shouldSync + ? await syncFromEngineSnapshot(ctx, args.snapshot) + : null; if (latest) { await ctx.db.patch(latest._id, { diff --git a/src/app.js b/src/app.js index 981a26f..f59ec10 100644 --- a/src/app.js +++ b/src/app.js @@ -126,20 +126,15 @@ function buildApp({ windowMs: 60_000, }); - function persistMutation() { + async function persistMutation() { if (!onMutation) { return; } - - try { - onMutation({ - version: 1, - updatedAt: new Date().toISOString(), - engine: engine.exportState(), - }); - } catch (error) { - logger.error({ err: error }, "failed to persist mutation"); - } + await onMutation({ + version: 1, + updatedAt: new Date().toISOString(), + engine: engine.exportState(), + }); } function clientAddressFromHeaders(headers) { @@ -174,7 +169,9 @@ function buildApp({ if (!generationService || !generationService.isConfigured()) { try { engine.completeJob(job.id); - persistMutation(); + void persistMutation().catch((error) => { + logger.error({ err: error, jobId: job.id }, "failed to persist completion without generation worker"); + }); } catch (error) { logger.error({ err: error, jobId: job.id }, "failed to mark job as completed without generation worker"); } @@ -183,7 +180,9 @@ function buildApp({ try { engine.startJob(job.id); - persistMutation(); + void persistMutation().catch((error) => { + logger.error({ err: error, jobId: job.id }, "failed to persist job start"); + }); } catch (error) { logger.error({ err: error, jobId: job.id }, "failed to start audio generation job"); return; @@ -196,8 +195,13 @@ function buildApp({ onCompleted: (audioMeta) => { try { engine.completeJob(job.id, audioMeta); - persistMutation(); - logger.info({ assetId: job.assetId, jobId: job.id }, "audio generation completed"); + void persistMutation() + .then(() => { + logger.info({ assetId: job.assetId, jobId: job.id }, "audio generation completed"); + }) + .catch((error) => { + logger.error({ err: error, assetId: job.assetId, jobId: job.id }, "failed to persist completed job"); + }); } catch (error) { logger.error({ err: error, assetId: job.assetId }, "failed to apply generated audio metadata"); } @@ -208,7 +212,9 @@ function buildApp({ error: error && error.message ? error.message : "audio_generation_failed", refund: true, }); - persistMutation(); + void persistMutation().catch((persistError) => { + logger.error({ err: persistError, jobId: job.id }, "failed to persist failed job state"); + }); } catch (failureError) { logger.error({ err: failureError, jobId: job.id }, "failed to mark generation failure"); } @@ -392,7 +398,7 @@ function buildApp({ }); } - persistMutation(); + await persistMutation(); scheduleAudioGeneration(result.job); const replyMessage = result.reply ? result.reply.message @@ -413,7 +419,7 @@ function buildApp({ } } - function handlePolarWebhook(headers, rawBody) { + async function handlePolarWebhook(headers, rawBody) { try { let payload; @@ -441,7 +447,7 @@ function buildApp({ } engine.topUpCredits(payload.userId, payload.credits, `polar:${payload.eventId}`); - persistMutation(); + await persistMutation(); return json(200, { status: "credited" }); } catch (error) { logger.warn({ err: error }, "polar webhook request failed"); @@ -645,7 +651,7 @@ function buildApp({ } engine.topUpCredits(userId, amount, `app-topup:${userId}:${randomUUID()}`); - persistMutation(); + await persistMutation(); return redirect(withQuery("/app", { flash: `Added ${amount} credits` })); } @@ -701,7 +707,7 @@ function buildApp({ return redirect(withQuery("/app", { flash: "Parent post is not an article" })); } - persistMutation(); + await persistMutation(); scheduleAudioGeneration(result.job); return redirect(withQuery(`/audio/${result.job.assetId}`, { flash: "Audiobook generated", @@ -725,7 +731,7 @@ function buildApp({ try { engine.unlockAudio(assetId, userId); - persistMutation(); + await persistMutation(); return redirect(withQuery(`/audio/${assetId}`, { flash: "Unlocked" })); } catch (error) { return redirect(withQuery(`/audio/${assetId}`, { flash: `Unlock failed: ${error.message}` })); @@ -821,7 +827,7 @@ function buildApp({ rawArticleHours: Number.isFinite(payload.rawArticleHours) ? payload.rawArticleHours : 24, audioDays: Number.isFinite(payload.audioDays) ? payload.audioDays : 90, }); - persistMutation(); + await persistMutation(); return json(200, { status: "ok", summary }); } @@ -830,7 +836,7 @@ function buildApp({ if (rateLimited) { return rateLimited; } - return handlePolarWebhook(safeHeaders, rawBody); + return await handlePolarWebhook(safeHeaders, rawBody); } if (method === "POST" && path === "/api/payments/create-checkout") { @@ -895,7 +901,7 @@ function buildApp({ const assetId = path.slice("/api/audio/".length, -"/unlock".length); try { const result = engine.unlockAudio(assetId, userId); - persistMutation(); + await persistMutation(); return json(200, result); } catch (error) { return json(400, { error: error.message }); @@ -910,7 +916,7 @@ function buildApp({ const assetId = path.slice("/api/audio/".length); try { const deleted = engine.takedownAudio(assetId, userId); - persistMutation(); + await persistMutation(); return json(200, { status: "deleted", assetId: deleted.id }); } catch (error) { const status = error.message === "forbidden" ? 403 : 400; @@ -927,7 +933,7 @@ function buildApp({ const jobId = path.slice("/internal/jobs/".length, -"/start".length); try { const job = engine.startJob(jobId); - persistMutation(); + await persistMutation(); return json(200, { job }); } catch (error) { return json(400, { error: error.message }); @@ -945,7 +951,7 @@ function buildApp({ try { const job = engine.completeJob(jobId, payload.asset || {}); - persistMutation(); + await persistMutation(); return json(200, { job }); } catch (error) { return json(400, { error: error.message }); @@ -967,7 +973,7 @@ function buildApp({ error: payload.error || "generation_failed", refund: shouldRefund, }); - persistMutation(); + await persistMutation(); return json(200, { job }); } catch (error) { return json(400, { error: error.message }); diff --git a/src/config.js b/src/config.js index ee61991..f9d35d4 100644 --- a/src/config.js +++ b/src/config.js @@ -49,6 +49,7 @@ function boolFromEnv(name, fallback) { } const parsed = { + nodeEnv: strFromEnv("NODE_ENV", "development"), port: intFromEnv("PORT", 3000), logLevel: strFromEnv("LOG_LEVEL", "info"), appBaseUrl: strFromEnv("APP_BASE_URL", "http://localhost:3000"), @@ -100,7 +101,13 @@ const parsed = { }, }; +parsed.allowInMemoryStateFallback = boolFromEnv( + "ALLOW_IN_MEMORY_STATE_FALLBACK", + parsed.nodeEnv !== "production", +); + const ConfigSchema = z.object({ + nodeEnv: z.string().min(1), port: z.number().int().positive(), logLevel: z.enum(["fatal", "error", "warn", "info", "debug", "trace", "silent"]), appBaseUrl: z.string().min(1), @@ -150,6 +157,7 @@ const ConfigSchema = z.object({ stepCredits: z.number().int().positive(), maxCharsPerArticle: z.number().int().positive(), }), + allowInMemoryStateFallback: z.boolean(), }); const config = ConfigSchema.parse(parsed); diff --git a/src/server.js b/src/server.js index 288d364..87e3804 100644 --- a/src/server.js +++ b/src/server.js @@ -64,13 +64,19 @@ function createHttpServer({ app }) { function createMutationPersister({ stateStore, logger = console }) { let queue = Promise.resolve(); + let lastError = null; return { enqueue(state) { queue = queue - .then(() => stateStore.save(state)) + .then( + () => stateStore.save(state), + () => stateStore.save(state), + ) .catch((error) => { + lastError = error; logger.error({ err: error }, "failed to persist state"); + throw error; }); return queue; @@ -78,6 +84,9 @@ function createMutationPersister({ stateStore, logger = console }) { flush() { return queue; }, + getLastError() { + return lastError; + }, }; } @@ -92,10 +101,15 @@ async function createRuntime({ runtimeConfig = config, logger = console, stateSt try { initialState = await effectiveStateStore.load(); } catch (error) { - logger.warn( - { err: error }, - "failed to initialize configured state store; falling back to in-memory state", - ); + const allowFallback = runtimeConfig.allowInMemoryStateFallback !== undefined + ? Boolean(runtimeConfig.allowInMemoryStateFallback) + : true; + + if (!allowFallback) { + throw new Error("state_store_unavailable_without_fallback", { cause: error }); + } + + logger.warn({ err: error }, "failed to initialize configured state store; falling back to in-memory state"); effectiveStateStore = new InMemoryStateStore(); initialState = await effectiveStateStore.load(); } diff --git a/test/config.test.js b/test/config.test.js index a7d5932..b0b8828 100644 --- a/test/config.test.js +++ b/test/config.test.js @@ -31,6 +31,7 @@ function withTempEnv(patch, run) { test("config uses defaults when env is missing", () => { withTempEnv({ + NODE_ENV: "", PORT: "", LOG_LEVEL: "", APP_BASE_URL: "", @@ -42,8 +43,10 @@ test("config uses defaults when env is missing", () => { MINIO_SIGNED_URL_TTL_SEC: "", MINIO_USE_SSL: "", WEBHOOK_RPM: "", + ALLOW_IN_MEMORY_STATE_FALLBACK: "", }, () => { const { config } = require("../src/config"); + assert.equal(config.nodeEnv, "development"); assert.equal(config.port, 3000); assert.equal(config.logLevel, "info"); assert.equal(config.appBaseUrl, "http://localhost:3000"); @@ -55,6 +58,7 @@ test("config uses defaults when env is missing", () => { assert.equal(config.minioSignedUrlTtlSec, 3600); assert.equal(config.minioUseSSL, true); assert.equal(config.rateLimits.webhookPerMinute, 120); + assert.equal(config.allowInMemoryStateFallback, true); assert.equal(config.abuse.maxJobsPerUserPerDay, 0); assert.equal(config.abuse.cooldownSec, 0); assert.deepEqual(config.abuse.denyUserIds, []); @@ -63,6 +67,7 @@ test("config uses defaults when env is missing", () => { test("config reads convex/qwen/minio overrides", () => { withTempEnv({ + NODE_ENV: "production", PORT: "8080", LOG_LEVEL: "debug", APP_BASE_URL: "https://xartaudio.app", @@ -86,8 +91,10 @@ test("config reads convex/qwen/minio overrides", () => { ABUSE_MAX_JOBS_PER_USER_PER_DAY: "5", ABUSE_COOLDOWN_SEC: "120", ABUSE_DENY_USER_IDS: "u1,u2", + ALLOW_IN_MEMORY_STATE_FALLBACK: "", }, () => { const { config } = require("../src/config"); + assert.equal(config.nodeEnv, "production"); assert.equal(config.port, 8080); assert.equal(config.logLevel, "debug"); assert.equal(config.appBaseUrl, "https://xartaudio.app"); @@ -109,6 +116,17 @@ test("config reads convex/qwen/minio overrides", () => { assert.equal(config.abuse.maxJobsPerUserPerDay, 5); assert.equal(config.abuse.cooldownSec, 120); assert.deepEqual(config.abuse.denyUserIds, ["u1", "u2"]); + assert.equal(config.allowInMemoryStateFallback, false); + }); +}); + +test("allow in-memory fallback can be explicitly enabled in production", () => { + withTempEnv({ + NODE_ENV: "production", + ALLOW_IN_MEMORY_STATE_FALLBACK: "true", + }, () => { + const { config } = require("../src/config"); + assert.equal(config.allowInMemoryStateFallback, true); }); }); diff --git a/test/server.test.js b/test/server.test.js index e8e3058..15ec2c2 100644 --- a/test/server.test.js +++ b/test/server.test.js @@ -11,6 +11,7 @@ const { function createRuntimeConfig() { return { + nodeEnv: "test", port: 3000, logLevel: "info", appBaseUrl: "http://localhost:3000", @@ -60,6 +61,7 @@ function createRuntimeConfig() { stepCredits: 1, maxCharsPerArticle: 120000, }, + allowInMemoryStateFallback: true, }; } @@ -153,3 +155,39 @@ test("createRuntime falls back to in-memory state when initial load fails", asyn assert.equal(response.status, 303); await runtime.persister.flush(); }); + +test("createRuntime fails startup when fallback is disabled", async () => { + const runtimeConfig = createRuntimeConfig(); + runtimeConfig.allowInMemoryStateFallback = false; + + await assert.rejects( + createRuntime({ + runtimeConfig, + logger: { info() {}, warn() {}, error() {} }, + stateStore: { + async load() { + throw new Error("state_load_failed"); + }, + async save() {}, + }, + }), + /state_store_unavailable_without_fallback/, + ); +}); + +test("createMutationPersister surfaces save errors", async () => { + const persister = createMutationPersister({ + stateStore: { + async save() { + throw new Error("persist_failed"); + }, + }, + logger: { error() {} }, + }); + + await assert.rejects( + persister.enqueue({}), + /persist_failed/, + ); + assert.equal(persister.getLastError()?.message, "persist_failed"); +});