Ver código fonte

同步优化

wangxx 1 semana atrás
pai
commit
21eb8d7528

+ 187 - 52
airport-ledger/src/main/java/com/sundot/airport/ledger/service/impl/LedgerSyncServiceImpl.java

@@ -11,6 +11,7 @@ import com.sundot.airport.common.utils.DateUtils;
11 11
 import com.sundot.airport.ledger.domain.*;
12 12
 import com.sundot.airport.ledger.mapper.*;
13 13
 import com.sundot.airport.ledger.service.ILedgerSyncService;
14
+import com.sundot.airport.ledger.service.IScoreEventService;
14 15
 import com.sundot.airport.system.domain.BasePosition;
15 16
 import com.sundot.airport.system.mapper.SysDeptMapper;
16 17
 import com.sundot.airport.system.mapper.SysUserMapper;
@@ -60,7 +61,7 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
60 61
     @Autowired private LedgerExamScoreMapper examScoreMapper;
61 62
 
62 63
     // ── Score Mappers ─────────────────────────────────────────
63
-    @Autowired private ScoreEventMapper scoreEventMapper;
64
+    @Autowired private IScoreEventService scoreEventService;
64 65
     @Autowired private ScoreDimensionMapper dimensionMapper;
65 66
     @Autowired private ScoreIndicatorMapper indicatorMapper;
66 67
     @Autowired private SysDeptMapper sysDeptMapper;
@@ -199,11 +200,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
199 200
         Long dimId = dimCache.get("安全防控能力");
200 201
         ScoreIndicator lv2 = getIndicator(IND_SUPERVISION_L2);
201 202
         int ins = 0, skip = 0;
203
+
204
+        // 优化:收集需要更新的ID,批量更新sync_flag,避免逐条更新导致锁等待超时
205
+        List<Long> idsToUpdate = new ArrayList<>();
206
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
207
+
202 208
         for (LedgerSupervisionProblem row : list) {
203
-            LambdaUpdateWrapper<LedgerSupervisionProblem> uw = new LambdaUpdateWrapper<>();
204
-            uw.eq(LedgerSupervisionProblem::getId, row.getId())
205
-                    .set(LedgerSupervisionProblem::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
206
-            supervisionMapper.update(null, uw);
207 209
             String src = "supervision_problem:" + row.getId();
208 210
             if (existsBySrc(src)) { skip++; continue; }
209 211
             if (!hasName(row.getInspectedName())) { skip++; continue; }
@@ -221,9 +223,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
221 223
                     row.getLocation(), sv, ZERO,
222 224
                     row.getProblemDesc(), src, row.getEvidenceFile(),
223 225
                     row.getChannelId(),row.getChannelNo(),row.getAreaId(),row.getPositionId(),row.getPosition());
224
-            scoreEventMapper.insert(e);
226
+            eventsToInsert.add(e);
227
+            idsToUpdate.add(row.getId());
225 228
             ins++;
226 229
         }
230
+
231
+        // 使用通用批量处理方法
232
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
233
+            LambdaUpdateWrapper<LedgerSupervisionProblem> uw = new LambdaUpdateWrapper<>();
234
+            uw.in(LedgerSupervisionProblem::getId, subList)
235
+                    .set(LedgerSupervisionProblem::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
236
+            supervisionMapper.update(null, uw);
237
+        });
238
+
227 239
         return new SyncResult(ins, skip, "");
228 240
     }
229 241
 
@@ -235,11 +247,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
235 247
         Long dimId = dimCache.get("安全防控能力");
236 248
         ScoreIndicator lv2 = getIndicator(IND_INTERCEPT_L2);
237 249
         int ins = 0, skip = 0;
250
+
251
+        // 优化:收集需要更新的ID,批量更新sync_flag
252
+        List<Long> idsToUpdate = new ArrayList<>();
253
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
254
+
238 255
         for (LedgerRealtimeInterception row : list) {
239
-            LambdaUpdateWrapper<LedgerRealtimeInterception> uw = new LambdaUpdateWrapper<>();
240
-            uw.eq(LedgerRealtimeInterception::getId, row.getId())
241
-                    .set(LedgerRealtimeInterception::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
242
-            interceptionMapper.update(null, uw);
243 256
             String src = "realtime_interception:" + row.getId();
244 257
             if (existsBySrc(src)) { skip++; continue; }
245 258
             if (!hasName(row.getInspectorName())) { skip++; continue; }
@@ -263,9 +276,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
263 276
                     row.getLocation(), sv, ZERO,
264 277
                     row.getItemName(), src, row.getEvidenceFile(),
265 278
                     row.getChannelId(),row.getChannelNo(),row.getAreaId(),row.getPositionId(),row.getPosition());
266
-            scoreEventMapper.insert(e);
279
+            eventsToInsert.add(e);
280
+            idsToUpdate.add(row.getId());
267 281
             ins++;
268 282
         }
