refactor(backend): split stream routes into focused modules
This commit is contained in:
135
Backend/routes/streams/accept.ts
Normal file
135
Backend/routes/streams/accept.ts
Normal file
@@ -0,0 +1,135 @@
|
||||
import { randomUUID } from 'crypto';
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { Router } from 'express';
|
||||
|
||||
import { db } from '../../db/client';
|
||||
import { streamSessions } from '../../db/schema';
|
||||
import { createLiveMediaSession } from '../../media/service';
|
||||
import { sfuService } from '../../media/sfu/service';
|
||||
import { requireDeviceAuth } from '../../middleware/device-auth';
|
||||
import { sendRealtimeToDevice } from '../../realtime/gateway';
|
||||
import { createRecordingForStream } from '../../services/recordings';
|
||||
import { writeAuditLog } from '../../services/audit';
|
||||
import { enqueuePushNotification } from '../../services/push';
|
||||
import { createStreamStartedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple';
|
||||
import { acceptStreamSchema, streamParamSchema } from './schemas';
|
||||
import { ensureStreamDeviceAuth, shouldCreateRecordingPlaceholder } from './shared';
|
||||
|
||||
const router = Router();
|
||||
|
||||
router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
|
||||
const parsedParams = streamParamSchema.safeParse(req.params);
|
||||
|
||||
if (!parsedParams.success) {
|
||||
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
|
||||
return;
|
||||
}
|
||||
|
||||
const parsed = acceptStreamSchema.safeParse(req.body ?? {});
|
||||
|
||||
if (!parsed.success) {
|
||||
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
||||
return;
|
||||
}
|
||||
|
||||
const deviceAuth = ensureStreamDeviceAuth(req, res);
|
||||
if (!deviceAuth) return;
|
||||
|
||||
const session = await db.query.streamSessions.findFirst({
|
||||
where: eq(streamSessions.id, parsedParams.data.streamSessionId),
|
||||
});
|
||||
|
||||
if (!session || session.ownerUserId !== deviceAuth.userId || session.cameraDeviceId !== deviceAuth.deviceId) {
|
||||
res.status(404).json({ message: 'Stream session not found for this camera device' });
|
||||
return;
|
||||
}
|
||||
|
||||
if (session.status !== 'requested' && session.status !== 'starting') {
|
||||
res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` });
|
||||
return;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`;
|
||||
const mediaSession = await createLiveMediaSession({
|
||||
streamSessionId: session.id,
|
||||
ownerUserId: session.ownerUserId,
|
||||
cameraDeviceId: session.cameraDeviceId,
|
||||
requesterDeviceId: session.requesterDeviceId,
|
||||
});
|
||||
|
||||
const [updated] = await db
|
||||
.update(streamSessions)
|
||||
.set({
|
||||
status: 'streaming',
|
||||
streamKey: mediaSession ? streamKey : null,
|
||||
mediaProvider: mediaSession?.provider ?? 'simple',
|
||||
mediaSessionId: mediaSession?.mediaSessionId ?? null,
|
||||
publishEndpoint: mediaSession?.publishUrl ?? null,
|
||||
subscribeEndpoint: mediaSession?.subscribeUrl ?? null,
|
||||
metadata: parsed.data.metadata ?? session.metadata,
|
||||
startedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(streamSessions.id, session.id))
|
||||
.returning();
|
||||
|
||||
if (!updated) {
|
||||
res.status(500).json({ message: 'Failed to update stream session' });
|
||||
return;
|
||||
}
|
||||
|
||||
if (shouldCreateRecordingPlaceholder()) {
|
||||
await createRecordingForStream(updated.id);
|
||||
}
|
||||
|
||||
if (sfuService) {
|
||||
try {
|
||||
await sfuService.startSession({
|
||||
streamSessionId: updated.id,
|
||||
ownerUserId: updated.ownerUserId,
|
||||
cameraDeviceId: updated.cameraDeviceId,
|
||||
requesterDeviceId: updated.requesterDeviceId,
|
||||
});
|
||||
await sfuService.setSessionState(updated.id, 'live');
|
||||
} catch (error) {
|
||||
console.error('Failed starting SFU session', error);
|
||||
res.status(500).json({ message: 'Failed to initialize SFU session' });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const startedPayload = createStreamStartedPayload(updated);
|
||||
console.info('[stream.accept]', {
|
||||
streamSessionId: updated.id,
|
||||
requesterDeviceId: updated.requesterDeviceId,
|
||||
cameraDeviceId: updated.cameraDeviceId,
|
||||
mode: mediaSession ? 'legacy' : 'simple',
|
||||
});
|
||||
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload);
|
||||
|
||||
if (!deliveredToRequester) {
|
||||
await enqueuePushNotification({
|
||||
ownerUserId: session.ownerUserId,
|
||||
recipientDeviceId: session.requesterDeviceId,
|
||||
type: 'stream_started',
|
||||
payload: startedPayload,
|
||||
});
|
||||
}
|
||||
|
||||
res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) });
|
||||
|
||||
await writeAuditLog({
|
||||
ownerUserId: session.ownerUserId,
|
||||
actorDeviceId: session.cameraDeviceId,
|
||||
action: 'stream.accepted',
|
||||
targetType: 'stream_session',
|
||||
targetId: session.id,
|
||||
metadata: mediaSession
|
||||
? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider }
|
||||
: { transport: 'webrtc' },
|
||||
ipAddress: req.ip,
|
||||
});
|
||||
});
|
||||
|
||||
export default router;
|
||||
Reference in New Issue
Block a user