Revert "feat(streams): add phase-2 SFU transport handshake and produce/consume APIs"
This reverts commit 498a7f838b7747470b220701505c4bfbd3ea8cff.
This commit is contained in:
@@ -1,405 +0,0 @@
|
||||
import { randomUUID } from 'crypto';
|
||||
import { networkInterfaces } from 'os';
|
||||
|
||||
import { mediaConfig } from '../config';
|
||||
import type {
|
||||
SfuConnectTransportInput,
|
||||
SfuConsumeInput,
|
||||
SfuConsumerDescriptor,
|
||||
SfuProduceInput,
|
||||
SfuProducerDescriptor,
|
||||
SfuPublishTransportRequest,
|
||||
SfuPublishTransportResult,
|
||||
SfuService,
|
||||
SfuSessionDescriptor,
|
||||
SfuSessionStartInput,
|
||||
SfuSubscribeTransportRequest,
|
||||
SfuSubscribeTransportResult,
|
||||
SfuTransportDescriptor,
|
||||
SfuTransportOptions,
|
||||
} from './types';
|
||||
|
||||
type DynamicMediasoup = {
|
||||
createWorker: (opts?: Record<string, unknown>) => Promise<any>;
|
||||
};
|
||||
|
||||
type MediaSessionRuntime = {
|
||||
session: SfuSessionDescriptor;
|
||||
router: any;
|
||||
transports: Map<string, any>;
|
||||
transportDescriptors: Map<string, SfuTransportDescriptor>;
|
||||
producers: Map<string, any>;
|
||||
producerDescriptors: Map<string, SfuProducerDescriptor>;
|
||||
consumers: Map<string, any>;
|
||||
consumerDescriptors: Map<string, SfuConsumerDescriptor>;
|
||||
};
|
||||
|
||||
const parsePort = (value: string | undefined, fallback: number): number => {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isFinite(parsed)) return fallback;
|
||||
return parsed;
|
||||
};
|
||||
|
||||
const pickHostIpv4 = (): string | null => {
|
||||
const interfaces = networkInterfaces();
|
||||
for (const addresses of Object.values(interfaces)) {
|
||||
if (!addresses) continue;
|
||||
for (const address of addresses) {
|
||||
if (address.family === 'IPv4' && !address.internal) {
|
||||
return address.address;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
const resolveListenAddress = (): { ip: string; announcedAddress?: string } => {
|
||||
const configuredListenIp = (process.env.MEDIA_WEBRTC_LISTEN_IP ?? '').trim();
|
||||
const configuredAnnounced = process.env.MEDIA_WEBRTC_ANNOUNCED_IP?.trim();
|
||||
|
||||
if (configuredListenIp && configuredListenIp !== '0.0.0.0') {
|
||||
return {
|
||||
ip: configuredListenIp,
|
||||
...(configuredAnnounced ? { announcedAddress: configuredAnnounced } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
const discoveredIp = pickHostIpv4();
|
||||
if (!configuredAnnounced && configuredListenIp === '0.0.0.0') {
|
||||
console.warn(
|
||||
`[sfu] MEDIA_WEBRTC_LISTEN_IP is 0.0.0.0 without MEDIA_WEBRTC_ANNOUNCED_IP. ` +
|
||||
`Using ${discoveredIp ?? '127.0.0.1'} for ICE candidates. Configure both env vars for production.`,
|
||||
);
|
||||
}
|
||||
|
||||
const ip = discoveredIp ?? '127.0.0.1';
|
||||
return {
|
||||
ip,
|
||||
...(configuredAnnounced ? { announcedAddress: configuredAnnounced } : {}),
|
||||
};
|
||||
};
|
||||
|
||||
const toTransportOptions = (transport: any): SfuTransportOptions => ({
|
||||
id: transport.id,
|
||||
iceParameters: transport.iceParameters ?? {},
|
||||
iceCandidates: transport.iceCandidates ?? [],
|
||||
dtlsParameters: transport.dtlsParameters ?? {},
|
||||
...(transport.sctpParameters ? { sctpParameters: transport.sctpParameters } : {}),
|
||||
});
|
||||
|
||||
const mediasoupCodecs = [
|
||||
{
|
||||
kind: 'audio',
|
||||
mimeType: 'audio/opus',
|
||||
clockRate: 48000,
|
||||
channels: 2,
|
||||
},
|
||||
{
|
||||
kind: 'video',
|
||||
mimeType: 'video/VP8',
|
||||
clockRate: 90000,
|
||||
parameters: {},
|
||||
},
|
||||
];
|
||||
|
||||
export class MediasoupSfuService implements SfuService {
|
||||
mode: 'single_server_sfu' = 'single_server_sfu';
|
||||
private mediasoupPromise: Promise<DynamicMediasoup> | null = null;
|
||||
private workerPromise: Promise<any> | null = null;
|
||||
private readonly sessions = new Map<string, MediaSessionRuntime>();
|
||||
|
||||
private async getMediasoup(): Promise<DynamicMediasoup> {
|
||||
if (!this.mediasoupPromise) {
|
||||
this.mediasoupPromise = (new Function('return import("mediasoup")')() as Promise<DynamicMediasoup>).catch((error) => {
|
||||
throw new Error(`mediasoup package is required for MEDIA_SFU_ENGINE=mediasoup: ${error instanceof Error ? error.message : 'load failed'}`);
|
||||
});
|
||||
}
|
||||
return await this.mediasoupPromise;
|
||||
}
|
||||
|
||||
private async getWorker(): Promise<any> {
|
||||
if (!this.workerPromise) {
|
||||
this.workerPromise = (async () => {
|
||||
const mediasoup = await this.getMediasoup();
|
||||
const worker = await mediasoup.createWorker({
|
||||
logLevel: (process.env.MEDIA_SFU_LOG_LEVEL ?? 'warn') as 'debug' | 'warn' | 'error' | 'none',
|
||||
rtcMinPort: parsePort(process.env.MEDIA_RTC_MIN_PORT, 40000),
|
||||
rtcMaxPort: parsePort(process.env.MEDIA_RTC_MAX_PORT, 49999),
|
||||
});
|
||||
worker.on?.('died', () => {
|
||||
console.error('mediasoup worker died; clearing worker handle');
|
||||
this.workerPromise = null;
|
||||
});
|
||||
return worker;
|
||||
})();
|
||||
}
|
||||
return await this.workerPromise;
|
||||
}
|
||||
|
||||
private getRuntime(streamSessionId: string): MediaSessionRuntime {
|
||||
const runtime = this.sessions.get(streamSessionId);
|
||||
if (!runtime) {
|
||||
throw new Error('SFU session not initialized');
|
||||
}
|
||||
return runtime;
|
||||
}
|
||||
|
||||
private async createWebRtcTransport(router: any): Promise<any> {
|
||||
const listenAddress = resolveListenAddress();
|
||||
return await router.createWebRtcTransport({
|
||||
listenInfos: [
|
||||
{
|
||||
protocol: 'udp',
|
||||
ip: listenAddress.ip,
|
||||
...(listenAddress.announcedAddress ? { announcedAddress: listenAddress.announcedAddress } : {}),
|
||||
},
|
||||
{
|
||||
protocol: 'tcp',
|
||||
ip: listenAddress.ip,
|
||||
...(listenAddress.announcedAddress ? { announcedAddress: listenAddress.announcedAddress } : {}),
|
||||
},
|
||||
],
|
||||
enableUdp: true,
|
||||
enableTcp: true,
|
||||
preferUdp: true,
|
||||
});
|
||||
}
|
||||
|
||||
async startSession(input: SfuSessionStartInput): Promise<SfuSessionDescriptor> {
|
||||
const existing = this.sessions.get(input.streamSessionId);
|
||||
if (existing) {
|
||||
return existing.session;
|
||||
}
|
||||
|
||||
const worker = await this.getWorker();
|
||||
const router = await worker.createRouter({ mediaCodecs: mediasoupCodecs });
|
||||
const session: SfuSessionDescriptor = {
|
||||
streamSessionId: input.streamSessionId,
|
||||
ownerUserId: input.ownerUserId,
|
||||
cameraDeviceId: input.cameraDeviceId,
|
||||
requesterDeviceId: input.requesterDeviceId,
|
||||
state: 'starting',
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
this.sessions.set(input.streamSessionId, {
|
||||
session,
|
||||
router,
|
||||
transports: new Map(),
|
||||
transportDescriptors: new Map(),
|
||||
producers: new Map(),
|
||||
producerDescriptors: new Map(),
|
||||
consumers: new Map(),
|
||||
consumerDescriptors: new Map(),
|
||||
});
|
||||
return session;
|
||||
}
|
||||
|
||||
async setSessionState(streamSessionId: string, state: SfuSessionDescriptor['state']): Promise<void> {
|
||||
const runtime = this.getRuntime(streamSessionId);
|
||||
runtime.session = { ...runtime.session, state };
|
||||
}
|
||||
|
||||
async endSession(streamSessionId: string): Promise<void> {
|
||||
const runtime = this.sessions.get(streamSessionId);
|
||||
if (!runtime) return;
|
||||
|
||||
runtime.session = { ...runtime.session, state: 'ending' };
|
||||
for (const consumer of runtime.consumers.values()) {
|
||||
consumer.close?.();
|
||||
}
|
||||
for (const producer of runtime.producers.values()) {
|
||||
producer.close?.();
|
||||
}
|
||||
for (const transport of runtime.transports.values()) {
|
||||
transport.close?.();
|
||||
}
|
||||
runtime.router.close?.();
|
||||
runtime.session = { ...runtime.session, state: 'ended' };
|
||||
this.sessions.delete(streamSessionId);
|
||||
}
|
||||
|
||||
async getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null> {
|
||||
return this.sessions.get(streamSessionId)?.session ?? null;
|
||||
}
|
||||
|
||||
async getRouterRtpCapabilities(streamSessionId: string): Promise<Record<string, unknown> | null> {
|
||||
const runtime = this.sessions.get(streamSessionId);
|
||||
if (!runtime) return null;
|
||||
return runtime.router.rtpCapabilities ?? null;
|
||||
}
|
||||
|
||||
async listSessions(): Promise<SfuSessionDescriptor[]> {
|
||||
return Array.from(this.sessions.values()).map((runtime) => runtime.session);
|
||||
}
|
||||
|
||||
async listTransports(streamSessionId: string): Promise<SfuTransportDescriptor[]> {
|
||||
const runtime = this.sessions.get(streamSessionId);
|
||||
if (!runtime) return [];
|
||||
return Array.from(runtime.transportDescriptors.values());
|
||||
}
|
||||
|
||||
async listProducers(streamSessionId: string): Promise<SfuProducerDescriptor[]> {
|
||||
const runtime = this.sessions.get(streamSessionId);
|
||||
if (!runtime) return [];
|
||||
return Array.from(runtime.producerDescriptors.values());
|
||||
}
|
||||
|
||||
async listConsumers(streamSessionId: string): Promise<SfuConsumerDescriptor[]> {
|
||||
const runtime = this.sessions.get(streamSessionId);
|
||||
if (!runtime) return [];
|
||||
return Array.from(runtime.consumerDescriptors.values());
|
||||
}
|
||||
|
||||
async createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> {
|
||||
const runtime = this.getRuntime(input.streamSessionId);
|
||||
const transport = await this.createWebRtcTransport(runtime.router);
|
||||
const descriptor: SfuTransportDescriptor = {
|
||||
transportId: transport.id,
|
||||
streamSessionId: input.streamSessionId,
|
||||
ownerDeviceId: input.cameraDeviceId,
|
||||
direction: 'publish',
|
||||
state: 'new',
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
runtime.transports.set(transport.id, transport);
|
||||
runtime.transportDescriptors.set(transport.id, descriptor);
|
||||
return {
|
||||
transportId: transport.id,
|
||||
iceServers: mediaConfig.turn.urls.map((urls) => ({
|
||||
urls,
|
||||
...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}),
|
||||
...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}),
|
||||
})),
|
||||
transportOptions: toTransportOptions(transport),
|
||||
};
|
||||
}
|
||||
|
||||
async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult> {
|
||||
const runtime = this.getRuntime(input.streamSessionId);
|
||||
const transport = await this.createWebRtcTransport(runtime.router);
|
||||
const descriptor: SfuTransportDescriptor = {
|
||||
transportId: transport.id,
|
||||
streamSessionId: input.streamSessionId,
|
||||
ownerDeviceId: input.viewerDeviceId,
|
||||
direction: 'subscribe',
|
||||
state: 'new',
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
runtime.transports.set(transport.id, transport);
|
||||
runtime.transportDescriptors.set(transport.id, descriptor);
|
||||
return {
|
||||
transportId: transport.id,
|
||||
iceServers: mediaConfig.turn.urls.map((urls) => ({
|
||||
urls,
|
||||
...(mediaConfig.turn.username ? { username: mediaConfig.turn.username } : {}),
|
||||
...(mediaConfig.turn.credential ? { credential: mediaConfig.turn.credential } : {}),
|
||||
})),
|
||||
transportOptions: toTransportOptions(transport),
|
||||
};
|
||||
}
|
||||
|
||||
async connectPublishTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
|
||||
const runtime = this.getRuntime(input.streamSessionId);
|
||||
const transport = runtime.transports.get(input.transportId);
|
||||
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
||||
if (!transport || !descriptor) throw new Error('Publish transport not found');
|
||||
if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport');
|
||||
if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport');
|
||||
|
||||
await transport.connect({ dtlsParameters: input.dtlsParameters });
|
||||
const next = { ...descriptor, state: 'connected' as const };
|
||||
runtime.transportDescriptors.set(descriptor.transportId, next);
|
||||
return next;
|
||||
}
|
||||
|
||||
async connectSubscribeTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
|
||||
const runtime = this.getRuntime(input.streamSessionId);
|
||||
const transport = runtime.transports.get(input.transportId);
|
||||
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
||||
if (!transport || !descriptor) throw new Error('Subscribe transport not found');
|
||||
if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
|
||||
if (descriptor.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport');
|
||||
|
||||
await transport.connect({ dtlsParameters: input.dtlsParameters });
|
||||
const next = { ...descriptor, state: 'connected' as const };
|
||||
runtime.transportDescriptors.set(descriptor.transportId, next);
|
||||
return next;
|
||||
}
|
||||
|
||||
async produce(input: SfuProduceInput): Promise<SfuProducerDescriptor> {
|
||||
const runtime = this.getRuntime(input.streamSessionId);
|
||||
const transport = runtime.transports.get(input.transportId);
|
||||
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
||||
if (!transport || !descriptor) throw new Error('Publish transport not found');
|
||||
if (descriptor.direction !== 'publish') throw new Error('Transport is not a publish transport');
|
||||
if (descriptor.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport');
|
||||
if (descriptor.state !== 'connected') throw new Error('Publish transport must be connected before producing');
|
||||
|
||||
const producer = await transport.produce({
|
||||
kind: input.kind,
|
||||
rtpParameters: input.rtpParameters,
|
||||
appData: { cameraDeviceId: input.cameraDeviceId },
|
||||
});
|
||||
const producerId = producer.id ?? `prod_${randomUUID()}`;
|
||||
const producerDescriptor: SfuProducerDescriptor = {
|
||||
producerId,
|
||||
streamSessionId: input.streamSessionId,
|
||||
transportId: input.transportId,
|
||||
cameraDeviceId: input.cameraDeviceId,
|
||||
kind: input.kind,
|
||||
rtpParameters: input.rtpParameters,
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
runtime.producers.set(producerId, producer);
|
||||
runtime.producerDescriptors.set(producerId, producerDescriptor);
|
||||
return producerDescriptor;
|
||||
}
|
||||
|
||||
async consume(input: SfuConsumeInput): Promise<SfuConsumerDescriptor> {
|
||||
const runtime = this.getRuntime(input.streamSessionId);
|
||||
const transport = runtime.transports.get(input.transportId);
|
||||
const descriptor = runtime.transportDescriptors.get(input.transportId);
|
||||
if (!transport || !descriptor) throw new Error('Subscribe transport not found');
|
||||
if (descriptor.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
|
||||
if (descriptor.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport');
|
||||
if (descriptor.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming');
|
||||
|
||||
const producerId =
|
||||
input.producerId ??
|
||||
Array.from(runtime.producerDescriptors.values())
|
||||
.slice()
|
||||
.reverse()
|
||||
.find((producer) => producer.kind === 'video')?.producerId;
|
||||
if (!producerId) throw new Error('No producer available for consume');
|
||||
|
||||
const producer = runtime.producers.get(producerId);
|
||||
const producerDescriptor = runtime.producerDescriptors.get(producerId);
|
||||
if (!producer || !producerDescriptor) throw new Error('Producer not found');
|
||||
if (!runtime.router.canConsume({ producerId: producer.id ?? producerId, rtpCapabilities: input.rtpCapabilities ?? {} })) {
|
||||
throw new Error('Router cannot consume with provided RTP capabilities');
|
||||
}
|
||||
|
||||
const consumer = await transport.consume({
|
||||
producerId: producer.id ?? producerId,
|
||||
rtpCapabilities: input.rtpCapabilities ?? {},
|
||||
paused: false,
|
||||
appData: { viewerDeviceId: input.viewerDeviceId },
|
||||
});
|
||||
|
||||
const consumerId = consumer.id ?? `cons_${randomUUID()}`;
|
||||
const consumerDescriptor: SfuConsumerDescriptor = {
|
||||
consumerId,
|
||||
streamSessionId: input.streamSessionId,
|
||||
transportId: input.transportId,
|
||||
viewerDeviceId: input.viewerDeviceId,
|
||||
producerId,
|
||||
kind: producerDescriptor.kind,
|
||||
rtpParameters: consumer.rtpParameters ?? producerDescriptor.rtpParameters,
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
runtime.consumers.set(consumerId, consumer);
|
||||
runtime.consumerDescriptors.set(consumerId, consumerDescriptor);
|
||||
return consumerDescriptor;
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,6 @@ import { randomUUID } from 'crypto';
|
||||
import { mediaConfig } from '../config';
|
||||
import { SfuSessionRegistry } from './registry';
|
||||
import type {
|
||||
SfuConnectTransportInput,
|
||||
SfuConsumeInput,
|
||||
SfuConsumerDescriptor,
|
||||
SfuIceServer,
|
||||
SfuProduceInput,
|
||||
SfuProducerDescriptor,
|
||||
SfuPublishTransportRequest,
|
||||
SfuPublishTransportResult,
|
||||
SfuService,
|
||||
@@ -16,10 +10,9 @@ import type {
|
||||
SfuSessionStartInput,
|
||||
SfuSubscribeTransportRequest,
|
||||
SfuSubscribeTransportResult,
|
||||
SfuTransportDescriptor,
|
||||
} from './types';
|
||||
|
||||
const toIceServers = (): SfuIceServer[] => {
|
||||
const toIceServers = (): Array<{ urls: string; username?: string; credential?: string }> => {
|
||||
if (mediaConfig.turn.urls.length === 0) {
|
||||
return [];
|
||||
}
|
||||
@@ -64,138 +57,21 @@ export class NoopSfuService implements SfuService {
|
||||
return this.registry.get(streamSessionId);
|
||||
}
|
||||
|
||||
async getRouterRtpCapabilities(_streamSessionId: string): Promise<Record<string, unknown> | null> {
|
||||
return {
|
||||
codecs: [{ mimeType: 'video/VP8', clockRate: 90000, kind: 'video' }],
|
||||
headerExtensions: [],
|
||||
};
|
||||
}
|
||||
|
||||
async listSessions(): Promise<SfuSessionDescriptor[]> {
|
||||
return this.registry.list();
|
||||
}
|
||||
|
||||
async listTransports(streamSessionId: string): Promise<SfuTransportDescriptor[]> {
|
||||
return this.registry.listTransports(streamSessionId);
|
||||
}
|
||||
|
||||
async listProducers(streamSessionId: string): Promise<SfuProducerDescriptor[]> {
|
||||
return this.registry.listProducers(streamSessionId);
|
||||
}
|
||||
|
||||
async listConsumers(streamSessionId: string): Promise<SfuConsumerDescriptor[]> {
|
||||
return this.registry.listConsumers(streamSessionId);
|
||||
}
|
||||
|
||||
async createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> {
|
||||
const transportId = `pub_${randomUUID()}`;
|
||||
this.registry.addTransport({
|
||||
transportId,
|
||||
streamSessionId: input.streamSessionId,
|
||||
ownerDeviceId: input.cameraDeviceId,
|
||||
direction: 'publish',
|
||||
});
|
||||
async createPublishTransport(_input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> {
|
||||
return {
|
||||
transportId,
|
||||
transportId: `pub_${randomUUID()}`,
|
||||
iceServers: toIceServers(),
|
||||
transportOptions: {
|
||||
id: transportId,
|
||||
iceParameters: {},
|
||||
iceCandidates: [],
|
||||
dtlsParameters: {},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult> {
|
||||
const transportId = `sub_${randomUUID()}`;
|
||||
this.registry.addTransport({
|
||||
transportId,
|
||||
streamSessionId: input.streamSessionId,
|
||||
ownerDeviceId: input.viewerDeviceId,
|
||||
direction: 'subscribe',
|
||||
});
|
||||
async createSubscribeTransport(_input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult> {
|
||||
return {
|
||||
transportId,
|
||||
transportId: `sub_${randomUUID()}`,
|
||||
iceServers: toIceServers(),
|
||||
transportOptions: {
|
||||
id: transportId,
|
||||
iceParameters: {},
|
||||
iceCandidates: [],
|
||||
dtlsParameters: {},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async connectPublishTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
|
||||
const transport = this.registry.getTransport(input.transportId);
|
||||
if (!transport) throw new Error('Publish transport not found');
|
||||
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
|
||||
if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport');
|
||||
if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport');
|
||||
|
||||
const connected = this.registry.connectTransport(input.transportId);
|
||||
if (!connected) throw new Error('Publish transport connect failed');
|
||||
return connected;
|
||||
}
|
||||
|
||||
async connectSubscribeTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
|
||||
const transport = this.registry.getTransport(input.transportId);
|
||||
if (!transport) throw new Error('Subscribe transport not found');
|
||||
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
|
||||
if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
|
||||
if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport');
|
||||
|
||||
const connected = this.registry.connectTransport(input.transportId);
|
||||
if (!connected) throw new Error('Subscribe transport connect failed');
|
||||
return connected;
|
||||
}
|
||||
|
||||
async produce(input: SfuProduceInput): Promise<SfuProducerDescriptor> {
|
||||
const transport = this.registry.getTransport(input.transportId);
|
||||
if (!transport) throw new Error('Publish transport not found');
|
||||
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
|
||||
if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport');
|
||||
if (transport.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport');
|
||||
if (transport.state !== 'connected') throw new Error('Publish transport must be connected before producing');
|
||||
|
||||
return this.registry.addProducer({
|
||||
producerId: `prod_${randomUUID()}`,
|
||||
streamSessionId: input.streamSessionId,
|
||||
transportId: input.transportId,
|
||||
cameraDeviceId: input.cameraDeviceId,
|
||||
kind: input.kind,
|
||||
rtpParameters: input.rtpParameters,
|
||||
});
|
||||
}
|
||||
|
||||
async consume(input: SfuConsumeInput): Promise<SfuConsumerDescriptor> {
|
||||
const transport = this.registry.getTransport(input.transportId);
|
||||
if (!transport) throw new Error('Subscribe transport not found');
|
||||
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
|
||||
if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
|
||||
if (transport.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport');
|
||||
if (transport.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming');
|
||||
|
||||
const selectedProducer =
|
||||
(input.producerId ? this.registry.getProducer(input.producerId) : null) ??
|
||||
this.registry
|
||||
.listProducers(input.streamSessionId)
|
||||
.slice()
|
||||
.reverse()
|
||||
.find((producer) => producer.kind === 'video');
|
||||
|
||||
if (!selectedProducer) throw new Error('No producer available for consume');
|
||||
if (selectedProducer.streamSessionId !== input.streamSessionId) throw new Error('Producer does not belong to stream');
|
||||
|
||||
return this.registry.addConsumer({
|
||||
consumerId: `cons_${randomUUID()}`,
|
||||
streamSessionId: input.streamSessionId,
|
||||
transportId: input.transportId,
|
||||
viewerDeviceId: input.viewerDeviceId,
|
||||
producerId: selectedProducer.producerId,
|
||||
kind: selectedProducer.kind,
|
||||
rtpParameters: selectedProducer.rtpParameters,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,24 +1,11 @@
|
||||
import type {
|
||||
SfuConsumerDescriptor,
|
||||
SfuMediaKind,
|
||||
SfuProducerDescriptor,
|
||||
SfuSessionDescriptor,
|
||||
SfuSessionState,
|
||||
SfuTransportDescriptor,
|
||||
SfuTransportDirection,
|
||||
} from './types';
|
||||
import type { SfuSessionDescriptor, SfuSessionState } from './types';
|
||||
|
||||
type StoredSfuSession = SfuSessionDescriptor & {
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
const nowIso = (): string => new Date().toISOString();
|
||||
|
||||
export class SfuSessionRegistry {
|
||||
private readonly sessions = new Map<string, StoredSfuSession>();
|
||||
private readonly transports = new Map<string, SfuTransportDescriptor>();
|
||||
private readonly producers = new Map<string, SfuProducerDescriptor>();
|
||||
private readonly consumers = new Map<string, SfuConsumerDescriptor>();
|
||||
|
||||
get(streamSessionId: string): SfuSessionDescriptor | null {
|
||||
const found = this.sessions.get(streamSessionId);
|
||||
@@ -28,7 +15,8 @@ export class SfuSessionRegistry {
|
||||
}
|
||||
|
||||
set(session: SfuSessionDescriptor): SfuSessionDescriptor {
|
||||
this.sessions.set(session.streamSessionId, { ...session, updatedAt: nowIso() });
|
||||
const now = new Date().toISOString();
|
||||
this.sessions.set(session.streamSessionId, { ...session, updatedAt: now });
|
||||
return session;
|
||||
}
|
||||
|
||||
@@ -36,7 +24,11 @@ export class SfuSessionRegistry {
|
||||
const existing = this.sessions.get(streamSessionId);
|
||||
if (!existing) return null;
|
||||
|
||||
const next: StoredSfuSession = { ...existing, state, updatedAt: nowIso() };
|
||||
const next: StoredSfuSession = {
|
||||
...existing,
|
||||
state,
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
this.sessions.set(streamSessionId, next);
|
||||
const { updatedAt: _updatedAt, ...descriptor } = next;
|
||||
return descriptor;
|
||||
@@ -45,97 +37,5 @@ export class SfuSessionRegistry {
|
||||
list(): SfuSessionDescriptor[] {
|
||||
return Array.from(this.sessions.values()).map(({ updatedAt: _updatedAt, ...descriptor }) => descriptor);
|
||||
}
|
||||
|
||||
addTransport(input: {
|
||||
transportId: string;
|
||||
streamSessionId: string;
|
||||
ownerDeviceId: string;
|
||||
direction: SfuTransportDirection;
|
||||
}): SfuTransportDescriptor {
|
||||
const descriptor: SfuTransportDescriptor = {
|
||||
transportId: input.transportId,
|
||||
streamSessionId: input.streamSessionId,
|
||||
ownerDeviceId: input.ownerDeviceId,
|
||||
direction: input.direction,
|
||||
state: 'new',
|
||||
createdAt: nowIso(),
|
||||
};
|
||||
this.transports.set(descriptor.transportId, descriptor);
|
||||
return descriptor;
|
||||
}
|
||||
|
||||
getTransport(transportId: string): SfuTransportDescriptor | null {
|
||||
return this.transports.get(transportId) ?? null;
|
||||
}
|
||||
|
||||
listTransports(streamSessionId: string): SfuTransportDescriptor[] {
|
||||
return Array.from(this.transports.values()).filter((transport) => transport.streamSessionId === streamSessionId);
|
||||
}
|
||||
|
||||
connectTransport(transportId: string): SfuTransportDescriptor | null {
|
||||
const existing = this.transports.get(transportId);
|
||||
if (!existing) return null;
|
||||
const next: SfuTransportDescriptor = { ...existing, state: 'connected' };
|
||||
this.transports.set(transportId, next);
|
||||
return next;
|
||||
}
|
||||
|
||||
addProducer(input: {
|
||||
producerId: string;
|
||||
streamSessionId: string;
|
||||
transportId: string;
|
||||
cameraDeviceId: string;
|
||||
kind: SfuMediaKind;
|
||||
rtpParameters: Record<string, unknown>;
|
||||
}): SfuProducerDescriptor {
|
||||
const descriptor: SfuProducerDescriptor = {
|
||||
producerId: input.producerId,
|
||||
streamSessionId: input.streamSessionId,
|
||||
transportId: input.transportId,
|
||||
cameraDeviceId: input.cameraDeviceId,
|
||||
kind: input.kind,
|
||||
rtpParameters: input.rtpParameters,
|
||||
createdAt: nowIso(),
|
||||
};
|
||||
this.producers.set(descriptor.producerId, descriptor);
|
||||
return descriptor;
|
||||
}
|
||||
|
||||
getProducer(producerId: string): SfuProducerDescriptor | null {
|
||||
return this.producers.get(producerId) ?? null;
|
||||
}
|
||||
|
||||
listProducers(streamSessionId: string): SfuProducerDescriptor[] {
|
||||
return Array.from(this.producers.values())
|
||||
.filter((producer) => producer.streamSessionId === streamSessionId)
|
||||
.sort((left, right) => left.createdAt.localeCompare(right.createdAt));
|
||||
}
|
||||
|
||||
addConsumer(input: {
|
||||
consumerId: string;
|
||||
streamSessionId: string;
|
||||
transportId: string;
|
||||
viewerDeviceId: string;
|
||||
producerId: string;
|
||||
kind: SfuMediaKind;
|
||||
rtpParameters: Record<string, unknown>;
|
||||
}): SfuConsumerDescriptor {
|
||||
const descriptor: SfuConsumerDescriptor = {
|
||||
consumerId: input.consumerId,
|
||||
streamSessionId: input.streamSessionId,
|
||||
transportId: input.transportId,
|
||||
viewerDeviceId: input.viewerDeviceId,
|
||||
producerId: input.producerId,
|
||||
kind: input.kind,
|
||||
rtpParameters: input.rtpParameters,
|
||||
createdAt: nowIso(),
|
||||
};
|
||||
this.consumers.set(descriptor.consumerId, descriptor);
|
||||
return descriptor;
|
||||
}
|
||||
|
||||
listConsumers(streamSessionId: string): SfuConsumerDescriptor[] {
|
||||
return Array.from(this.consumers.values()).filter((consumer) => consumer.streamSessionId === streamSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,20 +1,14 @@
|
||||
import { mediaMode } from '../config';
|
||||
import { MediasoupSfuService } from './mediasoup';
|
||||
import { NoopSfuService } from './noop';
|
||||
import type { SfuService } from './types';
|
||||
|
||||
const sfuEngine = (process.env.MEDIA_SFU_ENGINE ?? 'mediasoup').trim().toLowerCase();
|
||||
|
||||
const createSfuService = (): SfuService | null => {
|
||||
if (mediaMode !== 'single_server_sfu') {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (sfuEngine === 'noop') {
|
||||
return new NoopSfuService();
|
||||
}
|
||||
|
||||
return new MediasoupSfuService();
|
||||
return new NoopSfuService();
|
||||
};
|
||||
|
||||
export const sfuService = createSfuService();
|
||||
|
||||
|
||||
@@ -1,21 +1,4 @@
|
||||
export type SfuSessionState = 'idle' | 'starting' | 'live' | 'ending' | 'ended';
|
||||
export type SfuTransportDirection = 'publish' | 'subscribe';
|
||||
export type SfuTransportState = 'new' | 'connected';
|
||||
export type SfuMediaKind = 'audio' | 'video';
|
||||
|
||||
export type SfuIceServer = {
|
||||
urls: string;
|
||||
username?: string;
|
||||
credential?: string;
|
||||
};
|
||||
|
||||
export type SfuTransportOptions = {
|
||||
id: string;
|
||||
iceParameters: Record<string, unknown>;
|
||||
iceCandidates: Array<Record<string, unknown>>;
|
||||
dtlsParameters: Record<string, unknown>;
|
||||
sctpParameters?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type SfuSessionDescriptor = {
|
||||
streamSessionId: string;
|
||||
@@ -40,8 +23,7 @@ export type SfuPublishTransportRequest = {
|
||||
|
||||
export type SfuPublishTransportResult = {
|
||||
transportId: string;
|
||||
iceServers: SfuIceServer[];
|
||||
transportOptions?: SfuTransportOptions;
|
||||
iceServers: Array<{ urls: string; username?: string; credential?: string }>;
|
||||
};
|
||||
|
||||
export type SfuSubscribeTransportRequest = {
|
||||
@@ -51,61 +33,7 @@ export type SfuSubscribeTransportRequest = {
|
||||
|
||||
export type SfuSubscribeTransportResult = {
|
||||
transportId: string;
|
||||
iceServers: SfuIceServer[];
|
||||
transportOptions?: SfuTransportOptions;
|
||||
};
|
||||
|
||||
export type SfuTransportDescriptor = {
|
||||
transportId: string;
|
||||
streamSessionId: string;
|
||||
ownerDeviceId: string;
|
||||
direction: SfuTransportDirection;
|
||||
state: SfuTransportState;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
export type SfuProducerDescriptor = {
|
||||
producerId: string;
|
||||
streamSessionId: string;
|
||||
transportId: string;
|
||||
cameraDeviceId: string;
|
||||
kind: SfuMediaKind;
|
||||
rtpParameters: Record<string, unknown>;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
export type SfuConsumerDescriptor = {
|
||||
consumerId: string;
|
||||
streamSessionId: string;
|
||||
transportId: string;
|
||||
viewerDeviceId: string;
|
||||
producerId: string;
|
||||
kind: SfuMediaKind;
|
||||
rtpParameters: Record<string, unknown>;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
export type SfuConnectTransportInput = {
|
||||
streamSessionId: string;
|
||||
transportId: string;
|
||||
deviceId: string;
|
||||
dtlsParameters: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type SfuProduceInput = {
|
||||
streamSessionId: string;
|
||||
transportId: string;
|
||||
cameraDeviceId: string;
|
||||
kind: SfuMediaKind;
|
||||
rtpParameters: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type SfuConsumeInput = {
|
||||
streamSessionId: string;
|
||||
transportId: string;
|
||||
viewerDeviceId: string;
|
||||
producerId?: string;
|
||||
rtpCapabilities?: Record<string, unknown>;
|
||||
iceServers: Array<{ urls: string; username?: string; credential?: string }>;
|
||||
};
|
||||
|
||||
export interface SfuService {
|
||||
@@ -114,15 +42,7 @@ export interface SfuService {
|
||||
setSessionState(streamSessionId: string, state: SfuSessionState): Promise<void>;
|
||||
endSession(streamSessionId: string): Promise<void>;
|
||||
getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null>;
|
||||
getRouterRtpCapabilities(streamSessionId: string): Promise<Record<string, unknown> | null>;
|
||||
listSessions(): Promise<SfuSessionDescriptor[]>;
|
||||
listTransports(streamSessionId: string): Promise<SfuTransportDescriptor[]>;
|
||||
listProducers(streamSessionId: string): Promise<SfuProducerDescriptor[]>;
|
||||
listConsumers(streamSessionId: string): Promise<SfuConsumerDescriptor[]>;
|
||||
createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult>;
|
||||
createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult>;
|
||||
connectPublishTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor>;
|
||||
connectSubscribeTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor>;
|
||||
produce(input: SfuProduceInput): Promise<SfuProducerDescriptor>;
|
||||
consume(input: SfuConsumeInput): Promise<SfuConsumerDescriptor>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user