283
+
284
+        // 使用通用批量处理方法
285
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
286
+            LambdaUpdateWrapper<LedgerRealtimeInterception> uw = new LambdaUpdateWrapper<>();
287
+            uw.in(LedgerRealtimeInterception::getId, subList)
288
+                    .set(LedgerRealtimeInterception::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
289
+            interceptionMapper.update(null, uw);
290
+        });
291
+
269 292
         return new SyncResult(ins, skip, "");
270 293
     }
271 294
 
@@ -277,11 +300,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
277 300
         Long dimId = dimCache.get("安全防控能力");
278 301
         ScoreIndicator lv2 = getIndicator(IND_SECTEST_L2);
279 302
         int ins = 0, skip = 0;
303
+        
304
+        // 优化:收集需要更新的ID,批量更新sync_flag
305
+        List<Long> idsToUpdate = new ArrayList<>();
306
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
307
+        
280 308
         for (LedgerSecurityTest row : list) {
281
-            LambdaUpdateWrapper<LedgerSecurityTest> uw = new LambdaUpdateWrapper<>();
282
-            uw.eq(LedgerSecurityTest::getId, row.getId())
283
-                    .set(LedgerSecurityTest::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
284
-            securityTestMapper.update(null, uw);
285 309
             String src = "security_test:" + row.getId();
286 310
             if (existsBySrc(src)) { skip++; continue; }
287 311
             if (!"未通过".equals(row.getTestResult())) { skip++; continue; }
@@ -299,9 +323,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
299 323
                     row.getRegion(), NEG_HALF, ZERO,
300 324
                     row.getTestItem(), src, row.getEvidenceFile(),
301 325
                     row.getChannelId(),row.getChannel(),row.getAreaId(),row.getPositionId(),row.getTestType());
302
-            scoreEventMapper.insert(e);
326
+            eventsToInsert.add(e);
327
+            idsToUpdate.add(row.getId());
303 328
             ins++;
304 329
         }
330
+        
331
+        // 使用通用批量处理方法
332
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
333
+            LambdaUpdateWrapper<LedgerSecurityTest> uw = new LambdaUpdateWrapper<>();
334
+            uw.in(LedgerSecurityTest::getId, subList)
335
+                    .set(LedgerSecurityTest::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
336
+            securityTestMapper.update(null, uw);
337
+        });
338
+        
305 339
         return new SyncResult(ins, skip, "");
306 340
     }
307 341
 
@@ -313,11 +347,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
313 347
         Long dimId = dimCache.get("安全防控能力");
314 348
         ScoreIndicator lv2 = getIndicator(IND_UNSAFE_L2);
315 349
         int ins = 0, skip = 0;
350
+        
351
+        // 优化:收集需要更新的ID,批量更新sync_flag
352
+        List<Long> idsToUpdate = new ArrayList<>();
353
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
354
+        
316 355
         for (LedgerUnsafeEvent row : list) {
317
-            LambdaUpdateWrapper<LedgerUnsafeEvent> uw = new LambdaUpdateWrapper<>();
318
-            uw.eq(LedgerUnsafeEvent::getId, row.getId())
319
-                    .set(LedgerUnsafeEvent::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
320
-            unsafeEventMapper.update(null, uw);
321 356
             String src = "unsafe_event:" + row.getId();
322 357
             if (existsBySrc(src)) { skip++; continue; }
323 358
             if (!hasName(row.getResponsibleName())) { skip++; continue; }
@@ -335,9 +370,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
335 370
                     row.getArea(), sv, ZERO,
336 371
                     row.getEventDesc(), src, row.getEvidenceFile(),
337 372
                     row.getChannelId(),row.getChannelNo(),row.getAreaId(),row.getPositionId(),row.getPosition());
338
-            scoreEventMapper.insert(e);
373
+            eventsToInsert.add(e);
374
+            idsToUpdate.add(row.getId());
339 375
             ins++;
340 376
         }
377
+        
378
+        // 使用通用批量处理方法
379
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
380
+            LambdaUpdateWrapper<LedgerUnsafeEvent> uw = new LambdaUpdateWrapper<>();
381
+            uw.in(LedgerUnsafeEvent::getId, subList)
382
+                    .set(LedgerUnsafeEvent::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
383
+            unsafeEventMapper.update(null, uw);
384
+        });
385
+        
341 386
         return new SyncResult(ins, skip, "");
