From bccc049fc3e00640ea6c1b0b86f1a7be7f4c8875 Mon Sep 17 00:00:00 2001 From: Matiss Jurevics Date: Sat, 24 Jan 2026 12:35:00 +0000 Subject: [PATCH] feat(recordings): add phase6 recording finalization pipeline and simulator support --- Backend/db/schema.ts | 19 ++ Backend/drizzle/0009_recordings_pipeline.sql | 23 +++ Backend/drizzle/meta/_journal.json | 7 + Backend/index.ts | 4 + Backend/public/mobile-sim.html | 61 +++++- Backend/routes/recordings.ts | 185 +++++++++++++++++++ Backend/routes/streams.ts | 3 + Backend/workers/recordings.ts | 36 ++++ 8 files changed, 337 insertions(+), 1 deletion(-) create mode 100644 Backend/drizzle/0009_recordings_pipeline.sql create mode 100644 Backend/routes/recordings.ts create mode 100644 Backend/workers/recordings.ts diff --git a/Backend/db/schema.ts b/Backend/db/schema.ts index 0b878d7..f8c6698 100644 --- a/Backend/db/schema.ts +++ b/Backend/db/schema.ts @@ -77,6 +77,24 @@ export const streamSessions = pgTable('stream_sessions', { updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(), }); +export const recordings = pgTable('recordings', { + id: uuid('id').defaultRandom().primaryKey(), + ownerUserId: uuid('owner_user_id').notNull().references(() => users.id), + streamSessionId: uuid('stream_session_id').notNull().references(() => streamSessions.id), + cameraDeviceId: uuid('camera_device_id').notNull().references(() => devices.id), + requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id), + eventId: uuid('event_id').references(() => events.id), + objectKey: varchar('object_key', { length: 1024 }), + bucket: varchar('bucket', { length: 255 }), + durationSeconds: integer('duration_seconds'), + sizeBytes: integer('size_bytes'), + status: varchar('status', { length: 32 }).default('awaiting_upload').notNull(), + availableAt: timestamp('available_at', { withTimezone: true }), + error: text('error'), + createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(), + updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(), +}); + export const events = pgTable('events', { id: uuid('id').defaultRandom().primaryKey(), userId: uuid('user_id').notNull().references(() => users.id), @@ -159,6 +177,7 @@ export const schema = { deviceLinks, deviceCommands, streamSessions, + recordings, events, videos, notifications, diff --git a/Backend/drizzle/0009_recordings_pipeline.sql b/Backend/drizzle/0009_recordings_pipeline.sql new file mode 100644 index 0000000..598eacb --- /dev/null +++ b/Backend/drizzle/0009_recordings_pipeline.sql @@ -0,0 +1,23 @@ +CREATE TABLE "recordings" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "owner_user_id" uuid NOT NULL, + "stream_session_id" uuid NOT NULL, + "camera_device_id" uuid NOT NULL, + "requester_device_id" uuid NOT NULL, + "event_id" uuid, + "object_key" varchar(1024), + "bucket" varchar(255), + "duration_seconds" integer, + "size_bytes" integer, + "status" varchar(32) DEFAULT 'awaiting_upload' NOT NULL, + "available_at" timestamp with time zone, + "error" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +ALTER TABLE "recordings" ADD CONSTRAINT "recordings_owner_user_id_users_id_fk" FOREIGN KEY ("owner_user_id") REFERENCES "public"."users"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "recordings" ADD CONSTRAINT "recordings_stream_session_id_stream_sessions_id_fk" FOREIGN KEY ("stream_session_id") REFERENCES "public"."stream_sessions"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "recordings" ADD CONSTRAINT "recordings_camera_device_id_devices_id_fk" FOREIGN KEY ("camera_device_id") REFERENCES "public"."devices"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "recordings" ADD CONSTRAINT "recordings_requester_device_id_devices_id_fk" FOREIGN KEY ("requester_device_id") REFERENCES "public"."devices"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "recordings" ADD CONSTRAINT "recordings_event_id_events_id_fk" FOREIGN KEY ("event_id") REFERENCES "public"."events"("id") ON DELETE no action ON UPDATE no action; diff --git a/Backend/drizzle/meta/_journal.json b/Backend/drizzle/meta/_journal.json index 81506ec..4bb9cd9 100644 --- a/Backend/drizzle/meta/_journal.json +++ b/Backend/drizzle/meta/_journal.json @@ -64,6 +64,13 @@ "when": 1770415956419, "tag": "0008_media_plane_columns", "breakpoints": true + }, + { + "idx": 9, + "version": "7", + "when": 1770416956419, + "tag": "0009_recordings_pipeline", + "breakpoints": true } ] } diff --git a/Backend/index.ts b/Backend/index.ts index 499c238..27075d6 100644 --- a/Backend/index.ts +++ b/Backend/index.ts @@ -12,8 +12,10 @@ import deviceLinksRoutes from './routes/device-links'; import commandsRoutes from './routes/commands'; import eventsRoutes from './routes/events'; import streamsRoutes from './routes/streams'; +import recordingsRoutes from './routes/recordings'; import { setupRealtimeGateway } from './realtime/gateway'; import { ensureMinioBucket } from './utils/minio'; +import { startRecordingsWorker } from './workers/recordings'; const app = express(); const openApiDocument = buildOpenApiDocument(); @@ -39,6 +41,7 @@ app.use('/device-links', deviceLinksRoutes); app.use('/commands', commandsRoutes); app.use('/events', eventsRoutes); app.use('/streams', streamsRoutes); +app.use('/recordings', recordingsRoutes); app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => { console.error(err); @@ -57,6 +60,7 @@ const start = async () => { } setupRealtimeGateway(server); + startRecordingsWorker(); server.listen(port, () => { console.log(`Server is running on port ${port}`); diff --git a/Backend/public/mobile-sim.html b/Backend/public/mobile-sim.html index 46f93ae..b123096 100644 --- a/Backend/public/mobile-sim.html +++ b/Backend/public/mobile-sim.html @@ -159,6 +159,8 @@ + +

         
 
@@ -167,6 +169,7 @@
           
           
           
+          
           

         
       
@@ -185,6 +188,7 @@
         socket: null,
         lastMotionEventId: null,
         lastStreamSessionId: null,
+        lastRecordingId: null,
       };
 
       const $ = (id) => document.getElementById(id);
@@ -203,7 +207,11 @@
       const render = () => {
         $('deviceState').textContent = JSON.stringify({ device: state.device, hasToken: Boolean(state.deviceToken) }, null, 2);
         $('clientState').textContent = JSON.stringify({ lastStreamSessionId: state.lastStreamSessionId }, null, 2);
-        $('cameraState').textContent = JSON.stringify({ lastMotionEventId: state.lastMotionEventId }, null, 2);
+        $('cameraState').textContent = JSON.stringify(
+          { lastMotionEventId: state.lastMotionEventId, lastRecordingId: state.lastRecordingId },
+          null,
+          2,
+        );
       };
 
       const authFetch = async (url, options = {}) => {
@@ -425,6 +433,29 @@
         }
       });
 
