feat(realtime): add websocket gateway and command ack/retry flow
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import express from 'express';
|
||||
import { createServer } from 'http';
|
||||
import { toNodeHandler } from 'better-auth/node';
|
||||
import swaggerUi from 'swagger-ui-express';
|
||||
|
||||
@@ -8,6 +9,8 @@ import videosRoutes from './routes/videos';
|
||||
import adminRoutes from './routes/admin';
|
||||
import devicesRoutes from './routes/devices';
|
||||
import deviceLinksRoutes from './routes/device-links';
|
||||
import commandsRoutes from './routes/commands';
|
||||
import { setupRealtimeGateway } from './realtime/gateway';
|
||||
import { ensureMinioBucket } from './utils/minio';
|
||||
|
||||
const app = express();
|
||||
@@ -30,6 +33,7 @@ app.use('/videos', videosRoutes);
|
||||
app.use('/admin', adminRoutes);
|
||||
app.use('/devices', devicesRoutes);
|
||||
app.use('/device-links', deviceLinksRoutes);
|
||||
app.use('/commands', commandsRoutes);
|
||||
|
||||
app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
|
||||
console.error(err);
|
||||
@@ -37,6 +41,7 @@ app.use((err: unknown, _req: express.Request, res: express.Response, _next: expr
|
||||
});
|
||||
|
||||
const port = Number(process.env.PORT ?? 3000);
|
||||
const server = createServer(app);
|
||||
|
||||
const start = async () => {
|
||||
try {
|
||||
@@ -46,7 +51,9 @@ const start = async () => {
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
app.listen(port, () => {
|
||||
setupRealtimeGateway(server);
|
||||
|
||||
server.listen(port, () => {
|
||||
console.log(`Server is running on port ${port}`);
|
||||
});
|
||||
};
|
||||
|
||||
292
Backend/realtime/gateway.ts
Normal file
292
Backend/realtime/gateway.ts
Normal file
@@ -0,0 +1,292 @@
|
||||
import { and, eq, lt } from 'drizzle-orm';
|
||||
import { Server as HttpServer } from 'http';
|
||||
import { Server as SocketIOServer } from 'socket.io';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { db } from '../db/client';
|
||||
import { deviceCommands, devices } from '../db/schema';
|
||||
import { verifyDeviceToken } from '../utils/device-token';
|
||||
|
||||
const HEARTBEAT_INTERVAL_MS = 15_000;
|
||||
const RETRY_INTERVAL_MS = 5_000;
|
||||
const RETRY_DISPATCH_DELAY_MS = 10_000;
|
||||
const MAX_RETRIES = 3;
|
||||
|
||||
const commandAckSchema = z.object({
|
||||
commandId: z.string().uuid(),
|
||||
status: z.enum(['acknowledged', 'rejected']),
|
||||
error: z.string().optional(),
|
||||
});
|
||||
|
||||
const roomForDevice = (deviceId: string): string => `device:${deviceId}`;
|
||||
|
||||
let io: SocketIOServer | null = null;
|
||||
let retryTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
const countSocketsForDevice = (deviceId: string): number => {
|
||||
if (!io) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return io.sockets.adapter.rooms.get(roomForDevice(deviceId))?.size ?? 0;
|
||||
};
|
||||
|
||||
const markDevicePresence = async (deviceId: string, status: 'online' | 'offline') => {
|
||||
const now = new Date();
|
||||
|
||||
await db
|
||||
.update(devices)
|
||||
.set({
|
||||
status,
|
||||
lastSeenAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(devices.id, deviceId));
|
||||
};
|
||||
|
||||
const emitCommand = (command: {
|
||||
id: string;
|
||||
sourceDeviceId: string;
|
||||
targetDeviceId: string;
|
||||
commandType: string;
|
||||
payload: Record<string, unknown> | null;
|
||||
createdAt: Date;
|
||||
}): boolean => {
|
||||
if (!io) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (countSocketsForDevice(command.targetDeviceId) === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
io.to(roomForDevice(command.targetDeviceId)).emit('command:received', {
|
||||
commandId: command.id,
|
||||
sourceDeviceId: command.sourceDeviceId,
|
||||
commandType: command.commandType,
|
||||
payload: command.payload,
|
||||
createdAt: command.createdAt,
|
||||
});
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
export const dispatchCommandById = async (commandId: string): Promise<void> => {
|
||||
const command = await db.query.deviceCommands.findFirst({
|
||||
where: eq(deviceCommands.id, commandId),
|
||||
});
|
||||
|
||||
if (!command) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
|
||||
const delivered = emitCommand({
|
||||
id: command.id,
|
||||
sourceDeviceId: command.sourceDeviceId,
|
||||
targetDeviceId: command.targetDeviceId,
|
||||
commandType: command.commandType,
|
||||
payload: command.payload,
|
||||
createdAt: command.createdAt,
|
||||
});
|
||||
|
||||
await db
|
||||
.update(deviceCommands)
|
||||
.set({
|
||||
status: delivered ? 'sent' : 'queued',
|
||||
lastDispatchedAt: now,
|
||||
retryCount: delivered ? command.retryCount : command.retryCount,
|
||||
updatedAt: now,
|
||||
error: delivered ? null : 'target device offline',
|
||||
})
|
||||
.where(eq(deviceCommands.id, command.id));
|
||||
};
|
||||
|
||||
const retryPendingCommands = async () => {
|
||||
const threshold = new Date(Date.now() - RETRY_DISPATCH_DELAY_MS);
|
||||
|
||||
const pending = await db.query.deviceCommands.findMany({
|
||||
where: and(eq(deviceCommands.status, 'sent'), lt(deviceCommands.lastDispatchedAt, threshold)),
|
||||
limit: 100,
|
||||
orderBy: (fields, operators) => [operators.asc(fields.createdAt)],
|
||||
});
|
||||
|
||||
for (const command of pending) {
|
||||
const now = new Date();
|
||||
const nextRetryCount = command.retryCount + 1;
|
||||
|
||||
if (nextRetryCount > MAX_RETRIES) {
|
||||
await db
|
||||
.update(deviceCommands)
|
||||
.set({
|
||||
status: 'failed',
|
||||
updatedAt: now,
|
||||
error: 'max retries exceeded',
|
||||
})
|
||||
.where(eq(deviceCommands.id, command.id));
|
||||
continue;
|
||||
}
|
||||
|
||||
const delivered = emitCommand({
|
||||
id: command.id,
|
||||
sourceDeviceId: command.sourceDeviceId,
|
||||
targetDeviceId: command.targetDeviceId,
|
||||
commandType: command.commandType,
|
||||
payload: command.payload,
|
||||
createdAt: command.createdAt,
|
||||
});
|
||||
|
||||
await db
|
||||
.update(deviceCommands)
|
||||
.set({
|
||||
status: delivered ? 'sent' : 'queued',
|
||||
lastDispatchedAt: now,
|
||||
retryCount: nextRetryCount,
|
||||
updatedAt: now,
|
||||
error: delivered ? null : 'target device offline',
|
||||
})
|
||||
.where(eq(deviceCommands.id, command.id));
|
||||
}
|
||||
};
|
||||
|
||||
export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
|
||||
if (io) {
|
||||
return io;
|
||||
}
|
||||
|
||||
io = new SocketIOServer(server, {
|
||||
cors: {
|
||||
origin: process.env.BETTER_AUTH_TRUSTED_ORIGINS?.split(',').map((origin) => origin.trim()).filter(Boolean) ?? true,
|
||||
credentials: true,
|
||||
},
|
||||
pingInterval: HEARTBEAT_INTERVAL_MS,
|
||||
});
|
||||
|
||||
io.use(async (socket, next) => {
|
||||
try {
|
||||
const tokenFromAuth = typeof socket.handshake.auth?.token === 'string' ? socket.handshake.auth.token : null;
|
||||
const authorization = socket.handshake.headers.authorization;
|
||||
const tokenFromHeader =
|
||||
typeof authorization === 'string' && authorization.startsWith('Bearer ')
|
||||
? authorization.slice('Bearer '.length).trim()
|
||||
: null;
|
||||
|
||||
const token = tokenFromAuth ?? tokenFromHeader;
|
||||
|
||||
if (!token) {
|
||||
next(new Error('Missing device token'));
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = verifyDeviceToken(token);
|
||||
|
||||
if (!payload) {
|
||||
next(new Error('Invalid device token'));
|
||||
return;
|
||||
}
|
||||
|
||||
const device = await db.query.devices.findFirst({
|
||||
where: and(eq(devices.id, payload.deviceId), eq(devices.userId, payload.userId)),
|
||||
});
|
||||
|
||||
if (!device) {
|
||||
next(new Error('Device not found'));
|
||||
return;
|
||||
}
|
||||
|
||||
if (device.role !== payload.role) {
|
||||
next(new Error('Token role does not match device role'));
|
||||
return;
|
||||
}
|
||||
|
||||
socket.data.deviceAuth = payload;
|
||||
next();
|
||||
} catch (error) {
|
||||
next(error instanceof Error ? error : new Error('Authentication failed'));
|
||||
}
|
||||
});
|
||||
|
||||
io.on('connection', async (socket) => {
|
||||
const auth = socket.data.deviceAuth as { userId: string; deviceId: string; role: 'camera' | 'client' };
|
||||
const deviceRoom = roomForDevice(auth.deviceId);
|
||||
|
||||
socket.join(deviceRoom);
|
||||
await markDevicePresence(auth.deviceId, 'online');
|
||||
|
||||
socket.emit('connected', {
|
||||
deviceId: auth.deviceId,
|
||||
role: auth.role,
|
||||
serverTime: new Date().toISOString(),
|
||||
});
|
||||
|
||||
socket.on('heartbeat', async () => {
|
||||
await markDevicePresence(auth.deviceId, 'online');
|
||||
socket.emit('heartbeat:ack', { at: new Date().toISOString() });
|
||||
});
|
||||
|
||||
socket.on('command:ack', async (input) => {
|
||||
const parsed = commandAckSchema.safeParse(input);
|
||||
|
||||
if (!parsed.success) {
|
||||
socket.emit('error:command_ack', {
|
||||
message: 'Invalid command ack payload',
|
||||
errors: parsed.error.flatten(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const command = await db.query.deviceCommands.findFirst({
|
||||
where: eq(deviceCommands.id, parsed.data.commandId),
|
||||
});
|
||||
|
||||
if (!command) {
|
||||
socket.emit('error:command_ack', { message: 'Command not found' });
|
||||
return;
|
||||
}
|
||||
|
||||
if (command.targetDeviceId !== auth.deviceId) {
|
||||
socket.emit('error:command_ack', { message: 'Not authorized to ack this command' });
|
||||
return;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
|
||||
await db
|
||||
.update(deviceCommands)
|
||||
.set({
|
||||
status: parsed.data.status,
|
||||
acknowledgedAt: now,
|
||||
updatedAt: now,
|
||||
error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null,
|
||||
})
|
||||
.where(eq(deviceCommands.id, command.id));
|
||||
|
||||
io?.to(roomForDevice(command.sourceDeviceId)).emit('command:status', {
|
||||
commandId: command.id,
|
||||
status: parsed.data.status,
|
||||
acknowledgedAt: now.toISOString(),
|
||||
error: parsed.data.error,
|
||||
});
|
||||
});
|
||||
|
||||
socket.on('disconnect', async () => {
|
||||
// Small delay allows fast reconnects to reuse presence without flapping.
|
||||
setTimeout(async () => {
|
||||
if (countSocketsForDevice(auth.deviceId) === 0) {
|
||||
await markDevicePresence(auth.deviceId, 'offline');
|
||||
}
|
||||
}, 500);
|
||||
});
|
||||
});
|
||||
|
||||
if (!retryTimer) {
|
||||
retryTimer = setInterval(() => {
|
||||
retryPendingCommands().catch((error) => {
|
||||
console.error('Failed retrying pending commands', error);
|
||||
});
|
||||
}, RETRY_INTERVAL_MS);
|
||||
}
|
||||
|
||||
return io;
|
||||
};
|
||||
212
Backend/routes/commands.ts
Normal file
212
Backend/routes/commands.ts
Normal file
@@ -0,0 +1,212 @@
|
||||
import { and, desc, eq } from 'drizzle-orm';
|
||||
import { Router } from 'express';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { db } from '../db/client';
|
||||
import { deviceCommands, deviceLinks, devices } from '../db/schema';
|
||||
import { requireAuth } from '../middleware/auth';
|
||||
import { requireDeviceAuth } from '../middleware/device-auth';
|
||||
import { dispatchCommandById } from '../realtime/gateway';
|
||||
|
||||
const router = Router();
|
||||
|
||||
const commandCreateSchema = z.object({
|
||||
sourceDeviceId: z.string().uuid(),
|
||||
targetDeviceId: z.string().uuid(),
|
||||
commandType: z.enum(['start_stream', 'stop_stream', 'ping', 'update_settings']),
|
||||
payload: z.record(z.string(), z.unknown()).optional(),
|
||||
});
|
||||
|
||||
const ackSchema = z.object({
|
||||
status: z.enum(['acknowledged', 'rejected']),
|
||||
error: z.string().optional(),
|
||||
});
|
||||
const commandParamSchema = z.object({
|
||||
commandId: z.string().uuid(),
|
||||
});
|
||||
|
||||
const querySchema = z.object({
|
||||
sourceDeviceId: z.string().uuid().optional(),
|
||||
targetDeviceId: z.string().uuid().optional(),
|
||||
status: z.string().optional(),
|
||||
limit: z.coerce.number().int().min(1).max(100).default(25),
|
||||
});
|
||||
|
||||
router.post('/', requireAuth, async (req, res) => {
|
||||
const parsed = commandCreateSchema.safeParse(req.body);
|
||||
|
||||
if (!parsed.success) {
|
||||
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
||||
return;
|
||||
}
|
||||
|
||||
const authSession = req.auth;
|
||||
|
||||
if (!authSession?.user?.id) {
|
||||
res.status(401).json({ message: 'Unauthorized' });
|
||||
return;
|
||||
}
|
||||
|
||||
if (parsed.data.sourceDeviceId === parsed.data.targetDeviceId) {
|
||||
res.status(400).json({ message: 'sourceDeviceId and targetDeviceId must differ' });
|
||||
return;
|
||||
}
|
||||
|
||||
const [sourceDevice, targetDevice] = await Promise.all([
|
||||
db.query.devices.findFirst({
|
||||
where: and(eq(devices.id, parsed.data.sourceDeviceId), eq(devices.userId, authSession.user.id)),
|
||||
}),
|
||||
db.query.devices.findFirst({
|
||||
where: and(eq(devices.id, parsed.data.targetDeviceId), eq(devices.userId, authSession.user.id)),
|
||||
}),
|
||||
]);
|
||||
|
||||
if (!sourceDevice || !targetDevice) {
|
||||
res.status(400).json({ message: 'Both source and target devices must belong to the authenticated user' });
|
||||
return;
|
||||
}
|
||||
|
||||
const validRoleDirection = sourceDevice.role === 'client' && targetDevice.role === 'camera';
|
||||
|
||||
if (!validRoleDirection) {
|
||||
res.status(400).json({ message: 'Commands are currently allowed only from client -> camera devices' });
|
||||
return;
|
||||
}
|
||||
|
||||
const link = await db.query.deviceLinks.findFirst({
|
||||
where: and(
|
||||
eq(deviceLinks.ownerUserId, authSession.user.id),
|
||||
eq(deviceLinks.cameraDeviceId, targetDevice.id),
|
||||
eq(deviceLinks.clientDeviceId, sourceDevice.id),
|
||||
eq(deviceLinks.status, 'active'),
|
||||
),
|
||||
});
|
||||
|
||||
if (!link) {
|
||||
res.status(403).json({ message: 'No active camera-client link found for command routing' });
|
||||
return;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
|
||||
const [command] = await db
|
||||
.insert(deviceCommands)
|
||||
.values({
|
||||
ownerUserId: authSession.user.id,
|
||||
sourceDeviceId: sourceDevice.id,
|
||||
targetDeviceId: targetDevice.id,
|
||||
commandType: parsed.data.commandType,
|
||||
payload: parsed.data.payload ?? null,
|
||||
status: 'queued',
|
||||
lastDispatchedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.returning();
|
||||
|
||||
if (!command) {
|
||||
res.status(500).json({ message: 'Failed creating command' });
|
||||
return;
|
||||
}
|
||||
|
||||
await dispatchCommandById(command.id);
|
||||
|
||||
const refreshed = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
|
||||
|
||||
res.status(201).json({
|
||||
message: 'Command queued',
|
||||
command: refreshed ?? command,
|
||||
});
|
||||
});
|
||||
|
||||
router.get('/', requireAuth, async (req, res) => {
|
||||
const parsed = querySchema.safeParse(req.query);
|
||||
|
||||
if (!parsed.success) {
|
||||
res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
|
||||
return;
|
||||
}
|
||||
|
||||
const authSession = req.auth;
|
||||
|
||||
if (!authSession?.user?.id) {
|
||||
res.status(401).json({ message: 'Unauthorized' });
|
||||
return;
|
||||
}
|
||||
|
||||
const commands = await db.query.deviceCommands.findMany({
|
||||
where: eq(deviceCommands.ownerUserId, authSession.user.id),
|
||||
orderBy: [desc(deviceCommands.createdAt)],
|
||||
limit: parsed.data.limit,
|
||||
});
|
||||
|
||||
const filtered = commands.filter((command) => {
|
||||
if (parsed.data.sourceDeviceId && command.sourceDeviceId !== parsed.data.sourceDeviceId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parsed.data.targetDeviceId && command.targetDeviceId !== parsed.data.targetDeviceId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parsed.data.status && command.status !== parsed.data.status) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
res.json({ count: filtered.length, commands: filtered });
|
||||
});
|
||||
|
||||
router.post('/:commandId/ack', requireDeviceAuth, async (req, res) => {
|
||||
const parsedParams = commandParamSchema.safeParse(req.params);
|
||||
if (!parsedParams.success) {
|
||||
res.status(400).json({ message: 'Invalid commandId', errors: parsedParams.error.flatten() });
|
||||
return;
|
||||
}
|
||||
|
||||
const parsed = ackSchema.safeParse(req.body);
|
||||
|
||||
if (!parsed.success) {
|
||||
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
||||
return;
|
||||
}
|
||||
|
||||
const deviceAuth = req.deviceAuth;
|
||||
|
||||
if (!deviceAuth) {
|
||||
res.status(401).json({ message: 'Unauthorized' });
|
||||
return;
|
||||
}
|
||||
|
||||
const command = await db.query.deviceCommands.findFirst({
|
||||
where: eq(deviceCommands.id, parsedParams.data.commandId),
|
||||
});
|
||||
|
||||
if (!command) {
|
||||
res.status(404).json({ message: 'Command not found' });
|
||||
return;
|
||||
}
|
||||
|
||||
if (command.targetDeviceId !== deviceAuth.deviceId) {
|
||||
res.status(403).json({ message: 'Only target device can acknowledge this command' });
|
||||
return;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
|
||||
const [updated] = await db
|
||||
.update(deviceCommands)
|
||||
.set({
|
||||
status: parsed.data.status,
|
||||
acknowledgedAt: now,
|
||||
updatedAt: now,
|
||||
error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null,
|
||||
})
|
||||
.where(eq(deviceCommands.id, command.id))
|
||||
.returning();
|
||||
|
||||
res.json({ message: 'Command acknowledged', command: updated });
|
||||
});
|
||||
|
||||
export default router;
|
||||
Reference in New Issue
Block a user