diff --git a/Backend/routes/recordings.ts b/Backend/routes/recordings.ts index 627ae28..4f15119 100644 --- a/Backend/routes/recordings.ts +++ b/Backend/routes/recordings.ts @@ -3,7 +3,7 @@ import { Router } from 'express'; import { z } from 'zod'; import { db } from '../db/client'; -import { recordings, streamSessions } from '../db/schema'; +import { recordings } from '../db/schema'; import { requireDeviceAuth } from '../middleware/device-auth'; import { writeAuditLog } from '../services/audit'; import { ensureMinioBucket, minioBucket, minioClient, minioPresignedExpirySeconds } from '../utils/minio'; @@ -120,27 +120,7 @@ router.post('/:recordingId/finalize', requireDeviceAuth, async (req, res) => { objectKey, }); } catch (error) { - if (objectKey.startsWith('sim/')) { - console.warn('[recording.finalize] creating simulator fallback object', { - recordingId: recording.id, - bucket, - objectKey, - error: error instanceof Error ? error.message : String(error), - }); - const placeholder = Buffer.from( - JSON.stringify({ - message: 'simulated recording placeholder', - recordingId: recording.id, - streamSessionId: recording.streamSessionId, - createdAt: now.toISOString(), - }), - 'utf8', - ); - - await minioClient.putObject(bucket, objectKey, placeholder, placeholder.byteLength, { - 'Content-Type': 'application/json', - }); - } else if (isMissingStorageObjectError(error)) { + if (isMissingStorageObjectError(error)) { console.warn('[recording.finalize] storage object missing', { recordingId: recording.id, streamSessionId: recording.streamSessionId, @@ -150,15 +130,15 @@ router.post('/:recordingId/finalize', requireDeviceAuth, async (req, res) => { }); res.status(409).json({ message: 'Recording object does not exist in storage yet' }); return; - } else { - console.error('[recording.finalize] storage verification failed', { - recordingId: recording.id, - bucket, - objectKey, - error: error instanceof Error ? error.message : String(error), - }); - throw error; } + + console.error('[recording.finalize] storage verification failed', { + recordingId: recording.id, + bucket, + objectKey, + error: error instanceof Error ? error.message : String(error), + }); + throw error; } const [updated] = await db @@ -272,27 +252,4 @@ router.get('/:recordingId/download-url', requireDeviceAuth, async (req, res) => }); }); -export const createRecordingForStream = async (streamSessionId: string): Promise => { - const stream = await db.query.streamSessions.findFirst({ where: eq(streamSessions.id, streamSessionId) }); - - if (!stream) { - return; - } - - const existing = await db.query.recordings.findFirst({ where: eq(recordings.streamSessionId, stream.id) }); - - if (existing) { - return; - } - - await db.insert(recordings).values({ - ownerUserId: stream.ownerUserId, - streamSessionId: stream.id, - cameraDeviceId: stream.cameraDeviceId, - requesterDeviceId: stream.requesterDeviceId, - status: 'awaiting_upload', - updatedAt: new Date(), - }); -}; - export default router; diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts index 2e76b60..fb62af6 100644 --- a/Backend/routes/streams.ts +++ b/Backend/routes/streams.ts @@ -1,811 +1,19 @@ -import { randomUUID } from 'crypto'; - -import { and, desc, eq, or } from 'drizzle-orm'; import { Router } from 'express'; -import { z } from 'zod'; -import { db } from '../db/client'; -import { mediaMode, simpleStreamingEnabled, streamRecordingEnabled } from '../media/config'; -import { commands, deviceLinks, devices, streamSessions } from '../db/schema'; -import { createLiveMediaSession, mediaProvider } from '../media/service'; -import { sfuService } from '../media/sfu/service'; -import { requireDeviceAuth } from '../middleware/device-auth'; -import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; -import { writeAuditLog } from '../services/audit'; -import { enqueuePushNotification } from '../services/push'; -import { - createStreamEndedPayload, - createStreamRequestedPayload, - createStreamStartedPayload, - toSimpleStreamSessionResponse, -} from '../streaming/simple'; -import { createRecordingForStream } from './recordings'; +import acceptRouter from './streams/accept'; +import credentialsRouter from './streams/credentials'; +import endRouter from './streams/end'; +import listRouter from './streams/list'; +import requestRouter from './streams/request'; +import sfuRouter from './streams/sfu'; const router = Router(); -const requestStreamSchema = z.object({ - cameraDeviceId: z.string().uuid(), - reason: z.enum(['on_demand', 'motion_follow_up']).default('on_demand'), - metadata: z.record(z.string(), z.unknown()).optional(), -}); - -const acceptStreamSchema = z.object({ - streamKey: z.string().trim().min(1).max(255).optional(), - metadata: z.record(z.string(), z.unknown()).optional(), -}); - -const endStreamSchema = z.object({ - reason: z.enum(['completed', 'cancelled', 'failed']).default('completed'), -}); - -const streamParamSchema = z.object({ - streamSessionId: z.string().uuid(), -}); - -const sfuTransportRequestSchema = z.object({ - role: z.enum(['camera', 'viewer']).optional(), -}); - -const listSchema = z.object({ - status: z.string().optional(), - limit: z.coerce.number().int().min(1).max(100).default(25), -}); - -const shouldCreateRecordingPlaceholder = (): boolean => mediaMode === 'legacy' || streamRecordingEnabled; - -router.get('/me/list', requireDeviceAuth, async (req, res) => { - const parsed = listSchema.safeParse(req.query); - - if (!parsed.success) { - res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); - return; - } - - const deviceAuth = req.deviceAuth; - - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } - - const sessions = await db.query.streamSessions.findMany({ - where: and( - eq(streamSessions.ownerUserId, deviceAuth.userId), - or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)), - ), - orderBy: [desc(streamSessions.createdAt)], - limit: parsed.data.limit, - }); - - const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions; - - res.json({ count: filtered.length, streamSessions: filtered }); -}); - -const ensureDeviceAuth = (req: Parameters[0], res: Parameters[1]) => { - const deviceAuth = req.deviceAuth; - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return null; - } - return deviceAuth; -}; - -const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: string) => - await db.query.streamSessions.findFirst({ - where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)), - }); - -router.post('/request', requireDeviceAuth, async (req, res) => { - const parsed = requestStreamSchema.safeParse(req.body ?? {}); - - if (!parsed.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const [sourceDevice, cameraDevice] = await Promise.all([ - db.query.devices.findFirst({ - where: and(eq(devices.id, deviceAuth.deviceId), eq(devices.userId, deviceAuth.userId)), - }), - db.query.devices.findFirst({ - where: and(eq(devices.id, parsed.data.cameraDeviceId), eq(devices.userId, deviceAuth.userId)), - }), - ]); - - if (!sourceDevice || !cameraDevice) { - res.status(404).json({ message: 'Source or camera device not found' }); - return; - } - - if (sourceDevice.role !== 'client') { - res.status(403).json({ message: 'Only client devices can request on-demand stream sessions' }); - return; - } - - if (cameraDevice.role !== 'camera') { - res.status(400).json({ message: 'cameraDeviceId must point to a camera device' }); - return; - } - - const link = await db.query.deviceLinks.findFirst({ - where: and( - eq(deviceLinks.ownerUserId, deviceAuth.userId), - eq(deviceLinks.cameraDeviceId, cameraDevice.id), - eq(deviceLinks.clientDeviceId, sourceDevice.id), - eq(deviceLinks.status, 'active'), - ), - }); - - if (!link) { - res.status(403).json({ message: 'No active link between requester and camera' }); - return; - } - - const now = new Date(); - - const [session] = await db - .insert(streamSessions) - .values({ - ownerUserId: deviceAuth.userId, - cameraDeviceId: cameraDevice.id, - requesterDeviceId: sourceDevice.id, - status: 'requested', - reason: parsed.data.reason, - metadata: parsed.data.metadata ?? null, - mediaProvider: mediaProvider.name, - updatedAt: now, - }) - .returning(); - - if (!session) { - res.status(500).json({ message: 'Failed creating stream session' }); - return; - } - - if (simpleStreamingEnabled) { - const requestPayload = createStreamRequestedPayload(session); - const deliveredToCamera = sendRealtimeToDevice(cameraDevice.id, 'stream:requested', requestPayload); - - console.info('[stream.request]', { - streamSessionId: session.id, - requesterDeviceId: sourceDevice.id, - cameraDeviceId: cameraDevice.id, - mode: 'simple', - }); - - sendRealtimeToDevice(sourceDevice.id, 'stream:requested', requestPayload); - - if (!deliveredToCamera) { - await enqueuePushNotification({ - ownerUserId: cameraDevice.userId, - recipientDeviceId: cameraDevice.id, - type: 'stream_requested', - payload: requestPayload, - }); - } - - res.status(201).json({ - message: 'Stream request sent', - streamSession: toSimpleStreamSessionResponse(session), - }); - - await writeAuditLog({ - ownerUserId: sourceDevice.userId, - actorDeviceId: sourceDevice.id, - action: 'stream.requested', - targetType: 'stream_session', - targetId: session.id, - metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason, transport: 'webrtc' }, - ipAddress: req.ip, - }); - return; - } - - const [command] = await db - .insert(commands) - .values({ - ownerUserId: deviceAuth.userId, - sourceDeviceId: sourceDevice.id, - targetDeviceId: cameraDevice.id, - commandType: 'start_stream', - payload: { - streamSessionId: session.id, - reason: session.reason, - }, - status: 'queued', - retryCount: 0, - lastDispatchedAt: now, - updatedAt: now, - }) - .returning(); - - if (!command) { - res.status(500).json({ message: 'Failed creating stream command' }); - return; - } - - await dispatchCommandById(command.id); - console.info('[stream.request]', { - streamSessionId: session.id, - requesterDeviceId: sourceDevice.id, - cameraDeviceId: cameraDevice.id, - mode: 'legacy', - commandId: command.id, - }); - - const refreshedCommand = await db.query.commands.findFirst({ where: eq(commands.id, command.id) }); - - const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', { - streamSessionId: session.id, - cameraDeviceId: cameraDevice.id, - status: session.status, - reason: session.reason, - }); - - if (!deliveredToRequester) { - await enqueuePushNotification({ - ownerUserId: sourceDevice.userId, - recipientDeviceId: sourceDevice.id, - type: 'stream_requested', - payload: { - streamSessionId: session.id, - cameraDeviceId: cameraDevice.id, - }, - }); - } - - res.status(201).json({ - message: 'Stream request sent', - streamSession: session, - command: refreshedCommand ?? command, - }); - - await writeAuditLog({ - ownerUserId: sourceDevice.userId, - actorDeviceId: sourceDevice.id, - action: 'stream.requested', - targetType: 'stream_session', - targetId: session.id, - metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason }, - ipAddress: req.ip, - }); -}); - -router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsed = acceptStreamSchema.safeParse(req.body ?? {}); - - if (!parsed.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const session = await db.query.streamSessions.findFirst({ - where: and( - eq(streamSessions.id, parsedParams.data.streamSessionId), - eq(streamSessions.ownerUserId, deviceAuth.userId), - eq(streamSessions.cameraDeviceId, deviceAuth.deviceId), - ), - }); - - if (!session) { - res.status(404).json({ message: 'Stream session not found for this camera device' }); - return; - } - - if (session.status !== 'requested' && session.status !== 'starting') { - res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` }); - return; - } - - const now = new Date(); - const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; - const mediaSession = await createLiveMediaSession({ - streamSessionId: session.id, - ownerUserId: session.ownerUserId, - cameraDeviceId: session.cameraDeviceId, - requesterDeviceId: session.requesterDeviceId, - }); - - const [updated] = await db - .update(streamSessions) - .set({ - status: 'streaming', - streamKey: mediaSession ? streamKey : null, - mediaProvider: mediaSession?.provider ?? 'simple', - mediaSessionId: mediaSession?.mediaSessionId ?? null, - publishEndpoint: mediaSession?.publishUrl ?? null, - subscribeEndpoint: mediaSession?.subscribeUrl ?? null, - metadata: parsed.data.metadata ?? session.metadata, - startedAt: now, - updatedAt: now, - }) - .where(eq(streamSessions.id, session.id)) - .returning(); - - if (!updated) { - res.status(500).json({ message: 'Failed to update stream session' }); - return; - } - - if (shouldCreateRecordingPlaceholder()) { - await createRecordingForStream(updated.id); - } - - if (sfuService) { - try { - await sfuService.startSession({ - streamSessionId: updated.id, - ownerUserId: updated.ownerUserId, - cameraDeviceId: updated.cameraDeviceId, - requesterDeviceId: updated.requesterDeviceId, - }); - await sfuService.setSessionState(updated.id, 'live'); - } catch (error) { - console.error('Failed starting SFU session', error); - res.status(500).json({ message: 'Failed to initialize SFU session' }); - return; - } - } - - const startedPayload = createStreamStartedPayload(updated); - console.info('[stream.accept]', { - streamSessionId: updated.id, - requesterDeviceId: updated.requesterDeviceId, - cameraDeviceId: updated.cameraDeviceId, - mode: mediaSession ? 'legacy' : 'simple', - }); - const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload); - - if (!deliveredToRequester) { - await enqueuePushNotification({ - ownerUserId: session.ownerUserId, - recipientDeviceId: session.requesterDeviceId, - type: 'stream_started', - payload: startedPayload, - }); - } - - res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) }); - - await writeAuditLog({ - ownerUserId: session.ownerUserId, - actorDeviceId: session.cameraDeviceId, - action: 'stream.accepted', - targetType: 'stream_session', - targetId: session.id, - metadata: mediaSession - ? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider } - : { transport: 'webrtc' }, - ipAddress: req.ip, - }); -}); - -router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - if (simpleStreamingEnabled) { - res.status(409).json({ message: 'SIMPLE_STREAMING does not use publish credentials' }); - return; - } - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - if (session.cameraDeviceId !== deviceAuth.deviceId) { - res.status(403).json({ message: 'Only camera device can request publish credentials' }); - return; - } - - if (!session.mediaSessionId || session.status !== 'streaming') { - res.status(409).json({ message: 'Stream session is not ready for publish credentials' }); - return; - } - - const credentials = await mediaProvider.issuePublishCredentials({ - mediaSessionId: session.mediaSessionId, - cameraDeviceId: session.cameraDeviceId, - ownerUserId: session.ownerUserId, - }); - - res.json(credentials); - - await writeAuditLog({ - ownerUserId: session.ownerUserId, - actorDeviceId: session.cameraDeviceId, - action: 'stream.publish_credentials_issued', - targetType: 'stream_session', - targetId: session.id, - metadata: { mediaProvider: credentials.provider }, - ipAddress: req.ip, - }); -}); - -router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - if (simpleStreamingEnabled) { - res.status(409).json({ message: 'SIMPLE_STREAMING does not use subscribe credentials' }); - return; - } - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - const isRequester = session.requesterDeviceId === deviceAuth.deviceId; - const isCamera = session.cameraDeviceId === deviceAuth.deviceId; - - if (!isRequester && !isCamera) { - res.status(403).json({ message: 'Device cannot request subscribe credentials for this stream' }); - return; - } - - if (!session.mediaSessionId || session.status !== 'streaming') { - res.status(409).json({ message: 'Stream is not active yet' }); - return; - } - - const credentials = await mediaProvider.issueSubscribeCredentials({ - mediaSessionId: session.mediaSessionId, - viewerDeviceId: deviceAuth.deviceId, - ownerUserId: session.ownerUserId, - }); - - res.json(credentials); - - await writeAuditLog({ - ownerUserId: session.ownerUserId, - actorDeviceId: deviceAuth.deviceId, - action: 'stream.subscribe_credentials_issued', - targetType: 'stream_session', - targetId: session.id, - metadata: { mediaProvider: credentials.provider }, - ipAddress: req.ip, - }); -}); - -router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - if (!sfuService) { - res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); - return; - } - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; - if (!isParticipant) { - res.status(403).json({ message: 'Device cannot access SFU session details for this stream' }); - return; - } - - const sfuSession = await sfuService.getSession(session.id); - res.json({ - streamSessionId: session.id, - mediaMode, - sfuSession, - }); -}); - -router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); - if (!parsedBody.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - if (!sfuService) { - res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); - return; - } - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - if (session.status !== 'streaming') { - res.status(409).json({ message: 'Stream must be active before creating publish transport' }); - return; - } - - if (session.cameraDeviceId !== deviceAuth.deviceId) { - res.status(403).json({ message: 'Only camera device can create publish transport' }); - return; - } - - const transport = await sfuService.createPublishTransport({ - streamSessionId: session.id, - cameraDeviceId: deviceAuth.deviceId, - }); - await sfuService.setSessionState(session.id, 'live'); - - res.json({ - streamSessionId: session.id, - mediaMode, - transport, - }); -}); - -router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); - if (!parsedBody.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - if (!sfuService) { - res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); - return; - } - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - if (session.status !== 'streaming') { - res.status(409).json({ message: 'Stream must be active before creating subscribe transport' }); - return; - } - - const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; - if (!isParticipant) { - res.status(403).json({ message: 'Device cannot create subscribe transport for this stream' }); - return; - } - - const transport = await sfuService.createSubscribeTransport({ - streamSessionId: session.id, - viewerDeviceId: deviceAuth.deviceId, - }); - await sfuService.setSessionState(session.id, 'live'); - - res.json({ - streamSessionId: session.id, - mediaMode, - transport, - }); -}); - -router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const parsed = endStreamSchema.safeParse(req.body ?? {}); - - if (!parsed.success) { - res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - const canEnd = session.cameraDeviceId === deviceAuth.deviceId || session.requesterDeviceId === deviceAuth.deviceId; - - if (!canEnd) { - res.status(403).json({ message: 'Only requester or camera device can end this stream' }); - return; - } - - const now = new Date(); - const nextStatus = simpleStreamingEnabled ? 'ended' : parsed.data.reason; - const nextMetadata = - simpleStreamingEnabled && parsed.data.reason !== 'completed' - ? { - ...(session.metadata ?? {}), - endReason: parsed.data.reason, - } - : session.metadata; - - const [updated] = await db - .update(streamSessions) - .set({ - status: nextStatus, - endedAt: now, - metadata: nextMetadata, - updatedAt: now, - }) - .where(eq(streamSessions.id, session.id)) - .returning(); - - if (!updated) { - res.status(500).json({ message: 'Failed to update stream session' }); - return; - } - - if (sfuService) { - try { - await sfuService.endSession(session.id); - } catch (error) { - console.error('Failed ending SFU session', error); - } - } - - if (shouldCreateRecordingPlaceholder()) { - await createRecordingForStream(session.id); - } - - const endedPayload = simpleStreamingEnabled - ? createStreamEndedPayload({ - streamSessionId: session.id, - cameraDeviceId: session.cameraDeviceId, - requesterDeviceId: session.requesterDeviceId, - endedAt: now, - reason: parsed.data.reason, - }) - : { - streamSessionId: session.id, - status: parsed.data.reason, - endedAt: now, - }; - - console.info('[stream.end]', { - streamSessionId: session.id, - requesterDeviceId: session.requesterDeviceId, - cameraDeviceId: session.cameraDeviceId, - reason: parsed.data.reason, - status: simpleStreamingEnabled ? 'ended' : parsed.data.reason, - }); - - const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', endedPayload); - - const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', endedPayload); - - if (!deliveredToRequester) { - await enqueuePushNotification({ - ownerUserId: session.ownerUserId, - recipientDeviceId: session.requesterDeviceId, - type: 'stream_ended', - payload: endedPayload, - }); - } - - if (!deliveredToCamera) { - await enqueuePushNotification({ - ownerUserId: session.ownerUserId, - recipientDeviceId: session.cameraDeviceId, - type: 'stream_ended', - payload: endedPayload, - }); - } - - res.json({ message: 'Stream ended', streamSession: toSimpleStreamSessionResponse(updated) }); -}); - -router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => { - const parsedParams = streamParamSchema.safeParse(req.params); - - if (!parsedParams.success) { - res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); - return; - } - - const deviceAuth = ensureDeviceAuth(req, res); - if (!deviceAuth) return; - - if (simpleStreamingEnabled) { - res.status(409).json({ message: 'SIMPLE_STREAMING does not issue playback tokens' }); - return; - } - - const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); - - if (!session) { - res.status(404).json({ message: 'Stream session not found' }); - return; - } - - const isRequester = session.requesterDeviceId === deviceAuth.deviceId; - const isCamera = session.cameraDeviceId === deviceAuth.deviceId; - - if (!isRequester && !isCamera) { - res.status(403).json({ message: 'Device cannot request playback token for this stream' }); - return; - } - - if (!session.streamKey || !session.mediaSessionId || session.status !== 'streaming') { - res.status(409).json({ message: 'Stream is not active yet' }); - return; - } - - const credentials = await mediaProvider.issueSubscribeCredentials({ - mediaSessionId: session.mediaSessionId, - viewerDeviceId: deviceAuth.deviceId, - ownerUserId: deviceAuth.userId, - }); - - res.json({ - streamSessionId: session.id, - streamKey: session.streamKey, - status: session.status, - playbackToken: credentials.subscribeToken, - subscribeUrl: credentials.subscribeUrl, - mediaProvider: credentials.provider, - expiresInSeconds: credentials.expiresInSeconds, - }); -}); +router.use(listRouter); +router.use(requestRouter); +router.use(acceptRouter); +router.use(credentialsRouter); +router.use(sfuRouter); +router.use(endRouter); export default router; diff --git a/Backend/routes/streams/accept.ts b/Backend/routes/streams/accept.ts new file mode 100644 index 0000000..d84a01b --- /dev/null +++ b/Backend/routes/streams/accept.ts @@ -0,0 +1,135 @@ +import { randomUUID } from 'crypto'; +import { eq } from 'drizzle-orm'; +import { Router } from 'express'; + +import { db } from '../../db/client'; +import { streamSessions } from '../../db/schema'; +import { createLiveMediaSession } from '../../media/service'; +import { sfuService } from '../../media/sfu/service'; +import { requireDeviceAuth } from '../../middleware/device-auth'; +import { sendRealtimeToDevice } from '../../realtime/gateway'; +import { createRecordingForStream } from '../../services/recordings'; +import { writeAuditLog } from '../../services/audit'; +import { enqueuePushNotification } from '../../services/push'; +import { createStreamStartedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple'; +import { acceptStreamSchema, streamParamSchema } from './schemas'; +import { ensureStreamDeviceAuth, shouldCreateRecordingPlaceholder } from './shared'; + +const router = Router(); + +router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsed = acceptStreamSchema.safeParse(req.body ?? {}); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + const session = await db.query.streamSessions.findFirst({ + where: eq(streamSessions.id, parsedParams.data.streamSessionId), + }); + + if (!session || session.ownerUserId !== deviceAuth.userId || session.cameraDeviceId !== deviceAuth.deviceId) { + res.status(404).json({ message: 'Stream session not found for this camera device' }); + return; + } + + if (session.status !== 'requested' && session.status !== 'starting') { + res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` }); + return; + } + + const now = new Date(); + const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; + const mediaSession = await createLiveMediaSession({ + streamSessionId: session.id, + ownerUserId: session.ownerUserId, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + }); + + const [updated] = await db + .update(streamSessions) + .set({ + status: 'streaming', + streamKey: mediaSession ? streamKey : null, + mediaProvider: mediaSession?.provider ?? 'simple', + mediaSessionId: mediaSession?.mediaSessionId ?? null, + publishEndpoint: mediaSession?.publishUrl ?? null, + subscribeEndpoint: mediaSession?.subscribeUrl ?? null, + metadata: parsed.data.metadata ?? session.metadata, + startedAt: now, + updatedAt: now, + }) + .where(eq(streamSessions.id, session.id)) + .returning(); + + if (!updated) { + res.status(500).json({ message: 'Failed to update stream session' }); + return; + } + + if (shouldCreateRecordingPlaceholder()) { + await createRecordingForStream(updated.id); + } + + if (sfuService) { + try { + await sfuService.startSession({ + streamSessionId: updated.id, + ownerUserId: updated.ownerUserId, + cameraDeviceId: updated.cameraDeviceId, + requesterDeviceId: updated.requesterDeviceId, + }); + await sfuService.setSessionState(updated.id, 'live'); + } catch (error) { + console.error('Failed starting SFU session', error); + res.status(500).json({ message: 'Failed to initialize SFU session' }); + return; + } + } + + const startedPayload = createStreamStartedPayload(updated); + console.info('[stream.accept]', { + streamSessionId: updated.id, + requesterDeviceId: updated.requesterDeviceId, + cameraDeviceId: updated.cameraDeviceId, + mode: mediaSession ? 'legacy' : 'simple', + }); + const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload); + + if (!deliveredToRequester) { + await enqueuePushNotification({ + ownerUserId: session.ownerUserId, + recipientDeviceId: session.requesterDeviceId, + type: 'stream_started', + payload: startedPayload, + }); + } + + res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) }); + + await writeAuditLog({ + ownerUserId: session.ownerUserId, + actorDeviceId: session.cameraDeviceId, + action: 'stream.accepted', + targetType: 'stream_session', + targetId: session.id, + metadata: mediaSession + ? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider } + : { transport: 'webrtc' }, + ipAddress: req.ip, + }); +}); + +export default router; diff --git a/Backend/routes/streams/credentials.ts b/Backend/routes/streams/credentials.ts new file mode 100644 index 0000000..145b840 --- /dev/null +++ b/Backend/routes/streams/credentials.ts @@ -0,0 +1,166 @@ +import { Router } from 'express'; + +import { mediaProvider } from '../../media/service'; +import { simpleStreamingEnabled } from '../../media/config'; +import { requireDeviceAuth } from '../../middleware/device-auth'; +import { writeAuditLog } from '../../services/audit'; +import { streamParamSchema } from './schemas'; +import { ensureStreamDeviceAuth, getOwnedStreamSession, isStreamParticipant } from './shared'; + +const router = Router(); + +router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + if (simpleStreamingEnabled) { + res.status(409).json({ message: 'SIMPLE_STREAMING does not use publish credentials' }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.cameraDeviceId !== deviceAuth.deviceId) { + res.status(403).json({ message: 'Only camera device can request publish credentials' }); + return; + } + + if (!session.mediaSessionId || session.status !== 'streaming') { + res.status(409).json({ message: 'Stream session is not ready for publish credentials' }); + return; + } + + const credentials = await mediaProvider.issuePublishCredentials({ + mediaSessionId: session.mediaSessionId, + cameraDeviceId: session.cameraDeviceId, + ownerUserId: session.ownerUserId, + }); + + res.json(credentials); + + await writeAuditLog({ + ownerUserId: session.ownerUserId, + actorDeviceId: session.cameraDeviceId, + action: 'stream.publish_credentials_issued', + targetType: 'stream_session', + targetId: session.id, + metadata: { mediaProvider: credentials.provider }, + ipAddress: req.ip, + }); +}); + +router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + if (simpleStreamingEnabled) { + res.status(409).json({ message: 'SIMPLE_STREAMING does not use subscribe credentials' }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (!isStreamParticipant(session, deviceAuth.deviceId)) { + res.status(403).json({ message: 'Device cannot request subscribe credentials for this stream' }); + return; + } + + if (!session.mediaSessionId || session.status !== 'streaming') { + res.status(409).json({ message: 'Stream is not active yet' }); + return; + } + + const credentials = await mediaProvider.issueSubscribeCredentials({ + mediaSessionId: session.mediaSessionId, + viewerDeviceId: deviceAuth.deviceId, + ownerUserId: session.ownerUserId, + }); + + res.json(credentials); + + await writeAuditLog({ + ownerUserId: session.ownerUserId, + actorDeviceId: deviceAuth.deviceId, + action: 'stream.subscribe_credentials_issued', + targetType: 'stream_session', + targetId: session.id, + metadata: { mediaProvider: credentials.provider }, + ipAddress: req.ip, + }); +}); + +router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + if (simpleStreamingEnabled) { + res.status(409).json({ message: 'SIMPLE_STREAMING does not issue playback tokens' }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (!isStreamParticipant(session, deviceAuth.deviceId)) { + res.status(403).json({ message: 'Device cannot request playback token for this stream' }); + return; + } + + if (!session.streamKey || !session.mediaSessionId || session.status !== 'streaming') { + res.status(409).json({ message: 'Stream is not active yet' }); + return; + } + + const credentials = await mediaProvider.issueSubscribeCredentials({ + mediaSessionId: session.mediaSessionId, + viewerDeviceId: deviceAuth.deviceId, + ownerUserId: deviceAuth.userId, + }); + + res.json({ + streamSessionId: session.id, + streamKey: session.streamKey, + status: session.status, + playbackToken: credentials.subscribeToken, + subscribeUrl: credentials.subscribeUrl, + mediaProvider: credentials.provider, + expiresInSeconds: credentials.expiresInSeconds, + }); +}); + +export default router; diff --git a/Backend/routes/streams/end.ts b/Backend/routes/streams/end.ts new file mode 100644 index 0000000..91231df --- /dev/null +++ b/Backend/routes/streams/end.ts @@ -0,0 +1,132 @@ +import { eq } from 'drizzle-orm'; +import { Router } from 'express'; + +import { db } from '../../db/client'; +import { streamSessions } from '../../db/schema'; +import { simpleStreamingEnabled } from '../../media/config'; +import { sfuService } from '../../media/sfu/service'; +import { requireDeviceAuth } from '../../middleware/device-auth'; +import { sendRealtimeToDevice } from '../../realtime/gateway'; +import { createRecordingForStream } from '../../services/recordings'; +import { enqueuePushNotification } from '../../services/push'; +import { createStreamEndedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple'; +import { endStreamSchema, streamParamSchema } from './schemas'; +import { ensureStreamDeviceAuth, getOwnedStreamSession, isStreamParticipant, shouldCreateRecordingPlaceholder } from './shared'; + +const router = Router(); + +router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsed = endStreamSchema.safeParse(req.body ?? {}); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (!isStreamParticipant(session, deviceAuth.deviceId)) { + res.status(403).json({ message: 'Only requester or camera device can end this stream' }); + return; + } + + const now = new Date(); + const nextStatus = simpleStreamingEnabled ? 'ended' : parsed.data.reason; + const nextMetadata = + simpleStreamingEnabled && parsed.data.reason !== 'completed' + ? { + ...(session.metadata ?? {}), + endReason: parsed.data.reason, + } + : session.metadata; + + const [updated] = await db + .update(streamSessions) + .set({ + status: nextStatus, + endedAt: now, + metadata: nextMetadata, + updatedAt: now, + }) + .where(eq(streamSessions.id, session.id)) + .returning(); + + if (!updated) { + res.status(500).json({ message: 'Failed to update stream session' }); + return; + } + + if (sfuService) { + try { + await sfuService.endSession(session.id); + } catch (error) { + console.error('Failed ending SFU session', error); + } + } + + if (shouldCreateRecordingPlaceholder()) { + await createRecordingForStream(session.id); + } + + const endedPayload = simpleStreamingEnabled + ? createStreamEndedPayload({ + streamSessionId: session.id, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + endedAt: now, + reason: parsed.data.reason, + }) + : { + streamSessionId: session.id, + status: parsed.data.reason, + endedAt: now, + }; + + console.info('[stream.end]', { + streamSessionId: session.id, + requesterDeviceId: session.requesterDeviceId, + cameraDeviceId: session.cameraDeviceId, + reason: parsed.data.reason, + status: simpleStreamingEnabled ? 'ended' : parsed.data.reason, + }); + + const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', endedPayload); + const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', endedPayload); + + if (!deliveredToRequester) { + await enqueuePushNotification({ + ownerUserId: session.ownerUserId, + recipientDeviceId: session.requesterDeviceId, + type: 'stream_ended', + payload: endedPayload, + }); + } + + if (!deliveredToCamera) { + await enqueuePushNotification({ + ownerUserId: session.ownerUserId, + recipientDeviceId: session.cameraDeviceId, + type: 'stream_ended', + payload: endedPayload, + }); + } + + res.json({ message: 'Stream ended', streamSession: toSimpleStreamSessionResponse(updated) }); +}); + +export default router; diff --git a/Backend/routes/streams/list.ts b/Backend/routes/streams/list.ts new file mode 100644 index 0000000..79e9d76 --- /dev/null +++ b/Backend/routes/streams/list.ts @@ -0,0 +1,37 @@ +import { and, desc, eq, or } from 'drizzle-orm'; +import { Router } from 'express'; + +import { db } from '../../db/client'; +import { streamSessions } from '../../db/schema'; +import { requireDeviceAuth } from '../../middleware/device-auth'; +import { listSchema } from './schemas'; +import { ensureStreamDeviceAuth } from './shared'; + +const router = Router(); + +router.get('/me/list', requireDeviceAuth, async (req, res) => { + const parsed = listSchema.safeParse(req.query); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + const sessions = await db.query.streamSessions.findMany({ + where: and( + eq(streamSessions.ownerUserId, deviceAuth.userId), + or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)), + ), + orderBy: [desc(streamSessions.createdAt)], + limit: parsed.data.limit, + }); + + const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions; + + res.json({ count: filtered.length, streamSessions: filtered }); +}); + +export default router; diff --git a/Backend/routes/streams/request.ts b/Backend/routes/streams/request.ts new file mode 100644 index 0000000..a16f81b --- /dev/null +++ b/Backend/routes/streams/request.ts @@ -0,0 +1,197 @@ +import { and, eq } from 'drizzle-orm'; +import { Router } from 'express'; + +import { db } from '../../db/client'; +import { commands, deviceLinks, devices, streamSessions } from '../../db/schema'; +import { simpleStreamingEnabled } from '../../media/config'; +import { mediaProvider } from '../../media/service'; +import { requireDeviceAuth } from '../../middleware/device-auth'; +import { dispatchCommandById, sendRealtimeToDevice } from '../../realtime/gateway'; +import { writeAuditLog } from '../../services/audit'; +import { enqueuePushNotification } from '../../services/push'; +import { createStreamRequestedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple'; +import { requestStreamSchema } from './schemas'; +import { ensureStreamDeviceAuth } from './shared'; + +const router = Router(); + +router.post('/request', requireDeviceAuth, async (req, res) => { + const parsed = requestStreamSchema.safeParse(req.body ?? {}); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + const [sourceDevice, cameraDevice] = await Promise.all([ + db.query.devices.findFirst({ + where: and(eq(devices.id, deviceAuth.deviceId), eq(devices.userId, deviceAuth.userId)), + }), + db.query.devices.findFirst({ + where: and(eq(devices.id, parsed.data.cameraDeviceId), eq(devices.userId, deviceAuth.userId)), + }), + ]); + + if (!sourceDevice || !cameraDevice) { + res.status(404).json({ message: 'Source or camera device not found' }); + return; + } + + if (sourceDevice.role !== 'client') { + res.status(403).json({ message: 'Only client devices can request on-demand stream sessions' }); + return; + } + + if (cameraDevice.role !== 'camera') { + res.status(400).json({ message: 'cameraDeviceId must point to a camera device' }); + return; + } + + const link = await db.query.deviceLinks.findFirst({ + where: and( + eq(deviceLinks.ownerUserId, deviceAuth.userId), + eq(deviceLinks.cameraDeviceId, cameraDevice.id), + eq(deviceLinks.clientDeviceId, sourceDevice.id), + eq(deviceLinks.status, 'active'), + ), + }); + + if (!link) { + res.status(403).json({ message: 'No active link between requester and camera' }); + return; + } + + const now = new Date(); + + const [session] = await db + .insert(streamSessions) + .values({ + ownerUserId: deviceAuth.userId, + cameraDeviceId: cameraDevice.id, + requesterDeviceId: sourceDevice.id, + status: 'requested', + reason: parsed.data.reason, + metadata: parsed.data.metadata ?? null, + mediaProvider: mediaProvider.name, + updatedAt: now, + }) + .returning(); + + if (!session) { + res.status(500).json({ message: 'Failed creating stream session' }); + return; + } + + if (simpleStreamingEnabled) { + const requestPayload = createStreamRequestedPayload(session); + const deliveredToCamera = sendRealtimeToDevice(cameraDevice.id, 'stream:requested', requestPayload); + + console.info('[stream.request]', { + streamSessionId: session.id, + requesterDeviceId: sourceDevice.id, + cameraDeviceId: cameraDevice.id, + mode: 'simple', + }); + + sendRealtimeToDevice(sourceDevice.id, 'stream:requested', requestPayload); + + if (!deliveredToCamera) { + await enqueuePushNotification({ + ownerUserId: cameraDevice.userId, + recipientDeviceId: cameraDevice.id, + type: 'stream_requested', + payload: requestPayload, + }); + } + + res.status(201).json({ + message: 'Stream request sent', + streamSession: toSimpleStreamSessionResponse(session), + }); + + await writeAuditLog({ + ownerUserId: sourceDevice.userId, + actorDeviceId: sourceDevice.id, + action: 'stream.requested', + targetType: 'stream_session', + targetId: session.id, + metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason, transport: 'webrtc' }, + ipAddress: req.ip, + }); + return; + } + + const [command] = await db + .insert(commands) + .values({ + ownerUserId: deviceAuth.userId, + sourceDeviceId: sourceDevice.id, + targetDeviceId: cameraDevice.id, + commandType: 'start_stream', + payload: { + streamSessionId: session.id, + reason: session.reason, + }, + status: 'queued', + retryCount: 0, + lastDispatchedAt: now, + updatedAt: now, + }) + .returning(); + + if (!command) { + res.status(500).json({ message: 'Failed creating stream command' }); + return; + } + + await dispatchCommandById(command.id); + console.info('[stream.request]', { + streamSessionId: session.id, + requesterDeviceId: sourceDevice.id, + cameraDeviceId: cameraDevice.id, + mode: 'legacy', + commandId: command.id, + }); + + const refreshedCommand = await db.query.commands.findFirst({ where: eq(commands.id, command.id) }); + + const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', { + streamSessionId: session.id, + cameraDeviceId: cameraDevice.id, + status: session.status, + reason: session.reason, + }); + + if (!deliveredToRequester) { + await enqueuePushNotification({ + ownerUserId: sourceDevice.userId, + recipientDeviceId: sourceDevice.id, + type: 'stream_requested', + payload: { + streamSessionId: session.id, + cameraDeviceId: cameraDevice.id, + }, + }); + } + + res.status(201).json({ + message: 'Stream request sent', + streamSession: session, + command: refreshedCommand ?? command, + }); + + await writeAuditLog({ + ownerUserId: sourceDevice.userId, + actorDeviceId: sourceDevice.id, + action: 'stream.requested', + targetType: 'stream_session', + targetId: session.id, + metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason }, + ipAddress: req.ip, + }); +}); + +export default router; diff --git a/Backend/routes/streams/schemas.ts b/Backend/routes/streams/schemas.ts new file mode 100644 index 0000000..16efad8 --- /dev/null +++ b/Backend/routes/streams/schemas.ts @@ -0,0 +1,29 @@ +import { z } from 'zod'; + +export const requestStreamSchema = z.object({ + cameraDeviceId: z.string().uuid(), + reason: z.enum(['on_demand', 'motion_follow_up']).default('on_demand'), + metadata: z.record(z.string(), z.unknown()).optional(), +}); + +export const acceptStreamSchema = z.object({ + streamKey: z.string().trim().min(1).max(255).optional(), + metadata: z.record(z.string(), z.unknown()).optional(), +}); + +export const endStreamSchema = z.object({ + reason: z.enum(['completed', 'cancelled', 'failed']).default('completed'), +}); + +export const streamParamSchema = z.object({ + streamSessionId: z.string().uuid(), +}); + +export const sfuTransportRequestSchema = z.object({ + role: z.enum(['camera', 'viewer']).optional(), +}); + +export const listSchema = z.object({ + status: z.string().optional(), + limit: z.coerce.number().int().min(1).max(100).default(25), +}); diff --git a/Backend/routes/streams/sfu.ts b/Backend/routes/streams/sfu.ts new file mode 100644 index 0000000..8aa524b --- /dev/null +++ b/Backend/routes/streams/sfu.ts @@ -0,0 +1,146 @@ +import { Router } from 'express'; + +import { mediaMode } from '../../media/config'; +import { sfuService } from '../../media/sfu/service'; +import { requireDeviceAuth } from '../../middleware/device-auth'; +import { sfuTransportRequestSchema, streamParamSchema } from './schemas'; +import { ensureStreamDeviceAuth, getOwnedStreamSession, isStreamParticipant } from './shared'; + +const router = Router(); + +router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (!isStreamParticipant(session, deviceAuth.deviceId)) { + res.status(403).json({ message: 'Device cannot access SFU session details for this stream' }); + return; + } + + const sfuSession = await sfuService.getSession(session.id); + res.json({ + streamSessionId: session.id, + mediaMode, + sfuSession, + }); +}); + +router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.status !== 'streaming') { + res.status(409).json({ message: 'Stream must be active before creating publish transport' }); + return; + } + + if (session.cameraDeviceId !== deviceAuth.deviceId) { + res.status(403).json({ message: 'Only camera device can create publish transport' }); + return; + } + + const transport = await sfuService.createPublishTransport({ + streamSessionId: session.id, + cameraDeviceId: deviceAuth.deviceId, + }); + await sfuService.setSessionState(session.id, 'live'); + + res.json({ + streamSessionId: session.id, + mediaMode, + transport, + }); +}); + +router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureStreamDeviceAuth(req, res); + if (!deviceAuth) return; + + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.status !== 'streaming') { + res.status(409).json({ message: 'Stream must be active before creating subscribe transport' }); + return; + } + + if (!isStreamParticipant(session, deviceAuth.deviceId)) { + res.status(403).json({ message: 'Device cannot create subscribe transport for this stream' }); + return; + } + + const transport = await sfuService.createSubscribeTransport({ + streamSessionId: session.id, + viewerDeviceId: deviceAuth.deviceId, + }); + await sfuService.setSessionState(session.id, 'live'); + + res.json({ + streamSessionId: session.id, + mediaMode, + transport, + }); +}); + +export default router; diff --git a/Backend/routes/streams/shared.ts b/Backend/routes/streams/shared.ts new file mode 100644 index 0000000..b6e4dbf --- /dev/null +++ b/Backend/routes/streams/shared.ts @@ -0,0 +1,29 @@ +import type { Request, Response } from 'express'; +import { and, eq } from 'drizzle-orm'; + +import { db } from '../../db/client'; +import { streamSessions } from '../../db/schema'; +import { mediaMode, streamRecordingEnabled } from '../../media/config'; + +export const ensureStreamDeviceAuth = (req: Request, res: Response) => { + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return null; + } + + return deviceAuth; +}; + +export const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: string) => + await db.query.streamSessions.findFirst({ + where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)), + }); + +export const isStreamParticipant = ( + session: { requesterDeviceId: string; cameraDeviceId: string }, + deviceId: string, +): boolean => session.requesterDeviceId === deviceId || session.cameraDeviceId === deviceId; + +export const shouldCreateRecordingPlaceholder = (): boolean => mediaMode === 'legacy' || streamRecordingEnabled; diff --git a/Backend/services/recordings.ts b/Backend/services/recordings.ts new file mode 100644 index 0000000..293fc84 --- /dev/null +++ b/Backend/services/recordings.ts @@ -0,0 +1,27 @@ +import { eq } from 'drizzle-orm'; + +import { db } from '../db/client'; +import { recordings, streamSessions } from '../db/schema'; + +export const createRecordingForStream = async (streamSessionId: string): Promise => { + const stream = await db.query.streamSessions.findFirst({ where: eq(streamSessions.id, streamSessionId) }); + + if (!stream) { + return; + } + + const existing = await db.query.recordings.findFirst({ where: eq(recordings.streamSessionId, stream.id) }); + + if (existing) { + return; + } + + await db.insert(recordings).values({ + ownerUserId: stream.ownerUserId, + streamSessionId: stream.id, + cameraDeviceId: stream.cameraDeviceId, + requesterDeviceId: stream.requesterDeviceId, + status: 'awaiting_upload', + updatedAt: new Date(), + }); +};