+      $('listRecordingsBtn').addEventListener('click', async () => {
+        try {
+          const payload = await deviceFetch('/recordings/me/list');
+          if (payload.recordings?.length > 0) {
+            state.lastRecordingId = payload.recordings[0].id;
+            render();
+          }
+          log('recordings', payload);
+        } catch (error) {
+          log('list recordings failed', { error: error.message });
+        }
+      });
+
+      $('downloadLatestRecordingBtn').addEventListener('click', async () => {
+        try {
+          if (!state.lastRecordingId) throw new Error('No recording id available');
+          const payload = await deviceFetch(`/recordings/${state.lastRecordingId}/download-url`);
+          log('recording download url', payload);
+        } catch (error) {
+          log('recording download failed', { error: error.message });
+        }
+      });
+
       $('fetchSubscribeBtn').addEventListener('click', async () => {
         try {
           if (!state.lastStreamSessionId) throw new Error('No known stream session');
@@ -475,6 +506,34 @@
         }
       });
 
+      $('finalizeRecordingBtn').addEventListener('click', async () => {
+        try {
+          if (!state.lastStreamSessionId) throw new Error('No stream session yet');
+
+          const listPayload = await deviceFetch('/recordings/me/list');
+          const target = listPayload.recordings?.find((r) => r.streamSessionId === state.lastStreamSessionId) ?? listPayload.recordings?.[0];
+
+          if (!target) throw new Error('No recording placeholder exists');
+
+          state.lastRecordingId = target.id;
+          render();
+
+          const payload = await deviceFetch(`/recordings/${target.id}/finalize`, {
+            method: 'POST',
+            body: JSON.stringify({
+              objectKey: `recordings/${target.id}.mp4`,
+              bucket: 'videos',
+              durationSeconds: 12,
+              sizeBytes: 1024 * 1024 * 8,
+            }),
+          });
+
+          log('recording finalized', payload);
+        } catch (error) {
+          log('finalize recording failed', { error: error.message });
+        }
+      });
+
       render();
     
   
