From b800baefb2aec51cf55254504e1aa0c5dacbe7c7 Mon Sep 17 00:00:00 2001 From: Matiss Jurevics Date: Mon, 19 Jan 2026 16:55:00 +0000 Subject: [PATCH] feat(media): add phase5 media provider abstraction and stream credentials APIs --- Backend/db/schema.ts | 4 + Backend/drizzle/0008_media_plane_columns.sql | 4 + Backend/drizzle/meta/_journal.json | 7 + Backend/media/providers/mock.ts | 96 ++++++++++ Backend/media/service.ts | 15 ++ Backend/media/types.ts | 44 +++++ Backend/routes/streams.ts | 177 +++++++++++++++---- Backend/utils/stream-token.ts | 31 ---- 8 files changed, 311 insertions(+), 67 deletions(-) create mode 100644 Backend/drizzle/0008_media_plane_columns.sql create mode 100644 Backend/media/providers/mock.ts create mode 100644 Backend/media/service.ts create mode 100644 Backend/media/types.ts delete mode 100644 Backend/utils/stream-token.ts diff --git a/Backend/db/schema.ts b/Backend/db/schema.ts index cae365d..0b878d7 100644 --- a/Backend/db/schema.ts +++ b/Backend/db/schema.ts @@ -65,6 +65,10 @@ export const streamSessions = pgTable('stream_sessions', { requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id), status: varchar('status', { length: 32 }).default('requested').notNull(), reason: varchar('reason', { length: 32 }).default('on_demand').notNull(), + mediaProvider: varchar('media_provider', { length: 32 }).default('mock').notNull(), + mediaSessionId: varchar('media_session_id', { length: 255 }), + publishEndpoint: text('publish_endpoint'), + subscribeEndpoint: text('subscribe_endpoint'), streamKey: varchar('stream_key', { length: 255 }), startedAt: timestamp('started_at', { withTimezone: true }), endedAt: timestamp('ended_at', { withTimezone: true }), diff --git a/Backend/drizzle/0008_media_plane_columns.sql b/Backend/drizzle/0008_media_plane_columns.sql new file mode 100644 index 0000000..240f66e --- /dev/null +++ b/Backend/drizzle/0008_media_plane_columns.sql @@ -0,0 +1,4 @@ +ALTER TABLE "stream_sessions" ADD COLUMN "media_provider" varchar(32) DEFAULT 'mock' NOT NULL;--> statement-breakpoint +ALTER TABLE "stream_sessions" ADD COLUMN "media_session_id" varchar(255);--> statement-breakpoint +ALTER TABLE "stream_sessions" ADD COLUMN "publish_endpoint" text;--> statement-breakpoint +ALTER TABLE "stream_sessions" ADD COLUMN "subscribe_endpoint" text; diff --git a/Backend/drizzle/meta/_journal.json b/Backend/drizzle/meta/_journal.json index 118fe79..81506ec 100644 --- a/Backend/drizzle/meta/_journal.json +++ b/Backend/drizzle/meta/_journal.json @@ -57,6 +57,13 @@ "when": 1770414956419, "tag": "0007_live_stream_sessions", "breakpoints": true + }, + { + "idx": 8, + "version": "7", + "when": 1770415956419, + "tag": "0008_media_plane_columns", + "breakpoints": true } ] } diff --git a/Backend/media/providers/mock.ts b/Backend/media/providers/mock.ts new file mode 100644 index 0000000..357c47f --- /dev/null +++ b/Backend/media/providers/mock.ts @@ -0,0 +1,96 @@ +import { createHmac } from 'crypto'; + +import type { + MediaProvider, + MediaPublishCredentials, + MediaSessionCreateInput, + MediaSessionCreateResult, + MediaSubscribeCredentials, +} from '../types'; + +const secret = process.env.BETTER_AUTH_SECRET; + +if (!secret) { + throw new Error('BETTER_AUTH_SECRET is required for mock media provider token signing'); +} + +const DEFAULT_TTL_SECONDS = 60 * 10; + +const signToken = (payload: Record, ttlSeconds = DEFAULT_TTL_SECONDS): { token: string; expiresInSeconds: number } => { + const body = { + ...payload, + exp: Math.floor(Date.now() / 1000) + ttlSeconds, + }; + + const encoded = Buffer.from(JSON.stringify(body), 'utf8').toString('base64url'); + const signature = createHmac('sha256', secret).update(encoded).digest('base64url'); + + return { + token: `${encoded}.${signature}`, + expiresInSeconds: ttlSeconds, + }; +}; + +const getBaseUrl = (): string => process.env.MEDIA_MOCK_BASE_URL ?? process.env.BETTER_AUTH_URL ?? 'http://localhost:3000'; + +export class MockMediaProvider implements MediaProvider { + name = 'mock'; + + async createSession(input: MediaSessionCreateInput): Promise { + const mediaSessionId = `mock_${input.streamSessionId}`; + const baseUrl = getBaseUrl(); + + return { + provider: this.name, + mediaSessionId, + publishUrl: `${baseUrl}/media/mock/publish/${mediaSessionId}`, + subscribeUrl: `${baseUrl}/media/mock/subscribe/${mediaSessionId}`, + }; + } + + async issuePublishCredentials(input: { + mediaSessionId: string; + cameraDeviceId: string; + ownerUserId: string; + }): Promise { + const baseUrl = getBaseUrl(); + const { token, expiresInSeconds } = signToken({ + typ: 'publish', + provider: this.name, + mediaSessionId: input.mediaSessionId, + cameraDeviceId: input.cameraDeviceId, + ownerUserId: input.ownerUserId, + }); + + return { + provider: this.name, + mediaSessionId: input.mediaSessionId, + publishToken: token, + publishUrl: `${baseUrl}/media/mock/publish/${input.mediaSessionId}`, + expiresInSeconds, + }; + } + + async issueSubscribeCredentials(input: { + mediaSessionId: string; + viewerDeviceId: string; + ownerUserId: string; + }): Promise { + const baseUrl = getBaseUrl(); + const { token, expiresInSeconds } = signToken({ + typ: 'subscribe', + provider: this.name, + mediaSessionId: input.mediaSessionId, + viewerDeviceId: input.viewerDeviceId, + ownerUserId: input.ownerUserId, + }); + + return { + provider: this.name, + mediaSessionId: input.mediaSessionId, + subscribeToken: token, + subscribeUrl: `${baseUrl}/media/mock/subscribe/${input.mediaSessionId}`, + expiresInSeconds, + }; + } +} diff --git a/Backend/media/service.ts b/Backend/media/service.ts new file mode 100644 index 0000000..82e7178 --- /dev/null +++ b/Backend/media/service.ts @@ -0,0 +1,15 @@ +import { MockMediaProvider } from './providers/mock'; +import type { MediaProvider } from './types'; + +const providerName = (process.env.MEDIA_PROVIDER ?? 'mock').toLowerCase(); + +const createProvider = (): MediaProvider => { + switch (providerName) { + case 'mock': + return new MockMediaProvider(); + default: + throw new Error(`Unsupported MEDIA_PROVIDER: ${providerName}`); + } +}; + +export const mediaProvider = createProvider(); diff --git a/Backend/media/types.ts b/Backend/media/types.ts new file mode 100644 index 0000000..aab7123 --- /dev/null +++ b/Backend/media/types.ts @@ -0,0 +1,44 @@ +export type MediaSessionCreateInput = { + streamSessionId: string; + ownerUserId: string; + cameraDeviceId: string; + requesterDeviceId: string; +}; + +export type MediaPublishCredentials = { + provider: string; + mediaSessionId: string; + publishToken: string; + publishUrl: string; + expiresInSeconds: number; +}; + +export type MediaSubscribeCredentials = { + provider: string; + mediaSessionId: string; + subscribeToken: string; + subscribeUrl: string; + expiresInSeconds: number; +}; + +export type MediaSessionCreateResult = { + provider: string; + mediaSessionId: string; + publishUrl: string; + subscribeUrl: string; +}; + +export interface MediaProvider { + name: string; + createSession(input: MediaSessionCreateInput): Promise; + issuePublishCredentials(input: { + mediaSessionId: string; + cameraDeviceId: string; + ownerUserId: string; + }): Promise; + issueSubscribeCredentials(input: { + mediaSessionId: string; + viewerDeviceId: string; + ownerUserId: string; + }): Promise; +} diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts index 00691e9..fae2fd8 100644 --- a/Backend/routes/streams.ts +++ b/Backend/routes/streams.ts @@ -6,9 +6,9 @@ import { z } from 'zod'; import { db } from '../db/client'; import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema'; +import { mediaProvider } from '../media/service'; import { requireDeviceAuth } from '../middleware/device-auth'; import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; -import { createStreamPlaybackToken } from '../utils/stream-token'; const router = Router(); @@ -36,6 +36,35 @@ const listSchema = z.object({ limit: z.coerce.number().int().min(1).max(100).default(25), }); +router.get('/me/list', requireDeviceAuth, async (req, res) => { + const parsed = listSchema.safeParse(req.query); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const sessions = await db.query.streamSessions.findMany({ + where: and( + eq(streamSessions.ownerUserId, deviceAuth.userId), + or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)), + ), + orderBy: [desc(streamSessions.createdAt)], + limit: parsed.data.limit, + }); + + const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions; + + res.json({ count: filtered.length, streamSessions: filtered }); +}); + router.post('/request', requireDeviceAuth, async (req, res) => { const parsed = requestStreamSchema.safeParse(req.body ?? {}); @@ -100,6 +129,7 @@ router.post('/request', requireDeviceAuth, async (req, res) => { status: 'requested', reason: parsed.data.reason, metadata: parsed.data.metadata ?? null, + mediaProvider: mediaProvider.name, updatedAt: now, }) .returning(); @@ -192,12 +222,22 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { const now = new Date(); const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; + const mediaSession = await mediaProvider.createSession({ + streamSessionId: session.id, + ownerUserId: session.ownerUserId, + cameraDeviceId: session.cameraDeviceId, + requesterDeviceId: session.requesterDeviceId, + }); const [updated] = await db .update(streamSessions) .set({ status: 'streaming', streamKey, + mediaProvider: mediaSession.provider, + mediaSessionId: mediaSession.mediaSessionId, + publishEndpoint: mediaSession.publishUrl, + subscribeEndpoint: mediaSession.subscribeUrl, metadata: parsed.data.metadata ?? session.metadata, startedAt: now, updatedAt: now, @@ -215,11 +255,103 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { cameraDeviceId: updated.cameraDeviceId, status: updated.status, startedAt: updated.startedAt, + mediaProvider: updated.mediaProvider, + mediaSessionId: updated.mediaSessionId, + subscribeEndpoint: updated.subscribeEndpoint, }); res.json({ message: 'Stream accepted', streamSession: updated }); }); +router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const session = await db.query.streamSessions.findFirst({ + where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), + }); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + if (session.cameraDeviceId !== deviceAuth.deviceId) { + res.status(403).json({ message: 'Only camera device can request publish credentials' }); + return; + } + + if (!session.mediaSessionId || session.status !== 'streaming') { + res.status(409).json({ message: 'Stream session is not ready for publish credentials' }); + return; + } + + const credentials = await mediaProvider.issuePublishCredentials({ + mediaSessionId: session.mediaSessionId, + cameraDeviceId: session.cameraDeviceId, + ownerUserId: session.ownerUserId, + }); + + res.json(credentials); +}); + +router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const session = await db.query.streamSessions.findFirst({ + where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), + }); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + const isRequester = session.requesterDeviceId === deviceAuth.deviceId; + const isCamera = session.cameraDeviceId === deviceAuth.deviceId; + + if (!isRequester && !isCamera) { + res.status(403).json({ message: 'Device cannot request subscribe credentials for this stream' }); + return; + } + + if (!session.mediaSessionId || session.status !== 'streaming') { + res.status(409).json({ message: 'Stream is not active yet' }); + return; + } + + const credentials = await mediaProvider.issueSubscribeCredentials({ + mediaSessionId: session.mediaSessionId, + viewerDeviceId: deviceAuth.deviceId, + ownerUserId: session.ownerUserId, + }); + + res.json(credentials); +}); + router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); @@ -317,53 +449,26 @@ router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, re return; } - if (!session.streamKey || session.status !== 'streaming') { + if (!session.streamKey || !session.mediaSessionId || session.status !== 'streaming') { res.status(409).json({ message: 'Stream is not active yet' }); return; } - const playbackToken = createStreamPlaybackToken({ - sessionId: session.id, + const credentials = await mediaProvider.issueSubscribeCredentials({ + mediaSessionId: session.mediaSessionId, viewerDeviceId: deviceAuth.deviceId, - userId: deviceAuth.userId, + ownerUserId: deviceAuth.userId, }); res.json({ streamSessionId: session.id, streamKey: session.streamKey, status: session.status, - playbackToken, - expiresInSeconds: 60 * 15, + playbackToken: credentials.subscribeToken, + subscribeUrl: credentials.subscribeUrl, + mediaProvider: credentials.provider, + expiresInSeconds: credentials.expiresInSeconds, }); }); -router.get('/me/list', requireDeviceAuth, async (req, res) => { - const parsed = listSchema.safeParse(req.query); - - if (!parsed.success) { - res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); - return; - } - - const deviceAuth = req.deviceAuth; - - if (!deviceAuth) { - res.status(401).json({ message: 'Unauthorized' }); - return; - } - - const sessions = await db.query.streamSessions.findMany({ - where: and( - eq(streamSessions.ownerUserId, deviceAuth.userId), - or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)), - ), - orderBy: [desc(streamSessions.createdAt)], - limit: parsed.data.limit, - }); - - const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions; - - res.json({ count: filtered.length, streamSessions: filtered }); -}); - export default router; diff --git a/Backend/utils/stream-token.ts b/Backend/utils/stream-token.ts deleted file mode 100644 index 7469313..0000000 --- a/Backend/utils/stream-token.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { createHmac } from 'crypto'; - -type StreamPlaybackPayload = { - sessionId: string; - viewerDeviceId: string; - userId: string; - exp: number; -}; - -const secret = process.env.BETTER_AUTH_SECRET; - -if (!secret) { - throw new Error('BETTER_AUTH_SECRET is required for stream playback token signing'); -} - -const sign = (data: string): string => createHmac('sha256', secret).update(data).digest('base64url'); - -export const createStreamPlaybackToken = ( - payload: Omit, - ttlSeconds = 60 * 15, -): string => { - const body: StreamPlaybackPayload = { - ...payload, - exp: Math.floor(Date.now() / 1000) + ttlSeconds, - }; - - const encodedPayload = Buffer.from(JSON.stringify(body), 'utf8').toString('base64url'); - const signature = sign(encodedPayload); - - return `${encodedPayload}.${signature}`; -};