diff --git a/Backend/index.ts b/Backend/index.ts index cc1962e..f50b0cf 100644 --- a/Backend/index.ts +++ b/Backend/index.ts @@ -1,4 +1,5 @@ import express from 'express'; +import { createServer } from 'http'; import { toNodeHandler } from 'better-auth/node'; import swaggerUi from 'swagger-ui-express'; @@ -8,6 +9,8 @@ import videosRoutes from './routes/videos'; import adminRoutes from './routes/admin'; import devicesRoutes from './routes/devices'; import deviceLinksRoutes from './routes/device-links'; +import commandsRoutes from './routes/commands'; +import { setupRealtimeGateway } from './realtime/gateway'; import { ensureMinioBucket } from './utils/minio'; const app = express(); @@ -30,6 +33,7 @@ app.use('/videos', videosRoutes); app.use('/admin', adminRoutes); app.use('/devices', devicesRoutes); app.use('/device-links', deviceLinksRoutes); +app.use('/commands', commandsRoutes); app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => { console.error(err); @@ -37,6 +41,7 @@ app.use((err: unknown, _req: express.Request, res: express.Response, _next: expr }); const port = Number(process.env.PORT ?? 3000); +const server = createServer(app); const start = async () => { try { @@ -46,7 +51,9 @@ const start = async () => { process.exit(1); } - app.listen(port, () => { + setupRealtimeGateway(server); + + server.listen(port, () => { console.log(`Server is running on port ${port}`); }); }; diff --git a/Backend/realtime/gateway.ts b/Backend/realtime/gateway.ts new file mode 100644 index 0000000..4047eca --- /dev/null +++ b/Backend/realtime/gateway.ts @@ -0,0 +1,292 @@ +import { and, eq, lt } from 'drizzle-orm'; +import { Server as HttpServer } from 'http'; +import { Server as SocketIOServer } from 'socket.io'; +import { z } from 'zod'; + +import { db } from '../db/client'; +import { deviceCommands, devices } from '../db/schema'; +import { verifyDeviceToken } from '../utils/device-token'; + +const HEARTBEAT_INTERVAL_MS = 15_000; +const RETRY_INTERVAL_MS = 5_000; +const RETRY_DISPATCH_DELAY_MS = 10_000; +const MAX_RETRIES = 3; + +const commandAckSchema = z.object({ + commandId: z.string().uuid(), + status: z.enum(['acknowledged', 'rejected']), + error: z.string().optional(), +}); + +const roomForDevice = (deviceId: string): string => `device:${deviceId}`; + +let io: SocketIOServer | null = null; +let retryTimer: ReturnType | null = null; + +const countSocketsForDevice = (deviceId: string): number => { + if (!io) { + return 0; + } + + return io.sockets.adapter.rooms.get(roomForDevice(deviceId))?.size ?? 0; +}; + +const markDevicePresence = async (deviceId: string, status: 'online' | 'offline') => { + const now = new Date(); + + await db + .update(devices) + .set({ + status, + lastSeenAt: now, + updatedAt: now, + }) + .where(eq(devices.id, deviceId)); +}; + +const emitCommand = (command: { + id: string; + sourceDeviceId: string; + targetDeviceId: string; + commandType: string; + payload: Record | null; + createdAt: Date; +}): boolean => { + if (!io) { + return false; + } + + if (countSocketsForDevice(command.targetDeviceId) === 0) { + return false; + } + + io.to(roomForDevice(command.targetDeviceId)).emit('command:received', { + commandId: command.id, + sourceDeviceId: command.sourceDeviceId, + commandType: command.commandType, + payload: command.payload, + createdAt: command.createdAt, + }); + + return true; +}; + +export const dispatchCommandById = async (commandId: string): Promise => { + const command = await db.query.deviceCommands.findFirst({ + where: eq(deviceCommands.id, commandId), + }); + + if (!command) { + return; + } + + const now = new Date(); + + const delivered = emitCommand({ + id: command.id, + sourceDeviceId: command.sourceDeviceId, + targetDeviceId: command.targetDeviceId, + commandType: command.commandType, + payload: command.payload, + createdAt: command.createdAt, + }); + + await db + .update(deviceCommands) + .set({ + status: delivered ? 'sent' : 'queued', + lastDispatchedAt: now, + retryCount: delivered ? command.retryCount : command.retryCount, + updatedAt: now, + error: delivered ? null : 'target device offline', + }) + .where(eq(deviceCommands.id, command.id)); +}; + +const retryPendingCommands = async () => { + const threshold = new Date(Date.now() - RETRY_DISPATCH_DELAY_MS); + + const pending = await db.query.deviceCommands.findMany({ + where: and(eq(deviceCommands.status, 'sent'), lt(deviceCommands.lastDispatchedAt, threshold)), + limit: 100, + orderBy: (fields, operators) => [operators.asc(fields.createdAt)], + }); + + for (const command of pending) { + const now = new Date(); + const nextRetryCount = command.retryCount + 1; + + if (nextRetryCount > MAX_RETRIES) { + await db + .update(deviceCommands) + .set({ + status: 'failed', + updatedAt: now, + error: 'max retries exceeded', + }) + .where(eq(deviceCommands.id, command.id)); + continue; + } + + const delivered = emitCommand({ + id: command.id, + sourceDeviceId: command.sourceDeviceId, + targetDeviceId: command.targetDeviceId, + commandType: command.commandType, + payload: command.payload, + createdAt: command.createdAt, + }); + + await db + .update(deviceCommands) + .set({ + status: delivered ? 'sent' : 'queued', + lastDispatchedAt: now, + retryCount: nextRetryCount, + updatedAt: now, + error: delivered ? null : 'target device offline', + }) + .where(eq(deviceCommands.id, command.id)); + } +}; + +export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => { + if (io) { + return io; + } + + io = new SocketIOServer(server, { + cors: { + origin: process.env.BETTER_AUTH_TRUSTED_ORIGINS?.split(',').map((origin) => origin.trim()).filter(Boolean) ?? true, + credentials: true, + }, + pingInterval: HEARTBEAT_INTERVAL_MS, + }); + + io.use(async (socket, next) => { + try { + const tokenFromAuth = typeof socket.handshake.auth?.token === 'string' ? socket.handshake.auth.token : null; + const authorization = socket.handshake.headers.authorization; + const tokenFromHeader = + typeof authorization === 'string' && authorization.startsWith('Bearer ') + ? authorization.slice('Bearer '.length).trim() + : null; + + const token = tokenFromAuth ?? tokenFromHeader; + + if (!token) { + next(new Error('Missing device token')); + return; + } + + const payload = verifyDeviceToken(token); + + if (!payload) { + next(new Error('Invalid device token')); + return; + } + + const device = await db.query.devices.findFirst({ + where: and(eq(devices.id, payload.deviceId), eq(devices.userId, payload.userId)), + }); + + if (!device) { + next(new Error('Device not found')); + return; + } + + if (device.role !== payload.role) { + next(new Error('Token role does not match device role')); + return; + } + + socket.data.deviceAuth = payload; + next(); + } catch (error) { + next(error instanceof Error ? error : new Error('Authentication failed')); + } + }); + + io.on('connection', async (socket) => { + const auth = socket.data.deviceAuth as { userId: string; deviceId: string; role: 'camera' | 'client' }; + const deviceRoom = roomForDevice(auth.deviceId); + + socket.join(deviceRoom); + await markDevicePresence(auth.deviceId, 'online'); + + socket.emit('connected', { + deviceId: auth.deviceId, + role: auth.role, + serverTime: new Date().toISOString(), + }); + + socket.on('heartbeat', async () => { + await markDevicePresence(auth.deviceId, 'online'); + socket.emit('heartbeat:ack', { at: new Date().toISOString() }); + }); + + socket.on('command:ack', async (input) => { + const parsed = commandAckSchema.safeParse(input); + + if (!parsed.success) { + socket.emit('error:command_ack', { + message: 'Invalid command ack payload', + errors: parsed.error.flatten(), + }); + return; + } + + const command = await db.query.deviceCommands.findFirst({ + where: eq(deviceCommands.id, parsed.data.commandId), + }); + + if (!command) { + socket.emit('error:command_ack', { message: 'Command not found' }); + return; + } + + if (command.targetDeviceId !== auth.deviceId) { + socket.emit('error:command_ack', { message: 'Not authorized to ack this command' }); + return; + } + + const now = new Date(); + + await db + .update(deviceCommands) + .set({ + status: parsed.data.status, + acknowledgedAt: now, + updatedAt: now, + error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null, + }) + .where(eq(deviceCommands.id, command.id)); + + io?.to(roomForDevice(command.sourceDeviceId)).emit('command:status', { + commandId: command.id, + status: parsed.data.status, + acknowledgedAt: now.toISOString(), + error: parsed.data.error, + }); + }); + + socket.on('disconnect', async () => { + // Small delay allows fast reconnects to reuse presence without flapping. + setTimeout(async () => { + if (countSocketsForDevice(auth.deviceId) === 0) { + await markDevicePresence(auth.deviceId, 'offline'); + } + }, 500); + }); + }); + + if (!retryTimer) { + retryTimer = setInterval(() => { + retryPendingCommands().catch((error) => { + console.error('Failed retrying pending commands', error); + }); + }, RETRY_INTERVAL_MS); + } + + return io; +}; diff --git a/Backend/routes/commands.ts b/Backend/routes/commands.ts new file mode 100644 index 0000000..26f2327 --- /dev/null +++ b/Backend/routes/commands.ts @@ -0,0 +1,212 @@ +import { and, desc, eq } from 'drizzle-orm'; +import { Router } from 'express'; +import { z } from 'zod'; + +import { db } from '../db/client'; +import { deviceCommands, deviceLinks, devices } from '../db/schema'; +import { requireAuth } from '../middleware/auth'; +import { requireDeviceAuth } from '../middleware/device-auth'; +import { dispatchCommandById } from '../realtime/gateway'; + +const router = Router(); + +const commandCreateSchema = z.object({ + sourceDeviceId: z.string().uuid(), + targetDeviceId: z.string().uuid(), + commandType: z.enum(['start_stream', 'stop_stream', 'ping', 'update_settings']), + payload: z.record(z.string(), z.unknown()).optional(), +}); + +const ackSchema = z.object({ + status: z.enum(['acknowledged', 'rejected']), + error: z.string().optional(), +}); +const commandParamSchema = z.object({ + commandId: z.string().uuid(), +}); + +const querySchema = z.object({ + sourceDeviceId: z.string().uuid().optional(), + targetDeviceId: z.string().uuid().optional(), + status: z.string().optional(), + limit: z.coerce.number().int().min(1).max(100).default(25), +}); + +router.post('/', requireAuth, async (req, res) => { + const parsed = commandCreateSchema.safeParse(req.body); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const authSession = req.auth; + + if (!authSession?.user?.id) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + if (parsed.data.sourceDeviceId === parsed.data.targetDeviceId) { + res.status(400).json({ message: 'sourceDeviceId and targetDeviceId must differ' }); + return; + } + + const [sourceDevice, targetDevice] = await Promise.all([ + db.query.devices.findFirst({ + where: and(eq(devices.id, parsed.data.sourceDeviceId), eq(devices.userId, authSession.user.id)), + }), + db.query.devices.findFirst({ + where: and(eq(devices.id, parsed.data.targetDeviceId), eq(devices.userId, authSession.user.id)), + }), + ]); + + if (!sourceDevice || !targetDevice) { + res.status(400).json({ message: 'Both source and target devices must belong to the authenticated user' }); + return; + } + + const validRoleDirection = sourceDevice.role === 'client' && targetDevice.role === 'camera'; + + if (!validRoleDirection) { + res.status(400).json({ message: 'Commands are currently allowed only from client -> camera devices' }); + return; + } + + const link = await db.query.deviceLinks.findFirst({ + where: and( + eq(deviceLinks.ownerUserId, authSession.user.id), + eq(deviceLinks.cameraDeviceId, targetDevice.id), + eq(deviceLinks.clientDeviceId, sourceDevice.id), + eq(deviceLinks.status, 'active'), + ), + }); + + if (!link) { + res.status(403).json({ message: 'No active camera-client link found for command routing' }); + return; + } + + const now = new Date(); + + const [command] = await db + .insert(deviceCommands) + .values({ + ownerUserId: authSession.user.id, + sourceDeviceId: sourceDevice.id, + targetDeviceId: targetDevice.id, + commandType: parsed.data.commandType, + payload: parsed.data.payload ?? null, + status: 'queued', + lastDispatchedAt: now, + updatedAt: now, + }) + .returning(); + + if (!command) { + res.status(500).json({ message: 'Failed creating command' }); + return; + } + + await dispatchCommandById(command.id); + + const refreshed = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) }); + + res.status(201).json({ + message: 'Command queued', + command: refreshed ?? command, + }); +}); + +router.get('/', requireAuth, async (req, res) => { + const parsed = querySchema.safeParse(req.query); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); + return; + } + + const authSession = req.auth; + + if (!authSession?.user?.id) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const commands = await db.query.deviceCommands.findMany({ + where: eq(deviceCommands.ownerUserId, authSession.user.id), + orderBy: [desc(deviceCommands.createdAt)], + limit: parsed.data.limit, + }); + + const filtered = commands.filter((command) => { + if (parsed.data.sourceDeviceId && command.sourceDeviceId !== parsed.data.sourceDeviceId) { + return false; + } + + if (parsed.data.targetDeviceId && command.targetDeviceId !== parsed.data.targetDeviceId) { + return false; + } + + if (parsed.data.status && command.status !== parsed.data.status) { + return false; + } + + return true; + }); + + res.json({ count: filtered.length, commands: filtered }); +}); + +router.post('/:commandId/ack', requireDeviceAuth, async (req, res) => { + const parsedParams = commandParamSchema.safeParse(req.params); + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid commandId', errors: parsedParams.error.flatten() }); + return; + } + + const parsed = ackSchema.safeParse(req.body); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const command = await db.query.deviceCommands.findFirst({ + where: eq(deviceCommands.id, parsedParams.data.commandId), + }); + + if (!command) { + res.status(404).json({ message: 'Command not found' }); + return; + } + + if (command.targetDeviceId !== deviceAuth.deviceId) { + res.status(403).json({ message: 'Only target device can acknowledge this command' }); + return; + } + + const now = new Date(); + + const [updated] = await db + .update(deviceCommands) + .set({ + status: parsed.data.status, + acknowledgedAt: now, + updatedAt: now, + error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null, + }) + .where(eq(deviceCommands.id, command.id)) + .returning(); + + res.json({ message: 'Command acknowledged', command: updated }); +}); + +export default router;