diff --git a/Backend/routes/recordings.ts b/Backend/routes/recordings.ts
new file mode 100644
index 0000000..af3a7d2
--- /dev/null
+++ b/Backend/routes/recordings.ts
@@ -0,0 +1,185 @@
+import { and, desc, eq } from 'drizzle-orm';
+import { Router } from 'express';
+import { z } from 'zod';
+
+import { db } from '../db/client';
+import { recordings, streamSessions } from '../db/schema';
+import { requireDeviceAuth } from '../middleware/device-auth';
+import { minioBucket, minioClient, minioPresignedExpirySeconds } from '../utils/minio';
+
+const router = Router();
+
+const listSchema = z.object({
+  status: z.string().optional(),
+  limit: z.coerce.number().int().min(1).max(100).default(25),
+});
+
+const finalizeSchema = z.object({
+  objectKey: z.string().trim().min(1),
+  bucket: z.string().trim().min(1).default(minioBucket),
+  durationSeconds: z.coerce.number().int().nonnegative().optional(),
+  sizeBytes: z.coerce.number().int().nonnegative().optional(),
+});
+
+const recordingParamSchema = z.object({
+  recordingId: z.string().uuid(),
+});
+
+router.get('/me/list', requireDeviceAuth, async (req, res) => {
+  const parsed = listSchema.safeParse(req.query);
+
+  if (!parsed.success) {
+    res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
+    return;
+  }
+
+  const deviceAuth = req.deviceAuth;
+
+  if (!deviceAuth) {
+    res.status(401).json({ message: 'Unauthorized' });
+    return;
+  }
+
+  const result = await db.query.recordings.findMany({
+    where: eq(recordings.ownerUserId, deviceAuth.userId),
+    orderBy: [desc(recordings.createdAt)],
+    limit: parsed.data.limit,
+  });
+
+  const filtered = parsed.data.status ? result.filter((recording) => recording.status === parsed.data.status) : result;
+
+  res.json({ count: filtered.length, recordings: filtered });
+});
+
+router.post('/:recordingId/finalize', requireDeviceAuth, async (req, res) => {
+  const parsedParams = recordingParamSchema.safeParse(req.params);
+
+  if (!parsedParams.success) {
+    res.status(400).json({ message: 'Invalid recordingId', errors: parsedParams.error.flatten() });
+    return;
+  }
+
+  const parsed = finalizeSchema.safeParse(req.body ?? {});
+
+  if (!parsed.success) {
+    res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
+    return;
+  }
+
+  const deviceAuth = req.deviceAuth;
+
+  if (!deviceAuth) {
+    res.status(401).json({ message: 'Unauthorized' });
+    return;
+  }
+
+  const recording = await db.query.recordings.findFirst({
+    where: and(eq(recordings.id, parsedParams.data.recordingId), eq(recordings.ownerUserId, deviceAuth.userId)),
+  });
+
+  if (!recording) {
+    res.status(404).json({ message: 'Recording not found' });
+    return;
+  }
+
+  if (recording.cameraDeviceId !== deviceAuth.deviceId) {
+    res.status(403).json({ message: 'Only camera device can finalize this recording' });
+    return;
+  }
+
+  const now = new Date();
+
+  const [updated] = await db
+    .update(recordings)
+    .set({
+      objectKey: parsed.data.objectKey,
+      bucket: parsed.data.bucket,
+      durationSeconds: parsed.data.durationSeconds,
+      sizeBytes: parsed.data.sizeBytes,
+      status: 'ready',
+      availableAt: now,
+      updatedAt: now,
+      error: null,
+    })
+    .where(eq(recordings.id, recording.id))
+    .returning();
+
+  res.json({ message: 'Recording finalized', recording: updated });
+});
+
+router.get('/:recordingId/download-url', requireDeviceAuth, async (req, res) => {
+  const parsedParams = recordingParamSchema.safeParse(req.params);
+
+  if (!parsedParams.success) {
+    res.status(400).json({ message: 'Invalid recordingId', errors: parsedParams.error.flatten() });
+    return;
+  }
+
+  const deviceAuth = req.deviceAuth;
+
+  if (!deviceAuth) {
+    res.status(401).json({ message: 'Unauthorized' });
+    return;
+  }
+
+  const recording = await db.query.recordings.findFirst({
+    where: and(eq(recordings.id, parsedParams.data.recordingId), eq(recordings.ownerUserId, deviceAuth.userId)),
+  });
+
+  if (!recording) {
+    res.status(404).json({ message: 'Recording not found' });
+    return;
+  }
+
+  const canAccess = recording.requesterDeviceId === deviceAuth.deviceId || recording.cameraDeviceId === deviceAuth.deviceId;
+
+  if (!canAccess) {
+    res.status(403).json({ message: 'Device cannot access this recording' });
+    return;
+  }
+
+  if (recording.status !== 'ready' || !recording.objectKey || !recording.bucket) {
+    res.status(409).json({ message: 'Recording is not available yet' });
+    return;
+  }
+
+  const downloadUrl = await minioClient.presignedGetObject(
+    recording.bucket,
+    recording.objectKey,
+    minioPresignedExpirySeconds,
+  );
+
+  res.json({
+    recordingId: recording.id,
+    objectKey: recording.objectKey,
+    bucket: recording.bucket,
+    downloadUrl,
+    expiresInSeconds: minioPresignedExpirySeconds,
+  });
+});
+
+// Internal helper used by stream lifecycle to create recording placeholder rows.
+export const createRecordingForStream = async (streamSessionId: string): Promise => {
+  const stream = await db.query.streamSessions.findFirst({ where: eq(streamSessions.id, streamSessionId) });
+
+  if (!stream) {
+    return;
+  }
+
+  const existing = await db.query.recordings.findFirst({ where: eq(recordings.streamSessionId, stream.id) });
+
+  if (existing) {
+    return;
+  }
+
+  await db.insert(recordings).values({
+    ownerUserId: stream.ownerUserId,
+    streamSessionId: stream.id,
+    cameraDeviceId: stream.cameraDeviceId,
+    requesterDeviceId: stream.requesterDeviceId,
+    status: 'awaiting_upload',
+    updatedAt: new Date(),
+  });
+};
+
+export default router;
diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts
index fae2fd8..1f5d524 100644
--- a/Backend/routes/streams.ts
+++ b/Backend/routes/streams.ts
@@ -9,6 +9,7 @@ import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/sche
 import { mediaProvider } from '../media/service';
 import { requireDeviceAuth } from '../middleware/device-auth';
 import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