342 387
     }
343 388
 
@@ -349,11 +394,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
349 394
         Long dimId = dimCache.get("安全防控能力");
350 395
         ScoreIndicator lv2 = getIndicator(IND_SEIZURE_L2);
351 396
         int ins = 0, skip = 0;
397
+        
398
+        // 优化:收集需要更新的ID,批量更新sync_flag
399
+        List<Long> idsToUpdate = new ArrayList<>();
400
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
401
+        
352 402
         for (LedgerSeizureStats row : list) {
353
-            LambdaUpdateWrapper<LedgerSeizureStats> uw = new LambdaUpdateWrapper<>();
354
-            uw.eq(LedgerSeizureStats::getId, row.getId())
355
-                    .set(LedgerSeizureStats::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
356
-            seizureMapper.update(null, uw);
357 403
             String src = "seizure_stats:" + row.getId();
358 404
             if (existsBySrc(src)) { skip++; continue; }
359 405
             if (!hasName(row.getInspectorName())) { skip++; continue; }
@@ -368,9 +414,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
368 414
                     row.getWorkArea(), sv, ZERO,
369 415
                     row.getItemName(), src, row.getEvidenceFile(),
370 416
                     null,null,row.getAreaId(),row.getPositionId(),row.getPosition());
371
-            scoreEventMapper.insert(e);
417
+            eventsToInsert.add(e);
418
+            idsToUpdate.add(row.getId());
372 419
             ins++;
373 420
         }
421
+        
422
+        // 使用通用批量处理方法
423
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
424
+            LambdaUpdateWrapper<LedgerSeizureStats> uw = new LambdaUpdateWrapper<>();
425
+            uw.in(LedgerSeizureStats::getId, subList)
426
+                    .set(LedgerSeizureStats::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
427
+            seizureMapper.update(null, uw);
428
+        });
429
+        
374 430
         return new SyncResult(ins, skip, "");
375 431
     }
376 432
 
@@ -386,11 +442,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
386 442
         Long dimId = dimCache.get("安全防控能力");
387 443
         ScoreIndicator lv2 = getIndicator(IND_SEIZURE_L2);
388 444
         int ins = 0, skip = 0;
445
+        
446
+        // 优化:收集需要更新的ID,批量更新sync_flag
447
+        List<Long> idsToUpdate = new ArrayList<>();
448
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
449
+        
389 450
         for (LedgerRewardApproval row : list) {
390
-            LambdaUpdateWrapper<LedgerRewardApproval> uw = new LambdaUpdateWrapper<>();
391
-            uw.eq(LedgerRewardApproval::getId, row.getId())
392
-                    .set(LedgerRewardApproval::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
393
-            rewardApprovalMapper.update(null, uw);
394 451
             String src = "reward_approval:" + row.getId();
395 452
             if (existsBySrc(src)) { skip++; continue; }
396 453
             if (!hasName(row.getPersonName())) { skip++; continue; }
@@ -405,9 +462,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
405 462
                     null, sv, ZERO,
406 463
                     row.getRewardReason(), src, null,
407 464
                     null,null,null,null,null);
408
-            scoreEventMapper.insert(e);
465
+            eventsToInsert.add(e);
466
+            idsToUpdate.add(row.getId());
409 467
             ins++;
410 468
         }
469
+        
470
+        // 使用通用批量处理方法
471
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
472
+            LambdaUpdateWrapper<LedgerRewardApproval> uw = new LambdaUpdateWrapper<>();
473
+            uw.in(LedgerRewardApproval::getId, subList)
474
+                    .set(LedgerRewardApproval::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
475
+            rewardApprovalMapper.update(null, uw);
476
+        });
477
+        
411 478
         return new SyncResult(ins, skip, "");
412 479
     }
413 480
 
@@ -419,11 +486,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
419 486
         Long dimId = dimCache.get("服务响应能力");
420 487
         ScoreIndicator lv2 = getIndicator(IND_SVCPATROL_L2);
421 488
         int ins = 0, skip = 0;
