import { and, desc, eq } from 'drizzle-orm'; import { Router } from 'express'; import { z } from 'zod'; import { db } from '../db/client'; import { recordings, streamSessions } from '../db/schema'; import { requireDeviceAuth } from '../middleware/device-auth'; import { writeAuditLog } from '../services/audit'; import { ensureMinioBucket, minioBucket, minioClient, minioPresignedExpirySeconds } from '../utils/minio'; const router = Router(); const listSchema = z.object({ status: z.string().optional(), limit: z.coerce.number().int().min(1).max(100).default(25), }); const finalizeSchema = z.object({ objectKey: z.string().trim().min(1), bucket: z.string().trim().min(1).default(minioBucket), durationSeconds: z.coerce.number().int().nonnegative().optional(), sizeBytes: z.coerce.number().int().nonnegative().optional(), }); const recordingParamSchema = z.object({ recordingId: z.string().uuid(), }); const isMissingStorageObjectError = (error: unknown): boolean => { if (!error || typeof error !== 'object') { return false; } const code = 'code' in error ? String((error as { code?: unknown }).code) : ''; return code === 'NoSuchKey' || code === 'NotFound'; }; router.get('/me/list', requireDeviceAuth, async (req, res) => { const parsed = listSchema.safeParse(req.query); if (!parsed.success) { res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); return; } const deviceAuth = req.deviceAuth; if (!deviceAuth) { res.status(401).json({ message: 'Unauthorized' }); return; } const result = await db.query.recordings.findMany({ where: eq(recordings.ownerUserId, deviceAuth.userId), orderBy: [desc(recordings.createdAt)], limit: parsed.data.limit, }); const filtered = parsed.data.status ? result.filter((recording) => recording.status === parsed.data.status) : result; res.json({ count: filtered.length, recordings: filtered }); }); router.post('/:recordingId/finalize', requireDeviceAuth, async (req, res) => { const parsedParams = recordingParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid recordingId', errors: parsedParams.error.flatten() }); return; } const parsed = finalizeSchema.safeParse(req.body ?? {}); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = req.deviceAuth; if (!deviceAuth) { res.status(401).json({ message: 'Unauthorized' }); return; } const recording = await db.query.recordings.findFirst({ where: and(eq(recordings.id, parsedParams.data.recordingId), eq(recordings.ownerUserId, deviceAuth.userId)), }); if (!recording) { res.status(404).json({ message: 'Recording not found' }); return; } if (recording.cameraDeviceId !== deviceAuth.deviceId) { res.status(403).json({ message: 'Only camera device can finalize this recording' }); return; } const now = new Date(); const bucket = parsed.data.bucket; const objectKey = parsed.data.objectKey; await ensureMinioBucket(); try { await minioClient.statObject(bucket, objectKey); } catch (error) { if (objectKey.startsWith('sim/')) { const placeholder = Buffer.from( JSON.stringify({ message: 'simulated recording placeholder', recordingId: recording.id, streamSessionId: recording.streamSessionId, createdAt: now.toISOString(), }), 'utf8', ); await minioClient.putObject(bucket, objectKey, placeholder, placeholder.byteLength, { 'Content-Type': 'application/json', }); } else if (isMissingStorageObjectError(error)) { res.status(409).json({ message: 'Recording object does not exist in storage yet' }); return; } else { throw error; } } const [updated] = await db .update(recordings) .set({ objectKey, bucket, durationSeconds: parsed.data.durationSeconds, sizeBytes: parsed.data.sizeBytes, status: 'ready', availableAt: now, updatedAt: now, error: null, }) .where(eq(recordings.id, recording.id)) .returning(); res.json({ message: 'Recording finalized', recording: updated }); await writeAuditLog({ ownerUserId: recording.ownerUserId, actorDeviceId: recording.cameraDeviceId, action: 'recording.finalized', targetType: 'recording', targetId: recording.id, metadata: { objectKey: parsed.data.objectKey, bucket: parsed.data.bucket }, ipAddress: req.ip, }); }); router.get('/:recordingId/download-url', requireDeviceAuth, async (req, res) => { const parsedParams = recordingParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid recordingId', errors: parsedParams.error.flatten() }); return; } const deviceAuth = req.deviceAuth; if (!deviceAuth) { res.status(401).json({ message: 'Unauthorized' }); return; } const recording = await db.query.recordings.findFirst({ where: and(eq(recordings.id, parsedParams.data.recordingId), eq(recordings.ownerUserId, deviceAuth.userId)), }); if (!recording) { res.status(404).json({ message: 'Recording not found' }); return; } const canAccess = recording.requesterDeviceId === deviceAuth.deviceId || recording.cameraDeviceId === deviceAuth.deviceId; if (!canAccess) { res.status(403).json({ message: 'Device cannot access this recording' }); return; } if (recording.status !== 'ready' || !recording.objectKey || !recording.bucket) { res.status(409).json({ message: 'Recording is not available yet' }); return; } try { await minioClient.statObject(recording.bucket, recording.objectKey); } catch (error) { if (isMissingStorageObjectError(error)) { res.status(409).json({ message: 'Recording file is missing from storage' }); return; } throw error; } const downloadUrl = await minioClient.presignedGetObject( recording.bucket, recording.objectKey, minioPresignedExpirySeconds, ); res.json({ recordingId: recording.id, objectKey: recording.objectKey, bucket: recording.bucket, downloadUrl, expiresInSeconds: minioPresignedExpirySeconds, }); await writeAuditLog({ ownerUserId: recording.ownerUserId, actorDeviceId: deviceAuth.deviceId, action: 'recording.download_url_issued', targetType: 'recording', targetId: recording.id, metadata: { objectKey: recording.objectKey, bucket: recording.bucket }, ipAddress: req.ip, }); }); // Internal helper used by stream lifecycle to create recording placeholder rows. export const createRecordingForStream = async (streamSessionId: string): Promise => { const stream = await db.query.streamSessions.findFirst({ where: eq(streamSessions.id, streamSessionId) }); if (!stream) { return; } const existing = await db.query.recordings.findFirst({ where: eq(recordings.streamSessionId, stream.id) }); if (existing) { return; } await db.insert(recordings).values({ ownerUserId: stream.ownerUserId, streamSessionId: stream.id, cameraDeviceId: stream.cameraDeviceId, requesterDeviceId: stream.requesterDeviceId, status: 'awaiting_upload', updatedAt: new Date(), }); }; export default router;