feat(media): add phase5 media provider abstraction and stream credentials APIs

This commit is contained in:
2026-01-19 16:55:00 +00:00
parent f66b5ad15d
commit b800baefb2
8 changed files with 311 additions and 67 deletions

View File

@@ -65,6 +65,10 @@ export const streamSessions = pgTable('stream_sessions', {
requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id), requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id),
status: varchar('status', { length: 32 }).default('requested').notNull(), status: varchar('status', { length: 32 }).default('requested').notNull(),
reason: varchar('reason', { length: 32 }).default('on_demand').notNull(), reason: varchar('reason', { length: 32 }).default('on_demand').notNull(),
mediaProvider: varchar('media_provider', { length: 32 }).default('mock').notNull(),
mediaSessionId: varchar('media_session_id', { length: 255 }),
publishEndpoint: text('publish_endpoint'),
subscribeEndpoint: text('subscribe_endpoint'),
streamKey: varchar('stream_key', { length: 255 }), streamKey: varchar('stream_key', { length: 255 }),
startedAt: timestamp('started_at', { withTimezone: true }), startedAt: timestamp('started_at', { withTimezone: true }),
endedAt: timestamp('ended_at', { withTimezone: true }), endedAt: timestamp('ended_at', { withTimezone: true }),

View File

@@ -0,0 +1,4 @@
ALTER TABLE "stream_sessions" ADD COLUMN "media_provider" varchar(32) DEFAULT 'mock' NOT NULL;--> statement-breakpoint
ALTER TABLE "stream_sessions" ADD COLUMN "media_session_id" varchar(255);--> statement-breakpoint
ALTER TABLE "stream_sessions" ADD COLUMN "publish_endpoint" text;--> statement-breakpoint
ALTER TABLE "stream_sessions" ADD COLUMN "subscribe_endpoint" text;

View File

@@ -57,6 +57,13 @@
"when": 1770414956419, "when": 1770414956419,
"tag": "0007_live_stream_sessions", "tag": "0007_live_stream_sessions",
"breakpoints": true "breakpoints": true
},
{
"idx": 8,
"version": "7",
"when": 1770415956419,
"tag": "0008_media_plane_columns",
"breakpoints": true
} }
] ]
} }

View File

@@ -0,0 +1,96 @@
import { createHmac } from 'crypto';
import type {
MediaProvider,
MediaPublishCredentials,
MediaSessionCreateInput,
MediaSessionCreateResult,
MediaSubscribeCredentials,
} from '../types';
const secret = process.env.BETTER_AUTH_SECRET;
if (!secret) {
throw new Error('BETTER_AUTH_SECRET is required for mock media provider token signing');
}
const DEFAULT_TTL_SECONDS = 60 * 10;
const signToken = (payload: Record<string, unknown>, ttlSeconds = DEFAULT_TTL_SECONDS): { token: string; expiresInSeconds: number } => {
const body = {
...payload,
exp: Math.floor(Date.now() / 1000) + ttlSeconds,
};
const encoded = Buffer.from(JSON.stringify(body), 'utf8').toString('base64url');
const signature = createHmac('sha256', secret).update(encoded).digest('base64url');
return {
token: `${encoded}.${signature}`,
expiresInSeconds: ttlSeconds,
};
};
const getBaseUrl = (): string => process.env.MEDIA_MOCK_BASE_URL ?? process.env.BETTER_AUTH_URL ?? 'http://localhost:3000';
export class MockMediaProvider implements MediaProvider {
name = 'mock';
async createSession(input: MediaSessionCreateInput): Promise<MediaSessionCreateResult> {
const mediaSessionId = `mock_${input.streamSessionId}`;
const baseUrl = getBaseUrl();
return {
provider: this.name,
mediaSessionId,
publishUrl: `${baseUrl}/media/mock/publish/${mediaSessionId}`,
subscribeUrl: `${baseUrl}/media/mock/subscribe/${mediaSessionId}`,
};
}
async issuePublishCredentials(input: {
mediaSessionId: string;
cameraDeviceId: string;
ownerUserId: string;
}): Promise<MediaPublishCredentials> {
const baseUrl = getBaseUrl();
const { token, expiresInSeconds } = signToken({
typ: 'publish',
provider: this.name,
mediaSessionId: input.mediaSessionId,
cameraDeviceId: input.cameraDeviceId,
ownerUserId: input.ownerUserId,
});
return {
provider: this.name,
mediaSessionId: input.mediaSessionId,
publishToken: token,
publishUrl: `${baseUrl}/media/mock/publish/${input.mediaSessionId}`,
expiresInSeconds,
};
}
async issueSubscribeCredentials(input: {
mediaSessionId: string;
viewerDeviceId: string;
ownerUserId: string;
}): Promise<MediaSubscribeCredentials> {
const baseUrl = getBaseUrl();
const { token, expiresInSeconds } = signToken({
typ: 'subscribe',
provider: this.name,
mediaSessionId: input.mediaSessionId,
viewerDeviceId: input.viewerDeviceId,
ownerUserId: input.ownerUserId,
});
return {
provider: this.name,
mediaSessionId: input.mediaSessionId,
subscribeToken: token,
subscribeUrl: `${baseUrl}/media/mock/subscribe/${input.mediaSessionId}`,
expiresInSeconds,
};
}
}