489
+        
490
+        // 优化:收集需要更新的ID,批量更新sync_flag
491
+        List<Long> idsToUpdate = new ArrayList<>();
492
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
493
+        
422 494
         for (LedgerServicePatrol row : list) {
423
-            LambdaUpdateWrapper<LedgerServicePatrol> uw = new LambdaUpdateWrapper<>();
424
-            uw.eq(LedgerServicePatrol::getId, row.getId())
425
-                    .set(LedgerServicePatrol::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
426
-            servicePatrolMapper.update(null, uw);
427 495
             String src = "service_patrol:" + row.getId();
428 496
             if (existsBySrc(src)) { skip++; continue; }
429 497
             if (!hasName(row.getInspectedName())) { skip++; continue; }
@@ -440,9 +508,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
440 508
                     row.getLocation(), sv, ZERO,
441 509
                     row.getProblemDesc(), src, row.getEvidenceFile(),
442 510
                     row.getChannelId(),row.getChannelNo(),row.getAreaId(),row.getPositionId(),row.getPosition());
443
-            scoreEventMapper.insert(e);
511
+            eventsToInsert.add(e);
512
+            idsToUpdate.add(row.getId());
444 513
             ins++;
445 514
         }
515
+        
516
+        // 使用通用批量处理方法
517
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
518
+            LambdaUpdateWrapper<LedgerServicePatrol> uw = new LambdaUpdateWrapper<>();
519
+            uw.in(LedgerServicePatrol::getId, subList)
520
+                    .set(LedgerServicePatrol::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
521
+            servicePatrolMapper.update(null, uw);
522
+        });
523
+        
446 524
         return new SyncResult(ins, skip, "");
447 525
     }
448 526
 
@@ -454,11 +532,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
454 532
         Long dimId = dimCache.get("服务响应能力");
455 533
         ScoreIndicator lv2 = getIndicator(IND_COMPLAINT_L2);
456 534
         int ins = 0, skip = 0;
535
+        
536
+        // 优化:收集需要更新的ID,批量更新sync_flag
537
+        List<Long> idsToUpdate = new ArrayList<>();
538
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
539
+        
457 540
         for (LedgerComplaint row : list) {
458
-            LambdaUpdateWrapper<LedgerComplaint> uw = new LambdaUpdateWrapper<>();
459
-            uw.eq(LedgerComplaint::getId, row.getId())
460
-                    .set(LedgerComplaint::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
461
-            complaintMapper.update(null, uw);
462 541
             String src = "complaint:" + row.getId();
463 542
             if (existsBySrc(src)) { skip++; continue; }
464 543
             if (!hasName(row.getResponsibleName())) { skip++; continue; }
@@ -468,9 +547,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
468 547
                     null, sv, ZERO,
469 548
                     row.getComplaintDesc(), src, row.getEvidenceFile(),
470 549
                     null,null,null,null,null);
471
-            scoreEventMapper.insert(e);
550
+            eventsToInsert.add(e);
551
+            idsToUpdate.add(row.getId());
472 552
             ins++;
473 553
         }
554
+        
555
+        // 使用通用批量处理方法
556
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
557
+            LambdaUpdateWrapper<LedgerComplaint> uw = new LambdaUpdateWrapper<>();
558
+            uw.in(LedgerComplaint::getId, subList)
559
+                    .set(LedgerComplaint::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
560
+            complaintMapper.update(null, uw);
561
+        });
562
+        
474 563
         return new SyncResult(ins, skip, "");
475 564
     }
476 565
 
@@ -483,11 +572,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
483 572
         ScoreIndicator lv2 = getIndicator(IND_TYPCASE_L2);
484 573
         ScoreIndicator lv3 = getIndicator(IND_TYPCASE_L3);
485 574
         int ins = 0, skip = 0;
575
+        
576
+        // 优化:收集需要更新的ID,批量更新sync_flag
577
+        List<Long> idsToUpdate = new ArrayList<>();
578
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
579
+        
486 580
         for (LedgerTerminalBonus row : list) {
487
-            LambdaUpdateWrapper<LedgerTerminalBonus> uw = new LambdaUpdateWrapper<>();
488
-            uw.eq(LedgerTerminalBonus::getId, row.getId())
489
-                    .set(LedgerTerminalBonus::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
490
-            terminalBonusMapper.update(null, uw);
491 581
             String src = "terminal_bonus:" + row.getId();
492 582
             if (existsBySrc(src)) { skip++; continue; }
493 583
             if (!hasName(row.getPersonName())) { skip++; continue; }
@@ -497,9 +587,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
497 587
                     null, sv, ZERO,
498 588
                     "航站楼加分", src, null,
499 589
                     null,null,null,null,null);
500
-            scoreEventMapper.insert(e);
590
+            eventsToInsert.add(e);
591
+            idsToUpdate.add(row.getId());
501 592
             ins++;
502 593
         }
