feat: restore/persist app state in server runtime with queued writes
This commit is contained in:
@@ -3,6 +3,7 @@
|
|||||||
const http = require("node:http");
|
const http = require("node:http");
|
||||||
const { buildApp } = require("./app");
|
const { buildApp } = require("./app");
|
||||||
const { config } = require("./config");
|
const { config } = require("./config");
|
||||||
|
const { JsonFileStateStore } = require("./lib/state-store");
|
||||||
|
|
||||||
function readBody(req) {
|
function readBody(req) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
@@ -59,10 +60,72 @@ function createHttpServer({ app }) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function start() {
|
function createMutationPersister({ stateStore, logger = console }) {
|
||||||
const app = buildApp({ config });
|
let queue = Promise.resolve();
|
||||||
|
|
||||||
|
return {
|
||||||
|
enqueue(state) {
|
||||||
|
queue = queue
|
||||||
|
.then(() => stateStore.save(state))
|
||||||
|
.catch((error) => {
|
||||||
|
logger.error("failed to persist state", error);
|
||||||
|
});
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
},
|
||||||
|
flush() {
|
||||||
|
return queue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createRuntime({ runtimeConfig = config, logger = console } = {}) {
|
||||||
|
const stateStore = new JsonFileStateStore(runtimeConfig.stateFilePath);
|
||||||
|
const initialState = await stateStore.load();
|
||||||
|
const persister = createMutationPersister({ stateStore, logger });
|
||||||
|
|
||||||
|
const app = buildApp({
|
||||||
|
config: runtimeConfig,
|
||||||
|
initialState,
|
||||||
|
onMutation(state) {
|
||||||
|
void persister.enqueue(state);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
const server = createHttpServer({ app });
|
const server = createHttpServer({ app });
|
||||||
|
|
||||||
|
return {
|
||||||
|
app,
|
||||||
|
server,
|
||||||
|
persister,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function start() {
|
||||||
|
const runtime = await createRuntime({ runtimeConfig: config });
|
||||||
|
const { server, persister } = runtime;
|
||||||
|
|
||||||
|
let shuttingDown = false;
|
||||||
|
async function shutdown(signal) {
|
||||||
|
if (shuttingDown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
shuttingDown = true;
|
||||||
|
// eslint-disable-next-line no-console
|
||||||
|
console.log(`received ${signal}, shutting down`);
|
||||||
|
|
||||||
|
server.close();
|
||||||
|
await persister.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
process.on("SIGTERM", () => {
|
||||||
|
void shutdown("SIGTERM");
|
||||||
|
});
|
||||||
|
process.on("SIGINT", () => {
|
||||||
|
void shutdown("SIGINT");
|
||||||
|
});
|
||||||
|
|
||||||
server.listen(config.port, () => {
|
server.listen(config.port, () => {
|
||||||
// eslint-disable-next-line no-console
|
// eslint-disable-next-line no-console
|
||||||
console.log(`xartaudio server listening on :${config.port}`);
|
console.log(`xartaudio server listening on :${config.port}`);
|
||||||
@@ -70,12 +133,18 @@ function start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (require.main === module) {
|
if (require.main === module) {
|
||||||
start();
|
start().catch((error) => {
|
||||||
|
// eslint-disable-next-line no-console
|
||||||
|
console.error("failed to start server", error);
|
||||||
|
process.exitCode = 1;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
mapToAppRequest,
|
mapToAppRequest,
|
||||||
normalizeHeaders,
|
normalizeHeaders,
|
||||||
createHttpServer,
|
createHttpServer,
|
||||||
|
createMutationPersister,
|
||||||
|
createRuntime,
|
||||||
start,
|
start,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
const test = require("node:test");
|
const test = require("node:test");
|
||||||
const assert = require("node:assert/strict");
|
const assert = require("node:assert/strict");
|
||||||
const { mapToAppRequest, normalizeHeaders } = require("../src/server");
|
const { mapToAppRequest, normalizeHeaders, createMutationPersister } = require("../src/server");
|
||||||
|
|
||||||
test("normalizeHeaders lowercases and joins array values", () => {
|
test("normalizeHeaders lowercases and joins array values", () => {
|
||||||
const headers = normalizeHeaders({
|
const headers = normalizeHeaders({
|
||||||
@@ -31,3 +31,21 @@ test("mapToAppRequest extracts method/path/headers/body correctly", () => {
|
|||||||
assert.equal(request.headers["x-signature"], "sha256=abc");
|
assert.equal(request.headers["x-signature"], "sha256=abc");
|
||||||
assert.equal(request.rawBody, "{\"ok\":true}");
|
assert.equal(request.rawBody, "{\"ok\":true}");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("createMutationPersister writes sequentially and flush waits", async () => {
|
||||||
|
const saved = [];
|
||||||
|
const stateStore = {
|
||||||
|
async save(state) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 5));
|
||||||
|
saved.push(state.id);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const persister = createMutationPersister({ stateStore, logger: { error() {} } });
|
||||||
|
|
||||||
|
persister.enqueue({ id: "s1" });
|
||||||
|
persister.enqueue({ id: "s2" });
|
||||||
|
await persister.flush();
|
||||||
|
|
||||||
|
assert.deepEqual(saved, ["s1", "s2"]);
|
||||||
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user