import { randomUUID } from 'crypto'; import { networkInterfaces } from 'os'; import { mediaConfig } from '../config'; import type { SfuConnectTransportInput, SfuConsumeInput, SfuConsumerDescriptor, SfuProduceInput, SfuProducerDescriptor, SfuPublishTransportRequest, SfuPublishTransportResult, SfuService, SfuSessionDescriptor, SfuSessionStartInput, SfuSubscribeTransportRequest, SfuSubscribeTransportResult, SfuTransportDescriptor, SfuTransportOptions, } from './types'; type DynamicMediasoup = { createWorker: (opts?: Record) => Promise; }; type MediaSessionRuntime = { session: SfuSessionDescriptor; router: any; transports: Map; transportDescriptors: Map; producers: Map; producerDescriptors: Map; consumers: Map; consumerDescriptors: Map; }; const parsePort = (value: string | undefined, fallback: number): number => { const parsed = Number(value); if (!Number.isFinite(parsed)) return fallback; return parsed; }; const pickHostIpv4 = (): string | null => { const interfaces = networkInterfaces(); for (const addresses of Object.values(interfaces)) { if (!addresses) continue; for (const address of addresses) { if (address.family === 'IPv4' && !address.internal) { return address.address; } } } return null; }; const resolveListenAddress = (): { ip: string; announcedAddress?: string } => { const configuredListenIp = (process.env.MEDIA_WEBRTC_LISTEN_IP ?? '').trim(); const configuredAnnounced = process.env.MEDIA_WEBRTC_ANNOUNCED_IP?.trim(); if (configuredListenIp && configuredListenIp !== '0.0.0.0') { return { ip: configuredListenIp, ...(configuredAnnounced ? { announcedAddress: configuredAnnounced } : {}), }; } const discoveredIp = pickHostIpv4(); if (!configuredAnnounced && configuredListenIp === '0.0.0.0') { console.warn( `[sfu] MEDIA_WEBRTC_LISTEN_IP is 0.0.0.0 without MEDIA_WEBRTC_ANNOUNCED_IP. ` + `Using ${discoveredIp ?? '127.0.0.1'} for ICE candidates. Configure both env vars for production.`, ); } const ip = discoveredIp ?? '127.0.0.1'; return { ip, ...(configuredAnnounced ? { announcedAddress: configuredAnnounced } : {}), }; }; const toTransportOptions = (transport: any): SfuTransportOptions => ({ id: transport.id, iceParameters: transport.iceParameters ?? {}, iceCandidates: transport.iceCandidates ?? [], dtlsParameters: transport.dtlsParameters ?? {}, ...(transport.sctpParameters ? { sctpParameters: transport.sctpParameters } : {}), }); const mediasoupCodecs = [ { kind: 'audio', mimeType: 'audio/opus', clockRate: 48000, channels: 2, }, { kind: 'video', mimeType: 'video/VP8', clockRate: 90000, parameters: {}, }, ]; export class MediasoupSfuService implements SfuService { mode: 'single_server_sfu' = 'single_server_sfu'; private mediasoupPromise: Promise | null = null; private workerPromise: Promise | null = null; private readonly sessions = new Map(); private async getMediasoup(): Promise { if (!this.mediasoupPromise) { this.mediasoupPromise = (new Function('return import("mediasoup")')() as Promise).catch((error) => { throw new Error(`mediasoup package is required for MEDIA_SFU_ENGINE=mediasoup: ${error instanceof Error ? error.message : 'load failed'}`); }); } return await this.mediasoupPromise; } private async getWorker(): Promise { if (!this.workerPromise) { this.workerPromise = (async () => { const mediasoup = await this.getMediasoup(); const worker = await mediasoup.createWorker({ logLevel: (process.env.MEDIA_SFU_LOG_LEVEL ?? 'warn') as 'debug' | 'warn' | 'error' | 'none', rtcMinPort: parsePort(process.env.MEDIA_RTC_MIN_PORT, 40000), rtcMaxPort: parsePort(process.env.MEDIA_RTC_MAX_PORT, 49999), }); worker.on?.('died', () => { console.error('mediasoup worker died; clearing worker handle'); this.workerPromise = null; }); return worker; })(); } return await this.workerPromise; } private getRuntime(streamSessionId: string): MediaSessionRuntime { const runtime = this.sessions.get(streamSessionId); if (!runtime) { throw new Error('SFU session not initialized'); } return runtime; } private async createWebRtcTransport(router: any): Promise { const listenAddress = resolveListenAddress(); return await router.createWebRtcTransport({ listenInfos: [ { protocol: 'udp', ip: listenAddress.ip, ...(listenAddress.announcedAddress ? { announcedAddress: listenAddress.announcedAddress } : {}), }, { protocol: 'tcp', ip: listenAddress.ip, ...(listenAddress.announcedAddress ? { announcedAddress: listenAddress.announcedAddress } : {}), }, ], enableUdp: true, enableTcp: true, preferUdp: true, }); } async startSession(input: SfuSessionStartInput): Promise { const existing = this.sessions.get(input.streamSessionId); if (existing) { return existing.session; } const worker = await this.getWorker(); const router = await worker.createRouter({ mediaCodecs: mediasoupCodecs }); const session: SfuSessionDescriptor = { streamSessionId: input.streamSessionId, ownerUserId: input.ownerUserId, cameraDeviceId: input.cameraDeviceId, requesterDeviceId: input.requesterDeviceId, state: 'starting', createdAt: new Date().toISOString(), }; this.sessions.set(input.streamSessionId, { session, router, transports: new Map(), transportDescriptors: new Map(), producers: new Map(), producerDescriptors: new Map(), consumers: new Map(), consumerDescriptors: new Map(), }); return session; } async setSessionState(streamSessionId: string, state: SfuSessionDescriptor['state']): Promise { const runtime = this.getRuntime(streamSessionId); runtime.session = { ...runtime.session, state }; } async endSession(streamSessionId: string): Promise { const runtime = this.sessions.get(streamSessionId); if (!runtime) return; runtime.session = { ...runtime.session, state: 'ending' }; for (const consumer of runtime.consumers.values()) { consumer.close?.(); } for (const producer of runtime.producers.values()) { producer.close?.(); } for (const transport of runtime.transports.values()) { transport.close?.(); } runtime.router.close?.(); runtime.session = { ...runtime.session, state: 'ended' }; this.sessions.delete(streamSessionId); } async getSession(streamSessionId: string): Promise { return this.sessions.get(streamSessionId)?.session ?? null; } async getRouterRtpCapabilities(streamSessionId: string): Promise | null> { const runtime = this.sessions.get(streamSessionId); if (!runtime) return null; return runtime.router.rtpCapabilities ?? null; } async listSessions(): Promise { return Array.from(this.sessions.values()).map((runtime) => runtime.session); } async listTransports(streamSessionId: string): Promise { const runtime = this.sessions.get(streamSessionId); if (!runtime) return []; return Array.from(runtime.transportDescriptors.values()); } async listProducers(streamSessionId: string): Promise { const runtime = this.sessions.get(streamSessionId); if (!runtime) return []; return Array.from(runtime.producerDescriptors.values()); } async listConsumers(streamSessionId: string): Promise { const runtime = this.sessions.get(streamSessionId); if (!runtime) return []; return Array.from(runtime.consumerDescriptors.values()); } async createPublishTransport(input: SfuPublishTransportRequest): Promise { const runtime = this.getRuntime(input.streamSessionId); const transport = await this.createWebRtcTransport(runtime.router); const descriptor: SfuTransportDescriptor = { transportId: transport.id, streamSessionId: input.streamSessionId, ownerDeviceId: input.cameraDeviceId, direction: 'publish', state: 'new', createdAt: new Date().toISOString(), }; runtime.transports.set(transport.id, transport); runtime.transportDescriptors.set(transport.id, descriptor); return { transportId: transport.id, iceServers: mediaConfig.turn.urls.map((urls) => ({ urls, ...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}), ...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}), })), transportOptions: toTransportOptions(transport), }; } async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise { const runtime = this.getRuntime(input.streamSessionId); const transport = await this.createWebRtcTransport(runtime.router); const descriptor: SfuTransportDescriptor = { transportId: transport.id, streamSessionId: input.streamSessionId, ownerDeviceId: input.viewerDeviceId, direction: 'subscribe', state: 'new', createdAt: new Date().toISOString(), }; runtime.transports.set(transport.id, transport); runtime.transportDescriptors.set(transport.id, descriptor); return { transportId: transport.id, iceServers: mediaConfig.turn.urls.map((urls) => ({ urls, ...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}), ...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}), })), transportOptions: toTransportOptions(transport), }; } async connectPublishTransport(input: SfuConnectTransportInput): Promise { const runtime = this.getRuntime(input.streamSessionId); const transport = runtime.transports.get(input.transportId); const descriptor = runtime.transportDescriptors.get(input.transportId); if (!transport || !descriptor) throw new Error('Publish transport not found'); if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport'); if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport'); await transport.connect({ dtlsParameters: input.dtlsParameters }); const next = { ...descriptor, state: 'connected' as const }; runtime.transportDescriptors.set(descriptor.transportId, next); return next; } async connectSubscribeTransport(input: SfuConnectTransportInput): Promise { const runtime = this.getRuntime(input.streamSessionId); const transport = runtime.transports.get(input.transportId); const descriptor = runtime.transportDescriptors.get(input.transportId); if (!transport || !descriptor) throw new Error('Subscribe transport not found'); if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport'); await transport.connect({ dtlsParameters: input.dtlsParameters }); const next = { ...descriptor, state: 'connected' as const }; runtime.transportDescriptors.set(descriptor.transportId, next); return next; } async produce(input: SfuProduceInput): Promise { const runtime = this.getRuntime(input.streamSessionId); const transport = runtime.transports.get(input.transportId); const descriptor = runtime.transportDescriptors.get(input.transportId); if (!transport || !descriptor) throw new Error('Publish transport not found'); if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport'); if (descriptor.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport'); if (descriptor.state !== 'connected') throw new Error('Publish transport must be connected before producing'); const producer = await transport.produce({ kind: input.kind, rtpParameters: input.rtpParameters, appData: { cameraDeviceId: input.cameraDeviceId }, }); const producerId = producer.id ?? `prod_${randomUUID()}`; const producerDescriptor: SfuProducerDescriptor = { producerId, streamSessionId: input.streamSessionId, transportId: input.transportId, cameraDeviceId: input.cameraDeviceId, kind: input.kind, rtpParameters: input.rtpParameters, createdAt: new Date().toISOString(), }; runtime.producers.set(producerId, producer); runtime.producerDescriptors.set(producerId, producerDescriptor); return producerDescriptor; } async consume(input: SfuConsumeInput): Promise { const runtime = this.getRuntime(input.streamSessionId); const transport = runtime.transports.get(input.transportId); const descriptor = runtime.transportDescriptors.get(input.transportId); if (!transport || !descriptor) throw new Error('Subscribe transport not found'); if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport'); if (descriptor.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport'); if (descriptor.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming'); const producerId = input.producerId ?? Array.from(runtime.producerDescriptors.values()) .slice() .reverse() .find((producer) => producer.kind === 'video')?.producerId; if (!producerId) throw new Error('No producer available for consume'); const producer = runtime.producers.get(producerId); const producerDescriptor = runtime.producerDescriptors.get(producerId); if (!producer || !producerDescriptor) throw new Error('Producer not found'); if (!runtime.router.canConsume({ producerId: producer.id ?? producerId, rtpCapabilities: input.rtpCapabilities ?? {} })) { throw new Error('Router cannot consume with provided RTP capabilities'); } const consumer = await transport.consume({ producerId: producer.id ?? producerId, rtpCapabilities: input.rtpCapabilities ?? {}, paused: false, appData: { viewerDeviceId: input.viewerDeviceId }, }); const consumerId = consumer.id ?? `cons_${randomUUID()}`; const consumerDescriptor: SfuConsumerDescriptor = { consumerId, streamSessionId: input.streamSessionId, transportId: input.transportId, viewerDeviceId: input.viewerDeviceId, producerId, kind: producerDescriptor.kind, rtpParameters: consumer.rtpParameters ?? producerDescriptor.rtpParameters, createdAt: new Date().toISOString(), }; runtime.consumers.set(consumerId, consumer); runtime.consumerDescriptors.set(consumerId, consumerDescriptor); return consumerDescriptor; } }