feat(recordings): add phase6 recording finalization pipeline and simulator support
This commit is contained in:
@@ -77,6 +77,24 @@ export const streamSessions = pgTable('stream_sessions', {
|
|||||||
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
|
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
export const recordings = pgTable('recordings', {
|
||||||
|
id: uuid('id').defaultRandom().primaryKey(),
|
||||||
|
ownerUserId: uuid('owner_user_id').notNull().references(() => users.id),
|
||||||
|
streamSessionId: uuid('stream_session_id').notNull().references(() => streamSessions.id),
|
||||||
|
cameraDeviceId: uuid('camera_device_id').notNull().references(() => devices.id),
|
||||||
|
requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id),
|
||||||
|
eventId: uuid('event_id').references(() => events.id),
|
||||||
|
objectKey: varchar('object_key', { length: 1024 }),
|
||||||
|
bucket: varchar('bucket', { length: 255 }),
|
||||||
|
durationSeconds: integer('duration_seconds'),
|
||||||
|
sizeBytes: integer('size_bytes'),
|
||||||
|
status: varchar('status', { length: 32 }).default('awaiting_upload').notNull(),
|
||||||
|
availableAt: timestamp('available_at', { withTimezone: true }),
|
||||||
|
error: text('error'),
|
||||||
|
createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(),
|
||||||
|
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
|
||||||
|
});
|
||||||
|
|
||||||
export const events = pgTable('events', {
|
export const events = pgTable('events', {
|
||||||
id: uuid('id').defaultRandom().primaryKey(),
|
id: uuid('id').defaultRandom().primaryKey(),
|
||||||
userId: uuid('user_id').notNull().references(() => users.id),
|
userId: uuid('user_id').notNull().references(() => users.id),
|
||||||
@@ -159,6 +177,7 @@ export const schema = {
|
|||||||
deviceLinks,
|
deviceLinks,
|
||||||
deviceCommands,
|
deviceCommands,
|
||||||
streamSessions,
|
streamSessions,
|
||||||
|
recordings,
|
||||||
events,
|
events,
|
||||||
videos,
|
videos,
|
||||||
notifications,
|
notifications,
|
||||||
|
|||||||
23
Backend/drizzle/0009_recordings_pipeline.sql
Normal file
23
Backend/drizzle/0009_recordings_pipeline.sql
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
CREATE TABLE "recordings" (
|
||||||
|
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
|
||||||
|
"owner_user_id" uuid NOT NULL,
|
||||||
|
"stream_session_id" uuid NOT NULL,
|
||||||
|
"camera_device_id" uuid NOT NULL,
|
||||||
|
"requester_device_id" uuid NOT NULL,
|
||||||
|
"event_id" uuid,
|
||||||
|
"object_key" varchar(1024),
|
||||||
|
"bucket" varchar(255),
|
||||||
|
"duration_seconds" integer,
|
||||||
|
"size_bytes" integer,
|
||||||
|
"status" varchar(32) DEFAULT 'awaiting_upload' NOT NULL,
|
||||||
|
"available_at" timestamp with time zone,
|
||||||
|
"error" text,
|
||||||
|
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
|
||||||
|
"updated_at" timestamp with time zone DEFAULT now() NOT NULL
|
||||||
|
);
|
||||||
|
--> statement-breakpoint
|
||||||
|
ALTER TABLE "recordings" ADD CONSTRAINT "recordings_owner_user_id_users_id_fk" FOREIGN KEY ("owner_user_id") REFERENCES "public"."users"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
|
||||||
|
ALTER TABLE "recordings" ADD CONSTRAINT "recordings_stream_session_id_stream_sessions_id_fk" FOREIGN KEY ("stream_session_id") REFERENCES "public"."stream_sessions"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
|
||||||
|
ALTER TABLE "recordings" ADD CONSTRAINT "recordings_camera_device_id_devices_id_fk" FOREIGN KEY ("camera_device_id") REFERENCES "public"."devices"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
|
||||||
|
ALTER TABLE "recordings" ADD CONSTRAINT "recordings_requester_device_id_devices_id_fk" FOREIGN KEY ("requester_device_id") REFERENCES "public"."devices"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
|
||||||
|
ALTER TABLE "recordings" ADD CONSTRAINT "recordings_event_id_events_id_fk" FOREIGN KEY ("event_id") REFERENCES "public"."events"("id") ON DELETE no action ON UPDATE no action;
|
||||||
@@ -64,6 +64,13 @@
|
|||||||
"when": 1770415956419,
|
"when": 1770415956419,
|
||||||
"tag": "0008_media_plane_columns",
|
"tag": "0008_media_plane_columns",
|
||||||
"breakpoints": true
|
"breakpoints": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"idx": 9,
|
||||||
|
"version": "7",
|
||||||
|
"when": 1770416956419,
|
||||||
|
"tag": "0009_recordings_pipeline",
|
||||||
|
"breakpoints": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,8 +12,10 @@ import deviceLinksRoutes from './routes/device-links';
|
|||||||
import commandsRoutes from './routes/commands';
|
import commandsRoutes from './routes/commands';
|
||||||
import eventsRoutes from './routes/events';
|
import eventsRoutes from './routes/events';
|
||||||
import streamsRoutes from './routes/streams';
|
import streamsRoutes from './routes/streams';
|
||||||
|
import recordingsRoutes from './routes/recordings';
|
||||||
import { setupRealtimeGateway } from './realtime/gateway';
|
import { setupRealtimeGateway } from './realtime/gateway';
|
||||||
import { ensureMinioBucket } from './utils/minio';
|
import { ensureMinioBucket } from './utils/minio';
|
||||||
|
import { startRecordingsWorker } from './workers/recordings';
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
const openApiDocument = buildOpenApiDocument();
|
const openApiDocument = buildOpenApiDocument();
|
||||||
@@ -39,6 +41,7 @@ app.use('/device-links', deviceLinksRoutes);
|
|||||||
app.use('/commands', commandsRoutes);
|
app.use('/commands', commandsRoutes);
|
||||||
app.use('/events', eventsRoutes);
|
app.use('/events', eventsRoutes);
|
||||||
app.use('/streams', streamsRoutes);
|
app.use('/streams', streamsRoutes);
|
||||||
|
app.use('/recordings', recordingsRoutes);
|
||||||
|
|
||||||
app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
|
app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
@@ -57,6 +60,7 @@ const start = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
setupRealtimeGateway(server);
|
setupRealtimeGateway(server);
|
||||||
|
startRecordingsWorker();
|
||||||
|
|
||||||
server.listen(port, () => {
|
server.listen(port, () => {
|
||||||
console.log(`Server is running on port ${port}`);
|
console.log(`Server is running on port ${port}`);
|
||||||
|
|||||||
@@ -159,6 +159,8 @@
|
|||||||
<button id="requestStreamBtn">Request On-Demand Stream</button>
|
<button id="requestStreamBtn">Request On-Demand Stream</button>
|
||||||
<button id="fetchPlaybackBtn" class="alt">Fetch Playback Token (Latest)</button>
|
<button id="fetchPlaybackBtn" class="alt">Fetch Playback Token (Latest)</button>
|
||||||
<button id="fetchSubscribeBtn" class="alt">Fetch Subscribe Credentials</button>
|
<button id="fetchSubscribeBtn" class="alt">Fetch Subscribe Credentials</button>
|
||||||
|
<button id="listRecordingsBtn" class="alt">List Recordings</button>
|
||||||
|
<button id="downloadLatestRecordingBtn" class="alt">Download URL (Latest)</button>
|
||||||
<pre id="clientState"></pre>
|
<pre id="clientState"></pre>
|
||||||
</section>
|
</section>
|
||||||
|
|
||||||
@@ -167,6 +169,7 @@
|
|||||||
<button id="startMotionBtn" class="warn">Start Motion Event</button>
|
<button id="startMotionBtn" class="warn">Start Motion Event</button>
|
||||||
<button id="endMotionBtn" class="danger">End Last Motion Event</button>
|
<button id="endMotionBtn" class="danger">End Last Motion Event</button>
|
||||||
<button id="fetchPublishBtn" class="alt">Fetch Publish Credentials</button>
|
<button id="fetchPublishBtn" class="alt">Fetch Publish Credentials</button>
|
||||||
|
<button id="finalizeRecordingBtn" class="alt">Finalize Recording (Latest)</button>
|
||||||
<pre id="cameraState"></pre>
|
<pre id="cameraState"></pre>
|
||||||
</section>
|
</section>
|
||||||
</div>
|
</div>
|
||||||
@@ -185,6 +188,7 @@
|
|||||||
socket: null,
|
socket: null,
|
||||||
lastMotionEventId: null,
|
lastMotionEventId: null,
|
||||||
lastStreamSessionId: null,
|
lastStreamSessionId: null,
|
||||||
|
lastRecordingId: null,
|
||||||
};
|
};
|
||||||
|
|
||||||
const $ = (id) => document.getElementById(id);
|
const $ = (id) => document.getElementById(id);
|
||||||
@@ -203,7 +207,11 @@
|
|||||||
const render = () => {
|
const render = () => {
|
||||||
$('deviceState').textContent = JSON.stringify({ device: state.device, hasToken: Boolean(state.deviceToken) }, null, 2);
|
$('deviceState').textContent = JSON.stringify({ device: state.device, hasToken: Boolean(state.deviceToken) }, null, 2);
|
||||||
$('clientState').textContent = JSON.stringify({ lastStreamSessionId: state.lastStreamSessionId }, null, 2);
|
$('clientState').textContent = JSON.stringify({ lastStreamSessionId: state.lastStreamSessionId }, null, 2);
|
||||||
$('cameraState').textContent = JSON.stringify({ lastMotionEventId: state.lastMotionEventId }, null, 2);
|
$('cameraState').textContent = JSON.stringify(
|
||||||
|
{ lastMotionEventId: state.lastMotionEventId, lastRecordingId: state.lastRecordingId },
|
||||||
|
null,
|
||||||
|
2,
|
||||||
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
const authFetch = async (url, options = {}) => {
|
const authFetch = async (url, options = {}) => {
|
||||||
@@ -425,6 +433,29 @@
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
$('listRecordingsBtn').addEventListener('click', async () => {
|
||||||
|
try {
|
||||||
|
const payload = await deviceFetch('/recordings/me/list');
|
||||||
|
if (payload.recordings?.length > 0) {
|
||||||
|
state.lastRecordingId = payload.recordings[0].id;
|
||||||
|
render();
|
||||||
|
}
|
||||||
|
log('recordings', payload);
|
||||||
|
} catch (error) {
|
||||||
|
log('list recordings failed', { error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
$('downloadLatestRecordingBtn').addEventListener('click', async () => {
|
||||||
|
try {
|
||||||
|
if (!state.lastRecordingId) throw new Error('No recording id available');
|
||||||
|
const payload = await deviceFetch(`/recordings/${state.lastRecordingId}/download-url`);
|
||||||
|
log('recording download url', payload);
|
||||||
|
} catch (error) {
|
||||||
|
log('recording download failed', { error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
$('fetchSubscribeBtn').addEventListener('click', async () => {
|
$('fetchSubscribeBtn').addEventListener('click', async () => {
|
||||||
try {
|
try {
|
||||||
if (!state.lastStreamSessionId) throw new Error('No known stream session');
|
if (!state.lastStreamSessionId) throw new Error('No known stream session');
|
||||||
@@ -475,6 +506,34 @@
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
$('finalizeRecordingBtn').addEventListener('click', async () => {
|
||||||
|
try {
|
||||||
|
if (!state.lastStreamSessionId) throw new Error('No stream session yet');
|
||||||
|
|
||||||
|
const listPayload = await deviceFetch('/recordings/me/list');
|
||||||
|
const target = listPayload.recordings?.find((r) => r.streamSessionId === state.lastStreamSessionId) ?? listPayload.recordings?.[0];
|
||||||
|
|
||||||
|
if (!target) throw new Error('No recording placeholder exists');
|
||||||
|
|
||||||
|
state.lastRecordingId = target.id;
|
||||||
|
render();
|
||||||
|
|
||||||
|
const payload = await deviceFetch(`/recordings/${target.id}/finalize`, {
|
||||||
|
method: 'POST',
|
||||||
|
body: JSON.stringify({
|
||||||
|
objectKey: `recordings/${target.id}.mp4`,
|
||||||
|
bucket: 'videos',
|
||||||
|
durationSeconds: 12,
|
||||||
|
sizeBytes: 1024 * 1024 * 8,
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
log('recording finalized', payload);
|
||||||
|
} catch (error) {
|
||||||
|
log('finalize recording failed', { error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
render();
|
render();
|
||||||
</script>
|
</script>
|
||||||
</body>
|
</body>
|
||||||
|
|||||||
185
Backend/routes/recordings.ts
Normal file
185
Backend/routes/recordings.ts
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
import { and, desc, eq } from 'drizzle-orm';
|
||||||
|
import { Router } from 'express';
|
||||||
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
import { db } from '../db/client';
|
||||||
|
import { recordings, streamSessions } from '../db/schema';
|
||||||
|
import { requireDeviceAuth } from '../middleware/device-auth';
|
||||||
|
import { minioBucket, minioClient, minioPresignedExpirySeconds } from '../utils/minio';
|
||||||
|
|
||||||
|
const router = Router();
|
||||||
|
|
||||||
|
const listSchema = z.object({
|
||||||
|
status: z.string().optional(),
|
||||||
|
limit: z.coerce.number().int().min(1).max(100).default(25),
|
||||||
|
});
|
||||||
|
|
||||||
|
const finalizeSchema = z.object({
|
||||||
|
objectKey: z.string().trim().min(1),
|
||||||
|
bucket: z.string().trim().min(1).default(minioBucket),
|
||||||
|
durationSeconds: z.coerce.number().int().nonnegative().optional(),
|
||||||
|
sizeBytes: z.coerce.number().int().nonnegative().optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const recordingParamSchema = z.object({
|
||||||
|
recordingId: z.string().uuid(),
|
||||||
|
});
|
||||||
|
|
||||||
|
router.get('/me/list', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsed = listSchema.safeParse(req.query);
|
||||||
|
|
||||||
|
if (!parsed.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await db.query.recordings.findMany({
|
||||||
|
where: eq(recordings.ownerUserId, deviceAuth.userId),
|
||||||
|
orderBy: [desc(recordings.createdAt)],
|
||||||
|
limit: parsed.data.limit,
|
||||||
|
});
|
||||||
|
|
||||||
|
const filtered = parsed.data.status ? result.filter((recording) => recording.status === parsed.data.status) : result;
|
||||||
|
|
||||||
|
res.json({ count: filtered.length, recordings: filtered });
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post('/:recordingId/finalize', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsedParams = recordingParamSchema.safeParse(req.params);
|
||||||
|
|
||||||
|
if (!parsedParams.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid recordingId', errors: parsedParams.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const parsed = finalizeSchema.safeParse(req.body ?? {});
|
||||||
|
|
||||||
|
if (!parsed.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const recording = await db.query.recordings.findFirst({
|
||||||
|
where: and(eq(recordings.id, parsedParams.data.recordingId), eq(recordings.ownerUserId, deviceAuth.userId)),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!recording) {
|
||||||
|
res.status(404).json({ message: 'Recording not found' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (recording.cameraDeviceId !== deviceAuth.deviceId) {
|
||||||
|
res.status(403).json({ message: 'Only camera device can finalize this recording' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
|
||||||
|
const [updated] = await db
|
||||||
|
.update(recordings)
|
||||||
|
.set({
|
||||||
|
objectKey: parsed.data.objectKey,
|
||||||
|
bucket: parsed.data.bucket,
|
||||||
|
durationSeconds: parsed.data.durationSeconds,
|
||||||
|
sizeBytes: parsed.data.sizeBytes,
|
||||||
|
status: 'ready',
|
||||||
|
availableAt: now,
|
||||||
|
updatedAt: now,
|
||||||
|
error: null,
|
||||||
|
})
|
||||||
|
.where(eq(recordings.id, recording.id))
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
res.json({ message: 'Recording finalized', recording: updated });
|
||||||
|
});
|
||||||
|
|
||||||
|
router.get('/:recordingId/download-url', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsedParams = recordingParamSchema.safeParse(req.params);
|
||||||
|
|
||||||
|
if (!parsedParams.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid recordingId', errors: parsedParams.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const recording = await db.query.recordings.findFirst({
|
||||||
|
where: and(eq(recordings.id, parsedParams.data.recordingId), eq(recordings.ownerUserId, deviceAuth.userId)),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!recording) {
|
||||||
|
res.status(404).json({ message: 'Recording not found' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const canAccess = recording.requesterDeviceId === deviceAuth.deviceId || recording.cameraDeviceId === deviceAuth.deviceId;
|
||||||
|
|
||||||
|
if (!canAccess) {
|
||||||
|
res.status(403).json({ message: 'Device cannot access this recording' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (recording.status !== 'ready' || !recording.objectKey || !recording.bucket) {
|
||||||
|
res.status(409).json({ message: 'Recording is not available yet' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const downloadUrl = await minioClient.presignedGetObject(
|
||||||
|
recording.bucket,
|
||||||
|
recording.objectKey,
|
||||||
|
minioPresignedExpirySeconds,
|
||||||
|
);
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
recordingId: recording.id,
|
||||||
|
objectKey: recording.objectKey,
|
||||||
|
bucket: recording.bucket,
|
||||||
|
downloadUrl,
|
||||||
|
expiresInSeconds: minioPresignedExpirySeconds,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Internal helper used by stream lifecycle to create recording placeholder rows.
|
||||||
|
export const createRecordingForStream = async (streamSessionId: string): Promise<void> => {
|
||||||
|
const stream = await db.query.streamSessions.findFirst({ where: eq(streamSessions.id, streamSessionId) });
|
||||||
|
|
||||||
|
if (!stream) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const existing = await db.query.recordings.findFirst({ where: eq(recordings.streamSessionId, stream.id) });
|
||||||
|
|
||||||
|
if (existing) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await db.insert(recordings).values({
|
||||||
|
ownerUserId: stream.ownerUserId,
|
||||||
|
streamSessionId: stream.id,
|
||||||
|
cameraDeviceId: stream.cameraDeviceId,
|
||||||
|
requesterDeviceId: stream.requesterDeviceId,
|
||||||
|
status: 'awaiting_upload',
|
||||||
|
updatedAt: new Date(),
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
export default router;
|
||||||
@@ -9,6 +9,7 @@ import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/sche
|
|||||||
import { mediaProvider } from '../media/service';
|
import { mediaProvider } from '../media/service';
|
||||||
import { requireDeviceAuth } from '../middleware/device-auth';
|
import { requireDeviceAuth } from '../middleware/device-auth';
|
||||||
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
|
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
|
||||||
|
import { createRecordingForStream } from './recordings';
|
||||||
|
|
||||||
const router = Router();
|
const router = Router();
|
||||||
|
|
||||||
@@ -402,6 +403,8 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
|
|||||||
.where(eq(streamSessions.id, session.id))
|
.where(eq(streamSessions.id, session.id))
|
||||||
.returning();
|
.returning();
|
||||||
|
|
||||||
|
await createRecordingForStream(session.id);
|
||||||
|
|
||||||
sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
|
sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
|
||||||
streamSessionId: session.id,
|
streamSessionId: session.id,
|
||||||
status: parsed.data.reason,
|
status: parsed.data.reason,
|
||||||
|
|||||||
36
Backend/workers/recordings.ts
Normal file
36
Backend/workers/recordings.ts
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
import { and, eq, lt } from 'drizzle-orm';
|
||||||
|
|
||||||
|
import { db } from '../db/client';
|
||||||
|
import { recordings } from '../db/schema';
|
||||||
|
|
||||||
|
const STALE_RECORDING_SECONDS = Number(process.env.RECORDING_STALE_SECONDS ?? 60 * 30);
|
||||||
|
|
||||||
|
export const startRecordingsWorker = (): void => {
|
||||||
|
const intervalMs = Number(process.env.RECORDING_WORKER_INTERVAL_MS ?? 30_000);
|
||||||
|
|
||||||
|
setInterval(() => {
|
||||||
|
reconcileStaleRecordings().catch((error) => {
|
||||||
|
console.error('recordings worker failed', error);
|
||||||
|
});
|
||||||
|
}, intervalMs);
|
||||||
|
};
|
||||||
|
|
||||||
|
const reconcileStaleRecordings = async (): Promise<void> => {
|
||||||
|
const staleBefore = new Date(Date.now() - STALE_RECORDING_SECONDS * 1000);
|
||||||
|
|
||||||
|
const stale = await db.query.recordings.findMany({
|
||||||
|
where: and(eq(recordings.status, 'awaiting_upload'), lt(recordings.createdAt, staleBefore)),
|
||||||
|
limit: 100,
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const recording of stale) {
|
||||||
|
await db
|
||||||
|
.update(recordings)
|
||||||
|
.set({
|
||||||
|
status: 'failed',
|
||||||
|
error: 'recording upload timeout',
|
||||||
|
updatedAt: new Date(),
|
||||||
|
})
|
||||||
|
.where(eq(recordings.id, recording.id));
|
||||||
|
}
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user