+import { createRecordingForStream } from './recordings';
 
 const router = Router();
 
@@ -402,6 +403,8 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
     .where(eq(streamSessions.id, session.id))
     .returning();
 
+  await createRecordingForStream(session.id);
+
   sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
     streamSessionId: session.id,
     status: parsed.data.reason,
diff --git a/Backend/workers/recordings.ts b/Backend/workers/recordings.ts
new file mode 100644
index 0000000..a4abc3d
--- /dev/null
+++ b/Backend/workers/recordings.ts
@@ -0,0 +1,36 @@
+import { and, eq, lt } from 'drizzle-orm';
+
+import { db } from '../db/client';
+import { recordings } from '../db/schema';
+
+const STALE_RECORDING_SECONDS = Number(process.env.RECORDING_STALE_SECONDS ?? 60 * 30);
+
+export const startRecordingsWorker = (): void => {
+  const intervalMs = Number(process.env.RECORDING_WORKER_INTERVAL_MS ?? 30_000);
+
+  setInterval(() => {
+    reconcileStaleRecordings().catch((error) => {
+      console.error('recordings worker failed', error);
+    });
+  }, intervalMs);
+};
+
+const reconcileStaleRecordings = async (): Promise => {
+  const staleBefore = new Date(Date.now() - STALE_RECORDING_SECONDS * 1000);
+
+  const stale = await db.query.recordings.findMany({
+    where: and(eq(recordings.status, 'awaiting_upload'), lt(recordings.createdAt, staleBefore)),
+    limit: 100,
+  });
+
+  for (const recording of stale) {
+    await db
+      .update(recordings)
+      .set({
+        status: 'failed',
+        error: 'recording upload timeout',
+        updatedAt: new Date(),
+      })
+      .where(eq(recordings.id, recording.id));
+  }
+};