15
Backend/media/service.ts Normal file
View File

@@ -0,0 +1,15 @@
import { MockMediaProvider } from './providers/mock';
import type { MediaProvider } from './types';
const providerName = (process.env.MEDIA_PROVIDER ?? 'mock').toLowerCase();
const createProvider = (): MediaProvider => {
switch (providerName) {
case 'mock':
return new MockMediaProvider();
default:
throw new Error(`Unsupported MEDIA_PROVIDER: ${providerName}`);
}
};
export const mediaProvider = createProvider();

44
Backend/media/types.ts Normal file
View File

@@ -0,0 +1,44 @@
export type MediaSessionCreateInput = {
streamSessionId: string;
ownerUserId: string;
cameraDeviceId: string;
requesterDeviceId: string;
};
export type MediaPublishCredentials = {
provider: string;
mediaSessionId: string;
publishToken: string;
publishUrl: string;
expiresInSeconds: number;
};
export type MediaSubscribeCredentials = {
provider: string;
mediaSessionId: string;
subscribeToken: string;
subscribeUrl: string;
expiresInSeconds: number;
};
export type MediaSessionCreateResult = {
provider: string;
mediaSessionId: string;
publishUrl: string;
subscribeUrl: string;
};
export interface MediaProvider {
name: string;
createSession(input: MediaSessionCreateInput): Promise<MediaSessionCreateResult>;
issuePublishCredentials(input: {
mediaSessionId: string;
cameraDeviceId: string;
ownerUserId: string;
}): Promise<MediaPublishCredentials>;
issueSubscribeCredentials(input: {
mediaSessionId: string;
viewerDeviceId: string;
ownerUserId: string;
}): Promise<MediaSubscribeCredentials>;
}

View File