594
+        
595
+        // 使用通用批量处理方法
596
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
597
+            LambdaUpdateWrapper<LedgerTerminalBonus> uw = new LambdaUpdateWrapper<>();
598
+            uw.in(LedgerTerminalBonus::getId, subList)
599
+                    .set(LedgerTerminalBonus::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
600
+            terminalBonusMapper.update(null, uw);
601
+        });
602
+        
503 603
         return new SyncResult(ins, skip, "");
504 604
     }
505 605
 
@@ -511,11 +611,12 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
511 611
         Long dimId = dimCache.get("业务实操能力");
512 612
         ScoreIndicator lv2 = getIndicator(IND_EXAM_L2);
513 613
         int ins = 0, skip = 0;
614
+        
615
+        // 优化:收集需要更新的ID,批量更新sync_flag
616
+        List<Long> idsToUpdate = new ArrayList<>();
617
+        List<ScoreEvent> eventsToInsert = new ArrayList<>();
618
+        
514 619
         for (LedgerExamScore row : list) {
515
-            LambdaUpdateWrapper<LedgerExamScore> uw = new LambdaUpdateWrapper<>();
516
-            uw.eq(LedgerExamScore::getId, row.getId())
517
-                    .set(LedgerExamScore::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
518
-            examScoreMapper.update(null, uw);
519 620
             String src = "exam_score:" + row.getId();
520 621
             if (existsBySrc(src)) { skip++; continue; }
521 622
             if (!hasName(row.getPersonName())) { skip++; continue; }
@@ -533,9 +634,19 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
533 634
                     null, POS_FIVE, ZERO,
534 635
                     row.getExamCategory() + " " + row.getExamPeriod(), src, null,
535 636
                     null,null,null,null,null);
536
-            scoreEventMapper.insert(e);
637
+            eventsToInsert.add(e);
638
+            idsToUpdate.add(row.getId());
537 639
             ins++;
538 640
         }
641
+        
642
+        // 使用通用批量处理方法
643
+        batchProcess(eventsToInsert, idsToUpdate, subList -> {
644
+            LambdaUpdateWrapper<LedgerExamScore> uw = new LambdaUpdateWrapper<>();
645
+            uw.in(LedgerExamScore::getId, subList)
646
+                    .set(LedgerExamScore::getSyncFlag, LedgerSyncFlagEnum.ONE.getCode());
647
+            examScoreMapper.update(null, uw);
648
+        });
649
+        
539 650
         return new SyncResult(ins, skip, "");
540 651
     }
541 652
 
@@ -543,6 +654,30 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
543 654
     //  Helpers
544 655
     // ═════════════════════════════════════════════════════════
545 656
 
657
+    /**
658
+     * 通用批量处理方法:批量插入ScoreEvent并更新sync_flag
659
+     * 
660
+     * @param eventsToInsert 待插入的事件列表
661
+     * @param idsToUpdate 待更新的ID列表
662
+     * @param updateFunction 更新函数,接收ID子列表,返回更新的SQL执行结果
663
+     */
664
+    private void batchProcess(List<ScoreEvent> eventsToInsert, List<Long> idsToUpdate,
665
+                              java.util.function.Consumer<List<Long>> updateFunction) {
666
+        // 批量插入ScoreEvent(每批500条)
667
+        if (!eventsToInsert.isEmpty()) {
668
+            scoreEventService.saveBatch(eventsToInsert, 500);
669
+        }
670
+
671
+        // 批量更新sync_flag(每批500条),减少锁持有时间
672
+        if (!idsToUpdate.isEmpty()) {
673
+            for (int i = 0; i < idsToUpdate.size(); i += 500) {
674
+                int end = Math.min(i + 500, idsToUpdate.size());
675
+                List<Long> subList = idsToUpdate.subList(i, end);
676
+                updateFunction.accept(subList);
677
+            }
678
+        }
679
+    }
680
+
546 681
     private void loadDimCache() {
547 682
         if (!dimCache.isEmpty()) return;
548 683
         dimensionMapper.selectList(new ScoreDimension()).forEach(d -> dimCache.put(d.getName(), d.getId()));
@@ -667,7 +802,7 @@ public class LedgerSyncServiceImpl implements ILedgerSyncService {
667 802
     }
668 803
 
669 804
     private boolean existsBySrc(String sourceLedger) {
670
-        return scoreEventMapper.selectCount(
805
+        return scoreEventService.count(
671 806
                 new LambdaQueryWrapper<ScoreEvent>()
672 807
                         .eq(ScoreEvent::getSourceLedger, sourceLedger)) > 0;
673 808
     }