diff --git a/Backend/media/sfu/noop.ts b/Backend/media/sfu/noop.ts index ff449a5..71eb696 100644 --- a/Backend/media/sfu/noop.ts +++ b/Backend/media/sfu/noop.ts @@ -3,6 +3,12 @@ import { randomUUID } from 'crypto'; import { mediaConfig } from '../config'; import { SfuSessionRegistry } from './registry'; import type { + SfuConnectTransportInput, + SfuConsumeInput, + SfuConsumerDescriptor, + SfuIceServer, + SfuProduceInput, + SfuProducerDescriptor, SfuPublishTransportRequest, SfuPublishTransportResult, SfuService, @@ -10,9 +16,10 @@ import type { SfuSessionStartInput, SfuSubscribeTransportRequest, SfuSubscribeTransportResult, + SfuTransportDescriptor, } from './types'; -const toIceServers = (): Array<{ urls: string; username?: string; credential?: string }> => { +const toIceServers = (): SfuIceServer[] => { if (mediaConfig.turn.urls.length === 0) { return []; } @@ -61,17 +68,115 @@ export class NoopSfuService implements SfuService { return this.registry.list(); } - async createPublishTransport(_input: SfuPublishTransportRequest): Promise { + async listTransports(streamSessionId: string): Promise { + return this.registry.listTransports(streamSessionId); + } + + async listProducers(streamSessionId: string): Promise { + return this.registry.listProducers(streamSessionId); + } + + async listConsumers(streamSessionId: string): Promise { + return this.registry.listConsumers(streamSessionId); + } + + async createPublishTransport(input: SfuPublishTransportRequest): Promise { + const transportId = `pub_${randomUUID()}`; + this.registry.addTransport({ + transportId, + streamSessionId: input.streamSessionId, + ownerDeviceId: input.cameraDeviceId, + direction: 'publish', + }); return { - transportId: `pub_${randomUUID()}`, + transportId, iceServers: toIceServers(), }; } - async createSubscribeTransport(_input: SfuSubscribeTransportRequest): Promise { + async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise { + const transportId = `sub_${randomUUID()}`; + this.registry.addTransport({ + transportId, + streamSessionId: input.streamSessionId, + ownerDeviceId: input.viewerDeviceId, + direction: 'subscribe', + }); return { - transportId: `sub_${randomUUID()}`, + transportId, iceServers: toIceServers(), }; } + + async connectPublishTransport(input: SfuConnectTransportInput): Promise { + const transport = this.registry.getTransport(input.transportId); + if (!transport) throw new Error('Publish transport not found'); + if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); + if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport'); + if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport'); + + const connected = this.registry.connectTransport(input.transportId); + if (!connected) throw new Error('Publish transport connect failed'); + return connected; + } + + async connectSubscribeTransport(input: SfuConnectTransportInput): Promise { + const transport = this.registry.getTransport(input.transportId); + if (!transport) throw new Error('Subscribe transport not found'); + if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); + if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); + if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport'); + + const connected = this.registry.connectTransport(input.transportId); + if (!connected) throw new Error('Subscribe transport connect failed'); + return connected; + } + + async produce(input: SfuProduceInput): Promise { + const transport = this.registry.getTransport(input.transportId); + if (!transport) throw new Error('Publish transport not found'); + if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); + if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport'); + if (transport.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport'); + if (transport.state !== 'connected') throw new Error('Publish transport must be connected before producing'); + + return this.registry.addProducer({ + producerId: `prod_${randomUUID()}`, + streamSessionId: input.streamSessionId, + transportId: input.transportId, + cameraDeviceId: input.cameraDeviceId, + kind: input.kind, + rtpParameters: input.rtpParameters, + }); + } + + async consume(input: SfuConsumeInput): Promise { + const transport = this.registry.getTransport(input.transportId); + if (!transport) throw new Error('Subscribe transport not found'); + if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream'); + if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); + if (transport.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport'); + if (transport.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming'); + + const selectedProducer = + (input.producerId ? this.registry.getProducer(input.producerId) : null) ?? + this.registry + .listProducers(input.streamSessionId) + .slice() + .reverse() + .find((producer) => producer.kind === 'video'); + + if (!selectedProducer) throw new Error('No producer available for consume'); + if (selectedProducer.streamSessionId !== input.streamSessionId) throw new Error('Producer does not belong to stream'); + + return this.registry.addConsumer({ + consumerId: `cons_${randomUUID()}`, + streamSessionId: input.streamSessionId, + transportId: input.transportId, + viewerDeviceId: input.viewerDeviceId, + producerId: selectedProducer.producerId, + kind: selectedProducer.kind, + rtpParameters: selectedProducer.rtpParameters, + }); + } } diff --git a/Backend/media/sfu/registry.ts b/Backend/media/sfu/registry.ts index 776477c..86a570d 100644 --- a/Backend/media/sfu/registry.ts +++ b/Backend/media/sfu/registry.ts @@ -1,11 +1,24 @@ -import type { SfuSessionDescriptor, SfuSessionState } from './types'; +import type { + SfuConsumerDescriptor, + SfuMediaKind, + SfuProducerDescriptor, + SfuSessionDescriptor, + SfuSessionState, + SfuTransportDescriptor, + SfuTransportDirection, +} from './types'; type StoredSfuSession = SfuSessionDescriptor & { updatedAt: string; }; +const nowIso = (): string => new Date().toISOString(); + export class SfuSessionRegistry { private readonly sessions = new Map(); + private readonly transports = new Map(); + private readonly producers = new Map(); + private readonly consumers = new Map(); get(streamSessionId: string): SfuSessionDescriptor | null { const found = this.sessions.get(streamSessionId); @@ -15,8 +28,7 @@ export class SfuSessionRegistry { } set(session: SfuSessionDescriptor): SfuSessionDescriptor { - const now = new Date().toISOString(); - this.sessions.set(session.streamSessionId, { ...session, updatedAt: now }); + this.sessions.set(session.streamSessionId, { ...session, updatedAt: nowIso() }); return session; } @@ -24,11 +36,7 @@ export class SfuSessionRegistry { const existing = this.sessions.get(streamSessionId); if (!existing) return null; - const next: StoredSfuSession = { - ...existing, - state, - updatedAt: new Date().toISOString(), - }; + const next: StoredSfuSession = { ...existing, state, updatedAt: nowIso() }; this.sessions.set(streamSessionId, next); const { updatedAt: _updatedAt, ...descriptor } = next; return descriptor; @@ -37,5 +45,97 @@ export class SfuSessionRegistry { list(): SfuSessionDescriptor[] { return Array.from(this.sessions.values()).map(({ updatedAt: _updatedAt, ...descriptor }) => descriptor); } + + addTransport(input: { + transportId: string; + streamSessionId: string; + ownerDeviceId: string; + direction: SfuTransportDirection; + }): SfuTransportDescriptor { + const descriptor: SfuTransportDescriptor = { + transportId: input.transportId, + streamSessionId: input.streamSessionId, + ownerDeviceId: input.ownerDeviceId, + direction: input.direction, + state: 'new', + createdAt: nowIso(), + }; + this.transports.set(descriptor.transportId, descriptor); + return descriptor; + } + + getTransport(transportId: string): SfuTransportDescriptor | null { + return this.transports.get(transportId) ?? null; + } + + listTransports(streamSessionId: string): SfuTransportDescriptor[] { + return Array.from(this.transports.values()).filter((transport) => transport.streamSessionId === streamSessionId); + } + + connectTransport(transportId: string): SfuTransportDescriptor | null { + const existing = this.transports.get(transportId); + if (!existing) return null; + const next: SfuTransportDescriptor = { ...existing, state: 'connected' }; + this.transports.set(transportId, next); + return next; + } + + addProducer(input: { + producerId: string; + streamSessionId: string; + transportId: string; + cameraDeviceId: string; + kind: SfuMediaKind; + rtpParameters: Record; + }): SfuProducerDescriptor { + const descriptor: SfuProducerDescriptor = { + producerId: input.producerId, + streamSessionId: input.streamSessionId, + transportId: input.transportId, + cameraDeviceId: input.cameraDeviceId, + kind: input.kind, + rtpParameters: input.rtpParameters, + createdAt: nowIso(), + }; + this.producers.set(descriptor.producerId, descriptor); + return descriptor; + } + + getProducer(producerId: string): SfuProducerDescriptor | null { + return this.producers.get(producerId) ?? null; + } + + listProducers(streamSessionId: string): SfuProducerDescriptor[] { + return Array.from(this.producers.values()) + .filter((producer) => producer.streamSessionId === streamSessionId) + .sort((left, right) => left.createdAt.localeCompare(right.createdAt)); + } + + addConsumer(input: { + consumerId: string; + streamSessionId: string; + transportId: string; + viewerDeviceId: string; + producerId: string; + kind: SfuMediaKind; + rtpParameters: Record; + }): SfuConsumerDescriptor { + const descriptor: SfuConsumerDescriptor = { + consumerId: input.consumerId, + streamSessionId: input.streamSessionId, + transportId: input.transportId, + viewerDeviceId: input.viewerDeviceId, + producerId: input.producerId, + kind: input.kind, + rtpParameters: input.rtpParameters, + createdAt: nowIso(), + }; + this.consumers.set(descriptor.consumerId, descriptor); + return descriptor; + } + + listConsumers(streamSessionId: string): SfuConsumerDescriptor[] { + return Array.from(this.consumers.values()).filter((consumer) => consumer.streamSessionId === streamSessionId); + } } diff --git a/Backend/media/sfu/types.ts b/Backend/media/sfu/types.ts index dfb1986..74fa5f6 100644 --- a/Backend/media/sfu/types.ts +++ b/Backend/media/sfu/types.ts @@ -1,4 +1,13 @@ export type SfuSessionState = 'idle' | 'starting' | 'live' | 'ending' | 'ended'; +export type SfuTransportDirection = 'publish' | 'subscribe'; +export type SfuTransportState = 'new' | 'connected'; +export type SfuMediaKind = 'audio' | 'video'; + +export type SfuIceServer = { + urls: string; + username?: string; + credential?: string; +}; export type SfuSessionDescriptor = { streamSessionId: string; @@ -23,7 +32,7 @@ export type SfuPublishTransportRequest = { export type SfuPublishTransportResult = { transportId: string; - iceServers: Array<{ urls: string; username?: string; credential?: string }>; + iceServers: SfuIceServer[]; }; export type SfuSubscribeTransportRequest = { @@ -33,7 +42,60 @@ export type SfuSubscribeTransportRequest = { export type SfuSubscribeTransportResult = { transportId: string; - iceServers: Array<{ urls: string; username?: string; credential?: string }>; + iceServers: SfuIceServer[]; +}; + +export type SfuTransportDescriptor = { + transportId: string; + streamSessionId: string; + ownerDeviceId: string; + direction: SfuTransportDirection; + state: SfuTransportState; + createdAt: string; +}; + +export type SfuProducerDescriptor = { + producerId: string; + streamSessionId: string; + transportId: string; + cameraDeviceId: string; + kind: SfuMediaKind; + rtpParameters: Record; + createdAt: string; +}; + +export type SfuConsumerDescriptor = { + consumerId: string; + streamSessionId: string; + transportId: string; + viewerDeviceId: string; + producerId: string; + kind: SfuMediaKind; + rtpParameters: Record; + createdAt: string; +}; + +export type SfuConnectTransportInput = { + streamSessionId: string; + transportId: string; + deviceId: string; + dtlsParameters: Record; +}; + +export type SfuProduceInput = { + streamSessionId: string; + transportId: string; + cameraDeviceId: string; + kind: SfuMediaKind; + rtpParameters: Record; +}; + +export type SfuConsumeInput = { + streamSessionId: string; + transportId: string; + viewerDeviceId: string; + producerId?: string; + rtpCapabilities?: Record; }; export interface SfuService { @@ -43,6 +105,13 @@ export interface SfuService { endSession(streamSessionId: string): Promise; getSession(streamSessionId: string): Promise; listSessions(): Promise; + listTransports(streamSessionId: string): Promise; + listProducers(streamSessionId: string): Promise; + listConsumers(streamSessionId: string): Promise; createPublishTransport(input: SfuPublishTransportRequest): Promise; createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise; + connectPublishTransport(input: SfuConnectTransportInput): Promise; + connectSubscribeTransport(input: SfuConnectTransportInput): Promise; + produce(input: SfuProduceInput): Promise; + consume(input: SfuConsumeInput): Promise; } diff --git a/Backend/routes/ops.ts b/Backend/routes/ops.ts index 1320e9b..bdbf7ed 100644 --- a/Backend/routes/ops.ts +++ b/Backend/routes/ops.ts @@ -17,7 +17,18 @@ router.get('/ready', async (_req, res) => { try { await db.execute('select 1'); await minioClient.bucketExists(minioBucket); - const sfuSessions = sfuService ? await sfuService.listSessions() : []; + const sfu = sfuService; + const sfuSessions = sfu ? await sfu.listSessions() : []; + const sfuSessionIds = sfuSessions.map((session) => session.streamSessionId); + const sfuTransports = sfu + ? (await Promise.all(sfuSessionIds.map(async (streamSessionId) => await sfu.listTransports(streamSessionId)))).flat() + : []; + const sfuProducers = sfu + ? (await Promise.all(sfuSessionIds.map(async (streamSessionId) => await sfu.listProducers(streamSessionId)))).flat() + : []; + const sfuConsumers = sfu + ? (await Promise.all(sfuSessionIds.map(async (streamSessionId) => await sfu.listConsumers(streamSessionId)))).flat() + : []; res.json({ status: 'ready', @@ -28,6 +39,9 @@ router.get('/ready', async (_req, res) => { mediaProvider: mediaProvider.name, sfuService: sfuService ? sfuService.mode : 'disabled', sfuActiveSessions: sfuSessions.filter((session) => session.state !== 'ended').length, + sfuTransports: sfuTransports.length, + sfuProducers: sfuProducers.length, + sfuConsumers: sfuConsumers.length, }, timestamp: new Date().toISOString(), }); diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts index 17775b1..a196ae2 100644 --- a/Backend/routes/streams.ts +++ b/Backend/routes/streams.ts @@ -40,6 +40,23 @@ const sfuTransportRequestSchema = z.object({ role: z.enum(['camera', 'viewer']).optional(), }); +const sfuTransportConnectSchema = z.object({ + transportId: z.string().min(1), + dtlsParameters: z.record(z.string(), z.unknown()).default({}), +}); + +const sfuProduceSchema = z.object({ + transportId: z.string().min(1), + kind: z.enum(['audio', 'video']).default('video'), + rtpParameters: z.record(z.string(), z.unknown()).default({}), +}); + +const sfuConsumeSchema = z.object({ + transportId: z.string().min(1), + producerId: z.string().min(1).optional(), + rtpCapabilities: z.record(z.string(), z.unknown()).optional(), +}); + const listSchema = z.object({ status: z.string().optional(), limit: z.coerce.number().int().min(1).max(100).default(25), @@ -88,6 +105,14 @@ const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: strin where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)), }); +const ensureSfuEnabled = (res: Parameters[1]) => { + if (!sfuService) { + res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); + return null; + } + return sfuService; +}; + router.post('/request', requireDeviceAuth, async (req, res) => { const parsed = requestStreamSchema.safeParse(req.body ?? {}); @@ -446,10 +471,8 @@ router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; - if (!sfuService) { - res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); - return; - } + const sfu = ensureSfuEnabled(res); + if (!sfu) return; const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -463,11 +486,20 @@ router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) return; } - const sfuSession = await sfuService.getSession(session.id); + const [sfuSession, transports, producers, consumers] = await Promise.all([ + sfu.getSession(session.id), + sfu.listTransports(session.id), + sfu.listProducers(session.id), + sfu.listConsumers(session.id), + ]); + res.json({ streamSessionId: session.id, mediaMode, sfuSession, + transports, + producers, + consumers, }); }); @@ -487,10 +519,8 @@ router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; - if (!sfuService) { - res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); - return; - } + const sfu = ensureSfuEnabled(res); + if (!sfu) return; const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -508,11 +538,11 @@ router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async return; } - const transport = await sfuService.createPublishTransport({ + const transport = await sfu.createPublishTransport({ streamSessionId: session.id, cameraDeviceId: deviceAuth.deviceId, }); - await sfuService.setSessionState(session.id, 'live'); + await sfu.setSessionState(session.id, 'live'); res.json({ streamSessionId: session.id, @@ -537,10 +567,8 @@ router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, asyn const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; - if (!sfuService) { - res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` }); - return; - } + const sfu = ensureSfuEnabled(res); + if (!sfu) return; const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -559,11 +587,11 @@ router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, asyn return; } - const transport = await sfuService.createSubscribeTransport({ + const transport = await sfu.createSubscribeTransport({ streamSessionId: session.id, viewerDeviceId: deviceAuth.deviceId, }); - await sfuService.setSessionState(session.id, 'live'); + await sfu.setSessionState(session.id, 'live'); res.json({ streamSessionId: session.id, @@ -572,6 +600,186 @@ router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, asyn }); }); +router.post('/:streamSessionId/sfu/publish-transport/connect', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuTransportConnectSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; + + const sfu = ensureSfuEnabled(res); + if (!sfu) return; + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.cameraDeviceId !== deviceAuth.deviceId) { + res.status(403).json({ message: 'Only camera device can connect publish transport' }); + return; + } + + try { + const transport = await sfu.connectPublishTransport({ + streamSessionId: session.id, + transportId: parsedBody.data.transportId, + deviceId: deviceAuth.deviceId, + dtlsParameters: parsedBody.data.dtlsParameters, + }); + await sfu.setSessionState(session.id, 'live'); + res.json({ streamSessionId: session.id, mediaMode, transport }); + } catch (error) { + res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to connect publish transport' }); + } +}); + +router.post('/:streamSessionId/sfu/subscribe-transport/connect', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuTransportConnectSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; + + const sfu = ensureSfuEnabled(res); + if (!sfu) return; + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; + if (!isParticipant) { + res.status(403).json({ message: 'Device cannot connect subscribe transport for this stream' }); + return; + } + + try { + const transport = await sfu.connectSubscribeTransport({ + streamSessionId: session.id, + transportId: parsedBody.data.transportId, + deviceId: deviceAuth.deviceId, + dtlsParameters: parsedBody.data.dtlsParameters, + }); + await sfu.setSessionState(session.id, 'live'); + res.json({ streamSessionId: session.id, mediaMode, transport }); + } catch (error) { + res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to connect subscribe transport' }); + } +}); + +router.post('/:streamSessionId/sfu/produce', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuProduceSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; + + const sfu = ensureSfuEnabled(res); + if (!sfu) return; + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.cameraDeviceId !== deviceAuth.deviceId) { + res.status(403).json({ message: 'Only camera device can publish media' }); + return; + } + + try { + const producer = await sfu.produce({ + streamSessionId: session.id, + transportId: parsedBody.data.transportId, + cameraDeviceId: deviceAuth.deviceId, + kind: parsedBody.data.kind, + rtpParameters: parsedBody.data.rtpParameters, + }); + await sfu.setSessionState(session.id, 'live'); + res.json({ streamSessionId: session.id, mediaMode, producer }); + } catch (error) { + res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to produce media' }); + } +}); + +router.post('/:streamSessionId/sfu/consume', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsedBody = sfuConsumeSchema.safeParse(req.body ?? {}); + if (!parsedBody.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() }); + return; + } + + const deviceAuth = ensureDeviceAuth(req, res); + if (!deviceAuth) return; + + const sfu = ensureSfuEnabled(res); + if (!sfu) return; + + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId; + if (!isParticipant) { + res.status(403).json({ message: 'Device cannot consume media for this stream' }); + return; + } + + try { + const consumer = await sfu.consume({ + streamSessionId: session.id, + transportId: parsedBody.data.transportId, + viewerDeviceId: deviceAuth.deviceId, + producerId: parsedBody.data.producerId, + rtpCapabilities: parsedBody.data.rtpCapabilities, + }); + await sfu.setSessionState(session.id, 'live'); + res.json({ streamSessionId: session.id, mediaMode, consumer }); + } catch (error) { + res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to consume media' }); + } +}); + router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params);