harden state durability and disable destructive snapshot sync

This commit is contained in:
Codex
2026-02-18 15:24:49 +00:00
parent 331b66506a
commit 4814342156
6 changed files with 123 additions and 35 deletions

View File

@@ -23,6 +23,7 @@ export const saveSnapshot = mutation({
args: {
snapshot: v.any(),
updatedAt: v.string(),
syncToDomain: v.optional(v.boolean()),
},
handler: async (ctx, args) => {
const latest = await ctx.db
@@ -30,7 +31,10 @@ export const saveSnapshot = mutation({
.order("desc")
.first();
const syncSummary = await syncFromEngineSnapshot(ctx, args.snapshot);
const shouldSync = Boolean(args.syncToDomain);
const syncSummary = shouldSync
? await syncFromEngineSnapshot(ctx, args.snapshot)
: null;
if (latest) {
await ctx.db.patch(latest._id, {

View File

@@ -126,20 +126,15 @@ function buildApp({
windowMs: 60_000,
});
function persistMutation() {
async function persistMutation() {
if (!onMutation) {
return;
}
try {
onMutation({
version: 1,
updatedAt: new Date().toISOString(),
engine: engine.exportState(),
});
} catch (error) {
logger.error({ err: error }, "failed to persist mutation");
}
await onMutation({
version: 1,
updatedAt: new Date().toISOString(),
engine: engine.exportState(),
});
}
function clientAddressFromHeaders(headers) {
@@ -174,7 +169,9 @@ function buildApp({
if (!generationService || !generationService.isConfigured()) {
try {
engine.completeJob(job.id);
persistMutation();
void persistMutation().catch((error) => {
logger.error({ err: error, jobId: job.id }, "failed to persist completion without generation worker");
});
} catch (error) {
logger.error({ err: error, jobId: job.id }, "failed to mark job as completed without generation worker");
}
@@ -183,7 +180,9 @@ function buildApp({
try {
engine.startJob(job.id);
persistMutation();
void persistMutation().catch((error) => {
logger.error({ err: error, jobId: job.id }, "failed to persist job start");
});
} catch (error) {
logger.error({ err: error, jobId: job.id }, "failed to start audio generation job");
return;
@@ -196,8 +195,13 @@ function buildApp({
onCompleted: (audioMeta) => {
try {
engine.completeJob(job.id, audioMeta);
persistMutation();
logger.info({ assetId: job.assetId, jobId: job.id }, "audio generation completed");
void persistMutation()
.then(() => {
logger.info({ assetId: job.assetId, jobId: job.id }, "audio generation completed");
})
.catch((error) => {
logger.error({ err: error, assetId: job.assetId, jobId: job.id }, "failed to persist completed job");
});
} catch (error) {
logger.error({ err: error, assetId: job.assetId }, "failed to apply generated audio metadata");
}
@@ -208,7 +212,9 @@ function buildApp({
error: error && error.message ? error.message : "audio_generation_failed",
refund: true,
});
persistMutation();
void persistMutation().catch((persistError) => {
logger.error({ err: persistError, jobId: job.id }, "failed to persist failed job state");
});
} catch (failureError) {
logger.error({ err: failureError, jobId: job.id }, "failed to mark generation failure");
}
@@ -392,7 +398,7 @@ function buildApp({
});
}
persistMutation();
await persistMutation();
scheduleAudioGeneration(result.job);
const replyMessage = result.reply
? result.reply.message
@@ -413,7 +419,7 @@ function buildApp({
}
}
function handlePolarWebhook(headers, rawBody) {
async function handlePolarWebhook(headers, rawBody) {
try {
let payload;
@@ -441,7 +447,7 @@ function buildApp({
}
engine.topUpCredits(payload.userId, payload.credits, `polar:${payload.eventId}`);
persistMutation();
await persistMutation();
return json(200, { status: "credited" });
} catch (error) {
logger.warn({ err: error }, "polar webhook request failed");
@@ -645,7 +651,7 @@ function buildApp({
}
engine.topUpCredits(userId, amount, `app-topup:${userId}:${randomUUID()}`);
persistMutation();
await persistMutation();
return redirect(withQuery("/app", { flash: `Added ${amount} credits` }));
}
@@ -701,7 +707,7 @@ function buildApp({
return redirect(withQuery("/app", { flash: "Parent post is not an article" }));
}
persistMutation();
await persistMutation();
scheduleAudioGeneration(result.job);
return redirect(withQuery(`/audio/${result.job.assetId}`, {
flash: "Audiobook generated",
@@ -725,7 +731,7 @@ function buildApp({
try {
engine.unlockAudio(assetId, userId);
persistMutation();
await persistMutation();
return redirect(withQuery(`/audio/${assetId}`, { flash: "Unlocked" }));
} catch (error) {
return redirect(withQuery(`/audio/${assetId}`, { flash: `Unlock failed: ${error.message}` }));
@@ -821,7 +827,7 @@ function buildApp({
rawArticleHours: Number.isFinite(payload.rawArticleHours) ? payload.rawArticleHours : 24,
audioDays: Number.isFinite(payload.audioDays) ? payload.audioDays : 90,
});
persistMutation();
await persistMutation();
return json(200, { status: "ok", summary });
}
@@ -830,7 +836,7 @@ function buildApp({
if (rateLimited) {
return rateLimited;
}
return handlePolarWebhook(safeHeaders, rawBody);
return await handlePolarWebhook(safeHeaders, rawBody);
}
if (method === "POST" && path === "/api/payments/create-checkout") {
@@ -895,7 +901,7 @@ function buildApp({
const assetId = path.slice("/api/audio/".length, -"/unlock".length);
try {
const result = engine.unlockAudio(assetId, userId);
persistMutation();
await persistMutation();
return json(200, result);
} catch (error) {
return json(400, { error: error.message });
@@ -910,7 +916,7 @@ function buildApp({
const assetId = path.slice("/api/audio/".length);
try {
const deleted = engine.takedownAudio(assetId, userId);
persistMutation();
await persistMutation();
return json(200, { status: "deleted", assetId: deleted.id });
} catch (error) {
const status = error.message === "forbidden" ? 403 : 400;
@@ -927,7 +933,7 @@ function buildApp({
const jobId = path.slice("/internal/jobs/".length, -"/start".length);
try {
const job = engine.startJob(jobId);
persistMutation();
await persistMutation();
return json(200, { job });
} catch (error) {
return json(400, { error: error.message });
@@ -945,7 +951,7 @@ function buildApp({
try {
const job = engine.completeJob(jobId, payload.asset || {});
persistMutation();
await persistMutation();
return json(200, { job });
} catch (error) {
return json(400, { error: error.message });
@@ -967,7 +973,7 @@ function buildApp({
error: payload.error || "generation_failed",
refund: shouldRefund,
});
persistMutation();
await persistMutation();
return json(200, { job });
} catch (error) {
return json(400, { error: error.message });

View File

@@ -49,6 +49,7 @@ function boolFromEnv(name, fallback) {
}
const parsed = {
nodeEnv: strFromEnv("NODE_ENV", "development"),
port: intFromEnv("PORT", 3000),
logLevel: strFromEnv("LOG_LEVEL", "info"),
appBaseUrl: strFromEnv("APP_BASE_URL", "http://localhost:3000"),
@@ -100,7 +101,13 @@ const parsed = {
},
};
parsed.allowInMemoryStateFallback = boolFromEnv(
"ALLOW_IN_MEMORY_STATE_FALLBACK",
parsed.nodeEnv !== "production",
);
const ConfigSchema = z.object({
nodeEnv: z.string().min(1),
port: z.number().int().positive(),
logLevel: z.enum(["fatal", "error", "warn", "info", "debug", "trace", "silent"]),
appBaseUrl: z.string().min(1),
@@ -150,6 +157,7 @@ const ConfigSchema = z.object({
stepCredits: z.number().int().positive(),
maxCharsPerArticle: z.number().int().positive(),
}),
allowInMemoryStateFallback: z.boolean(),
});
const config = ConfigSchema.parse(parsed);

View File

@@ -64,13 +64,19 @@ function createHttpServer({ app }) {
function createMutationPersister({ stateStore, logger = console }) {
let queue = Promise.resolve();
let lastError = null;
return {
enqueue(state) {
queue = queue
.then(() => stateStore.save(state))
.then(
() => stateStore.save(state),
() => stateStore.save(state),
)
.catch((error) => {
lastError = error;
logger.error({ err: error }, "failed to persist state");
throw error;
});
return queue;
@@ -78,6 +84,9 @@ function createMutationPersister({ stateStore, logger = console }) {
flush() {
return queue;
},
getLastError() {
return lastError;
},
};
}
@@ -92,10 +101,15 @@ async function createRuntime({ runtimeConfig = config, logger = console, stateSt
try {
initialState = await effectiveStateStore.load();
} catch (error) {
logger.warn(
{ err: error },
"failed to initialize configured state store; falling back to in-memory state",
);
const allowFallback = runtimeConfig.allowInMemoryStateFallback !== undefined
? Boolean(runtimeConfig.allowInMemoryStateFallback)
: true;
if (!allowFallback) {
throw new Error("state_store_unavailable_without_fallback", { cause: error });
}
logger.warn({ err: error }, "failed to initialize configured state store; falling back to in-memory state");
effectiveStateStore = new InMemoryStateStore();
initialState = await effectiveStateStore.load();
}

View File

@@ -31,6 +31,7 @@ function withTempEnv(patch, run) {
test("config uses defaults when env is missing", () => {
withTempEnv({
NODE_ENV: "",
PORT: "",
LOG_LEVEL: "",
APP_BASE_URL: "",
@@ -42,8 +43,10 @@ test("config uses defaults when env is missing", () => {
MINIO_SIGNED_URL_TTL_SEC: "",
MINIO_USE_SSL: "",
WEBHOOK_RPM: "",
ALLOW_IN_MEMORY_STATE_FALLBACK: "",
}, () => {
const { config } = require("../src/config");
assert.equal(config.nodeEnv, "development");
assert.equal(config.port, 3000);
assert.equal(config.logLevel, "info");
assert.equal(config.appBaseUrl, "http://localhost:3000");
@@ -55,6 +58,7 @@ test("config uses defaults when env is missing", () => {
assert.equal(config.minioSignedUrlTtlSec, 3600);
assert.equal(config.minioUseSSL, true);
assert.equal(config.rateLimits.webhookPerMinute, 120);
assert.equal(config.allowInMemoryStateFallback, true);
assert.equal(config.abuse.maxJobsPerUserPerDay, 0);
assert.equal(config.abuse.cooldownSec, 0);
assert.deepEqual(config.abuse.denyUserIds, []);
@@ -63,6 +67,7 @@ test("config uses defaults when env is missing", () => {
test("config reads convex/qwen/minio overrides", () => {
withTempEnv({
NODE_ENV: "production",
PORT: "8080",
LOG_LEVEL: "debug",
APP_BASE_URL: "https://xartaudio.app",
@@ -86,8 +91,10 @@ test("config reads convex/qwen/minio overrides", () => {
ABUSE_MAX_JOBS_PER_USER_PER_DAY: "5",
ABUSE_COOLDOWN_SEC: "120",
ABUSE_DENY_USER_IDS: "u1,u2",
ALLOW_IN_MEMORY_STATE_FALLBACK: "",
}, () => {
const { config } = require("../src/config");
assert.equal(config.nodeEnv, "production");
assert.equal(config.port, 8080);
assert.equal(config.logLevel, "debug");
assert.equal(config.appBaseUrl, "https://xartaudio.app");
@@ -109,6 +116,17 @@ test("config reads convex/qwen/minio overrides", () => {
assert.equal(config.abuse.maxJobsPerUserPerDay, 5);
assert.equal(config.abuse.cooldownSec, 120);
assert.deepEqual(config.abuse.denyUserIds, ["u1", "u2"]);
assert.equal(config.allowInMemoryStateFallback, false);
});
});
test("allow in-memory fallback can be explicitly enabled in production", () => {
withTempEnv({
NODE_ENV: "production",
ALLOW_IN_MEMORY_STATE_FALLBACK: "true",
}, () => {
const { config } = require("../src/config");
assert.equal(config.allowInMemoryStateFallback, true);
});
});

View File

@@ -11,6 +11,7 @@ const {
function createRuntimeConfig() {
return {
nodeEnv: "test",
port: 3000,
logLevel: "info",
appBaseUrl: "http://localhost:3000",
@@ -60,6 +61,7 @@ function createRuntimeConfig() {
stepCredits: 1,
maxCharsPerArticle: 120000,
},
allowInMemoryStateFallback: true,
};
}
@@ -153,3 +155,39 @@ test("createRuntime falls back to in-memory state when initial load fails", asyn
assert.equal(response.status, 303);
await runtime.persister.flush();
});
test("createRuntime fails startup when fallback is disabled", async () => {
const runtimeConfig = createRuntimeConfig();
runtimeConfig.allowInMemoryStateFallback = false;
await assert.rejects(
createRuntime({
runtimeConfig,
logger: { info() {}, warn() {}, error() {} },
stateStore: {
async load() {
throw new Error("state_load_failed");
},
async save() {},
},
}),
/state_store_unavailable_without_fallback/,
);
});
test("createMutationPersister surfaces save errors", async () => {
const persister = createMutationPersister({
stateStore: {
async save() {
throw new Error("persist_failed");
},
},
logger: { error() {} },
});
await assert.rejects(
persister.enqueue({}),
/persist_failed/,
);
assert.equal(persister.getLastError()?.message, "persist_failed");
});