368 lines
14 KiB
TypeScript
368 lines
14 KiB
TypeScript
import { randomUUID } from 'crypto';
|
|
|
|
import { mediaConfig } from '../config';
|
|
import type {
|
|
SfuConnectTransportInput,
|
|
SfuConsumeInput,
|
|
SfuConsumerDescriptor,
|
|
SfuProduceInput,
|
|
SfuProducerDescriptor,
|
|
SfuPublishTransportRequest,
|
|
SfuPublishTransportResult,
|
|
SfuService,
|
|
SfuSessionDescriptor,
|
|
SfuSessionStartInput,
|
|
SfuSubscribeTransportRequest,
|
|
SfuSubscribeTransportResult,
|
|
SfuTransportDescriptor,
|
|
SfuTransportOptions,
|
|
} from './types';
|
|
|
|
type DynamicMediasoup = {
|
|
createWorker: (opts?: Record<string, unknown>) => Promise<any>;
|
|
};
|
|
|
|
type MediaSessionRuntime = {
|
|
session: SfuSessionDescriptor;
|
|
router: any;
|
|
transports: Map<string, any>;
|
|
transportDescriptors: Map<string, SfuTransportDescriptor>;
|
|
producers: Map<string, any>;
|
|
producerDescriptors: Map<string, SfuProducerDescriptor>;
|
|
consumers: Map<string, any>;
|
|
consumerDescriptors: Map<string, SfuConsumerDescriptor>;
|
|
};
|
|
|
|
const parsePort = (value: string | undefined, fallback: number): number => {
|
|
const parsed = Number(value);
|
|
if (!Number.isFinite(parsed)) return fallback;
|
|
return parsed;
|
|
};
|
|
|
|
const toTransportOptions = (transport: any): SfuTransportOptions => ({
|
|
id: transport.id,
|
|
iceParameters: transport.iceParameters ?? {},
|
|
iceCandidates: transport.iceCandidates ?? [],
|
|
dtlsParameters: transport.dtlsParameters ?? {},
|
|
...(transport.sctpParameters ? { sctpParameters: transport.sctpParameters } : {}),
|
|
});
|
|
|
|
const mediasoupCodecs = [
|
|
{
|
|
kind: 'audio',
|
|
mimeType: 'audio/opus',
|
|
clockRate: 48000,
|
|
channels: 2,
|
|
},
|
|
{
|
|
kind: 'video',
|
|
mimeType: 'video/VP8',
|
|
clockRate: 90000,
|
|
parameters: {},
|
|
},
|
|
];
|
|
|
|
export class MediasoupSfuService implements SfuService {
|
|
mode: 'single_server_sfu' = 'single_server_sfu';
|
|
private mediasoupPromise: Promise<DynamicMediasoup> | null = null;
|
|
private workerPromise: Promise<any> | null = null;
|
|
private readonly sessions = new Map<string, MediaSessionRuntime>();
|
|
|
|
private async getMediasoup(): Promise<DynamicMediasoup> {
|
|
if (!this.mediasoupPromise) {
|
|
this.mediasoupPromise = (new Function('return import("mediasoup")')() as Promise<DynamicMediasoup>).catch((error) => {
|
|
throw new Error(`mediasoup package is required for MEDIA_SFU_ENGINE=mediasoup: ${error instanceof Error ? error.message : 'load failed'}`);
|
|
});
|
|
}
|
|
return await this.mediasoupPromise;
|
|
}
|
|
|
|
private async getWorker(): Promise<any> {
|
|
if (!this.workerPromise) {
|
|
this.workerPromise = (async () => {
|
|
const mediasoup = await this.getMediasoup();
|
|
const worker = await mediasoup.createWorker({
|
|
logLevel: (process.env.MEDIA_SFU_LOG_LEVEL ?? 'warn') as 'debug' | 'warn' | 'error' | 'none',
|
|
rtcMinPort: parsePort(process.env.MEDIA_RTC_MIN_PORT, 40000),
|
|
rtcMaxPort: parsePort(process.env.MEDIA_RTC_MAX_PORT, 49999),
|
|
});
|
|
worker.on?.('died', () => {
|
|
console.error('mediasoup worker died; clearing worker handle');
|
|
this.workerPromise = null;
|
|
});
|
|
return worker;
|
|
})();
|
|
}
|
|
return await this.workerPromise;
|
|
}
|
|
|
|
private getRuntime(streamSessionId: string): MediaSessionRuntime {
|
|
const runtime = this.sessions.get(streamSessionId);
|
|
if (!runtime) {
|
|
throw new Error('SFU session not initialized');
|
|
}
|
|
return runtime;
|
|
}
|
|
|
|
private async createWebRtcTransport(router: any): Promise<any> {
|
|
const listenIp = process.env.MEDIA_WEBRTC_LISTEN_IP ?? '0.0.0.0';
|
|
const announcedAddress = process.env.MEDIA_WEBRTC_ANNOUNCED_IP?.trim();
|
|
return await router.createWebRtcTransport({
|
|
listenInfos: [
|
|
{
|
|
protocol: 'udp',
|
|
ip: listenIp,
|
|
...(announcedAddress ? { announcedAddress } : {}),
|
|
},
|
|
{
|
|
protocol: 'tcp',
|
|
ip: listenIp,
|
|
...(announcedAddress ? { announcedAddress } : {}),
|
|
},
|
|
],
|
|
enableUdp: true,
|
|
enableTcp: true,
|
|
preferUdp: true,
|
|
});
|
|
}
|
|
|
|
async startSession(input: SfuSessionStartInput): Promise<SfuSessionDescriptor> {
|
|
const existing = this.sessions.get(input.streamSessionId);
|
|
if (existing) {
|
|
return existing.session;
|
|
}
|
|
|
|
const worker = await this.getWorker();
|
|
const router = await worker.createRouter({ mediaCodecs: mediasoupCodecs });
|
|
const session: SfuSessionDescriptor = {
|
|
streamSessionId: input.streamSessionId,
|
|
ownerUserId: input.ownerUserId,
|
|
cameraDeviceId: input.cameraDeviceId,
|
|
requesterDeviceId: input.requesterDeviceId,
|
|
state: 'starting',
|
|
createdAt: new Date().toISOString(),
|
|
};
|
|
|
|
this.sessions.set(input.streamSessionId, {
|
|
session,
|
|
router,
|
|
transports: new Map(),
|
|
transportDescriptors: new Map(),
|
|
producers: new Map(),
|
|
producerDescriptors: new Map(),
|
|
consumers: new Map(),
|
|
consumerDescriptors: new Map(),
|
|
});
|
|
return session;
|
|
}
|
|
|
|
async setSessionState(streamSessionId: string, state: SfuSessionDescriptor['state']): Promise<void> {
|
|
const runtime = this.getRuntime(streamSessionId);
|
|
runtime.session = { ...runtime.session, state };
|
|
}
|
|
|
|
async endSession(streamSessionId: string): Promise<void> {
|
|
const runtime = this.sessions.get(streamSessionId);
|
|
if (!runtime) return;
|
|
|
|
runtime.session = { ...runtime.session, state: 'ending' };
|
|
for (const consumer of runtime.consumers.values()) {
|
|
consumer.close?.();
|
|
}
|
|
for (const producer of runtime.producers.values()) {
|
|
producer.close?.();
|
|
}
|
|
for (const transport of runtime.transports.values()) {
|
|
transport.close?.();
|
|
}
|
|
runtime.router.close?.();
|
|
runtime.session = { ...runtime.session, state: 'ended' };
|
|
this.sessions.delete(streamSessionId);
|
|
}
|
|
|
|
async getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null> {
|
|
return this.sessions.get(streamSessionId)?.session ?? null;
|
|
}
|
|
|
|
async getRouterRtpCapabilities(streamSessionId: string): Promise<Record<string, unknown> | null> {
|
|
const runtime = this.sessions.get(streamSessionId);
|
|
if (!runtime) return null;
|
|
return runtime.router.rtpCapabilities ?? null;
|
|
}
|
|
|
|
async listSessions(): Promise<SfuSessionDescriptor[]> {
|
|
return Array.from(this.sessions.values()).map((runtime) => runtime.session);
|
|
}
|
|
|
|
async listTransports(streamSessionId: string): Promise<SfuTransportDescriptor[]> {
|
|
const runtime = this.sessions.get(streamSessionId);
|
|
if (!runtime) return [];
|
|
return Array.from(runtime.transportDescriptors.values());
|
|
}
|
|
|
|
async listProducers(streamSessionId: string): Promise<SfuProducerDescriptor[]> {
|
|
const runtime = this.sessions.get(streamSessionId);
|
|
if (!runtime) return [];
|
|
return Array.from(runtime.producerDescriptors.values());
|
|
}
|
|
|
|
async listConsumers(streamSessionId: string): Promise<SfuConsumerDescriptor[]> {
|
|
const runtime = this.sessions.get(streamSessionId);
|
|
if (!runtime) return [];
|
|
return Array.from(runtime.consumerDescriptors.values());
|
|
}
|
|
|
|
async createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> {
|
|
const runtime = this.getRuntime(input.streamSessionId);
|
|
const transport = await this.createWebRtcTransport(runtime.router);
|
|
const descriptor: SfuTransportDescriptor = {
|
|
transportId: transport.id,
|
|
streamSessionId: input.streamSessionId,
|
|
ownerDeviceId: input.cameraDeviceId,
|
|
direction: 'publish',
|
|
state: 'new',
|
|
createdAt: new Date().toISOString(),
|
|
};
|
|
runtime.transports.set(transport.id, transport);
|
|
runtime.transportDescriptors.set(transport.id, descriptor);
|
|
return {
|
|
transportId: transport.id,
|
|
iceServers: mediaConfig.turn.urls.map((urls) => ({
|
|
urls,
|
|
...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}),
|
|
...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}),
|
|
})),
|
|
transportOptions: toTransportOptions(transport),
|
|
};
|
|
}
|
|
|
|
async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult> {
|
|
const runtime = this.getRuntime(input.streamSessionId);
|
|
const transport = await this.createWebRtcTransport(runtime.router);
|
|
const descriptor: SfuTransportDescriptor = {
|
|
transportId: transport.id,
|
|
streamSessionId: input.streamSessionId,
|
|
ownerDeviceId: input.viewerDeviceId,
|
|
direction: 'subscribe',
|
|
state: 'new',
|
|
createdAt: new Date().toISOString(),
|
|
};
|
|
runtime.transports.set(transport.id, transport);
|
|
runtime.transportDescriptors.set(transport.id, descriptor);
|
|
return {
|
|
transportId: transport.id,
|
|
iceServers: mediaConfig.turn.urls.map((urls) => ({
|
|
urls,
|
|
...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}),
|
|
...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}),
|
|
})),
|
|
transportOptions: toTransportOptions(transport),
|
|
};
|
|
}
|
|
|
|
async connectPublishTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
|
|
const runtime = this.getRuntime(input.streamSessionId);
|
|
const transport = runtime.transports.get(input.transportId);
|
|
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
|
if (!transport || !descriptor) throw new Error('Publish transport not found');
|
|
if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport');
|
|
if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport');
|
|
|
|
await transport.connect({ dtlsParameters: input.dtlsParameters });
|
|
const next = { ...descriptor, state: 'connected' as const };
|
|
runtime.transportDescriptors.set(descriptor.transportId, next);
|
|
return next;
|
|
}
|
|
|
|
async connectSubscribeTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
|
|
const runtime = this.getRuntime(input.streamSessionId);
|
|
const transport = runtime.transports.get(input.transportId);
|
|
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
|
if (!transport || !descriptor) throw new Error('Subscribe transport not found');
|
|
if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
|
|
if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport');
|
|
|
|
await transport.connect({ dtlsParameters: input.dtlsParameters });
|
|
const next = { ...descriptor, state: 'connected' as const };
|
|
runtime.transportDescriptors.set(descriptor.transportId, next);
|
|
return next;
|
|
}
|
|
|
|
async produce(input: SfuProduceInput): Promise<SfuProducerDescriptor> {
|
|
const runtime = this.getRuntime(input.streamSessionId);
|
|
const transport = runtime.transports.get(input.transportId);
|
|
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
|
if (!transport || !descriptor) throw new Error('Publish transport not found');
|
|
if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport');
|
|
if (descriptor.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport');
|
|
if (descriptor.state !== 'connected') throw new Error('Publish transport must be connected before producing');
|
|
|
|
const producer = await transport.produce({
|
|
kind: input.kind,
|
|
rtpParameters: input.rtpParameters,
|
|
appData: { cameraDeviceId: input.cameraDeviceId },
|
|
});
|
|
const producerId = producer.id ?? `prod_${randomUUID()}`;
|
|
const producerDescriptor: SfuProducerDescriptor = {
|
|
producerId,
|
|
streamSessionId: input.streamSessionId,
|
|
transportId: input.transportId,
|
|
cameraDeviceId: input.cameraDeviceId,
|
|
kind: input.kind,
|
|
rtpParameters: input.rtpParameters,
|
|
createdAt: new Date().toISOString(),
|
|
};
|
|
runtime.producers.set(producerId, producer);
|
|
runtime.producerDescriptors.set(producerId, producerDescriptor);
|
|
return producerDescriptor;
|
|
}
|
|
|
|
async consume(input: SfuConsumeInput): Promise<SfuConsumerDescriptor> {
|
|
const runtime = this.getRuntime(input.streamSessionId);
|
|
const transport = runtime.transports.get(input.transportId);
|
|
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
|
if (!transport || !descriptor) throw new Error('Subscribe transport not found');
|
|
if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
|
|
if (descriptor.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport');
|
|
if (descriptor.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming');
|
|
|
|
const producerId =
|
|
input.producerId ??
|
|
Array.from(runtime.producerDescriptors.values())
|
|
.slice()
|
|
.reverse()
|
|
.find((producer) => producer.kind === 'video')?.producerId;
|
|
if (!producerId) throw new Error('No producer available for consume');
|
|
|
|
const producer = runtime.producers.get(producerId);
|
|
const producerDescriptor = runtime.producerDescriptors.get(producerId);
|
|
if (!producer || !producerDescriptor) throw new Error('Producer not found');
|
|
if (!runtime.router.canConsume({ producerId: producer.id ?? producerId, rtpCapabilities: input.rtpCapabilities ?? {} })) {
|
|
throw new Error('Router cannot consume with provided RTP capabilities');
|
|
}
|
|
|
|
const consumer = await transport.consume({
|
|
producerId: producer.id ?? producerId,
|
|
rtpCapabilities: input.rtpCapabilities ?? {},
|
|
paused: false,
|
|
appData: { viewerDeviceId: input.viewerDeviceId },
|
|
});
|
|
|
|
const consumerId = consumer.id ?? `cons_${randomUUID()}`;
|
|
const consumerDescriptor: SfuConsumerDescriptor = {
|
|
consumerId,
|
|
streamSessionId: input.streamSessionId,
|
|
transportId: input.transportId,
|
|
viewerDeviceId: input.viewerDeviceId,
|
|
producerId,
|
|
kind: producerDescriptor.kind,
|
|
rtpParameters: consumer.rtpParameters ?? producerDescriptor.rtpParameters,
|
|
createdAt: new Date().toISOString(),
|
|
};
|
|
runtime.consumers.set(consumerId, consumer);
|
|
runtime.consumerDescriptors.set(consumerId, consumerDescriptor);
|
|
return consumerDescriptor;
|
|
}
|
|
}
|
|
|