fix: [2.5] Set deltalogs for stats task after set segment stating (#39502)

issue: #39333

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-01-22 16:29:06 +08:00 committed by GitHub
parent e46c8ba7fb
commit cbf1161177
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 46 additions and 9 deletions

View File

@ -227,7 +227,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
return true
}
func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{

View File

@ -266,7 +266,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
return true
}
func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{

View File

@ -426,7 +426,7 @@ func (s *taskScheduler) processInit(task Task) bool {
log.Ctx(s.ctx).Info("update task version success", zap.Int64("taskID", task.GetTaskID()))
// 3. assign task to indexNode
success := task.AssignTask(s.ctx, client)
success := task.AssignTask(s.ctx, client, s.meta)
if !success {
log.Ctx(s.ctx).Warn("assign task to client failed", zap.Int64("taskID", task.GetTaskID()),
zap.String("new state", task.GetState().String()), zap.String("fail reason", task.GetFailReason()))

View File

@ -206,8 +206,6 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
PartitionID: segment.GetPartitionID(),
InsertChannel: segment.GetInsertChannel(),
SegmentID: segment.GetID(),
InsertLogs: segment.GetBinlogs(),
DeltaLogs: segment.GetDeltalogs(),
StorageConfig: createStorageConfig(),
Schema: collInfo.Schema,
SubJobType: st.subJobType,
@ -223,7 +221,19 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
return true
}
func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
segment := meta.GetHealthySegment(ctx, st.segmentID)
if segment == nil {
log.Ctx(ctx).Warn("segment is node healthy, skip stats")
// need to set retry and reset compacting
st.SetState(indexpb.JobState_JobStateRetry, "segment is not healthy")
return false
}
// Set InsertLogs and DeltaLogs before execution, and wait for the L0 compaction containing the segment to complete
st.req.InsertLogs = segment.GetBinlogs()
st.req.DeltaLogs = segment.GetDeltalogs()
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{
@ -242,6 +252,7 @@ func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClien
}
log.Ctx(ctx).Info("assign stats task success", zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID))
log.Ctx(ctx).Debug("assign stats task success, print request", zap.Any("req", st.req))
st.SetState(indexpb.JobState_JobStateInProgress, "")
return true
}

View File

@ -377,7 +377,20 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
Reason: "mock error",
}, nil)
s.False(st.AssignTask(context.Background(), in))
mt := &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
st.segmentID: {
SegmentInfo: &datapb.SegmentInfo{
ID: st.segmentID,
State: commonpb.SegmentState_Flushed,
},
},
},
},
}
s.False(st.AssignTask(context.Background(), in, mt))
})
s.Run("assign success", func() {
@ -387,7 +400,20 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
Reason: "",
}, nil)
s.True(st.AssignTask(context.Background(), in))
mt := &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
st.segmentID: {
SegmentInfo: &datapb.SegmentInfo{
ID: st.segmentID,
State: commonpb.SegmentState_Flushed,
},
},
},
},
}
s.True(st.AssignTask(context.Background(), in, mt))
})
})

View File

@ -35,7 +35,7 @@ type Task interface {
GetFailReason() string
UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error
UpdateMetaBuildingState(meta *meta) error
AssignTask(ctx context.Context, client types.IndexNodeClient) bool
AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool
QueryResult(ctx context.Context, client types.IndexNodeClient)
DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool
SetJobInfo(meta *meta) error