refactor(backend): simplify media schema and recording metadata

This commit is contained in:
2026-03-11 17:15:00 +00:00
parent 662d8d7b90
commit c6919d8174
18 changed files with 223 additions and 113 deletions

View File

@@ -12,6 +12,9 @@ MINIO_SECRET_KEY=minioadmin
MINIO_BUCKET=videos
MINIO_REGION=us-east-1
MINIO_PRESIGNED_EXPIRY_SECONDS=600
MINIO_CA_CERT_PATH=
MINIO_TLS_REJECT_UNAUTHORIZED=true
MINIO_INSECURE_SKIP_TLS_VERIFY=false
MEDIA_MODE=legacy
MEDIA_PROVIDER=mock
TURN_URLS=

View File

@@ -41,6 +41,9 @@ Required env vars:
| `MEDIA_RECORDINGS_DIR` | Local output directory for server-side recording workers (planned in SFU mode) |
| `MEDIA_MAX_PUBLISHERS` / `MEDIA_MAX_SUBSCRIBERS_PER_ROOM` | Soft concurrency limits for single-server media mode (planned) |
| `MINIO_*` | Connection settings for the MinIO/S3 endpoint |
| `MINIO_CA_CERT_PATH` | Optional path to a PEM CA bundle used to trust a private/self-managed MinIO certificate |
| `MINIO_TLS_REJECT_UNAUTHORIZED` | TLS verification toggle for MinIO HTTPS requests (`true` by default) |
| `MINIO_INSECURE_SKIP_TLS_VERIFY` | Dev-only escape hatch to skip MinIO TLS certificate verification |
| `ADMIN_USERNAME` / `ADMIN_PASSWORD` | Basic auth for `/admin` dashboard |
`BETTER_AUTH_URL` is still accepted as a legacy fallback, but `BETTER_AUTH_BASE_URL` is preferred.
@@ -53,6 +56,7 @@ bun run dev
```
- Server boots after ensuring the configured MinIO bucket exists.
- If MinIO uses a private or incomplete certificate chain, prefer setting `MINIO_CA_CERT_PATH` to a trusted PEM bundle. Only use `MINIO_INSECURE_SKIP_TLS_VERIFY=true` for local development or temporary debugging.
## Database (Drizzle ORM)
- Generate a migration:

View File

@@ -42,7 +42,7 @@ export const deviceLinks = pgTable(
}),
);
export const deviceCommands = pgTable('device_commands', {
export const commands = pgTable('commands', {
id: uuid('id').defaultRandom().primaryKey(),
ownerUserId: uuid('owner_user_id').notNull().references(() => users.id),
sourceDeviceId: uuid('source_device_id').notNull().references(() => devices.id),
@@ -65,8 +65,6 @@ export const streamSessions = pgTable('stream_sessions', {
requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id),
status: varchar('status', { length: 32 }).default('requested').notNull(),
reason: varchar('reason', { length: 32 }).default('on_demand').notNull(),
// Legacy provider-backed fields are retained for compatibility with older sessions.
// SIMPLE_STREAMING relies on direct WebRTC signaling and does not populate them.
mediaProvider: varchar('media_provider', { length: 32 }).default('mock').notNull(),
mediaSessionId: varchar('media_session_id', { length: 255 }),
publishEndpoint: text('publish_endpoint'),
@@ -82,9 +80,9 @@ export const streamSessions = pgTable('stream_sessions', {
export const recordings = pgTable('recordings', {
id: uuid('id').defaultRandom().primaryKey(),
ownerUserId: uuid('owner_user_id').notNull().references(() => users.id),
streamSessionId: uuid('stream_session_id').notNull().references(() => streamSessions.id),
streamSessionId: uuid('stream_session_id').references(() => streamSessions.id),
cameraDeviceId: uuid('camera_device_id').notNull().references(() => devices.id),
requesterDeviceId: uuid('requester_device_id').notNull().references(() => devices.id),
requesterDeviceId: uuid('requester_device_id').references(() => devices.id),
eventId: uuid('event_id').references(() => events.id),
objectKey: varchar('object_key', { length: 1024 }),
bucket: varchar('bucket', { length: 255 }),
@@ -112,21 +110,6 @@ export const events = pgTable('events', {
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
});
export const videos = pgTable('videos', {
id: uuid('id').defaultRandom().primaryKey(),
eventId: uuid('event_id').references(() => events.id),
userId: uuid('user_id').notNull().references(() => users.id),
deviceId: uuid('device_id').notNull().references(() => devices.id),
objectKey: varchar('object_key', { length: 1024 }).notNull().unique(),
bucket: varchar('bucket', { length: 255 }).notNull(),
uploadUrl: text('upload_url').notNull(),
downloadUrl: text('download_url'),
status: varchar('status', { length: 32 }).notNull().default('pending'),
expiresAt: timestamp('expires_at', { withTimezone: true }),
createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(),
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
});
export const notifications = pgTable('notifications', {
id: uuid('id').defaultRandom().primaryKey(),
eventId: uuid('event_id').references(() => events.id).notNull(),
@@ -137,7 +120,7 @@ export const notifications = pgTable('notifications', {
isRead: boolean('is_read').default(false).notNull(),
});
export const pushNotifications = pgTable('push_notifications', {
export const notificationDeliveries = pgTable('notification_deliveries', {
id: uuid('id').defaultRandom().primaryKey(),
ownerUserId: uuid('owner_user_id').notNull().references(() => users.id),
recipientDeviceId: uuid('recipient_device_id').notNull().references(() => devices.id),
@@ -204,13 +187,12 @@ export const schema = {
users,
devices,
deviceLinks,
deviceCommands,
commands,
streamSessions,
recordings,
events,
videos,
notifications,
pushNotifications,
notificationDeliveries,
auditLogs,
accounts,
sessions,

View File

@@ -32,6 +32,8 @@ const VideoUploadUrlRequestSchema = registry.register(
fileName: z.string().min(1).max(255),
deviceId: z.string().uuid(),
prefix: z.string().optional(),
recordingId: z.string().uuid().optional(),
eventId: z.string().uuid().optional(),
}),
);
@@ -178,7 +180,7 @@ registry.registerPath({
registry.registerPath({
method: 'post',
path: '/videos/upload-url',
summary: 'Create a presigned upload URL',
summary: 'Create a presigned upload URL and attach it to a recording',
tags: ['Videos'],
security: [{ cookieAuth: [] }],
request: {

View File

@@ -1 +1 @@
ALTER TABLE "users" ADD COLUMN "password_hash" varchar(255) NOT NULL;
ALTER TABLE "users" ADD COLUMN IF NOT EXISTS "password_hash" varchar(255) NOT NULL;

View File

@@ -0,0 +1,39 @@
ALTER TABLE "device_commands" RENAME TO "commands";--> statement-breakpoint
ALTER TABLE "push_notifications" RENAME TO "notification_deliveries";--> statement-breakpoint
ALTER TABLE "recordings" ALTER COLUMN "stream_session_id" DROP NOT NULL;--> statement-breakpoint
ALTER TABLE "recordings" ALTER COLUMN "requester_device_id" DROP NOT NULL;--> statement-breakpoint
INSERT INTO "recordings" (
"id",
"owner_user_id",
"stream_session_id",
"camera_device_id",
"requester_device_id",
"event_id",
"object_key",
"bucket",
"duration_seconds",
"size_bytes",
"status",
"available_at",
"error",
"created_at",
"updated_at"
)
SELECT
"id",
"user_id",
NULL,
"device_id",
NULL,
"event_id",
"object_key",
"bucket",
NULL,
NULL,
CASE WHEN "download_url" IS NOT NULL THEN 'ready' ELSE 'awaiting_upload' END,
CASE WHEN "download_url" IS NOT NULL THEN "updated_at" ELSE NULL END,
NULL,
"created_at",
"updated_at"
FROM "videos";--> statement-breakpoint
DROP TABLE "videos";

View File

@@ -99,6 +99,13 @@
"when": 1770800000000,
"tag": "0013_users_password_hash_nullable",
"breakpoints": true
},
{
"idx": 14,
"version": "7",
"when": 1770900000000,
"tag": "0014_schema_simplification",
"breakpoints": true
}
]
}

View File

@@ -42,7 +42,7 @@
"test": "bun test",
"load:smoke": "bun run scripts/load-smoke.ts",
"db:generate": "drizzle-kit generate",
"db:migrate": "drizzle-kit migrate",
"db:migrate": "bun run scripts/migrate.ts",
"db:push": "drizzle-kit push",
"db:studio": "drizzle-kit studio",
"auth:migrate": "bun run scripts/migrate-better-auth.ts"

View File

@@ -948,6 +948,7 @@ const finalizeRecordingForStream = async (streamSessionId, captureResult) => {
fileName: `stream-${streamSessionId}.webm`,
deviceId: currentDevice.id,
prefix: 'recordings',
recordingId: recording.id,
}),
});
@@ -1010,6 +1011,7 @@ const uploadStandaloneMotionRecording = async (captureResult) => {
fileName: `motion-${Date.now()}.webm`,
deviceId: currentDevice.id,
prefix: 'recordings',
eventId: lastMotionEventId,
}),
});
@@ -1023,6 +1025,13 @@ const uploadStandaloneMotionRecording = async (captureResult) => {
throw new Error(`Upload failed with status ${uploadResponse.status}`);
}
await API.events.finalizeRecording(uploadMeta.video.id, {
objectKey: uploadMeta.objectKey,
bucket: uploadMeta.bucket,
durationSeconds: captureResult.durationSeconds,
sizeBytes: compressedBlob.size,
});
addActivity('Recording', `Motion clip uploaded (${uploadMeta.objectKey})`);
return true;
} catch (error) {

View File

@@ -5,7 +5,7 @@ import { z } from 'zod';
import { db } from '../db/client';
import { simpleStreamingEnabled } from '../media/config';
import { deviceCommands, devices, streamSessions } from '../db/schema';
import { commands, devices, streamSessions } from '../db/schema';
import { canRelayWebrtcSignal } from '../streaming/simple';
import { hasRequiredTables } from '../utils/db-schema';
import { verifyDeviceToken } from '../utils/device-token';
@@ -97,8 +97,8 @@ const emitCommand = (command: {
};
export const dispatchCommandById = async (commandId: string): Promise<void> => {
const command = await db.query.deviceCommands.findFirst({
where: eq(deviceCommands.id, commandId),
const command = await db.query.commands.findFirst({
where: eq(commands.id, commandId),
});
if (!command) {
@@ -109,13 +109,13 @@ export const dispatchCommandById = async (commandId: string): Promise<void> => {
if (simpleStreamingEnabled && command.commandType === 'start_stream') {
await db
.update(deviceCommands)
.update(commands)
.set({
status: 'failed',
updatedAt: now,
error: 'start_stream command delivery disabled by SIMPLE_STREAMING',
})
.where(eq(deviceCommands.id, command.id));
.where(eq(commands.id, command.id));
return;
}
@@ -129,7 +129,7 @@ export const dispatchCommandById = async (commandId: string): Promise<void> => {
});
await db
.update(deviceCommands)
.update(commands)
.set({
status: delivered ? 'sent' : 'queued',
lastDispatchedAt: now,
@@ -137,14 +137,14 @@ export const dispatchCommandById = async (commandId: string): Promise<void> => {
updatedAt: now,
error: delivered ? null : 'target device offline',
})
.where(eq(deviceCommands.id, command.id));
.where(eq(commands.id, command.id));
};
const retryPendingCommands = async () => {
const threshold = new Date(Date.now() - RETRY_DISPATCH_DELAY_MS);
const pending = await db.query.deviceCommands.findMany({
where: and(eq(deviceCommands.status, 'sent'), lt(deviceCommands.lastDispatchedAt, threshold)),
const pending = await db.query.commands.findMany({
where: and(eq(commands.status, 'sent'), lt(commands.lastDispatchedAt, threshold)),
limit: 100,
orderBy: (fields, operators) => [operators.asc(fields.createdAt)],
});
@@ -154,13 +154,13 @@ const retryPendingCommands = async () => {
if (simpleStreamingEnabled && command.commandType === 'start_stream') {
await db
.update(deviceCommands)
.update(commands)
.set({
status: 'failed',
updatedAt: now,
error: 'start_stream retries disabled by SIMPLE_STREAMING',
})
.where(eq(deviceCommands.id, command.id));
.where(eq(commands.id, command.id));
continue;
}
@@ -168,13 +168,13 @@ const retryPendingCommands = async () => {
if (nextRetryCount > MAX_RETRIES) {
await db
.update(deviceCommands)
.update(commands)
.set({
status: 'failed',
updatedAt: now,
error: 'max retries exceeded',
})
.where(eq(deviceCommands.id, command.id));
.where(eq(commands.id, command.id));
continue;
}
@@ -188,7 +188,7 @@ const retryPendingCommands = async () => {
});
await db
.update(deviceCommands)
.update(commands)
.set({
status: delivered ? 'sent' : 'queued',
lastDispatchedAt: now,
@@ -196,7 +196,7 @@ const retryPendingCommands = async () => {
updatedAt: now,
error: delivered ? null : 'target device offline',
})
.where(eq(deviceCommands.id, command.id));
.where(eq(commands.id, command.id));
}
};
@@ -286,8 +286,8 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
return;
}
const command = await db.query.deviceCommands.findFirst({
where: eq(deviceCommands.id, parsed.data.commandId),
const command = await db.query.commands.findFirst({
where: eq(commands.id, parsed.data.commandId),
});
if (!command) {
@@ -303,14 +303,14 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
const now = new Date();
await db
.update(deviceCommands)
.update(commands)
.set({
status: parsed.data.status,
acknowledgedAt: now,
updatedAt: now,
error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null,
})
.where(eq(deviceCommands.id, command.id));
.where(eq(commands.id, command.id));
io?.to(roomForDevice(command.sourceDeviceId)).emit('command:status', {
commandId: command.id,
@@ -371,7 +371,7 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
});
if (!retryTimer) {
const requiredTables = ['device_commands'];
const requiredTables = ['commands'];
void (async () => {
const ready = await hasRequiredTables(requiredTables);

View File

@@ -3,7 +3,7 @@ import { Router } from 'express';
import { z } from 'zod';
import { db } from '../db/client';
import { deviceCommands, deviceLinks, devices } from '../db/schema';
import { commands, deviceLinks, devices } from '../db/schema';
import { simpleStreamingEnabled } from '../media/config';
import { requireAuth } from '../middleware/auth';
import { requireDeviceAuth } from '../middleware/device-auth';
@@ -98,7 +98,7 @@ router.post('/', requireAuth, async (req, res) => {
const now = new Date();
const [command] = await db
.insert(deviceCommands)
.insert(commands)
.values({
ownerUserId: authSession.user.id,
sourceDeviceId: sourceDevice.id,
@@ -118,7 +118,7 @@ router.post('/', requireAuth, async (req, res) => {
await dispatchCommandById(command.id);
const refreshed = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
const refreshed = await db.query.commands.findFirst({ where: eq(commands.id, command.id) });
res.status(201).json({
message: 'Command queued',
@@ -141,13 +141,13 @@ router.get('/', requireAuth, async (req, res) => {
return;
}
const commands = await db.query.deviceCommands.findMany({
where: eq(deviceCommands.ownerUserId, authSession.user.id),
orderBy: [desc(deviceCommands.createdAt)],
const commandResults = await db.query.commands.findMany({
where: eq(commands.ownerUserId, authSession.user.id),
orderBy: [desc(commands.createdAt)],
limit: parsed.data.limit,
});
const filtered = commands.filter((command) => {
const filtered = commandResults.filter((command) => {
if (parsed.data.sourceDeviceId && command.sourceDeviceId !== parsed.data.sourceDeviceId) {
return false;
}
@@ -187,8 +187,8 @@ router.post('/:commandId/ack', requireDeviceAuth, async (req, res) => {
return;
}
const command = await db.query.deviceCommands.findFirst({
where: eq(deviceCommands.id, parsedParams.data.commandId),
const command = await db.query.commands.findFirst({
where: eq(commands.id, parsedParams.data.commandId),
});
if (!command) {
@@ -204,14 +204,14 @@ router.post('/:commandId/ack', requireDeviceAuth, async (req, res) => {
const now = new Date();
const [updated] = await db
.update(deviceCommands)
.update(commands)
.set({
status: parsed.data.status,
acknowledgedAt: now,
updatedAt: now,
error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null,
})
.where(eq(deviceCommands.id, command.id))
.where(eq(commands.id, command.id))
.returning();
res.json({ message: 'Command acknowledged', command: updated });

View File

@@ -3,7 +3,7 @@ import { Router } from 'express';
import { z } from 'zod';
import { db } from '../db/client';
import { pushNotifications } from '../db/schema';
import { notificationDeliveries } from '../db/schema';
import { requireDeviceAuth } from '../middleware/device-auth';
import { dispatchPushQueueOnce } from '../services/push';
@@ -33,12 +33,12 @@ router.get('/me', requireDeviceAuth, async (req, res) => {
return;
}
const result = await db.query.pushNotifications.findMany({
const result = await db.query.notificationDeliveries.findMany({
where: and(
eq(pushNotifications.ownerUserId, deviceAuth.userId),
eq(pushNotifications.recipientDeviceId, deviceAuth.deviceId),
eq(notificationDeliveries.ownerUserId, deviceAuth.userId),
eq(notificationDeliveries.recipientDeviceId, deviceAuth.deviceId),
),
orderBy: [desc(pushNotifications.createdAt)],
orderBy: [desc(notificationDeliveries.createdAt)],
limit: parsed.data.limit,
});
@@ -63,16 +63,16 @@ router.post('/:notificationId/read', requireDeviceAuth, async (req, res) => {
}
const [updated] = await db
.update(pushNotifications)
.update(notificationDeliveries)
.set({
status: 'read',
updatedAt: new Date(),
})
.where(
and(
eq(pushNotifications.id, parsedParams.data.notificationId),
eq(pushNotifications.ownerUserId, deviceAuth.userId),
eq(pushNotifications.recipientDeviceId, deviceAuth.deviceId),
eq(notificationDeliveries.id, parsedParams.data.notificationId),
eq(notificationDeliveries.ownerUserId, deviceAuth.userId),
eq(notificationDeliveries.recipientDeviceId, deviceAuth.deviceId),
),
)
.returning();

View File

@@ -180,13 +180,6 @@ router.get('/:recordingId/download-url', requireDeviceAuth, async (req, res) =>
return;
}
const canAccess = recording.requesterDeviceId === deviceAuth.deviceId || recording.cameraDeviceId === deviceAuth.deviceId;
if (!canAccess) {
res.status(403).json({ message: 'Device cannot access this recording' });
return;
}
if (recording.status !== 'ready' || !recording.objectKey || !recording.bucket) {
res.status(409).json({ message: 'Recording is not available yet' });
return;
@@ -228,7 +221,6 @@ router.get('/:recordingId/download-url', requireDeviceAuth, async (req, res) =>
});
});
// Internal helper used by stream lifecycle to create recording placeholder rows.
export const createRecordingForStream = async (streamSessionId: string): Promise<void> => {
const stream = await db.query.streamSessions.findFirst({ where: eq(streamSessions.id, streamSessionId) });

View File

@@ -6,7 +6,7 @@ import { z } from 'zod';
import { db } from '../db/client';
import { mediaMode, simpleStreamingEnabled, streamRecordingEnabled } from '../media/config';
import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema';
import { commands, deviceLinks, devices, streamSessions } from '../db/schema';
import { createLiveMediaSession, mediaProvider } from '../media/service';
import { sfuService } from '../media/sfu/service';
import { requireDeviceAuth } from '../middleware/device-auth';
@@ -204,7 +204,7 @@ router.post('/request', requireDeviceAuth, async (req, res) => {
}
const [command] = await db
.insert(deviceCommands)
.insert(commands)
.values({
ownerUserId: deviceAuth.userId,
sourceDeviceId: sourceDevice.id,
@@ -235,7 +235,7 @@ router.post('/request', requireDeviceAuth, async (req, res) => {
commandId: command.id,
});
const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
const refreshedCommand = await db.query.commands.findFirst({ where: eq(commands.id, command.id) });
const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', {
streamSessionId: session.id,

View File

@@ -3,7 +3,7 @@ import { and, eq } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../db/client';
import { devices, videos } from '../db/schema';
import { devices, events, recordings } from '../db/schema';
import { requireAuth } from '../middleware/auth';
import {
ensureMinioBucket,
@@ -18,6 +18,8 @@ const uploadUrlSchema = z.object({
fileName: z.string().trim().min(1).max(255),
deviceId: z.string().uuid(),
prefix: z.string().trim().optional(),
recordingId: z.string().uuid().optional(),
eventId: z.string().uuid().optional(),
});
const downloadUrlSchema = z.object({
@@ -67,44 +69,81 @@ router.post('/upload-url', async (req, res) => {
return;
}
if (parsed.data.eventId) {
const event = await db.query.events.findFirst({
where: and(eq(events.id, parsed.data.eventId), eq(events.userId, authSession.user.id)),
columns: { id: true },
});
if (!event) {
res.status(400).json({ message: 'Invalid eventId for this user' });
return;
}
}
const objectKey = buildObjectKey(authSession.user.id, parsed.data.fileName, parsed.data.prefix);
const uploadUrl = await minioClient.presignedPutObject(minioBucket, objectKey, minioPresignedExpirySeconds);
const now = new Date();
const expiresAt = new Date(now.getTime() + minioPresignedExpirySeconds * 1000);
const [videoRecord] = await db
.insert(videos)
.values({
userId: authSession.user.id,
deviceId: parsed.data.deviceId,
objectKey,
bucket: minioBucket,
uploadUrl,
status: 'upload_link_sent',
expiresAt,
updatedAt: now,
})
.returning({
id: videos.id,
objectKey: videos.objectKey,
bucket: videos.bucket,
status: videos.status,
createdAt: videos.createdAt,
expiresAt: videos.expiresAt,
let persistedRecording;
if (parsed.data.recordingId) {
const existingRecording = await db.query.recordings.findFirst({
where: and(eq(recordings.id, parsed.data.recordingId), eq(recordings.ownerUserId, authSession.user.id)),
});
if (!videoRecord) {
res.status(500).json({ message: 'Unable to persist video metadata' });
if (!existingRecording) {
res.status(404).json({ message: 'Recording not found' });
return;
}
[persistedRecording] = await db
.update(recordings)
.set({
objectKey,
bucket: minioBucket,
status: 'awaiting_upload',
updatedAt: now,
error: null,
})
.where(eq(recordings.id, existingRecording.id))
.returning();
} else {
[persistedRecording] = await db
.insert(recordings)
.values({
ownerUserId: authSession.user.id,
cameraDeviceId: parsed.data.deviceId,
requesterDeviceId: null,
eventId: parsed.data.eventId ?? null,
objectKey,
bucket: minioBucket,
status: 'awaiting_upload',
updatedAt: now,
})
.returning();
}
if (!persistedRecording) {
res.status(500).json({ message: 'Unable to persist recording metadata' });
return;
}
res.status(201).json({
message: 'Dummy upload URL generated',
message: 'Upload URL generated',
bucket: minioBucket,
objectKey,
uploadUrl,
expiresInSeconds: minioPresignedExpirySeconds,
video: videoRecord,
video: {
id: persistedRecording.id,
objectKey: persistedRecording.objectKey,
bucket: persistedRecording.bucket,
status: persistedRecording.status,
createdAt: persistedRecording.createdAt,
expiresAt,
},
});
});

View File

@@ -0,0 +1,18 @@
import 'dotenv/config';
import { migrate } from 'drizzle-orm/node-postgres/migrator';
import { db, pool } from '../db/client';
const run = async (): Promise<void> => {
await migrate(db, { migrationsFolder: './drizzle' });
await pool.end();
console.log('Database migrations applied.');
};
run()
.then(() => process.exit(0))
.catch(async (error) => {
console.error('Failed to apply database migrations', error);
await pool.end().catch(() => undefined);
process.exit(1);
});

View File

@@ -1,7 +1,7 @@
import { and, eq, lte } from 'drizzle-orm';
import { db } from '../db/client';
import { devices, pushNotifications } from '../db/schema';
import { devices, notificationDeliveries } from '../db/schema';
import { hasRequiredTables } from '../utils/db-schema';
const MAX_ATTEMPTS = Number(process.env.PUSH_MAX_ATTEMPTS ?? 5);
@@ -12,7 +12,7 @@ export const enqueuePushNotification = async (input: {
type: string;
payload?: Record<string, unknown>;
}): Promise<void> => {
await db.insert(pushNotifications).values({
await db.insert(notificationDeliveries).values({
ownerUserId: input.ownerUserId,
recipientDeviceId: input.recipientDeviceId,
type: input.type,
@@ -25,7 +25,9 @@ export const enqueuePushNotification = async (input: {
};
const deliverPush = async (notificationId: string): Promise<void> => {
const notification = await db.query.pushNotifications.findFirst({ where: eq(pushNotifications.id, notificationId) });
const notification = await db.query.notificationDeliveries.findFirst({
where: eq(notificationDeliveries.id, notificationId),
});
if (!notification || notification.status === 'delivered' || notification.status === 'failed') {
return;
@@ -41,7 +43,7 @@ const deliverPush = async (notificationId: string): Promise<void> => {
const shouldFail = attempts >= MAX_ATTEMPTS;
await db
.update(pushNotifications)
.update(notificationDeliveries)
.set({
attempts,
status: shouldFail ? 'failed' : 'queued',
@@ -49,14 +51,13 @@ const deliverPush = async (notificationId: string): Promise<void> => {
nextAttemptAt: new Date(now.getTime() + nextDelaySeconds * 1000),
updatedAt: now,
})
.where(eq(pushNotifications.id, notification.id));
.where(eq(notificationDeliveries.id, notification.id));
return;
}
// Mock push provider: consider "delivered" when token exists.
await db
.update(pushNotifications)
.update(notificationDeliveries)
.set({
attempts,
status: 'delivered',
@@ -64,14 +65,14 @@ const deliverPush = async (notificationId: string): Promise<void> => {
lastError: null,
updatedAt: now,
})
.where(eq(pushNotifications.id, notification.id));
.where(eq(notificationDeliveries.id, notification.id));
};
export const dispatchPushQueueOnce = async (): Promise<number> => {
const now = new Date();
const queued = await db.query.pushNotifications.findMany({
where: and(eq(pushNotifications.status, 'queued'), lte(pushNotifications.nextAttemptAt, now)),
const queued = await db.query.notificationDeliveries.findMany({
where: and(eq(notificationDeliveries.status, 'queued'), lte(notificationDeliveries.nextAttemptAt, now)),
limit: 100,
});
@@ -84,7 +85,7 @@ export const dispatchPushQueueOnce = async (): Promise<number> => {
export const startPushWorker = (): void => {
const intervalMs = Number(process.env.PUSH_WORKER_INTERVAL_MS ?? 10_000);
const requiredTables = ['push_notifications', 'devices'];
const requiredTables = ['notification_deliveries', 'devices'];
void (async () => {
const ready = await hasRequiredTables(requiredTables);

View File

@@ -1,3 +1,5 @@
import { readFileSync } from 'node:fs';
import { Agent as HttpsAgent } from 'node:https';
import { Client } from 'minio';
const endpoint = process.env.MINIO_ENDPOINT ?? 'localhost';
@@ -5,6 +7,9 @@ const port = Number(process.env.MINIO_PORT ?? 9000);
const useSSL = (process.env.MINIO_USE_SSL ?? 'false').toLowerCase() === 'true';
const accessKey = process.env.MINIO_ACCESS_KEY;
const secretKey = process.env.MINIO_SECRET_KEY;
const insecureSkipTlsVerify = (process.env.MINIO_INSECURE_SKIP_TLS_VERIFY ?? 'false').toLowerCase() === 'true';
const tlsRejectUnauthorized = (process.env.MINIO_TLS_REJECT_UNAUTHORIZED ?? 'true').toLowerCase() !== 'false';
const minioCaCertPath = process.env.MINIO_CA_CERT_PATH?.trim();
if (!accessKey || !secretKey) {
throw new Error('MINIO_ACCESS_KEY and MINIO_SECRET_KEY must be set');
@@ -12,6 +17,14 @@ if (!accessKey || !secretKey) {
export const minioBucket = process.env.MINIO_BUCKET ?? 'videos';
export const minioPresignedExpirySeconds = Number(process.env.MINIO_PRESIGNED_EXPIRY_SECONDS ?? 60 * 10);
const customCa = minioCaCertPath ? readFileSync(minioCaCertPath) : undefined;
const transportAgent = useSSL
? new HttpsAgent({
keepAlive: true,
ca: customCa,
rejectUnauthorized: insecureSkipTlsVerify ? false : tlsRejectUnauthorized,
})
: undefined;
export const minioClient = new Client({
endPoint: endpoint,
@@ -19,6 +32,7 @@ export const minioClient = new Client({
useSSL,
accessKey,
secretKey,
transportAgent,
});
let ensureBucketPromise: Promise<void> | null = null;