@@ -6,9 +6,9 @@ import { z } from 'zod';
import { db } from '../db/client'; import { db } from '../db/client';
import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema'; import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema';
import { mediaProvider } from '../media/service';
import { requireDeviceAuth } from '../middleware/device-auth'; import { requireDeviceAuth } from '../middleware/device-auth';
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
import { createStreamPlaybackToken } from '../utils/stream-token';
const router = Router(); const router = Router();
@@ -36,6 +36,35 @@ const listSchema = z.object({
limit: z.coerce.number().int().min(1).max(100).default(25), limit: z.coerce.number().int().min(1).max(100).default(25),
}); });
router.get('/me/list', requireDeviceAuth, async (req, res) => {
const parsed = listSchema.safeParse(req.query);
if (!parsed.success) {
res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
return;
}
const deviceAuth = req.deviceAuth;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return;
}
const sessions = await db.query.streamSessions.findMany({
where: and(
eq(streamSessions.ownerUserId, deviceAuth.userId),
or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)),
),
orderBy: [desc(streamSessions.createdAt)],
limit: parsed.data.limit,
});
const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions;
res.json({ count: filtered.length, streamSessions: filtered });
});
router.post('/request', requireDeviceAuth, async (req, res) => { router.post('/request', requireDeviceAuth, async (req, res) => {
const parsed = requestStreamSchema.safeParse(req.body ?? {}); const parsed = requestStreamSchema.safeParse(req.body ?? {});
@@ -100,6 +129,7 @@ router.post('/request', requireDeviceAuth, async (req, res) => {
status: 'requested', status: 'requested',
reason: parsed.data.reason, reason: parsed.data.reason,
metadata: parsed.data.metadata ?? null, metadata: parsed.data.metadata ?? null,
mediaProvider: mediaProvider.name,
updatedAt: now, updatedAt: now,
}) })
.returning(); .returning();
@@ -192,12 +222,22 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
const now = new Date(); const now = new Date();
const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`;
const mediaSession = await mediaProvider.createSession({
streamSessionId: session.id,
ownerUserId: session.ownerUserId,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
});
const [updated] = await db const [updated] = await db
.update(streamSessions) .update(streamSessions)
.set({ .set({
status: 'streaming', status: 'streaming',
streamKey, streamKey,
mediaProvider: mediaSession.provider,
mediaSessionId: mediaSession.mediaSessionId,
publishEndpoint: mediaSession.publishUrl,
subscribeEndpoint: mediaSession.subscribeUrl,
metadata: parsed.data.metadata ?? session.metadata, metadata: parsed.data.metadata ?? session.metadata,
startedAt: now, startedAt: now,
updatedAt: now, updatedAt: now,
@@ -215,11 +255,103 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
cameraDeviceId: updated.cameraDeviceId, cameraDeviceId: updated.cameraDeviceId,
status: updated.status, status: updated.status,
startedAt: updated.startedAt, startedAt: updated.startedAt,
mediaProvider: updated.mediaProvider,
mediaSessionId: updated.mediaSessionId,
subscribeEndpoint: updated.subscribeEndpoint,
}); });
res.json({ message: 'Stream accepted', streamSession: updated }); res.json({ message: 'Stream accepted', streamSession: updated });
}); });
router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const deviceAuth = req.deviceAuth;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return;
}
const session = await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
});
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.cameraDeviceId !== deviceAuth.deviceId) {
res.status(403).json({ message: 'Only camera device can request publish credentials' });
return;
}
if (!session.mediaSessionId || session.status !== 'streaming') {
res.status(409).json({ message: 'Stream session is not ready for publish credentials' });
return;
}
const credentials = await mediaProvider.issuePublishCredentials({
mediaSessionId: session.mediaSessionId,
cameraDeviceId: session.cameraDeviceId,
ownerUserId: session.ownerUserId,
});
res.json(credentials);
});
router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const deviceAuth = req.deviceAuth;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return;
}
const session = await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
});
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const isRequester = session.requesterDeviceId === deviceAuth.deviceId;
const isCamera = session.cameraDeviceId === deviceAuth.deviceId;
if (!isRequester && !isCamera) {
res.status(403).json({ message: 'Device cannot request subscribe credentials for this stream' });
return;
}
if (!session.mediaSessionId || session.status !== 'streaming') {
res.status(409).json({ message: 'Stream is not active yet' });
return;
}
const credentials = await mediaProvider.issueSubscribeCredentials({
mediaSessionId: session.mediaSessionId,
viewerDeviceId: deviceAuth.deviceId,
ownerUserId: session.ownerUserId,
});
res.json(credentials);
});
router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params); const parsedParams = streamParamSchema.safeParse(req.params);
@@ -317,53 +449,26 @@ router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, re
return; return;
} }
if (!session.streamKey || session.status !== 'streaming') { if (!session.streamKey || !session.mediaSessionId || session.status !== 'streaming') {
res.status(409).json({ message: 'Stream is not active yet' }); res.status(409).json({ message: 'Stream is not active yet' });
return; return;
} }
const playbackToken = createStreamPlaybackToken({ const credentials = await mediaProvider.issueSubscribeCredentials({
sessionId: session.id, mediaSessionId: session.mediaSessionId,
viewerDeviceId: deviceAuth.deviceId, viewerDeviceId: deviceAuth.deviceId,
userId: deviceAuth.userId, ownerUserId: deviceAuth.userId,
}); });
res.json({ res.json({
streamSessionId: session.id, streamSessionId: session.id,
streamKey: session.streamKey, streamKey: session.streamKey,
status: session.status, status: session.status,
playbackToken, playbackToken: credentials.subscribeToken,
expiresInSeconds: 60 * 15, subscribeUrl: credentials.subscribeUrl,
mediaProvider: credentials.provider,
expiresInSeconds: credentials.expiresInSeconds,
}); });
}); });
router.get('/me/list', requireDeviceAuth, async (req, res) => {
const parsed = listSchema.safeParse(req.query);
if (!parsed.success) {
res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
return;
}
const deviceAuth = req.deviceAuth;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return;
}
const sessions = await db.query.streamSessions.findMany({
where: and(
eq(streamSessions.ownerUserId, deviceAuth.userId),
or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)),
),
orderBy: [desc(streamSessions.createdAt)],
limit: parsed.data.limit,
});
const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions;
res.json({ count: filtered.length, streamSessions: filtered });
});
export default router; export default router;

View File

@@ -1,31 +0,0 @@
import { createHmac } from 'crypto';
type StreamPlaybackPayload = {
sessionId: string;
viewerDeviceId: string;
userId: string;
exp: number;
};
const secret = process.env.BETTER_AUTH_SECRET;
if (!secret) {
throw new Error('BETTER_AUTH_SECRET is required for stream playback token signing');
}
const sign = (data: string): string => createHmac('sha256', secret).update(data).digest('base64url');
export const createStreamPlaybackToken = (
payload: Omit<StreamPlaybackPayload, 'exp'>,
ttlSeconds = 60 * 15,
): string => {
const body: StreamPlaybackPayload = {
...payload,
exp: Math.floor(Date.now() / 1000) + ttlSeconds,
};
const encodedPayload = Buffer.from(JSON.stringify(body), 'utf8').toString('base64url');
const signature = sign(encodedPayload);
return `${encodedPayload}.${signature}`;
};