mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: Fix empty import task result (#38316)
Ensure the idempotency of import tasks to prevent duplicate tasks in DataNode. issue: https://github.com/milvus-io/milvus/issues/38313 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
37a52286b1
commit
43e0e2b7ed
@ -156,7 +156,7 @@ func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, m
|
||||
defer cancel()
|
||||
for size > 0 {
|
||||
segmentInfo, err := AllocImportSegment(ctx, alloc, meta,
|
||||
task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
|
||||
task.GetJobID(), task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -180,8 +180,8 @@ func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, m
|
||||
func AllocImportSegment(ctx context.Context,
|
||||
alloc allocator.Allocator,
|
||||
meta *meta,
|
||||
taskID int64, collectionID UniqueID,
|
||||
partitionID UniqueID,
|
||||
jobID int64, taskID int64,
|
||||
collectionID UniqueID, partitionID UniqueID,
|
||||
channelName string,
|
||||
level datapb.SegmentLevel,
|
||||
) (*SegmentInfo, error) {
|
||||
@ -221,6 +221,7 @@ func AllocImportSegment(ctx context.Context,
|
||||
return nil, err
|
||||
}
|
||||
log.Info("add import segment done",
|
||||
zap.Int64("jobID", jobID),
|
||||
zap.Int64("taskID", taskID),
|
||||
zap.Int64("collectionID", segmentInfo.CollectionID),
|
||||
zap.Int64("segmentID", segmentInfo.ID),
|
||||
|
||||
@ -18,6 +18,8 @@ package importv2
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
type TaskManager interface {
|
||||
@ -42,6 +44,10 @@ func NewTaskManager() TaskManager {
|
||||
func (m *taskManager) Add(task Task) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if _, ok := m.tasks[task.GetTaskID()]; ok {
|
||||
log.Warn("duplicated task", WrapLogFields(task)...)
|
||||
return
|
||||
}
|
||||
m.tasks[task.GetTaskID()] = task
|
||||
}
|
||||
|
||||
|
||||
@ -65,6 +65,13 @@ func TestImportManager(t *testing.T) {
|
||||
assert.Equal(t, 1, len(tasks))
|
||||
assert.Equal(t, task2.GetTaskID(), tasks[0].GetTaskID())
|
||||
|
||||
// check idempotency
|
||||
manager.Add(task2)
|
||||
tasks = manager.GetBy(WithStates(datapb.ImportTaskStateV2_Completed))
|
||||
assert.Equal(t, 1, len(tasks))
|
||||
assert.Equal(t, task2.GetTaskID(), tasks[0].GetTaskID())
|
||||
assert.True(t, task2 == tasks[0])
|
||||
|
||||
manager.Update(task1.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed))
|
||||
task := manager.Get(task1.GetTaskID())
|
||||
assert.Equal(t, datapb.ImportTaskStateV2_Failed, task.GetState())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user