diff --git a/Backend/README.md b/Backend/README.md index 34a3b04..c5fc1f9 100644 --- a/Backend/README.md +++ b/Backend/README.md @@ -130,21 +130,23 @@ Motion realtime events: - Linked clients receive `motion:detected` as soon as camera starts event. - Linked clients receive `motion:ended` when camera ends event. -### On-Demand Streams + Media Credentials (Phase 4 + 5) +### On-Demand Streams + WebRTC Control Plane | Endpoint | Purpose | | --- | --- | | `POST /streams/request` | Client device requests a linked camera to start a live stream | | `POST /streams/:streamSessionId/accept` | Camera device accepts and transitions stream session to `streaming` | -| `GET /streams/:streamSessionId/publish-credentials` | Camera fetches media ingest credentials for the active stream session | -| `GET /streams/:streamSessionId/subscribe-credentials` | Viewer fetches media subscribe credentials for the active stream session | | `POST /streams/:streamSessionId/end` | Requester/camera ends an existing stream session | -| `GET /streams/:streamSessionId/playback-token` | Obtain short-lived playback token for active stream | | `GET /streams/me/list` | List stream sessions for the current device | Stream realtime events: -- Client receives `stream:requested` after request creation. +- Camera receives `stream:requested` when `SIMPLE_STREAMING=true`. - Client receives `stream:started` when camera accepts. - Both devices receive `stream:ended` when session is closed. +- Both participants exchange `webrtc:signal` payloads through Socket.IO for offer/answer/candidate/hangup relay. + +Legacy compatibility when `SIMPLE_STREAMING=false`: +- `start_stream` device commands remain active for camera wake-up. +- Media-provider credential endpoints (`publish-credentials`, `subscribe-credentials`, `playback-token`) remain available for older simulator/mobile flows. Experimental SFU scaffolding endpoints (`MEDIA_MODE=single_server_sfu`): - `GET /streams/:streamSessionId/sfu/session` – fetch in-memory SFU session state for participant devices @@ -153,8 +155,9 @@ Experimental SFU scaffolding endpoints (`MEDIA_MODE=single_server_sfu`): #### Streaming Scale Tradeoffs (Current Prototype) - The current implementation is **not production-grade at scale**. -- Video quality and reliability currently depend on direct browser-to-browser WebRTC success, with a low-fps frame relay fallback in the simulator. -- This backend currently acts as a control plane (commands, session state, credentials, events), not a full media plane/SFU. +- The preferred path is direct browser-to-browser WebRTC, with the backend acting as auth/session/signaling control plane. +- Native mobile is not yet on the WebRTC path; `SIMPLE_STREAMING` defaults to `false` until a supported RN WebRTC stack is added. +- This backend currently acts as a control plane (commands, session state, signaling, events), not a full media plane/SFU. - Running live transport + fan-out + recording on the same web server is possible for small loads but introduces significant CPU, RAM, and network egress pressure under concurrency. - For larger deployments, use a dedicated media plane (managed or self-hosted SFU + recorder) and keep this service focused on auth/session/control APIs. - For a pragmatic prototype path that keeps media on the current server, see `docs/streaming-on-web-server-plan.md`. @@ -185,8 +188,8 @@ Architecture reference page: All simulator pages support the same flow: - Register as `camera` or `client` - Connect Socket.IO with bearer device token -- Camera: process incoming `start_stream` commands, fetch publish credentials, start/end motion events -- Client: create links, request streams, fetch subscribe credentials, and fetch playback tokens +- Camera: process incoming stream requests, negotiate WebRTC, start/end motion events +- Client: create links, request streams, and negotiate WebRTC viewing ### Admin Dashboard Access `/admin` with Basic auth to: diff --git a/Backend/db/schema.ts b/Backend/db/schema.ts index 52ebd5d..6024d5a 100644 --- a/Backend/db/schema.ts +++ b/Backend/db/schema.ts @@ -65,6 +65,8 @@ export const streamSessions = pgTable('stream_sessions', { requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id), status: varchar('status', { length: 32 }).default('requested').notNull(), reason: varchar('reason', { length: 32 }).default('on_demand').notNull(), + // Legacy provider-backed fields are retained for compatibility with older sessions. + // SIMPLE_STREAMING relies on direct WebRTC signaling and does not populate them. mediaProvider: varchar('media_provider', { length: 32 }).default('mock').notNull(), mediaSessionId: varchar('media_session_id', { length: 255 }), publishEndpoint: text('publish_endpoint'), diff --git a/Backend/docs/openapi.ts b/Backend/docs/openapi.ts index 1e4a2f6..a2aabe4 100644 --- a/Backend/docs/openapi.ts +++ b/Backend/docs/openapi.ts @@ -768,8 +768,12 @@ registry.registerPath({ requesterDeviceId: z.string().uuid(), status: z.string(), reason: z.string(), + metadata: z.record(z.string(), z.unknown()).nullable().optional(), + startedAt: z.string().datetime().nullable().optional(), + endedAt: z.string().datetime().nullable().optional(), + createdAt: z.string().datetime().optional(), + updatedAt: z.string().datetime().optional(), }), - command: DeviceCommandSchema, }), }, }, @@ -805,13 +809,16 @@ registry.registerPath({ message: z.string(), streamSession: z.object({ id: z.string().uuid(), + ownerUserId: z.string().uuid(), + cameraDeviceId: z.string().uuid(), + requesterDeviceId: z.string().uuid(), status: z.string(), - streamKey: z.string().nullable(), - mediaProvider: z.string(), - mediaSessionId: z.string().nullable(), - publishEndpoint: z.string().nullable(), - subscribeEndpoint: z.string().nullable(), + reason: z.string(), + metadata: z.record(z.string(), z.unknown()).nullable().optional(), startedAt: z.string().datetime().nullable(), + endedAt: z.string().datetime().nullable().optional(), + createdAt: z.string().datetime().optional(), + updatedAt: z.string().datetime().optional(), }), }), }, @@ -857,89 +864,6 @@ registry.registerPath({ }, }); -registry.registerPath({ - method: 'get', - path: '/streams/{streamSessionId}/publish-credentials', - summary: 'Get publish credentials for camera ingest to media provider', - tags: ['Streams'], - security: [{ bearerDeviceToken: [] }], - request: { - params: z.object({ streamSessionId: z.string().uuid() }), - }, - responses: { - 200: { - description: 'Publish credentials', - content: { - 'application/json': { - schema: z.object({ - provider: z.string(), - mediaSessionId: z.string(), - publishToken: z.string(), - publishUrl: z.string(), - expiresInSeconds: z.number().int(), - }), - }, - }, - }, - }, -}); - -registry.registerPath({ - method: 'get', - path: '/streams/{streamSessionId}/subscribe-credentials', - summary: 'Get subscribe credentials for viewing stream from media provider', - tags: ['Streams'], - security: [{ bearerDeviceToken: [] }], - request: { - params: z.object({ streamSessionId: z.string().uuid() }), - }, - responses: { - 200: { - description: 'Subscribe credentials', - content: { - 'application/json': { - schema: z.object({ - provider: z.string(), - mediaSessionId: z.string(), - subscribeToken: z.string(), - subscribeUrl: z.string(), - expiresInSeconds: z.number().int(), - }), - }, - }, - }, - }, -}); - -registry.registerPath({ - method: 'get', - path: '/streams/{streamSessionId}/playback-token', - summary: 'Get short-lived playback token for active stream session', - tags: ['Streams'], - security: [{ bearerDeviceToken: [] }], - request: { - params: z.object({ streamSessionId: z.string().uuid() }), - }, - responses: { - 200: { - description: 'Playback token response', - content: { - 'application/json': { - schema: z.object({ - streamSessionId: z.string().uuid(), - streamKey: z.string(), - status: z.string(), - playbackToken: z.string(), - subscribeUrl: z.string(), - mediaProvider: z.string(), - expiresInSeconds: z.number().int(), - }), - }, - }, - }, - }, -}); - registry.registerPath({ method: 'get', path: '/streams/me/list', diff --git a/Backend/media/config.ts b/Backend/media/config.ts index 9632e3b..f890ff4 100644 --- a/Backend/media/config.ts +++ b/Backend/media/config.ts @@ -25,10 +25,29 @@ const parsePositiveNumber = (value: string | undefined): number | null => { return parsed; }; +export const parseFeatureFlag = (value: string | undefined, defaultValue: boolean): boolean => { + if (value === undefined) { + return defaultValue; + } + + const normalized = value.trim().toLowerCase(); + if (['1', 'true', 'yes', 'on'].includes(normalized)) { + return true; + } + if (['0', 'false', 'no', 'off'].includes(normalized)) { + return false; + } + return defaultValue; +}; + export const mediaMode: MediaMode = parseMediaMode(process.env.MEDIA_MODE); +export const simpleStreamingEnabled = parseFeatureFlag(process.env.SIMPLE_STREAMING, false); +export const streamRecordingEnabled = parseFeatureFlag(process.env.STREAM_RECORDINGS_ENABLED, false); export const mediaConfig = { mode: mediaMode, + simpleStreamingEnabled, + streamRecordingEnabled, turn: { urls: parseCsv(process.env.TURN_URLS), username: process.env.TURN_USERNAME ?? '', @@ -40,4 +59,3 @@ export const mediaConfig = { maxSubscribersPerRoom: parsePositiveNumber(process.env.MEDIA_MAX_SUBSCRIBERS_PER_ROOM), }, }; - diff --git a/Backend/media/providers/mock.ts b/Backend/media/providers/mock.ts index 357c47f..1134cbd 100644 --- a/Backend/media/providers/mock.ts +++ b/Backend/media/providers/mock.ts @@ -37,6 +37,8 @@ export class MockMediaProvider implements MediaProvider { name = 'mock'; async createSession(input: MediaSessionCreateInput): Promise { + // SIMPLE_STREAMING bypasses provider-backed transport at runtime. This metadata + // path is kept only for legacy endpoints and backwards compatibility. const mediaSessionId = `mock_${input.streamSessionId}`; const baseUrl = getBaseUrl(); diff --git a/Backend/media/service.ts b/Backend/media/service.ts index 82e7178..697f2fc 100644 --- a/Backend/media/service.ts +++ b/Backend/media/service.ts @@ -1,5 +1,6 @@ +import { simpleStreamingEnabled } from './config'; import { MockMediaProvider } from './providers/mock'; -import type { MediaProvider } from './types'; +import type { MediaProvider, MediaSessionCreateInput, MediaSessionCreateResult } from './types'; const providerName = (process.env.MEDIA_PROVIDER ?? 'mock').toLowerCase(); @@ -13,3 +14,14 @@ const createProvider = (): MediaProvider => { }; export const mediaProvider = createProvider(); +export const mediaProviderRuntimeEnabled = !simpleStreamingEnabled; + +export const createLiveMediaSession = async ( + input: MediaSessionCreateInput, +): Promise => { + if (!mediaProviderRuntimeEnabled) { + return null; + } + + return mediaProvider.createSession(input); +}; diff --git a/Backend/media/sfu/service.ts b/Backend/media/sfu/service.ts index 2dec3c6..651cf8b 100644 --- a/Backend/media/sfu/service.ts +++ b/Backend/media/sfu/service.ts @@ -1,8 +1,12 @@ -import { mediaMode } from '../config'; +import { mediaMode, simpleStreamingEnabled } from '../config'; import { NoopSfuService } from './noop'; import type { SfuService } from './types'; const createSfuService = (): SfuService | null => { + if (simpleStreamingEnabled) { + return null; + } + if (mediaMode !== 'single_server_sfu') { return null; } @@ -11,4 +15,3 @@ const createSfuService = (): SfuService | null => { }; export const sfuService = createSfuService(); - diff --git a/Backend/public/backend-architecture.html b/Backend/public/backend-architecture.html index b69712e..dc10789 100644 --- a/Backend/public/backend-architecture.html +++ b/Backend/public/backend-architecture.html @@ -495,9 +495,9 @@ diff --git a/Backend/realtime/gateway.ts b/Backend/realtime/gateway.ts index f648d6b..3df78eb 100644 --- a/Backend/realtime/gateway.ts +++ b/Backend/realtime/gateway.ts @@ -4,7 +4,9 @@ import { Server as SocketIOServer } from 'socket.io'; import { z } from 'zod'; import { db } from '../db/client'; -import { deviceCommands, devices } from '../db/schema'; +import { simpleStreamingEnabled } from '../media/config'; +import { deviceCommands, devices, streamSessions } from '../db/schema'; +import { canRelayWebrtcSignal } from '../streaming/simple'; import { hasRequiredTables } from '../utils/db-schema'; import { verifyDeviceToken } from '../utils/device-token'; @@ -26,13 +28,6 @@ const webrtcSignalSchema = z.object({ data: z.record(z.string(), z.unknown()).nullable().optional(), }); -const streamFrameSchema = z.object({ - toDeviceId: z.string().uuid(), - streamSessionId: z.string().uuid(), - frame: z.string().min(32), - capturedAt: z.string().optional(), -}); - const roomForDevice = (deviceId: string): string => `device:${deviceId}`; let io: SocketIOServer | null = null; @@ -112,6 +107,18 @@ export const dispatchCommandById = async (commandId: string): Promise => { const now = new Date(); + if (simpleStreamingEnabled && command.commandType === 'start_stream') { + await db + .update(deviceCommands) + .set({ + status: 'failed', + updatedAt: now, + error: 'start_stream command delivery disabled by SIMPLE_STREAMING', + }) + .where(eq(deviceCommands.id, command.id)); + return; + } + const delivered = emitCommand({ id: command.id, sourceDeviceId: command.sourceDeviceId, @@ -144,6 +151,19 @@ const retryPendingCommands = async () => { for (const command of pending) { const now = new Date(); + + if (simpleStreamingEnabled && command.commandType === 'start_stream') { + await db + .update(deviceCommands) + .set({ + status: 'failed', + updatedAt: now, + error: 'start_stream retries disabled by SIMPLE_STREAMING', + }) + .where(eq(deviceCommands.id, command.id)); + continue; + } + const nextRetryCount = command.retryCount + 1; if (nextRetryCount > MAX_RETRIES) { @@ -240,7 +260,6 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => { io.on('connection', async (socket) => { const auth = socket.data.deviceAuth as { userId: string; deviceId: string; role: 'camera' | 'client' }; const deviceRoom = roomForDevice(auth.deviceId); - const verifiedRelayTargets = new Set(); socket.join(deviceRoom); await markDevicePresence(auth.deviceId, 'online'); @@ -312,15 +331,27 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => { return; } - const targetDevice = await db.query.devices.findFirst({ - where: and(eq(devices.id, parsed.data.toDeviceId), eq(devices.userId, auth.userId)), + const session = await db.query.streamSessions.findFirst({ + where: and(eq(streamSessions.id, parsed.data.streamSessionId), eq(streamSessions.ownerUserId, auth.userId)), }); - if (!targetDevice) { - socket.emit('error:webrtc_signal', { message: 'Target device not found for this account' }); + if (!session) { + socket.emit('error:webrtc_signal', { message: 'Stream session not found for this account' }); return; } + if (!canRelayWebrtcSignal(session, auth.deviceId, parsed.data.toDeviceId)) { + socket.emit('error:webrtc_signal', { message: 'Signal target is not a participant in this stream session' }); + return; + } + + console.info('[stream.signal]', { + streamSessionId: parsed.data.streamSessionId, + fromDeviceId: auth.deviceId, + toDeviceId: parsed.data.toDeviceId, + signalType: parsed.data.signalType, + }); + io?.to(roomForDevice(parsed.data.toDeviceId)).emit('webrtc:signal', { fromDeviceId: auth.deviceId, streamSessionId: parsed.data.streamSessionId, @@ -329,38 +360,6 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => { }); }); - socket.on('stream:frame', async (input) => { - const parsed = streamFrameSchema.safeParse(input); - - if (!parsed.success) { - socket.emit('error:stream_frame', { - message: 'Invalid stream frame payload', - errors: parsed.error.flatten(), - }); - return; - } - - if (!verifiedRelayTargets.has(parsed.data.toDeviceId)) { - const targetDevice = await db.query.devices.findFirst({ - where: and(eq(devices.id, parsed.data.toDeviceId), eq(devices.userId, auth.userId)), - }); - - if (!targetDevice) { - socket.emit('error:stream_frame', { message: 'Target device not found for this account' }); - return; - } - - verifiedRelayTargets.add(parsed.data.toDeviceId); - } - - io?.to(roomForDevice(parsed.data.toDeviceId)).emit('stream:frame', { - fromDeviceId: auth.deviceId, - streamSessionId: parsed.data.streamSessionId, - frame: parsed.data.frame, - capturedAt: parsed.data.capturedAt ?? new Date().toISOString(), - }); - }); - socket.on('disconnect', async () => { // Small delay allows fast reconnects to reuse presence without flapping. setTimeout(async () => { diff --git a/Backend/routes/commands.ts b/Backend/routes/commands.ts index 26f2327..614955c 100644 --- a/Backend/routes/commands.ts +++ b/Backend/routes/commands.ts @@ -4,6 +4,7 @@ import { z } from 'zod'; import { db } from '../db/client'; import { deviceCommands, deviceLinks, devices } from '../db/schema'; +import { simpleStreamingEnabled } from '../media/config'; import { requireAuth } from '../middleware/auth'; import { requireDeviceAuth } from '../middleware/device-auth'; import { dispatchCommandById } from '../realtime/gateway'; @@ -47,6 +48,13 @@ router.post('/', requireAuth, async (req, res) => { return; } + if (simpleStreamingEnabled && parsed.data.commandType === 'start_stream') { + res.status(409).json({ + message: 'start_stream commands are disabled while SIMPLE_STREAMING is enabled; use /streams/request instead', + }); + return; + } + if (parsed.data.sourceDeviceId === parsed.data.targetDeviceId) { res.status(400).json({ message: 'sourceDeviceId and targetDeviceId must differ' }); return; diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts index 17775b1..5588a4c 100644 --- a/Backend/routes/streams.ts +++ b/Backend/routes/streams.ts @@ -5,14 +5,20 @@ import { Router } from 'express'; import { z } from 'zod'; import { db } from '../db/client'; -import { mediaMode } from '../media/config'; +import { mediaMode, simpleStreamingEnabled, streamRecordingEnabled } from '../media/config'; import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema'; -import { mediaProvider } from '../media/service'; +import { createLiveMediaSession, mediaProvider } from '../media/service'; import { sfuService } from '../media/sfu/service'; import { requireDeviceAuth } from '../middleware/device-auth'; import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; import { writeAuditLog } from '../services/audit'; import { enqueuePushNotification } from '../services/push'; +import { + createStreamEndedPayload, + createStreamRequestedPayload, + createStreamStartedPayload, + toSimpleStreamSessionResponse, +} from '../streaming/simple'; import { createRecordingForStream } from './recordings'; const router = Router(); @@ -158,6 +164,45 @@ router.post('/request', requireDeviceAuth, async (req, res) => { return; } + if (simpleStreamingEnabled) { + const requestPayload = createStreamRequestedPayload(session); + const deliveredToCamera = sendRealtimeToDevice(cameraDevice.id, 'stream:requested', requestPayload); + + console.info('[stream.request]', { + streamSessionId: session.id, + requesterDeviceId: sourceDevice.id, + cameraDeviceId: cameraDevice.id, + mode: 'simple', + }); + + sendRealtimeToDevice(sourceDevice.id, 'stream:requested', requestPayload); + + if (!deliveredToCamera) { + await enqueuePushNotification({ + ownerUserId: cameraDevice.userId, + recipientDeviceId: cameraDevice.id, + type: 'stream_requested', + payload: requestPayload, + }); + } + + res.status(201).json({ + message: 'Stream request sent', + streamSession: toSimpleStreamSessionResponse(session), + }); + + await writeAuditLog({ + ownerUserId: sourceDevice.userId, + actorDeviceId: sourceDevice.id, + action: 'stream.requested', + targetType: 'stream_session', + targetId: session.id, + metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason, transport: 'webrtc' }, + ipAddress: req.ip, + }); + return; + } + const [command] = await db .insert(deviceCommands) .values({ @@ -182,6 +227,13 @@ router.post('/request', requireDeviceAuth, async (req, res) => { } await dispatchCommandById(command.id); + console.info('[stream.request]', { + streamSessionId: session.id, + requesterDeviceId: sourceDevice.id, + cameraDeviceId: cameraDevice.id, + mode: 'legacy', + commandId: command.id, + }); const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) }); @@ -259,7 +311,7 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { const now = new Date(); const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; - const mediaSession = await mediaProvider.createSession({ + const mediaSession = await createLiveMediaSession({ streamSessionId: session.id, ownerUserId: session.ownerUserId, cameraDeviceId: session.cameraDeviceId, @@ -270,11 +322,11 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { .update(streamSessions) .set({ status: 'streaming', - streamKey, - mediaProvider: mediaSession.provider, - mediaSessionId: mediaSession.mediaSessionId, - publishEndpoint: mediaSession.publishUrl, - subscribeEndpoint: mediaSession.subscribeUrl, + streamKey: mediaSession ? streamKey : null, + mediaProvider: mediaSession?.provider ?? 'simple', + mediaSessionId: mediaSession?.mediaSessionId ?? null, + publishEndpoint: mediaSession?.publishUrl ?? null, + subscribeEndpoint: mediaSession?.subscribeUrl ?? null, metadata: parsed.data.metadata ?? session.metadata, startedAt: now, updatedAt: now, @@ -303,29 +355,25 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { } } - const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', { + const startedPayload = createStreamStartedPayload(updated); + console.info('[stream.accept]', { streamSessionId: updated.id, + requesterDeviceId: updated.requesterDeviceId, cameraDeviceId: updated.cameraDeviceId, - status: updated.status, - startedAt: updated.startedAt, - mediaProvider: updated.mediaProvider, - mediaSessionId: updated.mediaSessionId, - subscribeEndpoint: updated.subscribeEndpoint, + mode: mediaSession ? 'legacy' : 'simple', }); + const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.requesterDeviceId, type: 'stream_started', - payload: { - streamSessionId: updated.id, - cameraDeviceId: updated.cameraDeviceId, - }, + payload: startedPayload, }); } - res.json({ message: 'Stream accepted', streamSession: updated }); + res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) }); await writeAuditLog({ ownerUserId: session.ownerUserId, @@ -333,7 +381,9 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { action: 'stream.accepted', targetType: 'stream_session', targetId: session.id, - metadata: { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider }, + metadata: mediaSession + ? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider } + : { transport: 'webrtc' }, ipAddress: req.ip, }); }); @@ -349,6 +399,11 @@ router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (re const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; + if (simpleStreamingEnabled) { + res.status(409).json({ message: 'SIMPLE_STREAMING does not use publish credentials' }); + return; + } + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -396,6 +451,11 @@ router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async ( const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; + if (simpleStreamingEnabled) { + res.status(409).json({ message: 'SIMPLE_STREAMING does not use subscribe credentials' }); + return; + } + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { @@ -605,17 +665,31 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { } const now = new Date(); + const nextStatus = simpleStreamingEnabled ? 'ended' : parsed.data.reason; + const nextMetadata = + simpleStreamingEnabled && parsed.data.reason !== 'completed' + ? { + ...(session.metadata ?? {}), + endReason: parsed.data.reason, + } + : session.metadata; const [updated] = await db .update(streamSessions) .set({ - status: parsed.data.reason, + status: nextStatus, endedAt: now, + metadata: nextMetadata, updatedAt: now, }) .where(eq(streamSessions.id, session.id)) .returning(); + if (!updated) { + res.status(500).json({ message: 'Failed to update stream session' }); + return; + } + if (sfuService) { try { await sfuService.endSession(session.id); @@ -624,29 +698,42 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { } } - await createRecordingForStream(session.id); + if (streamRecordingEnabled) { + await createRecordingForStream(session.id); + } - const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', { + const endedPayload = simpleStreamingEnabled + ? createStreamEndedPayload({ + streamSessionId: session.id, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + endedAt: now, + reason: parsed.data.reason, + }) + : { + streamSessionId: session.id, + status: parsed.data.reason, + endedAt: now, + }; + + console.info('[stream.end]', { streamSessionId: session.id, - status: parsed.data.reason, - endedAt: now, + requesterDeviceId: session.requesterDeviceId, + cameraDeviceId: session.cameraDeviceId, + reason: parsed.data.reason, + status: simpleStreamingEnabled ? 'ended' : parsed.data.reason, }); - const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', { - streamSessionId: session.id, - status: parsed.data.reason, - endedAt: now, - }); + const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', endedPayload); + + const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', endedPayload); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.requesterDeviceId, type: 'stream_ended', - payload: { - streamSessionId: session.id, - status: parsed.data.reason, - }, + payload: endedPayload, }); } @@ -655,14 +742,11 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { ownerUserId: session.ownerUserId, recipientDeviceId: session.cameraDeviceId, type: 'stream_ended', - payload: { - streamSessionId: session.id, - status: parsed.data.reason, - }, + payload: endedPayload, }); } - res.json({ message: 'Stream ended', streamSession: updated }); + res.json({ message: 'Stream ended', streamSession: toSimpleStreamSessionResponse(updated) }); }); router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => { @@ -676,6 +760,11 @@ router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, re const deviceAuth = ensureDeviceAuth(req, res); if (!deviceAuth) return; + if (simpleStreamingEnabled) { + res.status(409).json({ message: 'SIMPLE_STREAMING does not issue playback tokens' }); + return; + } + const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { diff --git a/Backend/streaming/simple.ts b/Backend/streaming/simple.ts new file mode 100644 index 0000000..c50bbee --- /dev/null +++ b/Backend/streaming/simple.ts @@ -0,0 +1,80 @@ +type StreamSessionLike = { + id: string; + ownerUserId: string; + cameraDeviceId: string; + requesterDeviceId: string; + status: string; + reason: string; + metadata: Record | null; + startedAt: Date | null; + endedAt: Date | null; + createdAt: Date; + updatedAt: Date; +}; + +type StreamEndedPayloadInput = { + streamSessionId: string; + cameraDeviceId: string; + requesterDeviceId: string; + endedAt: Date; + reason: 'completed' | 'cancelled' | 'failed'; +}; + +export const isStreamParticipant = (session: Pick, deviceId: string): boolean => + session.cameraDeviceId === deviceId || session.requesterDeviceId === deviceId; + +export const canRelayWebrtcSignal = ( + session: Pick, + fromDeviceId: string, + toDeviceId: string, +): boolean => { + if (fromDeviceId === toDeviceId) { + return false; + } + + return isStreamParticipant(session, fromDeviceId) && isStreamParticipant(session, toDeviceId); +}; + +export const createStreamRequestedPayload = ( + session: Pick, +) => ({ + streamSessionId: session.id, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + status: session.status, + reason: session.reason, +}); + +export const createStreamStartedPayload = ( + session: Pick, +) => ({ + streamSessionId: session.id, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + status: session.status, + startedAt: session.startedAt, + transport: 'webrtc', +}); + +export const createStreamEndedPayload = (input: StreamEndedPayloadInput) => ({ + streamSessionId: input.streamSessionId, + cameraDeviceId: input.cameraDeviceId, + requesterDeviceId: input.requesterDeviceId, + status: 'ended', + endedAt: input.endedAt, + reason: input.reason, +}); + +export const toSimpleStreamSessionResponse = (session: StreamSessionLike) => ({ + id: session.id, + ownerUserId: session.ownerUserId, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + status: session.status, + reason: session.reason, + metadata: session.metadata, + startedAt: session.startedAt, + endedAt: session.endedAt, + createdAt: session.createdAt, + updatedAt: session.updatedAt, +}); diff --git a/Backend/tests/media-config.test.ts b/Backend/tests/media-config.test.ts new file mode 100644 index 0000000..5b083b8 --- /dev/null +++ b/Backend/tests/media-config.test.ts @@ -0,0 +1,22 @@ +import { describe, expect, test } from 'bun:test'; + +import { parseFeatureFlag } from '../media/config'; + +describe('media config feature flags', () => { + test('parses enabled values', () => { + expect(parseFeatureFlag('true', false)).toBe(true); + expect(parseFeatureFlag('1', false)).toBe(true); + expect(parseFeatureFlag('yes', false)).toBe(true); + }); + + test('parses disabled values', () => { + expect(parseFeatureFlag('false', true)).toBe(false); + expect(parseFeatureFlag('0', true)).toBe(false); + expect(parseFeatureFlag('off', true)).toBe(false); + }); + + test('falls back to default value for unknown input', () => { + expect(parseFeatureFlag(undefined, true)).toBe(true); + expect(parseFeatureFlag('maybe', false)).toBe(false); + }); +}); diff --git a/Backend/tests/streaming-simple.test.ts b/Backend/tests/streaming-simple.test.ts new file mode 100644 index 0000000..577b829 --- /dev/null +++ b/Backend/tests/streaming-simple.test.ts @@ -0,0 +1,97 @@ +import { describe, expect, test } from 'bun:test'; + +import { + canRelayWebrtcSignal, + createStreamEndedPayload, + createStreamRequestedPayload, + createStreamStartedPayload, + toSimpleStreamSessionResponse, +} from '../streaming/simple'; + +const buildSession = () => ({ + id: 'stream-1', + ownerUserId: 'user-1', + cameraDeviceId: 'camera-1', + requesterDeviceId: 'client-1', + status: 'streaming', + reason: 'on_demand', + metadata: { quality: 'standard' }, + startedAt: new Date('2026-04-06T10:00:00.000Z'), + endedAt: null, + createdAt: new Date('2026-04-06T09:59:00.000Z'), + updatedAt: new Date('2026-04-06T10:00:00.000Z'), +}); + +describe('simple streaming helpers', () => { + test('only relays WebRTC signals between stream participants', () => { + const session = buildSession(); + + expect(canRelayWebrtcSignal(session, 'camera-1', 'client-1')).toBe(true); + expect(canRelayWebrtcSignal(session, 'client-1', 'camera-1')).toBe(true); + expect(canRelayWebrtcSignal(session, 'camera-1', 'camera-1')).toBe(false); + expect(canRelayWebrtcSignal(session, 'camera-1', 'intruder-1')).toBe(false); + }); + + test('builds deterministic requested and started payloads', () => { + const session = buildSession(); + + expect(createStreamRequestedPayload(session)).toEqual({ + streamSessionId: 'stream-1', + cameraDeviceId: 'camera-1', + requesterDeviceId: 'client-1', + status: 'streaming', + reason: 'on_demand', + }); + + expect(createStreamStartedPayload(session)).toEqual({ + streamSessionId: 'stream-1', + cameraDeviceId: 'camera-1', + requesterDeviceId: 'client-1', + status: 'streaming', + startedAt: session.startedAt, + transport: 'webrtc', + }); + }); + + test('normalizes ended payload and strips provider fields from API response', () => { + const session = { + ...buildSession(), + mediaProvider: 'mock', + mediaSessionId: 'mock_stream-1', + streamKey: 'stream-key', + publishEndpoint: 'https://example.test/publish', + subscribeEndpoint: 'https://example.test/subscribe', + }; + + expect( + createStreamEndedPayload({ + streamSessionId: session.id, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + endedAt: new Date('2026-04-06T10:05:00.000Z'), + reason: 'completed', + }), + ).toEqual({ + streamSessionId: 'stream-1', + cameraDeviceId: 'camera-1', + requesterDeviceId: 'client-1', + status: 'ended', + endedAt: new Date('2026-04-06T10:05:00.000Z'), + reason: 'completed', + }); + + expect(toSimpleStreamSessionResponse(session)).toEqual({ + id: 'stream-1', + ownerUserId: 'user-1', + cameraDeviceId: 'camera-1', + requesterDeviceId: 'client-1', + status: 'streaming', + reason: 'on_demand', + metadata: { quality: 'standard' }, + startedAt: new Date('2026-04-06T10:00:00.000Z'), + endedAt: null, + createdAt: new Date('2026-04-06T09:59:00.000Z'), + updatedAt: new Date('2026-04-06T10:00:00.000Z'), + }); + }); +});