93 lines
2.6 KiB
TypeScript
93 lines
2.6 KiB
TypeScript
import { and, eq, lte } from 'drizzle-orm';
|
|
|
|
import { db } from '../db/client';
|
|
import { devices, pushNotifications } from '../db/schema';
|
|
|
|
const MAX_ATTEMPTS = Number(process.env.PUSH_MAX_ATTEMPTS ?? 5);
|
|
|
|
export const enqueuePushNotification = async (input: {
|
|
ownerUserId: string;
|
|
recipientDeviceId: string;
|
|
type: string;
|
|
payload?: Record<string, unknown>;
|
|
}): Promise<void> => {
|
|
await db.insert(pushNotifications).values({
|
|
ownerUserId: input.ownerUserId,
|
|
recipientDeviceId: input.recipientDeviceId,
|
|
type: input.type,
|
|
payload: input.payload ?? null,
|
|
status: 'queued',
|
|
attempts: 0,
|
|
nextAttemptAt: new Date(),
|
|
updatedAt: new Date(),
|
|
});
|
|
};
|
|
|
|
const deliverPush = async (notificationId: string): Promise<void> => {
|
|
const notification = await db.query.pushNotifications.findFirst({ where: eq(pushNotifications.id, notificationId) });
|
|
|
|
if (!notification || notification.status === 'delivered' || notification.status === 'failed') {
|
|
return;
|
|
}
|
|
|
|
const recipientDevice = await db.query.devices.findFirst({ where: eq(devices.id, notification.recipientDeviceId) });
|
|
|
|
const now = new Date();
|
|
const attempts = notification.attempts + 1;
|
|
|
|
if (!recipientDevice || !recipientDevice.pushToken) {
|
|
const nextDelaySeconds = Math.min(60 * attempts, 60 * 10);
|
|
const shouldFail = attempts >= MAX_ATTEMPTS;
|
|
|
|
await db
|
|
.update(pushNotifications)
|
|
.set({
|
|
attempts,
|
|
status: shouldFail ? 'failed' : 'queued',
|
|
lastError: 'missing push token',
|
|
nextAttemptAt: new Date(now.getTime() + nextDelaySeconds * 1000),
|
|
updatedAt: now,
|
|
})
|
|
.where(eq(pushNotifications.id, notification.id));
|
|
|
|
return;
|
|
}
|
|
|
|
// Mock push provider: consider "delivered" when token exists.
|
|
await db
|
|
.update(pushNotifications)
|
|
.set({
|
|
attempts,
|
|
status: 'delivered',
|
|
sentAt: now,
|
|
lastError: null,
|
|
updatedAt: now,
|
|
})
|
|
.where(eq(pushNotifications.id, notification.id));
|
|
};
|
|
|
|
export const dispatchPushQueueOnce = async (): Promise<number> => {
|
|
const now = new Date();
|
|
|
|
const queued = await db.query.pushNotifications.findMany({
|
|
where: and(eq(pushNotifications.status, 'queued'), lte(pushNotifications.nextAttemptAt, now)),
|
|
limit: 100,
|
|
});
|
|
|
|
for (const item of queued) {
|
|
await deliverPush(item.id);
|
|
}
|
|
|
|
return queued.length;
|
|
};
|
|
|
|
export const startPushWorker = (): void => {
|
|
const intervalMs = Number(process.env.PUSH_WORKER_INTERVAL_MS ?? 10_000);
|
|
|
|
setInterval(() => {
|
|
dispatchPushQueueOnce().catch((error) => {
|
|
console.error('push worker failed', error);
|
|
});
|
|
}, intervalMs);
|
|
};
|