feat(streams): add phase-2 SFU transport handshake and produce/consume APIs

This commit is contained in:
2026-02-08 16:35:00 +00:00
parent 8eed0df577
commit 8fc7302a58
5 changed files with 529 additions and 33 deletions

View File

@@ -3,6 +3,12 @@ import { randomUUID } from 'crypto';
import { mediaConfig } from '../config';
import { SfuSessionRegistry } from './registry';
import type {
SfuConnectTransportInput,
SfuConsumeInput,
SfuConsumerDescriptor,
SfuIceServer,
SfuProduceInput,
SfuProducerDescriptor,
SfuPublishTransportRequest,
SfuPublishTransportResult,
SfuService,
@@ -10,9 +16,10 @@ import type {
SfuSessionStartInput,
SfuSubscribeTransportRequest,
SfuSubscribeTransportResult,
SfuTransportDescriptor,
} from './types';
const toIceServers = (): Array<{ urls: string; username?: string; credential?: string }> => {
const toIceServers = (): SfuIceServer[] => {
if (mediaConfig.turn.urls.length === 0) {
return [];
}
@@ -61,17 +68,115 @@ export class NoopSfuService implements SfuService {
return this.registry.list();
}
async createPublishTransport(_input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> {
async listTransports(streamSessionId: string): Promise<SfuTransportDescriptor[]> {
return this.registry.listTransports(streamSessionId);
}
async listProducers(streamSessionId: string): Promise<SfuProducerDescriptor[]> {
return this.registry.listProducers(streamSessionId);
}
async listConsumers(streamSessionId: string): Promise<SfuConsumerDescriptor[]> {
return this.registry.listConsumers(streamSessionId);
}
async createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> {
const transportId = `pub_${randomUUID()}`;
this.registry.addTransport({
transportId,
streamSessionId: input.streamSessionId,
ownerDeviceId: input.cameraDeviceId,
direction: 'publish',
});
return {
transportId: `pub_${randomUUID()}`,
transportId,
iceServers: toIceServers(),
};
}
async createSubscribeTransport(_input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult> {
async createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult> {
const transportId = `sub_${randomUUID()}`;
this.registry.addTransport({
transportId,
streamSessionId: input.streamSessionId,
ownerDeviceId: input.viewerDeviceId,
direction: 'subscribe',
});
return {
transportId: `sub_${randomUUID()}`,
transportId,
iceServers: toIceServers(),
};
}
async connectPublishTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
const transport = this.registry.getTransport(input.transportId);
if (!transport) throw new Error('Publish transport not found');
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport');
if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this publish transport');
const connected = this.registry.connectTransport(input.transportId);
if (!connected) throw new Error('Publish transport connect failed');
return connected;
}
async connectSubscribeTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor> {
const transport = this.registry.getTransport(input.transportId);
if (!transport) throw new Error('Subscribe transport not found');
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
if (transport.ownerDeviceId !== input.deviceId) throw new Error('Device does not own this subscribe transport');
const connected = this.registry.connectTransport(input.transportId);
if (!connected) throw new Error('Subscribe transport connect failed');
return connected;
}
async produce(input: SfuProduceInput): Promise<SfuProducerDescriptor> {
const transport = this.registry.getTransport(input.transportId);
if (!transport) throw new Error('Publish transport not found');
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
if (transport.direction !== 'publish') throw new Error('Transport is not a publish transport');
if (transport.ownerDeviceId !== input.cameraDeviceId) throw new Error('Device does not own this publish transport');
if (transport.state !== 'connected') throw new Error('Publish transport must be connected before producing');
return this.registry.addProducer({
producerId: `prod_${randomUUID()}`,
streamSessionId: input.streamSessionId,
transportId: input.transportId,
cameraDeviceId: input.cameraDeviceId,
kind: input.kind,
rtpParameters: input.rtpParameters,
});
}
async consume(input: SfuConsumeInput): Promise<SfuConsumerDescriptor> {
const transport = this.registry.getTransport(input.transportId);
if (!transport) throw new Error('Subscribe transport not found');
if (transport.streamSessionId !== input.streamSessionId) throw new Error('Transport does not belong to stream');
if (transport.direction !== 'subscribe') throw new Error('Transport is not a subscribe transport');
if (transport.ownerDeviceId !== input.viewerDeviceId) throw new Error('Device does not own this subscribe transport');
if (transport.state !== 'connected') throw new Error('Subscribe transport must be connected before consuming');
const selectedProducer =
(input.producerId ? this.registry.getProducer(input.producerId) : null) ??
this.registry
.listProducers(input.streamSessionId)
.slice()
.reverse()
.find((producer) => producer.kind === 'video');
if (!selectedProducer) throw new Error('No producer available for consume');
if (selectedProducer.streamSessionId !== input.streamSessionId) throw new Error('Producer does not belong to stream');
return this.registry.addConsumer({
consumerId: `cons_${randomUUID()}`,
streamSessionId: input.streamSessionId,
transportId: input.transportId,
viewerDeviceId: input.viewerDeviceId,
producerId: selectedProducer.producerId,
kind: selectedProducer.kind,
rtpParameters: selectedProducer.rtpParameters,
});
}
}

View File

@@ -1,11 +1,24 @@
import type { SfuSessionDescriptor, SfuSessionState } from './types';
import type {
SfuConsumerDescriptor,
SfuMediaKind,
SfuProducerDescriptor,
SfuSessionDescriptor,
SfuSessionState,
SfuTransportDescriptor,
SfuTransportDirection,
} from './types';
type StoredSfuSession = SfuSessionDescriptor & {
updatedAt: string;
};
const nowIso = (): string => new Date().toISOString();
export class SfuSessionRegistry {
private readonly sessions = new Map<string, StoredSfuSession>();
private readonly transports = new Map<string, SfuTransportDescriptor>();
private readonly producers = new Map<string, SfuProducerDescriptor>();
private readonly consumers = new Map<string, SfuConsumerDescriptor>();
get(streamSessionId: string): SfuSessionDescriptor | null {
const found = this.sessions.get(streamSessionId);
@@ -15,8 +28,7 @@ export class SfuSessionRegistry {
}
set(session: SfuSessionDescriptor): SfuSessionDescriptor {
const now = new Date().toISOString();
this.sessions.set(session.streamSessionId, { ...session, updatedAt: now });
this.sessions.set(session.streamSessionId, { ...session, updatedAt: nowIso() });
return session;
}
@@ -24,11 +36,7 @@ export class SfuSessionRegistry {
const existing = this.sessions.get(streamSessionId);
if (!existing) return null;
const next: StoredSfuSession = {
...existing,
state,
updatedAt: new Date().toISOString(),
};
const next: StoredSfuSession = { ...existing, state, updatedAt: nowIso() };
this.sessions.set(streamSessionId, next);
const { updatedAt: _updatedAt, ...descriptor } = next;
return descriptor;
@@ -37,5 +45,97 @@ export class SfuSessionRegistry {
list(): SfuSessionDescriptor[] {
return Array.from(this.sessions.values()).map(({ updatedAt: _updatedAt, ...descriptor }) => descriptor);
}
addTransport(input: {
transportId: string;
streamSessionId: string;
ownerDeviceId: string;
direction: SfuTransportDirection;
}): SfuTransportDescriptor {
const descriptor: SfuTransportDescriptor = {
transportId: input.transportId,
streamSessionId: input.streamSessionId,
ownerDeviceId: input.ownerDeviceId,
direction: input.direction,
state: 'new',
createdAt: nowIso(),
};
this.transports.set(descriptor.transportId, descriptor);
return descriptor;
}
getTransport(transportId: string): SfuTransportDescriptor | null {
return this.transports.get(transportId) ?? null;
}
listTransports(streamSessionId: string): SfuTransportDescriptor[] {
return Array.from(this.transports.values()).filter((transport) => transport.streamSessionId === streamSessionId);
}
connectTransport(transportId: string): SfuTransportDescriptor | null {
const existing = this.transports.get(transportId);
if (!existing) return null;
const next: SfuTransportDescriptor = { ...existing, state: 'connected' };
this.transports.set(transportId, next);
return next;
}
addProducer(input: {
producerId: string;
streamSessionId: string;
transportId: string;
cameraDeviceId: string;
kind: SfuMediaKind;
rtpParameters: Record<string, unknown>;
}): SfuProducerDescriptor {
const descriptor: SfuProducerDescriptor = {
producerId: input.producerId,
streamSessionId: input.streamSessionId,
transportId: input.transportId,
cameraDeviceId: input.cameraDeviceId,
kind: input.kind,
rtpParameters: input.rtpParameters,
createdAt: nowIso(),
};
this.producers.set(descriptor.producerId, descriptor);
return descriptor;
}
getProducer(producerId: string): SfuProducerDescriptor | null {
return this.producers.get(producerId) ?? null;
}
listProducers(streamSessionId: string): SfuProducerDescriptor[] {
return Array.from(this.producers.values())
.filter((producer) => producer.streamSessionId === streamSessionId)
.sort((left, right) => left.createdAt.localeCompare(right.createdAt));
}
addConsumer(input: {
consumerId: string;
streamSessionId: string;
transportId: string;
viewerDeviceId: string;
producerId: string;
kind: SfuMediaKind;
rtpParameters: Record<string, unknown>;
}): SfuConsumerDescriptor {
const descriptor: SfuConsumerDescriptor = {
consumerId: input.consumerId,
streamSessionId: input.streamSessionId,
transportId: input.transportId,
viewerDeviceId: input.viewerDeviceId,
producerId: input.producerId,
kind: input.kind,
rtpParameters: input.rtpParameters,
createdAt: nowIso(),
};
this.consumers.set(descriptor.consumerId, descriptor);
return descriptor;
}
listConsumers(streamSessionId: string): SfuConsumerDescriptor[] {
return Array.from(this.consumers.values()).filter((consumer) => consumer.streamSessionId === streamSessionId);
}
}

View File

@@ -1,4 +1,13 @@
export type SfuSessionState = 'idle' | 'starting' | 'live' | 'ending' | 'ended';
export type SfuTransportDirection = 'publish' | 'subscribe';
export type SfuTransportState = 'new' | 'connected';
export type SfuMediaKind = 'audio' | 'video';
export type SfuIceServer = {
urls: string;
username?: string;
credential?: string;
};
export type SfuSessionDescriptor = {
streamSessionId: string;
@@ -23,7 +32,7 @@ export type SfuPublishTransportRequest = {
export type SfuPublishTransportResult = {
transportId: string;
iceServers: Array<{ urls: string; username?: string; credential?: string }>;
iceServers: SfuIceServer[];
};
export type SfuSubscribeTransportRequest = {
@@ -33,7 +42,60 @@ export type SfuSubscribeTransportRequest = {
export type SfuSubscribeTransportResult = {
transportId: string;
iceServers: Array<{ urls: string; username?: string; credential?: string }>;
iceServers: SfuIceServer[];
};
export type SfuTransportDescriptor = {
transportId: string;
streamSessionId: string;
ownerDeviceId: string;
direction: SfuTransportDirection;
state: SfuTransportState;
createdAt: string;
};
export type SfuProducerDescriptor = {
producerId: string;
streamSessionId: string;
transportId: string;
cameraDeviceId: string;
kind: SfuMediaKind;
rtpParameters: Record<string, unknown>;
createdAt: string;
};
export type SfuConsumerDescriptor = {
consumerId: string;
streamSessionId: string;
transportId: string;
viewerDeviceId: string;
producerId: string;
kind: SfuMediaKind;
rtpParameters: Record<string, unknown>;
createdAt: string;
};
export type SfuConnectTransportInput = {
streamSessionId: string;
transportId: string;
deviceId: string;
dtlsParameters: Record<string, unknown>;
};
export type SfuProduceInput = {
streamSessionId: string;
transportId: string;
cameraDeviceId: string;
kind: SfuMediaKind;
rtpParameters: Record<string, unknown>;
};
export type SfuConsumeInput = {
streamSessionId: string;
transportId: string;
viewerDeviceId: string;
producerId?: string;
rtpCapabilities?: Record<string, unknown>;
};
export interface SfuService {
@@ -43,6 +105,13 @@ export interface SfuService {
endSession(streamSessionId: string): Promise<void>;
getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null>;
listSessions(): Promise<SfuSessionDescriptor[]>;
listTransports(streamSessionId: string): Promise<SfuTransportDescriptor[]>;
listProducers(streamSessionId: string): Promise<SfuProducerDescriptor[]>;
listConsumers(streamSessionId: string): Promise<SfuConsumerDescriptor[]>;
createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult>;
createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult>;
connectPublishTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor>;
connectSubscribeTransport(input: SfuConnectTransportInput): Promise<SfuTransportDescriptor>;
produce(input: SfuProduceInput): Promise<SfuProducerDescriptor>;
consume(input: SfuConsumeInput): Promise<SfuConsumerDescriptor>;
}