feat(backend): add SIMPLE_STREAMING WebRTC control-path streaming

This commit is contained in:
2026-03-05 13:30:00 +00:00
parent c458857f0a
commit 19baf76169
14 changed files with 448 additions and 189 deletions

View File

@@ -130,21 +130,23 @@ Motion realtime events:
- Linked clients receive `motion:detected` as soon as camera starts event.
- Linked clients receive `motion:ended` when camera ends event.
### On-Demand Streams + Media Credentials (Phase 4 + 5)
### On-Demand Streams + WebRTC Control Plane
| Endpoint | Purpose |
| --- | --- |
| `POST /streams/request` | Client device requests a linked camera to start a live stream |
| `POST /streams/:streamSessionId/accept` | Camera device accepts and transitions stream session to `streaming` |
| `GET /streams/:streamSessionId/publish-credentials` | Camera fetches media ingest credentials for the active stream session |
| `GET /streams/:streamSessionId/subscribe-credentials` | Viewer fetches media subscribe credentials for the active stream session |
| `POST /streams/:streamSessionId/end` | Requester/camera ends an existing stream session |
| `GET /streams/:streamSessionId/playback-token` | Obtain short-lived playback token for active stream |
| `GET /streams/me/list` | List stream sessions for the current device |
Stream realtime events:
- Client receives `stream:requested` after request creation.
- Camera receives `stream:requested` when `SIMPLE_STREAMING=true`.
- Client receives `stream:started` when camera accepts.
- Both devices receive `stream:ended` when session is closed.
- Both participants exchange `webrtc:signal` payloads through Socket.IO for offer/answer/candidate/hangup relay.
Legacy compatibility when `SIMPLE_STREAMING=false`:
- `start_stream` device commands remain active for camera wake-up.
- Media-provider credential endpoints (`publish-credentials`, `subscribe-credentials`, `playback-token`) remain available for older simulator/mobile flows.
Experimental SFU scaffolding endpoints (`MEDIA_MODE=single_server_sfu`):
- `GET /streams/:streamSessionId/sfu/session` fetch in-memory SFU session state for participant devices
@@ -153,8 +155,9 @@ Experimental SFU scaffolding endpoints (`MEDIA_MODE=single_server_sfu`):
#### Streaming Scale Tradeoffs (Current Prototype)
- The current implementation is **not production-grade at scale**.
- Video quality and reliability currently depend on direct browser-to-browser WebRTC success, with a low-fps frame relay fallback in the simulator.
- This backend currently acts as a control plane (commands, session state, credentials, events), not a full media plane/SFU.
- The preferred path is direct browser-to-browser WebRTC, with the backend acting as auth/session/signaling control plane.
- Native mobile is not yet on the WebRTC path; `SIMPLE_STREAMING` defaults to `false` until a supported RN WebRTC stack is added.
- This backend currently acts as a control plane (commands, session state, signaling, events), not a full media plane/SFU.
- Running live transport + fan-out + recording on the same web server is possible for small loads but introduces significant CPU, RAM, and network egress pressure under concurrency.
- For larger deployments, use a dedicated media plane (managed or self-hosted SFU + recorder) and keep this service focused on auth/session/control APIs.
- For a pragmatic prototype path that keeps media on the current server, see `docs/streaming-on-web-server-plan.md`.
@@ -185,8 +188,8 @@ Architecture reference page:
All simulator pages support the same flow:
- Register as `camera` or `client`
- Connect Socket.IO with bearer device token
- Camera: process incoming `start_stream` commands, fetch publish credentials, start/end motion events
- Client: create links, request streams, fetch subscribe credentials, and fetch playback tokens
- Camera: process incoming stream requests, negotiate WebRTC, start/end motion events
- Client: create links, request streams, and negotiate WebRTC viewing
### Admin Dashboard
Access `/admin` with Basic auth to:

View File

