Files

136 lines
4.7 KiB
TypeScript

import { randomUUID } from 'crypto';
import { eq } from 'drizzle-orm';
import { Router } from 'express';
import { db } from '../../db/client';
import { streamSessions } from '../../db/schema';
import { createLiveMediaSession } from '../../media/service';
import { sfuService } from '../../media/sfu/service';
import { requireDeviceAuth } from '../../middleware/device-auth';
import { sendRealtimeToDevice } from '../../realtime/gateway';
import { createRecordingForStream } from '../../services/recordings';
import { writeAuditLog } from '../../services/audit';
import { enqueuePushNotification } from '../../services/push';
import { createStreamStartedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple';
import { acceptStreamSchema, streamParamSchema } from './schemas';
import { ensureStreamDeviceAuth, shouldCreateRecordingPlaceholder } from './shared';
const router = Router();
router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsed = acceptStreamSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
return;
}
const deviceAuth = ensureStreamDeviceAuth(req, res);
if (!deviceAuth) return;
const session = await db.query.streamSessions.findFirst({
where: eq(streamSessions.id, parsedParams.data.streamSessionId),
});
if (!session || session.ownerUserId !== deviceAuth.userId || session.cameraDeviceId !== deviceAuth.deviceId) {
res.status(404).json({ message: 'Stream session not found for this camera device' });
return;
}
if (session.status !== 'requested' && session.status !== 'starting') {
res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` });
return;
}
const now = new Date();
const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`;
const mediaSession = await createLiveMediaSession({
streamSessionId: session.id,
ownerUserId: session.ownerUserId,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
});
const [updated] = await db
.update(streamSessions)
.set({
status: 'streaming',
streamKey: mediaSession ? streamKey : null,
mediaProvider: mediaSession?.provider ?? 'simple',
mediaSessionId: mediaSession?.mediaSessionId ?? null,
publishEndpoint: mediaSession?.publishUrl ?? null,
subscribeEndpoint: mediaSession?.subscribeUrl ?? null,
metadata: parsed.data.metadata ?? session.metadata,
startedAt: now,
updatedAt: now,
})
.where(eq(streamSessions.id, session.id))
.returning();
if (!updated) {
res.status(500).json({ message: 'Failed to update stream session' });
return;
}
if (shouldCreateRecordingPlaceholder()) {
await createRecordingForStream(updated.id);
}
if (sfuService) {
try {
await sfuService.startSession({
streamSessionId: updated.id,
ownerUserId: updated.ownerUserId,
cameraDeviceId: updated.cameraDeviceId,
requesterDeviceId: updated.requesterDeviceId,
});
await sfuService.setSessionState(updated.id, 'live');
} catch (error) {
console.error('Failed starting SFU session', error);
res.status(500).json({ message: 'Failed to initialize SFU session' });
return;
}
}
const startedPayload = createStreamStartedPayload(updated);
console.info('[stream.accept]', {
streamSessionId: updated.id,
requesterDeviceId: updated.requesterDeviceId,
cameraDeviceId: updated.cameraDeviceId,
mode: mediaSession ? 'legacy' : 'simple',
});
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload);
if (!deliveredToRequester) {
await enqueuePushNotification({
ownerUserId: session.ownerUserId,
recipientDeviceId: session.requesterDeviceId,
type: 'stream_started',
payload: startedPayload,
});
}
res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) });
await writeAuditLog({
ownerUserId: session.ownerUserId,
actorDeviceId: session.cameraDeviceId,
action: 'stream.accepted',
targetType: 'stream_session',
targetId: session.id,
metadata: mediaSession
? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider }
: { transport: 'webrtc' },
ipAddress: req.ip,
});
});
export default router;