import { randomUUID } from 'crypto'; import { and, desc, eq, or } from 'drizzle-orm'; import { Router } from 'express'; import { z } from 'zod'; import { db } from '../db/client'; import { mediaMode, simpleStreamingEnabled, streamRecordingEnabled } from '../media/config'; import { commands, deviceLinks, devices, streamSessions } from '../db/schema'; import { createLiveMediaSession, mediaProvider } from '../media/service'; import { sfuService } from '../media/sfu/service'; import { requireDeviceAuth } from '../middleware/device-auth'; import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; import { writeAuditLog } from '../services/audit'; import { enqueuePushNotification } from '../services/push'; import { createStreamEndedPayload, createStreamRequestedPayload, createStreamStartedPayload, toSimpleStreamSessionResponse, } from '../streaming/simple'; import { createRecordingForStream } from './recordings'; const router = Router(); const requestStreamSchema = z.object({ cameraDeviceId: z.string().uuid(), reason: z.enum(['on_demand', 'motion_follow_up']).default('on_demand'), metadata: z.record(z.string(), z.unknown()).optional(), }); const acceptStreamSchema = z.object({ streamKey: z.string().trim().min(1).max(255).optional(), metadata: z.record(z.string(), z.unknown()).optional(), }); const endStreamSchema = z.object({ reason: z.enum(['completed', 'cancelled', 'failed']).default('completed'), }); const streamParamSchema = z.object({ streamSessionId: z.string().uuid(), }); const sfuTransportRequestSchema = z.object({ role: z.enum(['camera', 'viewer']).optional(), }); const listSchema = z.object({ status: z.string().optional(), limit: z.coerce.number().int().min(1).max(100).default(25), }); router.get('/me/list', requireDeviceAuth, async (req, res) => { const parsed = listSchema.safeParse(req.query); if (!parsed.success) { res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); return; } const deviceAuth = req.deviceAuth; if (!deviceAuth) { res.status(401).json({ message: 'Unauthorized' }); return; } const sessions = await db.query.streamSessions.findMany({ where: and( eq(streamSessions.ownerUserId, deviceAuth.userId), or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)), ), orderBy: [desc(streamSessions.createdAt)], limit: parsed.data.limit, }); const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions; res.json({ count: filtered.length, streamSessions: filtered }); }); const ensureDeviceAuth = (req: Parameters[0], res: Parameters[1]) => { const deviceAuth = req.deviceAuth; if (!deviceAuth) { res.status(401).json({ message: 'Unauthorized' }); return null; } return deviceAuth; }; const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: string) => await db.query.streamSessions.findFirst({ where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)), }); router.post('/request', requireDeviceAuth, async (req, res) => { const parsed = requestStreamSchema.safeParse(req.body ?? {}); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; const [sourceDevice, cameraDevice] = await Promise.all([ db.query.devices.findFirst({ where: and(eq(devices.id, deviceAuth.deviceId), eq(devices.userId, deviceAuth.userId)), }), db.query.devices.findFirst({ where: and(eq(devices.id, parsed.data.cameraDeviceId), eq(devices.userId, deviceAuth.userId)), }), ]); if (!sourceDevice || !cameraDevice) { res.status(404).json({ message: 'Source or camera device not found' }); return; } if (sourceDevice.role !== 'client') { res.status(403).json({ message: 'Only client devices can request on-demand stream sessions' }); return; } if (cameraDevice.role !== 'camera') { res.status(400).json({ message: 'cameraDeviceId must point to a camera device' }); return; } const link = await db.query.deviceLinks.findFirst({ where: and( eq(deviceLinks.ownerUserId, deviceAuth.userId), eq(deviceLinks.cameraDeviceId, cameraDevice.id), eq(deviceLinks.clientDeviceId, sourceDevice.id), eq(deviceLinks.status, 'active'), ), }); if (!link) { res.status(403).json({ message: 'No active link between requester and camera' }); return; } const now = new Date(); const [session] = await db .insert(streamSessions) .values({ ownerUserId: deviceAuth.userId, cameraDeviceId: cameraDevice.id, requesterDeviceId: sourceDevice.id, status: 'requested', reason: parsed.data.reason, metadata: parsed.data.metadata ?? null, mediaProvider: mediaProvider.name, updatedAt: now, }) .returning(); if (!session) { res.status(500).json({ message: 'Failed creating stream session' }); return; } if (simpleStreamingEnabled) { const requestPayload = createStreamRequestedPayload(session); const deliveredToCamera = sendRealtimeToDevice(cameraDevice.id, 'stream:requested', requestPayload); console.info('[stream.request]', { streamSessionId: session.id, requesterDeviceId: sourceDevice.id, cameraDeviceId: cameraDevice.id, mode: 'simple', }); sendRealtimeToDevice(sourceDevice.id, 'stream:requested', requestPayload); if (!deliveredToCamera) { await enqueuePushNotification({ ownerUserId: cameraDevice.userId, recipientDeviceId: cameraDevice.id, type: 'stream_requested', payload: requestPayload, }); } res.status(201).json({ message: 'Stream request sent', streamSession: toSimpleStreamSessionResponse(session), }); await writeAuditLog({ ownerUserId: sourceDevice.userId, actorDeviceId: sourceDevice.id, action: 'stream.requested', targetType: 'stream_session', targetId: session.id, metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason, transport: 'webrtc' }, ipAddress: req.ip, }); return; } const [command] = await db .insert(commands) .values({ ownerUserId: deviceAuth.userId, sourceDeviceId: sourceDevice.id, targetDeviceId: cameraDevice.id, commandType: 'start_stream', payload: { streamSessionId: session.id, reason: session.reason, }, status: 'queued', retryCount: 0, lastDispatchedAt: now, updatedAt: now, }) .returning(); if (!command) { res.status(500).json({ message: 'Failed creating stream command' }); return; } await dispatchCommandById(command.id); console.info('[stream.request]', { streamSessionId: session.id, requesterDeviceId: sourceDevice.id, cameraDeviceId: cameraDevice.id, mode: 'legacy', commandId: command.id, }); const refreshedCommand = await db.query.commands.findFirst({ where: eq(commands.id, command.id) }); const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', { streamSessionId: session.id, cameraDeviceId: cameraDevice.id, status: session.status, reason: session.reason, }); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: sourceDevice.userId, recipientDeviceId: sourceDevice.id, type: 'stream_requested', payload: { streamSessionId: session.id, cameraDeviceId: cameraDevice.id, }, }); } res.status(201).json({ message: 'Stream request sent', streamSession: session, command: refreshedCommand ?? command, }); await writeAuditLog({ ownerUserId: sourceDevice.userId, actorDeviceId: sourceDevice.id, action: 'stream.requested', targetType: 'stream_session', targetId: session.id, metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason }, ipAddress: req.ip, }); }); router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const parsed = acceptStreamSchema.safeParse(req.body ?? {}); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; const session = await db.query.streamSessions.findFirst({ where: and( eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId), ), }); if (!session) { res.status(404).json({ message: 'Stream session not found for this camera device' }); return; } if (session.status !== 'requested' && session.status !== 'starting') { res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` }); return; } const now = new Date(); const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; const mediaSession = await createLiveMediaSession({ streamSessionId: session.id, ownerUserId: session.ownerUserId, cameraDeviceId: session.cameraDeviceId, requesterDeviceId: session.requesterDeviceId, }); const [updated] = await db .update(streamSessions) .set({ status: 'streaming', streamKey: mediaSession ? streamKey : null, mediaProvider: mediaSession?.provider ?? 'simple', mediaSessionId: mediaSession?.mediaSessionId ?? null, publishEndpoint: mediaSession?.publishUrl ?? null, subscribeEndpoint: mediaSession?.subscribeUrl ?? null, metadata: parsed.data.metadata ?? session.metadata, startedAt: now, updatedAt: now, }) .where(eq(streamSessions.id, session.id)) .returning(); if (!updated) { res.status(500).json({ message: 'Failed to update stream session' }); return; } if (sfuService) { try { await sfuService.startSession({ streamSessionId: updated.id, ownerUserId: updated.ownerUserId, cameraDeviceId: updated.cameraDeviceId, requesterDeviceId: updated.requesterDeviceId, }); await sfuService.setSessionState(updated.id, 'live'); } catch (error) { console.error('Failed starting SFU session', error); res.status(500).json({ message: 'Failed to initialize SFU session' }); return; } } const startedPayload = createStreamStartedPayload(updated); console.info('[stream.accept]', { streamSessionId: updated.id, requesterDeviceId: updated.requesterDeviceId, cameraDeviceId: updated.cameraDeviceId, mode: mediaSession ? 'legacy' : 'simple', }); const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.requesterDeviceId, type: 'stream_started', payload: startedPayload, }); } res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) }); await writeAuditLog({ ownerUserId: session.ownerUserId, actorDeviceId: session.cameraDeviceId, action: 'stream.accepted', targetType: 'stream_session', targetId: session.id, metadata: mediaSession ? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider } : { transport: 'webrtc' }, ipAddress: req.ip, }); }); router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; if (simpleStreamingEnabled) { res.status(409).json({ message: 'SIMPLE_STREAMING does not use publish credentials' }); return; } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } if (session.cameraDeviceId !== deviceAuth.deviceId) { res.status(403).json({ message: 'Only camera device can request publish credentials' }); return; } if (!session.mediaSessionId || session.status !== 'streaming') { res.status(409).json({ message: 'Stream session is not ready for publish credentials' }); return; } const credentials = await mediaProvider.issuePublishCredentials({ mediaSessionId: session.mediaSessionId, cameraDeviceId: session.cameraDeviceId, ownerUserId: session.ownerUserId, }); res.json(credentials); await writeAuditLog({ ownerUserId: session.ownerUserId, actorDeviceId: session.cameraDeviceId, action: 'stream.publish_credentials_issued', targetType: 'stream_session', targetId: session.id, metadata: { mediaProvider: credentials.provider }, ipAddress: req.ip, }); }); router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; if (simpleStreamingEnabled) { res.status(409).json({ message: 'SIMPLE_STREAMING does not use subscribe credentials' }); return; } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } const isRequester = session.requesterDeviceId === deviceAuth.deviceId; const isCamera = session.cameraDeviceId === deviceAuth.deviceId; if (!isRequester && !isCamera) { res.status(403).json({ message: 'Device cannot request subscribe credentials for this stream' }); return; } if (!session.mediaSessionId || session.status !== 'streaming') { res.status(409).json({ message: 'Stream is not active yet' }); return; } const credentials = await mediaProvider.issueSubscribeCredentials({ mediaSessionId: session.mediaSessionId, viewerDeviceId: deviceAuth.deviceId, ownerUserId: session.ownerUserId, }); res.json(credentials); await writeAuditLog({ ownerUserId: session.ownerUserId, actorDeviceId: deviceAuth.deviceId, action: 'stream.subscribe_credentials_issued', targetType: 'stream_session', targetId: session.id, metadata: { mediaProvider: credentials.provider }, ipAddress: req.ip, }); }); router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; if (!sfuService) { res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); return; } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; if (!isParticipant) { res.status(403).json({ message: 'Device cannot access SFU session details for this stream' }); return; } const sfuSession = await sfuService.getSession(session.id); res.json({ streamSessionId: session.id, mediaMode, sfuSession, }); }); router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); if (!parsedBody.success) { res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; if (!sfuService) { res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); return; } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } if (session.status !== 'streaming') { res.status(409).json({ message: 'Stream must be active before creating publish transport' }); return; } if (session.cameraDeviceId !== deviceAuth.deviceId) { res.status(403).json({ message: 'Only camera device can create publish transport' }); return; } const transport = await sfuService.createPublishTransport({ streamSessionId: session.id, cameraDeviceId: deviceAuth.deviceId, }); await sfuService.setSessionState(session.id, 'live'); res.json({ streamSessionId: session.id, mediaMode, transport, }); }); router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); if (!parsedBody.success) { res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; if (!sfuService) { res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); return; } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } if (session.status !== 'streaming') { res.status(409).json({ message: 'Stream must be active before creating subscribe transport' }); return; } const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; if (!isParticipant) { res.status(403).json({ message: 'Device cannot create subscribe transport for this stream' }); return; } const transport = await sfuService.createSubscribeTransport({ streamSessionId: session.id, viewerDeviceId: deviceAuth.deviceId, }); await sfuService.setSessionState(session.id, 'live'); res.json({ streamSessionId: session.id, mediaMode, transport, }); }); router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const parsed = endStreamSchema.safeParse(req.body ?? {}); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } const canEnd = session.cameraDeviceId === deviceAuth.deviceId || session.requesterDeviceId === deviceAuth.deviceId; if (!canEnd) { res.status(403).json({ message: 'Only requester or camera device can end this stream' }); return; } const now = new Date(); const nextStatus = simpleStreamingEnabled ? 'ended' : parsed.data.reason; const nextMetadata = simpleStreamingEnabled && parsed.data.reason !== 'completed' ? { ...(session.metadata ?? {}), endReason: parsed.data.reason, } : session.metadata; const [updated] = await db .update(streamSessions) .set({ status: nextStatus, endedAt: now, metadata: nextMetadata, updatedAt: now, }) .where(eq(streamSessions.id, session.id)) .returning(); if (!updated) { res.status(500).json({ message: 'Failed to update stream session' }); return; } if (sfuService) { try { await sfuService.endSession(session.id); } catch (error) { console.error('Failed ending SFU session', error); } } if (streamRecordingEnabled) { await createRecordingForStream(session.id); } const endedPayload = simpleStreamingEnabled ? createStreamEndedPayload({ streamSessionId: session.id, cameraDeviceId: session.cameraDeviceId, requesterDeviceId: session.requesterDeviceId, endedAt: now, reason: parsed.data.reason, }) : { streamSessionId: session.id, status: parsed.data.reason, endedAt: now, }; console.info('[stream.end]', { streamSessionId: session.id, requesterDeviceId: session.requesterDeviceId, cameraDeviceId: session.cameraDeviceId, reason: parsed.data.reason, status: simpleStreamingEnabled ? 'ended' : parsed.data.reason, }); const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', endedPayload); const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', endedPayload); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.requesterDeviceId, type: 'stream_ended', payload: endedPayload, }); } if (!deliveredToCamera) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.cameraDeviceId, type: 'stream_ended', payload: endedPayload, }); } res.json({ message: 'Stream ended', streamSession: toSimpleStreamSessionResponse(updated) }); }); router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; if (simpleStreamingEnabled) { res.status(409).json({ message: 'SIMPLE_STREAMING does not issue playback tokens' }); return; } const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } const isRequester = session.requesterDeviceId === deviceAuth.deviceId; const isCamera = session.cameraDeviceId === deviceAuth.deviceId; if (!isRequester && !isCamera) { res.status(403).json({ message: 'Device cannot request playback token for this stream' }); return; } if (!session.streamKey || !session.mediaSessionId || session.status !== 'streaming') { res.status(409).json({ message: 'Stream is not active yet' }); return; } const credentials = await mediaProvider.issueSubscribeCredentials({ mediaSessionId: session.mediaSessionId, viewerDeviceId: deviceAuth.deviceId, ownerUserId: deviceAuth.userId, }); res.json({ streamSessionId: session.id, streamKey: session.streamKey, status: session.status, playbackToken: credentials.subscribeToken, subscribeUrl: credentials.subscribeUrl, mediaProvider: credentials.provider, expiresInSeconds: credentials.expiresInSeconds, }); }); export default router;