@@ -65,6 +65,8 @@ export const streamSessions = pgTable('stream_sessions', {
requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id),
status: varchar('status', { length: 32 }).default('requested').notNull(),
reason: varchar('reason', { length: 32 }).default('on_demand').notNull(),
// Legacy provider-backed fields are retained for compatibility with older sessions.
// SIMPLE_STREAMING relies on direct WebRTC signaling and does not populate them.
mediaProvider: varchar('media_provider', { length: 32 }).default('mock').notNull(),
mediaSessionId: varchar('media_session_id', { length: 255 }),
publishEndpoint: text('publish_endpoint'),

View File

@@ -768,8 +768,12 @@ registry.registerPath({
requesterDeviceId: z.string().uuid(),
status: z.string(),
reason: z.string(),
metadata: z.record(z.string(), z.unknown()).nullable().optional(),
startedAt: z.string().datetime().nullable().optional(),
endedAt: z.string().datetime().nullable().optional(),
createdAt: z.string().datetime().optional(),
updatedAt: z.string().datetime().optional(),
}),
command: DeviceCommandSchema,
}),
},
},
@@ -805,13 +809,16 @@ registry.registerPath({
message: z.string(),
streamSession: z.object({
id: z.string().uuid(),
ownerUserId: z.string().uuid(),
cameraDeviceId: z.string().uuid(),
requesterDeviceId: z.string().uuid(),
status: z.string(),
streamKey: z.string().nullable(),
mediaProvider: z.string(),
mediaSessionId: z.string().nullable(),
publishEndpoint: z.string().nullable(),
subscribeEndpoint: z.string().nullable(),
reason: z.string(),
metadata: z.record(z.string(), z.unknown()).nullable().optional(),
startedAt: z.string().datetime().nullable(),
endedAt: z.string().datetime().nullable().optional(),
createdAt: z.string().datetime().optional(),
updatedAt: z.string().datetime().optional(),
}),
}),
},
@@ -857,89 +864,6 @@ registry.registerPath({
},
});
registry.registerPath({
method: 'get',
path: '/streams/{streamSessionId}/publish-credentials',
summary: 'Get publish credentials for camera ingest to media provider',
tags: ['Streams'],
security: [{ bearerDeviceToken: [] }],
request: {
params: z.object({ streamSessionId: z.string().uuid() }),
},
responses: {
200: {
description: 'Publish credentials',
content: {
'application/json': {
schema: z.object({
provider: z.string(),
mediaSessionId: z.string(),
publishToken: z.string(),
publishUrl: z.string(),
expiresInSeconds: z.number().int(),
}),
},
},
},
},
});
registry.registerPath({
method: 'get',
path: '/streams/{streamSessionId}/subscribe-credentials',
summary: 'Get subscribe credentials for viewing stream from media provider',
tags: ['Streams'],
security: [{ bearerDeviceToken: [] }],
request: {
params: z.object({ streamSessionId: z.string().uuid() }),
},
responses: {
200: {
description: 'Subscribe credentials',
content: {
'application/json': {
schema: z.object({
provider: z.string(),
mediaSessionId: z.string(),
subscribeToken: z.string(),
subscribeUrl: z.string(),
expiresInSeconds: z.number().int(),
}),
},
},
},
},
});
registry.registerPath({
method: 'get',
path: '/streams/{streamSessionId}/playback-token',
summary: 'Get short-lived playback token for active stream session',
tags: ['Streams'],
security: [{ bearerDeviceToken: [] }],
request: {
params: z.object({ streamSessionId: z.string().uuid() }),
},
responses: {
200: {
description: 'Playback token response',
content: {
'application/json': {
schema: z.object({
streamSessionId: z.string().uuid(),
streamKey: z.string(),
status: z.string(),
playbackToken: z.string(),
subscribeUrl: z.string(),
mediaProvider: z.string(),
expiresInSeconds: z.number().int(),
}),
},
},
},
},
});
registry.registerPath({
method: 'get',
path: '/streams/me/list',

View File

@@ -25,10 +25,29 @@ const parsePositiveNumber = (value: string | undefined): number | null => {
return parsed;
};
export const parseFeatureFlag = (value: string | undefined, defaultValue: boolean): boolean => {
if (value === undefined) {
return defaultValue;
}
const normalized = value.trim().toLowerCase();
if (['1', 'true', 'yes', 'on'].includes(normalized)) {
return true;
}
if (['0', 'false', 'no', 'off'].includes(normalized)) {
return false;
}
return defaultValue;
};
export const mediaMode: MediaMode = parseMediaMode(process.env.MEDIA_MODE);
export const simpleStreamingEnabled = parseFeatureFlag(process.env.SIMPLE_STREAMING, false);
export const streamRecordingEnabled = parseFeatureFlag(process.env.STREAM_RECORDINGS_ENABLED, false);
export const mediaConfig = {
mode: mediaMode,
simpleStreamingEnabled,
streamRecordingEnabled,
turn: {
urls: parseCsv(process.env.TURN_URLS),
username: process.env.TURN_USERNAME ?? '',
@@ -40,4 +59,3 @@ export const mediaConfig = {
maxSubscribersPerRoom: parsePositiveNumber(process.env.MEDIA_MAX_SUBSCRIBERS_PER_ROOM),
},
};

View File

@@ -37,6 +37,8 @@ export class MockMediaProvider implements MediaProvider {
name = 'mock';
async createSession(input: MediaSessionCreateInput): Promise<MediaSessionCreateResult> {
// SIMPLE_STREAMING bypasses provider-backed transport at runtime. This metadata
// path is kept only for legacy endpoints and backwards compatibility.
const mediaSessionId = `mock_${input.streamSessionId}`;
const baseUrl = getBaseUrl();

View File

@@ -1,5 +1,6 @@
import { simpleStreamingEnabled } from './config';
import { MockMediaProvider } from './providers/mock';
import type { MediaProvider } from './types';
import type { MediaProvider, MediaSessionCreateInput, MediaSessionCreateResult } from './types';
const providerName = (process.env.MEDIA_PROVIDER ?? 'mock').toLowerCase();
@@ -13,3 +14,14 @@ const createProvider = (): MediaProvider => {
};
export const mediaProvider = createProvider();
export const mediaProviderRuntimeEnabled = !simpleStreamingEnabled;
export const createLiveMediaSession = async (
input: MediaSessionCreateInput,
): Promise<MediaSessionCreateResult | null> => {
if (!mediaProviderRuntimeEnabled) {
return null;
}
return mediaProvider.createSession(input);
};

View File

@@ -1,8 +1,12 @@
import { mediaMode } from '../config';
import { mediaMode, simpleStreamingEnabled } from '../config';
import { NoopSfuService } from './noop';
import type { SfuService } from './types';
const createSfuService = (): SfuService | null => {
if (simpleStreamingEnabled) {
return null;
}
if (mediaMode !== 'single_server_sfu') {
return null;
}
@@ -11,4 +15,3 @@ const createSfuService = (): SfuService | null => {
};
export const sfuService = createSfuService();

View File

@@ -495,9 +495,9 @@
<ul>
<li><span class="mono">command:received</span> delivery to target room.</li>
<li><span class="mono">command:ack</span> validation + DB update + source notification.</li>
<li><span class="mono">webrtc:signal</span> relay with same-owner target validation.</li>
<li><span class="mono">stream:frame</span> relay fallback (base64 image snapshots).</li>
<li>Retry worker for stale sent commands every 5s, max 3 retries.</li>
<li><span class="mono">webrtc:signal</span> relay with stream-participant validation.</li>
<li><span class="mono">stream:requested</span>, <span class="mono">stream:started</span>, and <span class="mono">stream:ended</span> lifecycle fan-out.</li>
<li>Legacy command retries remain only for non-stream commands while <span class="mono">SIMPLE_STREAMING</span> is enabled.</li>
</ul>
</div>
</div>

View File

@@ -4,7 +4,9 @@ import { Server as SocketIOServer } from 'socket.io';
import { z } from 'zod';
import { db } from '../db/client';
import { deviceCommands, devices } from '../db/schema';
import { simpleStreamingEnabled } from '../media/config';
import { deviceCommands, devices, streamSessions } from '../db/schema';
import { canRelayWebrtcSignal } from '../streaming/simple';
import { hasRequiredTables } from '../utils/db-schema';
import { verifyDeviceToken } from '../utils/device-token';
@@ -26,13 +28,6 @@ const webrtcSignalSchema = z.object({
data: z.record(z.string(), z.unknown()).nullable().optional(),
});
const streamFrameSchema = z.object({
toDeviceId: z.string().uuid(),
streamSessionId: z.string().uuid(),
frame: z.string().min(32),
capturedAt: z.string().optional(),
});
const roomForDevice = (deviceId: string): string => `device:${deviceId}`;
let io: SocketIOServer | null = null;
@@ -112,6 +107,18 @@ export const dispatchCommandById = async (commandId: string): Promise<void> => {
const now = new Date();
if (simpleStreamingEnabled && command.commandType === 'start_stream') {
await db
.update(deviceCommands)
.set({
status: 'failed',
updatedAt: now,
error: 'start_stream command delivery disabled by SIMPLE_STREAMING',
})
.where(eq(deviceCommands.id, command.id));
return;
}
const delivered = emitCommand({
id: command.id,
sourceDeviceId: command.sourceDeviceId,
@@ -144,6 +151,19 @@ const retryPendingCommands = async () => {
for (const command of pending) {
const now = new Date();
if (simpleStreamingEnabled && command.commandType === 'start_stream') {
await db
.update(deviceCommands)
.set({
status: 'failed',
updatedAt: now,
error: 'start_stream retries disabled by SIMPLE_STREAMING',
})
.where(eq(deviceCommands.id, command.id));
continue;
}
const nextRetryCount = command.retryCount + 1;
if (nextRetryCount > MAX_RETRIES) {
@@ -240,7 +260,6 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
io.on('connection', async (socket) => {
const auth = socket.data.deviceAuth as { userId: string; deviceId: string; role: 'camera' | 'client' };
const deviceRoom = roomForDevice(auth.deviceId);
const verifiedRelayTargets = new Set<string>();
socket.join(deviceRoom);
await markDevicePresence(auth.deviceId, 'online');
@@ -312,15 +331,27 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
return;
}
const targetDevice = await db.query.devices.findFirst({
where: and(eq(devices.id, parsed.data.toDeviceId), eq(devices.userId, auth.userId)),
const session = await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, parsed.data.streamSessionId), eq(streamSessions.ownerUserId, auth.userId)),
});
if (!targetDevice) {
socket.emit('error:webrtc_signal', { message: 'Target device not found for this account' });
if (!session) {
socket.emit('error:webrtc_signal', { message: 'Stream session not found for this account' });
return;
}
if (!canRelayWebrtcSignal(session, auth.deviceId, parsed.data.toDeviceId)) {
socket.emit('error:webrtc_signal', { message: 'Signal target is not a participant in this stream session' });
return;
}
console.info('[stream.signal]', {
streamSessionId: parsed.data.streamSessionId,
fromDeviceId: auth.deviceId,
toDeviceId: parsed.data.toDeviceId,
signalType: parsed.data.signalType,
});
io?.to(roomForDevice(parsed.data.toDeviceId)).emit('webrtc:signal', {
fromDeviceId: auth.deviceId,
streamSessionId: parsed.data.streamSessionId,
@@ -329,38 +360,6 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
});
});
socket.on('stream:frame', async (input) => {
const parsed = streamFrameSchema.safeParse(input);
if (!parsed.success) {
socket.emit('error:stream_frame', {
message: 'Invalid stream frame payload',
errors: parsed.error.flatten(),
});
return;
}
if (!verifiedRelayTargets.has(parsed.data.toDeviceId)) {
const targetDevice = await db.query.devices.findFirst({
where: and(eq(devices.id, parsed.data.toDeviceId), eq(devices.userId, auth.userId)),
});
if (!targetDevice) {
socket.emit('error:stream_frame', { message: 'Target device not found for this account' });
return;
}
verifiedRelayTargets.add(parsed.data.toDeviceId);
}
io?.to(roomForDevice(parsed.data.toDeviceId)).emit('stream:frame', {
fromDeviceId: auth.deviceId,
streamSessionId: parsed.data.streamSessionId,
frame: parsed.data.frame,
capturedAt: parsed.data.capturedAt ?? new Date().toISOString(),
});
});
socket.on('disconnect', async () => {
// Small delay allows fast reconnects to reuse presence without flapping.
setTimeout(async () => {

View File

@@ -4,6 +4,7 @@ import { z } from 'zod';
import { db } from '../db/client';
import { deviceCommands, deviceLinks, devices } from '../db/schema';
import { simpleStreamingEnabled } from '../media/config';
import { requireAuth } from '../middleware/auth';
import { requireDeviceAuth } from '../middleware/device-auth';
import { dispatchCommandById } from '../realtime/gateway';
@@ -47,6 +48,13 @@ router.post('/', requireAuth, async (req, res) => {
return;
}
if (simpleStreamingEnabled && parsed.data.commandType === 'start_stream') {
res.status(409).json({
message: 'start_stream commands are disabled while SIMPLE_STREAMING is enabled; use /streams/request instead',
});
return;
}
if (parsed.data.sourceDeviceId === parsed.data.targetDeviceId) {
res.status(400).json({ message: 'sourceDeviceId and targetDeviceId must differ' });
return;

View File

@@ -5,14 +5,20 @@ import { Router } from 'express';
import { z } from 'zod';
import { db } from '../db/client';
import { mediaMode } from '../media/config';
import { mediaMode, simpleStreamingEnabled, streamRecordingEnabled } from '../media/config';
import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema';
import { mediaProvider } from '../media/service';
import { createLiveMediaSession, mediaProvider } from '../media/service';
import { sfuService } from '../media/sfu/service';
import { requireDeviceAuth } from '../middleware/device-auth';
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
import { writeAuditLog } from '../services/audit';
import { enqueuePushNotification } from '../services/push';
import {
createStreamEndedPayload,
createStreamRequestedPayload,
createStreamStartedPayload,
toSimpleStreamSessionResponse,
} from '../streaming/simple';
import { createRecordingForStream } from './recordings';
const router = Router();
@@ -158,6 +164,45 @@ router.post('/request', requireDeviceAuth, async (req, res) => {
return;
}
if (simpleStreamingEnabled) {
const requestPayload = createStreamRequestedPayload(session);
const deliveredToCamera = sendRealtimeToDevice(cameraDevice.id, 'stream:requested', requestPayload);
console.info('[stream.request]', {
streamSessionId: session.id,
requesterDeviceId: sourceDevice.id,
cameraDeviceId: cameraDevice.id,
mode: 'simple',
});
sendRealtimeToDevice(sourceDevice.id, 'stream:requested', requestPayload);
if (!deliveredToCamera) {
await enqueuePushNotification({
ownerUserId: cameraDevice.userId,
recipientDeviceId: cameraDevice.id,
type: 'stream_requested',
payload: requestPayload,
});
}
res.status(201).json({
message: 'Stream request sent',
streamSession: toSimpleStreamSessionResponse(session),
});
await writeAuditLog({
ownerUserId: sourceDevice.userId,
actorDeviceId: sourceDevice.id,
action: 'stream.requested',
targetType: 'stream_session',
targetId: session.id,
metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason, transport: 'webrtc' },
ipAddress: req.ip,
});
return;
}
const [command] = await db
.insert(deviceCommands)
.values({
@@ -182,6 +227,13 @@ router.post('/request', requireDeviceAuth, async (req, res) => {
}
await dispatchCommandById(command.id);
console.info('[stream.request]', {
streamSessionId: session.id,
requesterDeviceId: sourceDevice.id,
cameraDeviceId: cameraDevice.id,
mode: 'legacy',
commandId: command.id,
});
const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
@@ -259,7 +311,7 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
const now = new Date();
const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`;
const mediaSession = await mediaProvider.createSession({
const mediaSession = await createLiveMediaSession({
streamSessionId: session.id,
ownerUserId: session.ownerUserId,
cameraDeviceId: session.cameraDeviceId,
@@ -270,11 +322,11 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
.update(streamSessions)
.set({
status: 'streaming',
streamKey,
mediaProvider: mediaSession.provider,
mediaSessionId: mediaSession.mediaSessionId,
publishEndpoint: mediaSession.publishUrl,
subscribeEndpoint: mediaSession.subscribeUrl,
streamKey: mediaSession ? streamKey : null,
mediaProvider: mediaSession?.provider ?? 'simple',
mediaSessionId: mediaSession?.mediaSessionId ?? null,
publishEndpoint: mediaSession?.publishUrl ?? null,
subscribeEndpoint: mediaSession?.subscribeUrl ?? null,
metadata: parsed.data.metadata ?? session.metadata,
startedAt: now,
updatedAt: now,
@@ -303,29 +355,25 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
}
}
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', {
const startedPayload = createStreamStartedPayload(updated);
console.info('[stream.accept]', {
streamSessionId: updated.id,
requesterDeviceId: updated.requesterDeviceId,
cameraDeviceId: updated.cameraDeviceId,
status: updated.status,
startedAt: updated.startedAt,
mediaProvider: updated.mediaProvider,
mediaSessionId: updated.mediaSessionId,
subscribeEndpoint: updated.subscribeEndpoint,
mode: mediaSession ? 'legacy' : 'simple',
});
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload);
if (!deliveredToRequester) {
await enqueuePushNotification({
ownerUserId: session.ownerUserId,
recipientDeviceId: session.requesterDeviceId,
type: 'stream_started',
payload: {
streamSessionId: updated.id,
cameraDeviceId: updated.cameraDeviceId,
},
payload: startedPayload,
});
}
res.json({ message: 'Stream accepted', streamSession: updated });
res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) });
await writeAuditLog({
ownerUserId: session.ownerUserId,
@@ -333,7 +381,9 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
action: 'stream.accepted',
targetType: 'stream_session',
targetId: session.id,
metadata: { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider },
metadata: mediaSession
? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider }
: { transport: 'webrtc' },
ipAddress: req.ip,
});
});
@@ -349,6 +399,11 @@ router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (re
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (simpleStreamingEnabled) {
res.status(409).json({ message: 'SIMPLE_STREAMING does not use publish credentials' });
return;
}
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
@@ -396,6 +451,11 @@ router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (simpleStreamingEnabled) {
res.status(409).json({ message: 'SIMPLE_STREAMING does not use subscribe credentials' });
return;
}
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
@@ -605,17 +665,31 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
}
const now = new Date();
const nextStatus = simpleStreamingEnabled ? 'ended' : parsed.data.reason;
const nextMetadata =
simpleStreamingEnabled && parsed.data.reason !== 'completed'
? {
...(session.metadata ?? {}),
endReason: parsed.data.reason,
}
: session.metadata;
const [updated] = await db
.update(streamSessions)
.set({
status: parsed.data.reason,
status: nextStatus,
endedAt: now,
metadata: nextMetadata,
updatedAt: now,
})
.where(eq(streamSessions.id, session.id))
.returning();
if (!updated) {
res.status(500).json({ message: 'Failed to update stream session' });
return;
}
if (sfuService) {
try {
await sfuService.endSession(session.id);
@@ -624,29 +698,42 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
}
}
await createRecordingForStream(session.id);
if (streamRecordingEnabled) {
await createRecordingForStream(session.id);
}
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
const endedPayload = simpleStreamingEnabled
? createStreamEndedPayload({
streamSessionId: session.id,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
endedAt: now,
reason: parsed.data.reason,
})
: {
streamSessionId: session.id,
status: parsed.data.reason,
endedAt: now,
};
console.info('[stream.end]', {
streamSessionId: session.id,
status: parsed.data.reason,
endedAt: now,
requesterDeviceId: session.requesterDeviceId,
cameraDeviceId: session.cameraDeviceId,
reason: parsed.data.reason,
status: simpleStreamingEnabled ? 'ended' : parsed.data.reason,
});
const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', {
streamSessionId: session.id,
status: parsed.data.reason,
endedAt: now,
});
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', endedPayload);
const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', endedPayload);
if (!deliveredToRequester) {
await enqueuePushNotification({
ownerUserId: session.ownerUserId,
recipientDeviceId: session.requesterDeviceId,
type: 'stream_ended',
payload: {
streamSessionId: session.id,
status: parsed.data.reason,
},
payload: endedPayload,
});
}
@@ -655,14 +742,11 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
ownerUserId: session.ownerUserId,
recipientDeviceId: session.cameraDeviceId,
type: 'stream_ended',
payload: {
streamSessionId: session.id,
status: parsed.data.reason,
},
payload: endedPayload,
});
}
res.json({ message: 'Stream ended', streamSession: updated });
res.json({ message: 'Stream ended', streamSession: toSimpleStreamSessionResponse(updated) });
});
router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => {
@@ -676,6 +760,11 @@ router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, re
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
if (simpleStreamingEnabled) {
res.status(409).json({ message: 'SIMPLE_STREAMING does not issue playback tokens' });
return;
}
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {

View File

@@ -0,0 +1,80 @@
type StreamSessionLike = {
id: string;
ownerUserId: string;
cameraDeviceId: string;
requesterDeviceId: string;
status: string;
reason: string;
metadata: Record<string, unknown> | null;
startedAt: Date | null;
endedAt: Date | null;
createdAt: Date;
updatedAt: Date;
};
type StreamEndedPayloadInput = {
streamSessionId: string;
cameraDeviceId: string;
requesterDeviceId: string;
endedAt: Date;
reason: 'completed' | 'cancelled' | 'failed';
};
export const isStreamParticipant = (session: Pick<StreamSessionLike, 'cameraDeviceId' | 'requesterDeviceId'>, deviceId: string): boolean =>
session.cameraDeviceId === deviceId || session.requesterDeviceId === deviceId;
export const canRelayWebrtcSignal = (
session: Pick<StreamSessionLike, 'cameraDeviceId' | 'requesterDeviceId'>,
fromDeviceId: string,
toDeviceId: string,
): boolean => {
if (fromDeviceId === toDeviceId) {
return false;
}
return isStreamParticipant(session, fromDeviceId) && isStreamParticipant(session, toDeviceId);
};
export const createStreamRequestedPayload = (
session: Pick<StreamSessionLike, 'id' | 'cameraDeviceId' | 'requesterDeviceId' | 'status' | 'reason'>,
) => ({
streamSessionId: session.id,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
status: session.status,
reason: session.reason,
});
export const createStreamStartedPayload = (
session: Pick<StreamSessionLike, 'id' | 'cameraDeviceId' | 'requesterDeviceId' | 'status' | 'startedAt'>,
) => ({
streamSessionId: session.id,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
status: session.status,
startedAt: session.startedAt,
transport: 'webrtc',
});
export const createStreamEndedPayload = (input: StreamEndedPayloadInput) => ({
streamSessionId: input.streamSessionId,
cameraDeviceId: input.cameraDeviceId,
requesterDeviceId: input.requesterDeviceId,
status: 'ended',
endedAt: input.endedAt,
reason: input.reason,
});
export const toSimpleStreamSessionResponse = (session: StreamSessionLike) => ({
id: session.id,
ownerUserId: session.ownerUserId,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
status: session.status,
reason: session.reason,
metadata: session.metadata,
startedAt: session.startedAt,
endedAt: session.endedAt,
createdAt: session.createdAt,
updatedAt: session.updatedAt,
});

View File

@@ -0,0 +1,22 @@
import { describe, expect, test } from 'bun:test';
import { parseFeatureFlag } from '../media/config';
describe('media config feature flags', () => {
test('parses enabled values', () => {
expect(parseFeatureFlag('true', false)).toBe(true);
expect(parseFeatureFlag('1', false)).toBe(true);
expect(parseFeatureFlag('yes', false)).toBe(true);
});
test('parses disabled values', () => {
expect(parseFeatureFlag('false', true)).toBe(false);
expect(parseFeatureFlag('0', true)).toBe(false);
expect(parseFeatureFlag('off', true)).toBe(false);
});
test('falls back to default value for unknown input', () => {
expect(parseFeatureFlag(undefined, true)).toBe(true);
expect(parseFeatureFlag('maybe', false)).toBe(false);
});
});

View File

@@ -0,0 +1,97 @@
import { describe, expect, test } from 'bun:test';
import {
canRelayWebrtcSignal,
createStreamEndedPayload,
createStreamRequestedPayload,
createStreamStartedPayload,
toSimpleStreamSessionResponse,
} from '../streaming/simple';
const buildSession = () => ({
id: 'stream-1',
ownerUserId: 'user-1',
cameraDeviceId: 'camera-1',
requesterDeviceId: 'client-1',
status: 'streaming',
reason: 'on_demand',
metadata: { quality: 'standard' },
startedAt: new Date('2026-04-06T10:00:00.000Z'),
endedAt: null,
createdAt: new Date('2026-04-06T09:59:00.000Z'),
updatedAt: new Date('2026-04-06T10:00:00.000Z'),
});
describe('simple streaming helpers', () => {
test('only relays WebRTC signals between stream participants', () => {
const session = buildSession();
expect(canRelayWebrtcSignal(session, 'camera-1', 'client-1')).toBe(true);
expect(canRelayWebrtcSignal(session, 'client-1', 'camera-1')).toBe(true);
expect(canRelayWebrtcSignal(session, 'camera-1', 'camera-1')).toBe(false);
expect(canRelayWebrtcSignal(session, 'camera-1', 'intruder-1')).toBe(false);
});
test('builds deterministic requested and started payloads', () => {
const session = buildSession();
expect(createStreamRequestedPayload(session)).toEqual({
streamSessionId: 'stream-1',
cameraDeviceId: 'camera-1',
requesterDeviceId: 'client-1',
status: 'streaming',
reason: 'on_demand',
});
expect(createStreamStartedPayload(session)).toEqual({
streamSessionId: 'stream-1',
cameraDeviceId: 'camera-1',
requesterDeviceId: 'client-1',
status: 'streaming',
startedAt: session.startedAt,
transport: 'webrtc',
});
});
test('normalizes ended payload and strips provider fields from API response', () => {
const session = {
...buildSession(),
mediaProvider: 'mock',
mediaSessionId: 'mock_stream-1',
streamKey: 'stream-key',
publishEndpoint: 'https://example.test/publish',
subscribeEndpoint: 'https://example.test/subscribe',
};
expect(
createStreamEndedPayload({
streamSessionId: session.id,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
endedAt: new Date('2026-04-06T10:05:00.000Z'),
reason: 'completed',
}),
).toEqual({
streamSessionId: 'stream-1',
cameraDeviceId: 'camera-1',
requesterDeviceId: 'client-1',
status: 'ended',
endedAt: new Date('2026-04-06T10:05:00.000Z'),
reason: 'completed',
});
expect(toSimpleStreamSessionResponse(session)).toEqual({
id: 'stream-1',
ownerUserId: 'user-1',
cameraDeviceId: 'camera-1',
requesterDeviceId: 'client-1',
status: 'streaming',
reason: 'on_demand',
metadata: { quality: 'standard' },
startedAt: new Date('2026-04-06T10:00:00.000Z'),
endedAt: null,
createdAt: new Date('2026-04-06T09:59:00.000Z'),
updatedAt: new Date('2026-04-06T10:00:00.000Z'),
});
});
});