feat: Implement analysis job tracking with progress timeline and enhanced data source status management.

This commit is contained in:
2026-02-03 22:43:27 +00:00
parent c47614bc66
commit 358f2a42dd
22 changed files with 2251 additions and 219 deletions

128
convex/analysisJobs.ts Normal file
View File

@@ -0,0 +1,128 @@
import { mutation, query } from "./_generated/server";
import { v } from "convex/values";
import { getAuthUserId } from "@convex-dev/auth/server";
export const create = mutation({
args: {
projectId: v.id("projects"),
dataSourceId: v.optional(v.id("dataSources")),
},
handler: async (ctx, args) => {
const userId = await getAuthUserId(ctx);
if (!userId) throw new Error("Unauthorized");
const project = await ctx.db.get(args.projectId);
if (!project || project.userId !== userId) {
throw new Error("Project not found or unauthorized");
}
const now = Date.now();
return await ctx.db.insert("analysisJobs", {
projectId: args.projectId,
dataSourceId: args.dataSourceId,
status: "pending",
progress: 0,
stage: undefined,
timeline: [],
createdAt: now,
updatedAt: now,
});
},
});
export const update = mutation({
args: {
jobId: v.id("analysisJobs"),
status: v.union(
v.literal("pending"),
v.literal("running"),
v.literal("completed"),
v.literal("failed")
),
progress: v.optional(v.number()),
stage: v.optional(v.string()),
timeline: v.optional(v.array(v.object({
key: v.string(),
label: v.string(),
status: v.union(
v.literal("pending"),
v.literal("running"),
v.literal("completed"),
v.literal("failed")
),
detail: v.optional(v.string()),
}))),
error: v.optional(v.string()),
},
handler: async (ctx, args) => {
const userId = await getAuthUserId(ctx);
if (!userId) throw new Error("Unauthorized");
const job = await ctx.db.get(args.jobId);
if (!job) throw new Error("Job not found");
const project = await ctx.db.get(job.projectId);
if (!project || project.userId !== userId) {
throw new Error("Project not found or unauthorized");
}
const patch: Record<string, unknown> = {
status: args.status,
updatedAt: Date.now(),
};
if (args.progress !== undefined) patch.progress = args.progress;
if (args.error !== undefined) patch.error = args.error;
if (args.stage !== undefined) patch.stage = args.stage;
if (args.timeline !== undefined) patch.timeline = args.timeline;
await ctx.db.patch(args.jobId, patch);
},
});
export const getById = query({
args: {
jobId: v.id("analysisJobs"),
},
handler: async (ctx, args) => {
const userId = await getAuthUserId(ctx);
if (!userId) return null;
const job = await ctx.db.get(args.jobId);
if (!job) return null;
const project = await ctx.db.get(job.projectId);
if (!project || project.userId !== userId) return null;
return job;
},
});
export const listByProject = query({
args: {
projectId: v.id("projects"),
status: v.optional(v.string()),
},
handler: async (ctx, args) => {
const userId = await getAuthUserId(ctx);
if (!userId) return [];
const project = await ctx.db.get(args.projectId);
if (!project || project.userId !== userId) return [];
if (args.status) {
return await ctx.db
.query("analysisJobs")
.withIndex("by_project_status", (q) =>
q.eq("projectId", args.projectId).eq("status", args.status!)
)
.order("desc")
.collect();
}
return await ctx.db
.query("analysisJobs")
.withIndex("by_project_createdAt", (q) => q.eq("projectId", args.projectId))
.order("desc")
.collect();
},
});