feat(streams): add phase-1 single-server SFU session and transport APIs

This commit is contained in:
2026-02-08 11:45:00 +00:00
parent 20373f411f
commit 1c8256bf57
5 changed files with 257 additions and 58 deletions

View File

@@ -1,6 +1,7 @@
import { randomUUID } from 'crypto'; import { randomUUID } from 'crypto';
import { mediaConfig } from '../config'; import { mediaConfig } from '../config';
import { SfuSessionRegistry } from './registry';
import type { import type {
SfuPublishTransportRequest, SfuPublishTransportRequest,
SfuPublishTransportResult, SfuPublishTransportResult,
@@ -25,11 +26,11 @@ const toIceServers = (): Array<{ urls: string; username?: string; credential?: s
export class NoopSfuService implements SfuService { export class NoopSfuService implements SfuService {
mode: 'single_server_sfu' = 'single_server_sfu'; mode: 'single_server_sfu' = 'single_server_sfu';
private readonly sessions = new Map<string, SfuSessionDescriptor>(); private readonly registry = new SfuSessionRegistry();
async startSession(input: SfuSessionStartInput): Promise<SfuSessionDescriptor> { async startSession(input: SfuSessionStartInput): Promise<SfuSessionDescriptor> {
const now = new Date().toISOString(); const now = new Date().toISOString();
const existing = this.sessions.get(input.streamSessionId); const existing = this.registry.get(input.streamSessionId);
if (existing) return existing; if (existing) return existing;
const descriptor: SfuSessionDescriptor = { const descriptor: SfuSessionDescriptor = {
@@ -40,18 +41,24 @@ export class NoopSfuService implements SfuService {
state: 'starting', state: 'starting',
createdAt: now, createdAt: now,
}; };
this.sessions.set(input.streamSessionId, descriptor); return this.registry.set(descriptor);
return descriptor; }
async setSessionState(streamSessionId: string, state: SfuSessionDescriptor['state']): Promise<void> {
this.registry.updateState(streamSessionId, state);
} }
async endSession(streamSessionId: string): Promise<void> { async endSession(streamSessionId: string): Promise<void> {
const existing = this.sessions.get(streamSessionId); this.registry.updateState(streamSessionId, 'ending');
if (!existing) return; this.registry.updateState(streamSessionId, 'ended');
this.sessions.set(streamSessionId, { ...existing, state: 'ended' });
} }
async getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null> { async getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null> {
return this.sessions.get(streamSessionId) ?? null; return this.registry.get(streamSessionId);
}
async listSessions(): Promise<SfuSessionDescriptor[]> {
return this.registry.list();
} }
async createPublishTransport(_input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> { async createPublishTransport(_input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult> {
@@ -68,4 +75,3 @@ export class NoopSfuService implements SfuService {
}; };
} }
} }

View File

@@ -0,0 +1,41 @@
import type { SfuSessionDescriptor, SfuSessionState } from './types';
type StoredSfuSession = SfuSessionDescriptor & {
updatedAt: string;
};
export class SfuSessionRegistry {
private readonly sessions = new Map<string, StoredSfuSession>();
get(streamSessionId: string): SfuSessionDescriptor | null {
const found = this.sessions.get(streamSessionId);
if (!found) return null;
const { updatedAt: _updatedAt, ...descriptor } = found;
return descriptor;
}
set(session: SfuSessionDescriptor): SfuSessionDescriptor {
const now = new Date().toISOString();
this.sessions.set(session.streamSessionId, { ...session, updatedAt: now });
return session;
}
updateState(streamSessionId: string, state: SfuSessionState): SfuSessionDescriptor | null {
const existing = this.sessions.get(streamSessionId);
if (!existing) return null;
const next: StoredSfuSession = {
...existing,
state,
updatedAt: new Date().toISOString(),
};
this.sessions.set(streamSessionId, next);
const { updatedAt: _updatedAt, ...descriptor } = next;
return descriptor;
}
list(): SfuSessionDescriptor[] {
return Array.from(this.sessions.values()).map(({ updatedAt: _updatedAt, ...descriptor }) => descriptor);
}
}

View File

@@ -39,9 +39,10 @@ export type SfuSubscribeTransportResult = {
export interface SfuService { export interface SfuService {
mode: 'single_server_sfu'; mode: 'single_server_sfu';
startSession(input: SfuSessionStartInput): Promise<SfuSessionDescriptor>; startSession(input: SfuSessionStartInput): Promise<SfuSessionDescriptor>;
setSessionState(streamSessionId: string, state: SfuSessionState): Promise<void>;
endSession(streamSessionId: string): Promise<void>; endSession(streamSessionId: string): Promise<void>;
getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null>; getSession(streamSessionId: string): Promise<SfuSessionDescriptor | null>;
listSessions(): Promise<SfuSessionDescriptor[]>;
createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult>; createPublishTransport(input: SfuPublishTransportRequest): Promise<SfuPublishTransportResult>;
createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult>; createSubscribeTransport(input: SfuSubscribeTransportRequest): Promise<SfuSubscribeTransportResult>;
} }

View File

@@ -17,6 +17,7 @@ router.get('/ready', async (_req, res) => {
try { try {
await db.execute('select 1'); await db.execute('select 1');
await minioClient.bucketExists(minioBucket); await minioClient.bucketExists(minioBucket);
const sfuSessions = sfuService ? await sfuService.listSessions() : [];
res.json({ res.json({
status: 'ready', status: 'ready',
@@ -26,6 +27,7 @@ router.get('/ready', async (_req, res) => {
mediaMode: mediaConfig.mode, mediaMode: mediaConfig.mode,
mediaProvider: mediaProvider.name, mediaProvider: mediaProvider.name,
sfuService: sfuService ? sfuService.mode : 'disabled', sfuService: sfuService ? sfuService.mode : 'disabled',
sfuActiveSessions: sfuSessions.filter((session) => session.state !== 'ended').length,
}, },
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}); });

View File

@@ -5,8 +5,10 @@ import { Router } from 'express';
import { z } from 'zod'; import { z } from 'zod';
import { db } from '../db/client'; import { db } from '../db/client';
import { mediaMode } from '../media/config';
import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema'; import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema';
import { mediaProvider } from '../media/service'; import { mediaProvider } from '../media/service';
import { sfuService } from '../media/sfu/service';
import { requireDeviceAuth } from '../middleware/device-auth'; import { requireDeviceAuth } from '../middleware/device-auth';
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
import { writeAuditLog } from '../services/audit'; import { writeAuditLog } from '../services/audit';
@@ -34,6 +36,10 @@ const streamParamSchema = z.object({
streamSessionId: z.string().uuid(), streamSessionId: z.string().uuid(),
}); });
const sfuTransportRequestSchema = z.object({
role: z.enum(['camera', 'viewer']).optional(),
});
const listSchema = z.object({ const listSchema = z.object({
status: z.string().optional(), status: z.string().optional(),
limit: z.coerce.number().int().min(1).max(100).default(25), limit: z.coerce.number().int().min(1).max(100).default(25),
@@ -68,6 +74,20 @@ router.get('/me/list', requireDeviceAuth, async (req, res) => {
res.json({ count: filtered.length, streamSessions: filtered }); res.json({ count: filtered.length, streamSessions: filtered });
}); });
const ensureDeviceAuth = (req: Parameters<typeof requireDeviceAuth>[0], res: Parameters<typeof requireDeviceAuth>[1]) => {
const deviceAuth = req.deviceAuth;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return null;
}
return deviceAuth;
};
const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: string) =>
await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)),
});
router.post('/request', requireDeviceAuth, async (req, res) => { router.post('/request', requireDeviceAuth, async (req, res) => {
const parsed = requestStreamSchema.safeParse(req.body ?? {}); const parsed = requestStreamSchema.safeParse(req.body ?? {});
@@ -76,12 +96,8 @@ router.post('/request', requireDeviceAuth, async (req, res) => {
return; return;
} }
const deviceAuth = req.deviceAuth; const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return;
}
const [sourceDevice, cameraDevice] = await Promise.all([ const [sourceDevice, cameraDevice] = await Promise.all([
db.query.devices.findFirst({ db.query.devices.findFirst({
@@ -220,12 +236,8 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
return; return;
} }
const deviceAuth = req.deviceAuth; const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return;
}
const session = await db.query.streamSessions.findFirst({ const session = await db.query.streamSessions.findFirst({
where: and( where: and(
@@ -275,6 +287,22 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
return; return;
} }
if (sfuService) {
try {
await sfuService.startSession({
streamSessionId: updated.id,
ownerUserId: updated.ownerUserId,
cameraDeviceId: updated.cameraDeviceId,
requesterDeviceId: updated.requesterDeviceId,
});
await sfuService.setSessionState(updated.id, 'live');
} catch (error) {
console.error('Failed starting SFU session', error);
res.status(500).json({ message: 'Failed to initialize SFU session' });
return;
}
}
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', { const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', {
streamSessionId: updated.id, streamSessionId: updated.id,
cameraDeviceId: updated.cameraDeviceId, cameraDeviceId: updated.cameraDeviceId,
@@ -318,16 +346,10 @@ router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (re
return; return;
} }
const deviceAuth = req.deviceAuth; const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!deviceAuth) { const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
res.status(401).json({ message: 'Unauthorized' });
return;
}
const session = await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
});
if (!session) { if (!session) {
res.status(404).json({ message: 'Stream session not found' }); res.status(404).json({ message: 'Stream session not found' });
@@ -371,16 +393,10 @@ router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (
return; return;
} }
const deviceAuth = req.deviceAuth; const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!deviceAuth) { const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
res.status(401).json({ message: 'Unauthorized' });
return;
}
const session = await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
});
if (!session) { if (!session) {
res.status(404).json({ message: 'Stream session not found' }); res.status(404).json({ message: 'Stream session not found' });
@@ -419,6 +435,143 @@ router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (
}); });
}); });
router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!sfuService) {
res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` });
return;
}
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId;
if (!isParticipant) {
res.status(403).json({ message: 'Device cannot access SFU session details for this stream' });
return;
}
const sfuSession = await sfuService.getSession(session.id);
res.json({
streamSessionId: session.id,
mediaMode,
sfuSession,
});
});
router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!sfuService) {
res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` });
return;
}
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.status !== 'streaming') {
res.status(409).json({ message: 'Stream must be active before creating publish transport' });
return;
}
if (session.cameraDeviceId !== deviceAuth.deviceId) {
res.status(403).json({ message: 'Only camera device can create publish transport' });
return;
}
const transport = await sfuService.createPublishTransport({
streamSessionId: session.id,
cameraDeviceId: deviceAuth.deviceId,
});
await sfuService.setSessionState(session.id, 'live');
res.json({
streamSessionId: session.id,
mediaMode,
transport,
});
});
router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!sfuService) {
res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` });
return;
}
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.status !== 'streaming') {
res.status(409).json({ message: 'Stream must be active before creating subscribe transport' });
return;
}
const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId;
if (!isParticipant) {
res.status(403).json({ message: 'Device cannot create subscribe transport for this stream' });
return;
}
const transport = await sfuService.createSubscribeTransport({
streamSessionId: session.id,
viewerDeviceId: deviceAuth.deviceId,
});
await sfuService.setSessionState(session.id, 'live');
res.json({
streamSessionId: session.id,
mediaMode,
transport,
});
});
router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params); const parsedParams = streamParamSchema.safeParse(req.params);
@@ -434,16 +587,10 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
return; return;
} }
const deviceAuth = req.deviceAuth; const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!deviceAuth) { const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
res.status(401).json({ message: 'Unauthorized' });
return;
}
const session = await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
});
if (!session) { if (!session) {
res.status(404).json({ message: 'Stream session not found' }); res.status(404).json({ message: 'Stream session not found' });
@@ -469,6 +616,14 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
.where(eq(streamSessions.id, session.id)) .where(eq(streamSessions.id, session.id))
.returning(); .returning();
if (sfuService) {
try {
await sfuService.endSession(session.id);
} catch (error) {
console.error('Failed ending SFU session', error);
}
}
await createRecordingForStream(session.id); await createRecordingForStream(session.id);
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', { const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
@@ -518,16 +673,10 @@ router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, re
return; return;
} }
const deviceAuth = req.deviceAuth; const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (!deviceAuth) { const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
res.status(401).json({ message: 'Unauthorized' });
return;
}
const session = await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
});
if (!session) { if (!session) {
res.status(404).json({ message: 'Stream session not found' }); res.status(404).json({ message: 'Stream session not found' });