From 6d6c77f77ecd5a9a5f5c5a1c39000dd9751e9512 Mon Sep 17 00:00:00 2001 From: Matiss Jurevics Date: Sat, 24 Jan 2026 15:20:00 +0000 Subject: [PATCH] feat(push): add phase7 offline push queue, worker, APIs, and simulator inbox --- Backend/db/schema.ts | 16 +++ .../drizzle/0010_push_notifications_queue.sql | 17 +++ Backend/drizzle/meta/_journal.json | 7 ++ Backend/index.ts | 4 + Backend/public/mobile-sim.html | 75 ++++++++++++- Backend/routes/events.ts | 30 +++++- Backend/routes/push-notifications.ts | 100 ++++++++++++++++++ Backend/routes/streams.ts | 57 +++++++++- Backend/services/push.ts | 92 ++++++++++++++++ 9 files changed, 392 insertions(+), 6 deletions(-) create mode 100644 Backend/drizzle/0010_push_notifications_queue.sql create mode 100644 Backend/routes/push-notifications.ts create mode 100644 Backend/services/push.ts diff --git a/Backend/db/schema.ts b/Backend/db/schema.ts index f8c6698..0216851 100644 --- a/Backend/db/schema.ts +++ b/Backend/db/schema.ts @@ -135,6 +135,21 @@ export const notifications = pgTable('notifications', { isRead: boolean('is_read').default(false).notNull(), }); +export const pushNotifications = pgTable('push_notifications', { + id: uuid('id').defaultRandom().primaryKey(), + ownerUserId: uuid('owner_user_id').notNull().references(() => users.id), + recipientDeviceId: uuid('recipient_device_id').notNull().references(() => devices.id), + type: varchar('type', { length: 64 }).notNull(), + payload: jsonb('payload').$type | null>().default(null), + status: varchar('status', { length: 32 }).default('queued').notNull(), + attempts: integer('attempts').default(0).notNull(), + lastError: text('last_error'), + sentAt: timestamp('sent_at', { withTimezone: true }), + nextAttemptAt: timestamp('next_attempt_at', { withTimezone: true }).defaultNow().notNull(), + createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(), + updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(), +}); + export const accounts = pgTable('account', { id: uuid('id').defaultRandom().primaryKey(), userId: uuid('user_id').notNull().references(() => users.id), @@ -181,6 +196,7 @@ export const schema = { events, videos, notifications, + pushNotifications, accounts, sessions, verifications, diff --git a/Backend/drizzle/0010_push_notifications_queue.sql b/Backend/drizzle/0010_push_notifications_queue.sql new file mode 100644 index 0000000..34dd3c8 --- /dev/null +++ b/Backend/drizzle/0010_push_notifications_queue.sql @@ -0,0 +1,17 @@ +CREATE TABLE "push_notifications" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "owner_user_id" uuid NOT NULL, + "recipient_device_id" uuid NOT NULL, + "type" varchar(64) NOT NULL, + "payload" jsonb DEFAULT 'null'::jsonb, + "status" varchar(32) DEFAULT 'queued' NOT NULL, + "attempts" integer DEFAULT 0 NOT NULL, + "last_error" text, + "sent_at" timestamp with time zone, + "next_attempt_at" timestamp with time zone DEFAULT now() NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +ALTER TABLE "push_notifications" ADD CONSTRAINT "push_notifications_owner_user_id_users_id_fk" FOREIGN KEY ("owner_user_id") REFERENCES "public"."users"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint +ALTER TABLE "push_notifications" ADD CONSTRAINT "push_notifications_recipient_device_id_devices_id_fk" FOREIGN KEY ("recipient_device_id") REFERENCES "public"."devices"("id") ON DELETE no action ON UPDATE no action; diff --git a/Backend/drizzle/meta/_journal.json b/Backend/drizzle/meta/_journal.json index 4bb9cd9..d851cfa 100644 --- a/Backend/drizzle/meta/_journal.json +++ b/Backend/drizzle/meta/_journal.json @@ -71,6 +71,13 @@ "when": 1770416956419, "tag": "0009_recordings_pipeline", "breakpoints": true + }, + { + "idx": 10, + "version": "7", + "when": 1770417956419, + "tag": "0010_push_notifications_queue", + "breakpoints": true } ] } diff --git a/Backend/index.ts b/Backend/index.ts index 27075d6..f7edd61 100644 --- a/Backend/index.ts +++ b/Backend/index.ts @@ -13,9 +13,11 @@ import commandsRoutes from './routes/commands'; import eventsRoutes from './routes/events'; import streamsRoutes from './routes/streams'; import recordingsRoutes from './routes/recordings'; +import pushNotificationsRoutes from './routes/push-notifications'; import { setupRealtimeGateway } from './realtime/gateway'; import { ensureMinioBucket } from './utils/minio'; import { startRecordingsWorker } from './workers/recordings'; +import { startPushWorker } from './services/push'; const app = express(); const openApiDocument = buildOpenApiDocument(); @@ -42,6 +44,7 @@ app.use('/commands', commandsRoutes); app.use('/events', eventsRoutes); app.use('/streams', streamsRoutes); app.use('/recordings', recordingsRoutes); +app.use('/push-notifications', pushNotificationsRoutes); app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => { console.error(err); @@ -61,6 +64,7 @@ const start = async () => { setupRealtimeGateway(server); startRecordingsWorker(); + startPushWorker(); server.listen(port, () => { console.log(`Server is running on port ${port}`); diff --git a/Backend/public/mobile-sim.html b/Backend/public/mobile-sim.html index b123096..d9b429f 100644 --- a/Backend/public/mobile-sim.html +++ b/Backend/public/mobile-sim.html @@ -134,6 +134,9 @@ + + +
@@ -189,6 +192,7 @@ lastMotionEventId: null, lastStreamSessionId: null, lastRecordingId: null, + latestPushNotificationId: null, }; const $ = (id) => document.getElementById(id); @@ -208,7 +212,11 @@ $('deviceState').textContent = JSON.stringify({ device: state.device, hasToken: Boolean(state.deviceToken) }, null, 2); $('clientState').textContent = JSON.stringify({ lastStreamSessionId: state.lastStreamSessionId }, null, 2); $('cameraState').textContent = JSON.stringify( - { lastMotionEventId: state.lastMotionEventId, lastRecordingId: state.lastRecordingId }, + { + lastMotionEventId: state.lastMotionEventId, + lastRecordingId: state.lastRecordingId, + latestPushNotificationId: state.latestPushNotificationId, + }, null, 2, ); @@ -338,6 +346,7 @@ name: name || undefined, platform: 'web', appVersion: 'sim-1', + pushToken: $('pushToken').value.trim() || undefined, }), }); @@ -361,6 +370,21 @@ log('loaded saved device', parsed); }); + $('loadSavedBtn').addEventListener('click', async () => { + try { + if (!state.device?.id) return; + const token = $('pushToken').value.trim(); + if (!token) return; + await authFetch(`/devices/${state.device.id}`, { + method: 'PATCH', + body: JSON.stringify({ pushToken: token }), + }); + log('push token updated', { deviceId: state.device.id }); + } catch (error) { + log('push token update failed', { error: error.message }); + } + }); + $('connectBtn').addEventListener('click', () => { try { connectSocket(); @@ -534,6 +558,55 @@ } }); + const pushPanel = document.createElement('section'); + pushPanel.className = 'panel'; + pushPanel.style.marginTop = '16px'; + pushPanel.innerHTML = ` +

Push Inbox (Offline Fallback)

+
+ + +
+ + `; + document.querySelector('.page').appendChild(pushPanel); + + $('dispatchPushWorkerBtn').addEventListener('click', async () => { + try { + const payload = await deviceFetch('/push-notifications/worker/dispatch', { method: 'POST', body: JSON.stringify({}) }); + log('push worker dispatch', payload); + } catch (error) { + log('push worker dispatch failed', { error: error.message }); + } + }); + + $('pollPushInboxBtn').addEventListener('click', async () => { + try { + const payload = await deviceFetch('/push-notifications/me'); + const latest = payload.notifications?.[0]; + if (latest) { + state.latestPushNotificationId = latest.id; + } + render(); + log('push inbox', payload); + } catch (error) { + log('poll push inbox failed', { error: error.message }); + } + }); + + $('markLatestPushReadBtn').addEventListener('click', async () => { + try { + if (!state.latestPushNotificationId) throw new Error('No push notification selected'); + const payload = await deviceFetch(`/push-notifications/${state.latestPushNotificationId}/read`, { + method: 'POST', + body: JSON.stringify({}), + }); + log('push marked read', payload); + } catch (error) { + log('mark push read failed', { error: error.message }); + } + }); + render(); diff --git a/Backend/routes/events.ts b/Backend/routes/events.ts index c994d00..8c2d08a 100644 --- a/Backend/routes/events.ts +++ b/Backend/routes/events.ts @@ -7,6 +7,7 @@ import { deviceLinks, devices, events, notifications } from '../db/schema'; import { requireAuth } from '../middleware/auth'; import { requireDeviceAuth } from '../middleware/device-auth'; import { sendRealtimeToDevice } from '../realtime/gateway'; +import { enqueuePushNotification } from '../services/push'; const router = Router(); @@ -105,6 +106,19 @@ router.post('/motion/start', requireDeviceAuth, async (req, res) => { isRead: false, sentAt: now, }); + + if (!delivered) { + await enqueuePushNotification({ + ownerUserId: deviceAuth.userId, + recipientDeviceId: link.clientDeviceId, + type: 'motion_detected', + payload: { + eventId: event.id, + cameraDeviceId: cameraDevice.id, + startedAt: event.startedAt.toISOString(), + }, + }); + } } res.status(201).json({ @@ -172,13 +186,27 @@ router.post('/:eventId/motion/end', requireDeviceAuth, async (req, res) => { }); for (const link of activeLinks) { - sendRealtimeToDevice(link.clientDeviceId, 'motion:ended', { + const delivered = sendRealtimeToDevice(link.clientDeviceId, 'motion:ended', { eventId: event.id, cameraDeviceId: deviceAuth.deviceId, status: parsed.data.status, endedAt: now, videoUrl: parsed.data.videoUrl ?? event.videoUrl, }); + + if (!delivered) { + await enqueuePushNotification({ + ownerUserId: deviceAuth.userId, + recipientDeviceId: link.clientDeviceId, + type: 'motion_ended', + payload: { + eventId: event.id, + cameraDeviceId: deviceAuth.deviceId, + status: parsed.data.status, + endedAt: now.toISOString(), + }, + }); + } } res.json({ message: 'Motion event ended', event: updated, notifiedClients: activeLinks.length }); diff --git a/Backend/routes/push-notifications.ts b/Backend/routes/push-notifications.ts new file mode 100644 index 0000000..af6cc95 --- /dev/null +++ b/Backend/routes/push-notifications.ts @@ -0,0 +1,100 @@ +import { and, desc, eq } from 'drizzle-orm'; +import { Router } from 'express'; +import { z } from 'zod'; + +import { db } from '../db/client'; +import { pushNotifications } from '../db/schema'; +import { requireDeviceAuth } from '../middleware/device-auth'; +import { dispatchPushQueueOnce } from '../services/push'; + +const router = Router(); + +const listSchema = z.object({ + status: z.string().optional(), + limit: z.coerce.number().int().min(1).max(100).default(25), +}); + +const notificationParamSchema = z.object({ + notificationId: z.string().uuid(), +}); + +router.get('/me', requireDeviceAuth, async (req, res) => { + const parsed = listSchema.safeParse(req.query); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const result = await db.query.pushNotifications.findMany({ + where: and( + eq(pushNotifications.ownerUserId, deviceAuth.userId), + eq(pushNotifications.recipientDeviceId, deviceAuth.deviceId), + ), + orderBy: [desc(pushNotifications.createdAt)], + limit: parsed.data.limit, + }); + + const filtered = parsed.data.status ? result.filter((item) => item.status === parsed.data.status) : result; + + res.json({ count: filtered.length, notifications: filtered }); +}); + +router.post('/:notificationId/read', requireDeviceAuth, async (req, res) => { + const parsedParams = notificationParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid notificationId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const [updated] = await db + .update(pushNotifications) + .set({ + status: 'read', + updatedAt: new Date(), + }) + .where( + and( + eq(pushNotifications.id, parsedParams.data.notificationId), + eq(pushNotifications.ownerUserId, deviceAuth.userId), + eq(pushNotifications.recipientDeviceId, deviceAuth.deviceId), + ), + ) + .returning(); + + if (!updated) { + res.status(404).json({ message: 'Notification not found' }); + return; + } + + res.json({ message: 'Notification marked as read', notification: updated }); +}); + +router.post('/worker/dispatch', requireDeviceAuth, async (req, res) => { + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const processed = await dispatchPushQueueOnce(); + res.json({ message: 'Push queue dispatch completed', processed }); +}); + +export default router; diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts index 1f5d524..2280880 100644 --- a/Backend/routes/streams.ts +++ b/Backend/routes/streams.ts @@ -9,6 +9,7 @@ import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/sche import { mediaProvider } from '../media/service'; import { requireDeviceAuth } from '../middleware/device-auth'; import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; +import { enqueuePushNotification } from '../services/push'; import { createRecordingForStream } from './recordings'; const router = Router(); @@ -167,13 +168,25 @@ router.post('/request', requireDeviceAuth, async (req, res) => { const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) }); - sendRealtimeToDevice(sourceDevice.id, 'stream:requested', { + const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', { streamSessionId: session.id, cameraDeviceId: cameraDevice.id, status: session.status, reason: session.reason, }); + if (!deliveredToRequester) { + await enqueuePushNotification({ + ownerUserId: sourceDevice.userId, + recipientDeviceId: sourceDevice.id, + type: 'stream_requested', + payload: { + streamSessionId: session.id, + cameraDeviceId: cameraDevice.id, + }, + }); + } + res.status(201).json({ message: 'Stream request sent', streamSession: session, @@ -251,7 +264,7 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { return; } - sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', { + const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', { streamSessionId: updated.id, cameraDeviceId: updated.cameraDeviceId, status: updated.status, @@ -261,6 +274,18 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { subscribeEndpoint: updated.subscribeEndpoint, }); + if (!deliveredToRequester) { + await enqueuePushNotification({ + ownerUserId: session.ownerUserId, + recipientDeviceId: session.requesterDeviceId, + type: 'stream_started', + payload: { + streamSessionId: updated.id, + cameraDeviceId: updated.cameraDeviceId, + }, + }); + } + res.json({ message: 'Stream accepted', streamSession: updated }); }); @@ -405,18 +430,42 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { await createRecordingForStream(session.id); - sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', { + const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', { streamSessionId: session.id, status: parsed.data.reason, endedAt: now, }); - sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', { + const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', { streamSessionId: session.id, status: parsed.data.reason, endedAt: now, }); + if (!deliveredToRequester) { + await enqueuePushNotification({ + ownerUserId: session.ownerUserId, + recipientDeviceId: session.requesterDeviceId, + type: 'stream_ended', + payload: { + streamSessionId: session.id, + status: parsed.data.reason, + }, + }); + } + + if (!deliveredToCamera) { + await enqueuePushNotification({ + ownerUserId: session.ownerUserId, + recipientDeviceId: session.cameraDeviceId, + type: 'stream_ended', + payload: { + streamSessionId: session.id, + status: parsed.data.reason, + }, + }); + } + res.json({ message: 'Stream ended', streamSession: updated }); }); diff --git a/Backend/services/push.ts b/Backend/services/push.ts new file mode 100644 index 0000000..67f03e0 --- /dev/null +++ b/Backend/services/push.ts @@ -0,0 +1,92 @@ +import { and, eq, lte } from 'drizzle-orm'; + +import { db } from '../db/client'; +import { devices, pushNotifications } from '../db/schema'; + +const MAX_ATTEMPTS = Number(process.env.PUSH_MAX_ATTEMPTS ?? 5); + +export const enqueuePushNotification = async (input: { + ownerUserId: string; + recipientDeviceId: string; + type: string; + payload?: Record; +}): Promise => { + await db.insert(pushNotifications).values({ + ownerUserId: input.ownerUserId, + recipientDeviceId: input.recipientDeviceId, + type: input.type, + payload: input.payload ?? null, + status: 'queued', + attempts: 0, + nextAttemptAt: new Date(), + updatedAt: new Date(), + }); +}; + +const deliverPush = async (notificationId: string): Promise => { + const notification = await db.query.pushNotifications.findFirst({ where: eq(pushNotifications.id, notificationId) }); + + if (!notification || notification.status === 'delivered' || notification.status === 'failed') { + return; + } + + const recipientDevice = await db.query.devices.findFirst({ where: eq(devices.id, notification.recipientDeviceId) }); + + const now = new Date(); + const attempts = notification.attempts + 1; + + if (!recipientDevice || !recipientDevice.pushToken) { + const nextDelaySeconds = Math.min(60 * attempts, 60 * 10); + const shouldFail = attempts >= MAX_ATTEMPTS; + + await db + .update(pushNotifications) + .set({ + attempts, + status: shouldFail ? 'failed' : 'queued', + lastError: 'missing push token', + nextAttemptAt: new Date(now.getTime() + nextDelaySeconds * 1000), + updatedAt: now, + }) + .where(eq(pushNotifications.id, notification.id)); + + return; + } + + // Mock push provider: consider "delivered" when token exists. + await db + .update(pushNotifications) + .set({ + attempts, + status: 'delivered', + sentAt: now, + lastError: null, + updatedAt: now, + }) + .where(eq(pushNotifications.id, notification.id)); +}; + +export const dispatchPushQueueOnce = async (): Promise => { + const now = new Date(); + + const queued = await db.query.pushNotifications.findMany({ + where: and(eq(pushNotifications.status, 'queued'), lte(pushNotifications.nextAttemptAt, now)), + limit: 100, + }); + + for (const item of queued) { + await deliverPush(item.id); + } + + return queued.length; +}; + +export const startPushWorker = (): void => { + const intervalMs = Number(process.env.PUSH_WORKER_INTERVAL_MS ?? 10_000); + + setInterval(() => { + dispatchPushQueueOnce().catch((error) => { + console.error('push worker failed', error); + }); + }, intervalMs); +};