diff --git a/Backend/media/sfu/noop.ts b/Backend/media/sfu/noop.ts index 4f216a7..ff449a5 100644 --- a/Backend/media/sfu/noop.ts +++ b/Backend/media/sfu/noop.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'crypto'; import { mediaConfig } from '../config'; +import { SfuSessionRegistry } from './registry'; import type { SfuPublishTransportRequest, SfuPublishTransportResult, @@ -25,11 +26,11 @@ const toIceServers = (): Array<{ urls: string; username?: string; credential?: s export class NoopSfuService implements SfuService { mode: 'single_server_sfu' = 'single_server_sfu'; - private readonly sessions = new Map(); + private readonly registry = new SfuSessionRegistry(); async startSession(input: SfuSessionStartInput): Promise { const now = new Date().toISOString(); - const existing = this.sessions.get(input.streamSessionId); + const existing = this.registry.get(input.streamSessionId); if (existing) return existing; const descriptor: SfuSessionDescriptor = { @@ -40,18 +41,24 @@ export class NoopSfuService implements SfuService { state: 'starting', createdAt: now, }; - this.sessions.set(input.streamSessionId, descriptor); - return descriptor; + return this.registry.set(descriptor); + } + + async setSessionState(streamSessionId: string, state: SfuSessionDescriptor['state']): Promise { + this.registry.updateState(streamSessionId, state); } async endSession(streamSessionId: string): Promise { - const existing = this.sessions.get(streamSessionId); - if (!existing) return; - this.sessions.set(streamSessionId, { ...existing, state: 'ended' }); + this.registry.updateState(streamSessionId, 'ending'); + this.registry.updateState(streamSessionId, 'ended'); } async getSession(streamSessionId: string): Promise { - return this.sessions.get(streamSessionId) ?? null; + return this.registry.get(streamSessionId); + } + + async listSessions(): Promise { + return this.registry.list(); } async createPublishTransport(_input: SfuPublishTransportRequest): Promise { @@ -68,4 +75,3 @@ export class NoopSfuService implements SfuService { }; } } - diff --git a/Backend/media/sfu/registry.ts b/Backend/media/sfu/registry.ts new file mode 100644 index 0000000..776477c --- /dev/null +++ b/Backend/media/sfu/registry.ts @@ -0,0 +1,41 @@ +import type { SfuSessionDescriptor, SfuSessionState } from './types'; + +type StoredSfuSession = SfuSessionDescriptor & { + updatedAt: string; +}; + +export class SfuSessionRegistry { + private readonly sessions = new Map(); + + get(streamSessionId: string): SfuSessionDescriptor | null { + const found = this.sessions.get(streamSessionId); + if (!found) return null; + const { updatedAt: _updatedAt, ...descriptor } = found; + return descriptor; + } + + set(session: SfuSessionDescriptor): SfuSessionDescriptor { + const now = new Date().toISOString(); + this.sessions.set(session.streamSessionId, { ...session, updatedAt: now }); + return session; + } + + updateState(streamSessionId: string, state: SfuSessionState): SfuSessionDescriptor | null { + const existing = this.sessions.get(streamSessionId); + if (!existing) return null; + + const next: StoredSfuSession = { + ...existing, + state, + updatedAt: new Date().toISOString(), + }; + this.sessions.set(streamSessionId, next); + const { updatedAt: _updatedAt, ...descriptor } = next; + return descriptor; + } + + list(): SfuSessionDescriptor[] { + return Array.from(this.sessions.values()).map(({ updatedAt: _updatedAt, ...descriptor }) => descriptor); + } +} + diff --git a/Backend/media/sfu/types.ts b/Backend/media/sfu/types.ts index fa13385..dfb1986 100644 --- a/Backend/media/sfu/types.ts +++ b/Backend/media/sfu/types.ts @@ -39,9 +39,10 @@ export type SfuSubscribeTransportResult = { export interface SfuService { mode: 'single_server_sfu'; startSession(input: SfuSessionStartInput): Promise; + setSessionState(streamSessionId: string, state: SfuSessionState): Promise; endSession(streamSessionId: string): Promise; getSession(streamSessionId: string): Promise; + listSessions(): Promise; createPublishTransport(input: SfuPublishTransportRequest): Promise; createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise; } - diff --git a/Backend/routes/ops.ts b/Backend/routes/ops.ts index a7ad64e..1320e9b 100644 --- a/Backend/routes/ops.ts +++ b/Backend/routes/ops.ts @@ -17,6 +17,7 @@ router.get('/ready', async (_req, res) => { try { await db.execute('select 1'); await minioClient.bucketExists(minioBucket); + const sfuSessions = sfuService ? await sfuService.listSessions() : []; res.json({ status: 'ready', @@ -26,6 +27,7 @@ router.get('/ready', async (_req, res) => { mediaMode: mediaConfig.mode, mediaProvider: mediaProvider.name, sfuService: sfuService ? sfuService.mode : 'disabled', + sfuActiveSessions: sfuSessions.filter((session) => session.state !== 'ended').length, }, timestamp: new Date().toISOString(), }); diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts index 07f9e52..17775b1 100644 --- a/Backend/routes/streams.ts +++ b/Backend/routes/streams.ts @@ -5,8 +5,10 @@ import { Router } from 'express'; import { z } from 'zod'; import { db } from '../db/client'; +import { mediaMode } from '../media/config'; import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema'; import { mediaProvider } from '../media/service'; +import { sfuService } from '../media/sfu/service'; import { requireDeviceAuth } from '../middleware/device-auth'; import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; import { writeAuditLog } from '../services/audit'; @@ -34,6 +36,10 @@ const streamParamSchema = z.object({ streamSessionId: z.string().uuid(), }); +const sfuTransportRequestSchema = z.object({ + role: z.enum(['camera', 'viewer']).optional(), +}); + const listSchema = z.object({ status: z.string().optional(), limit: z.coerce.number().int().min(1).max(100).default(25), @@ -68,6 +74,20 @@ router.get('/me/list', requireDeviceAuth, async (req, res) => { res.json({ count: filtered.length, streamSessions: filtered }); }); +const ensureDeviceAuth = (req: Parameters[0], res: Parameters[1]) => { + const deviceAuth = req.deviceAuth; + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return null; + } + return deviceAuth; +}; + +const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: string) => + await db.query.streamSessions.findFirst({ + where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)), + }); + router.post('/request', requireDeviceAuth, async (req, res) => { const parsed = requestStreamSchema.safeParse(req.body ?? {}); @@ -76,12 +96,8 @@ router.post('/request', requireDeviceAuth, async (req, res) => { return; } - const deviceAuth = req.deviceAuth; - - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; const [sourceDevice, cameraDevice] = await Promise.all([ db.query.devices.findFirst({ @@ -220,12 +236,8 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { return; } - const deviceAuth = req.deviceAuth; - - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; const session = await db.query.streamSessions.findFirst({ where: and( @@ -275,6 +287,22 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { return; } + if (sfuService) { + try { + await sfuService.startSession({ + streamSessionId: updated.id, + ownerUserId: updated.ownerUserId, + cameraDeviceId: updated.cameraDeviceId, + requesterDeviceId: updated.requesterDeviceId, + }); + await sfuService.setSessionState(updated.id, 'live'); + } catch (error) { + console.error('Failed starting SFU session', error); + res.status(500).json({ message: 'Failed to initialize SFU session' }); + return; + } + } + const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', { streamSessionId: updated.id, cameraDeviceId: updated.cameraDeviceId, @@ -318,16 +346,10 @@ router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (re return; } - const deviceAuth = req.deviceAuth; + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } - - const session = await db.query.streamSessions.findFirst({ - where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), - }); + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); @@ -371,16 +393,10 @@ router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async ( return; } - const deviceAuth = req.deviceAuth; + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } - - const session = await db.query.streamSessions.findFirst({ - where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), - }); + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); @@ -419,6 +435,143 @@ router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async ( }); }); +router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; + + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; + if (!isParticipant) { + res.status(403).json({ message: 'Device cannot access SFU session details for this stream' }); + return; + } + + const sfuSession = await sfuService.getSession(session.id); + res.json({ + streamSessionId: session.id, + mediaMode, + sfuSession, + }); +}); + +router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; + + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.status !== 'streaming') { + res.status(409).json({ message: 'Stream must be active before creating publish transport' }); + return; + } + + if (session.cameraDeviceId !== deviceAuth.deviceId) { + res.status(403).json({ message: 'Only camera device can create publish transport' }); + return; + } + + const transport = await sfuService.createPublishTransport({ + streamSessionId: session.id, + cameraDeviceId: deviceAuth.deviceId, + }); + await sfuService.setSessionState(session.id, 'live'); + + res.json({ + streamSessionId: session.id, + mediaMode, + transport, + }); +}); + +router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; + + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return; + } + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.status !== 'streaming') { + res.status(409).json({ message: 'Stream must be active before creating subscribe transport' }); + return; + } + + const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; + if (!isParticipant) { + res.status(403).json({ message: 'Device cannot create subscribe transport for this stream' }); + return; + } + + const transport = await sfuService.createSubscribeTransport({ + streamSessionId: session.id, + viewerDeviceId: deviceAuth.deviceId, + }); + await sfuService.setSessionState(session.id, 'live'); + + res.json({ + streamSessionId: session.id, + mediaMode, + transport, + }); +}); + router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); @@ -434,16 +587,10 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { return; } - const deviceAuth = req.deviceAuth; + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } - - const session = await db.query.streamSessions.findFirst({ - where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), - }); + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); @@ -469,6 +616,14 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { .where(eq(streamSessions.id, session.id)) .returning(); + if (sfuService) { + try { + await sfuService.endSession(session.id); + } catch (error) { + console.error('Failed ending SFU session', error); + } + } + await createRecordingForStream(session.id); const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', { @@ -518,16 +673,10 @@ router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, re return; } - const deviceAuth = req.deviceAuth; + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } - - const session = await db.query.streamSessions.findFirst({ - where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), - }); + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' });