|
10 | 10 | import { db } from '@sim/db' |
11 | 11 | import { userTableDefinitions, userTableRows } from '@sim/db/schema' |
12 | 12 | import { createLogger } from '@sim/logger' |
13 | | -import { and, count, eq, gt, gte, sql } from 'drizzle-orm' |
| 13 | +import { and, count, eq, gt, gte, inArray, sql } from 'drizzle-orm' |
14 | 14 | import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from './constants' |
15 | 15 | import { buildFilterClause, buildSortClause } from './sql' |
16 | 16 | import type { |
17 | 17 | BatchInsertData, |
| 18 | + BatchUpdateByIdData, |
18 | 19 | BulkDeleteByIdsData, |
19 | 20 | BulkDeleteByIdsResult, |
20 | 21 | BulkDeleteData, |
@@ -1092,6 +1093,96 @@ export async function updateRowsByFilter( |
1092 | 1093 | } |
1093 | 1094 | } |
1094 | 1095 |
|
| 1096 | +/** |
| 1097 | + * Updates multiple rows with per-row data in a single transaction. |
| 1098 | + * Avoids the race condition of parallel update_row calls overwriting each other. |
| 1099 | + */ |
| 1100 | +export async function batchUpdateRows( |
| 1101 | + data: BatchUpdateByIdData, |
| 1102 | + table: TableDefinition, |
| 1103 | + requestId: string |
| 1104 | +): Promise<BulkOperationResult> { |
| 1105 | + if (data.updates.length === 0) { |
| 1106 | + return { affectedCount: 0, affectedRowIds: [] } |
| 1107 | + } |
| 1108 | + |
| 1109 | + const rowIds = data.updates.map((u) => u.rowId) |
| 1110 | + const existingRows = await db |
| 1111 | + .select({ id: userTableRows.id, data: userTableRows.data }) |
| 1112 | + .from(userTableRows) |
| 1113 | + .where( |
| 1114 | + and( |
| 1115 | + eq(userTableRows.tableId, data.tableId), |
| 1116 | + eq(userTableRows.workspaceId, data.workspaceId), |
| 1117 | + inArray(userTableRows.id, rowIds) |
| 1118 | + ) |
| 1119 | + ) |
| 1120 | + |
| 1121 | + const existingMap = new Map(existingRows.map((r) => [r.id, r.data as RowData])) |
| 1122 | + |
| 1123 | + const missing = rowIds.filter((id) => !existingMap.has(id)) |
| 1124 | + if (missing.length > 0) { |
| 1125 | + throw new Error(`Rows not found: ${missing.join(', ')}`) |
| 1126 | + } |
| 1127 | + |
| 1128 | + const mergedUpdates: Array<{ rowId: string; mergedData: RowData }> = [] |
| 1129 | + for (const update of data.updates) { |
| 1130 | + const existing = existingMap.get(update.rowId)! |
| 1131 | + const merged = { ...existing, ...update.data } |
| 1132 | + |
| 1133 | + const sizeValidation = validateRowSize(merged) |
| 1134 | + if (!sizeValidation.valid) { |
| 1135 | + throw new Error(`Row ${update.rowId}: ${sizeValidation.errors.join(', ')}`) |
| 1136 | + } |
| 1137 | + |
| 1138 | + const schemaValidation = validateRowAgainstSchema(merged, table.schema) |
| 1139 | + if (!schemaValidation.valid) { |
| 1140 | + throw new Error(`Row ${update.rowId}: ${schemaValidation.errors.join(', ')}`) |
| 1141 | + } |
| 1142 | + |
| 1143 | + mergedUpdates.push({ rowId: update.rowId, mergedData: merged }) |
| 1144 | + } |
| 1145 | + |
| 1146 | + const uniqueColumns = getUniqueColumns(table.schema) |
| 1147 | + if (uniqueColumns.length > 0) { |
| 1148 | + for (const { rowId, mergedData } of mergedUpdates) { |
| 1149 | + const uniqueValidation = await checkUniqueConstraintsDb( |
| 1150 | + data.tableId, |
| 1151 | + mergedData, |
| 1152 | + table.schema, |
| 1153 | + rowId |
| 1154 | + ) |
| 1155 | + if (!uniqueValidation.valid) { |
| 1156 | + throw new Error(`Row ${rowId}: ${uniqueValidation.errors.join(', ')}`) |
| 1157 | + } |
| 1158 | + } |
| 1159 | + } |
| 1160 | + |
| 1161 | + const now = new Date() |
| 1162 | + |
| 1163 | + await db.transaction(async (trx) => { |
| 1164 | + for (let i = 0; i < mergedUpdates.length; i += TABLE_LIMITS.UPDATE_BATCH_SIZE) { |
| 1165 | + const batch = mergedUpdates.slice(i, i + TABLE_LIMITS.UPDATE_BATCH_SIZE) |
| 1166 | + const updatePromises = batch.map(({ rowId, mergedData }) => |
| 1167 | + trx |
| 1168 | + .update(userTableRows) |
| 1169 | + .set({ data: mergedData, updatedAt: now }) |
| 1170 | + .where(eq(userTableRows.id, rowId)) |
| 1171 | + ) |
| 1172 | + await Promise.all(updatePromises) |
| 1173 | + } |
| 1174 | + }) |
| 1175 | + |
| 1176 | + logger.info( |
| 1177 | + `[${requestId}] Batch updated ${mergedUpdates.length} rows in table ${data.tableId}` |
| 1178 | + ) |
| 1179 | + |
| 1180 | + return { |
| 1181 | + affectedCount: mergedUpdates.length, |
| 1182 | + affectedRowIds: mergedUpdates.map((u) => u.rowId), |
| 1183 | + } |
| 1184 | +} |
| 1185 | + |
1095 | 1186 | type DbTransaction = Parameters<Parameters<typeof db.transaction>[0]>[0] |
1096 | 1187 |
|
1097 | 1188 | /** |
|
0 commit comments