fix: load collection stucks if compaction/gc happens (#39701)

issue: #39680
if compaction/gc happens, load collection may stuck due to
SegmentNotFound, we should trigger UpdateNextTarget to get a new data
view to execute loading operation.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-02-11 15:48:50 +08:00 committed by GitHub
parent 85c9f92ff4
commit ff5c680c99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 146 additions and 30 deletions

View File

@ -175,13 +175,19 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
case <-ticker.C:
ob.clean()
loaded := lo.FilterMap(ob.meta.GetAllCollections(ctx), func(collection *meta.Collection, _ int) (int64, bool) {
if collection.GetStatus() == querypb.LoadStatus_Loaded {
return collection.GetCollectionID(), true
collections := ob.meta.GetAllCollections(ctx)
var loadedIDs, loadingIDs []int64
for _, c := range collections {
if c.GetStatus() == querypb.LoadStatus_Loaded {
loadedIDs = append(loadedIDs, c.GetCollectionID())
} else {
loadingIDs = append(loadingIDs, c.GetCollectionID())
}
return 0, false
})
ob.loadedDispatcher.AddTask(loaded...)
}
ob.loadedDispatcher.AddTask(loadedIDs...)
ob.loadingDispatcher.AddTask(loadingIDs...)
case req := <-ob.updateChan:
log.Info("manually trigger update target",

View File

@ -922,7 +922,8 @@ func (scheduler *taskScheduler) remove(task Task) {
if errors.Is(task.Err(), merr.ErrSegmentNotFound) {
log.Info("segment in target has been cleaned, trigger force update next target", zap.Int64("collectionID", task.CollectionID()))
scheduler.targetMgr.UpdateCollectionNextTarget(task.Context(), task.CollectionID())
// Avoid using task.Ctx as it may be canceled before remove is called.
scheduler.targetMgr.UpdateCollectionNextTarget(scheduler.ctx, task.CollectionID())
}
task.Cancel(nil)

View File

@ -1908,6 +1908,39 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID2, coll2))
}
func (suite *TaskSuite) TestRemoveTaskWithError() {
ctx := context.Background()
scheduler := suite.newScheduler()
mockTarget := meta.NewMockTargetManager(suite.T())
mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{
NumOfRows: 100,
})
mockTarget.EXPECT().UpdateCollectionNextTarget(mock.Anything, mock.Anything).Return(nil)
scheduler.targetMgr = mockTarget
coll := int64(1001)
nodeID := int64(1)
// add segment task for collection
task1, err := NewSegmentTask(
ctx,
10*time.Second,
WrapIDSource(0),
coll,
suite.replica,
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", 1, querypb.DataScope_Historical),
)
suite.NoError(err)
err = scheduler.Add(task1)
suite.NoError(err)
task1.Fail(merr.ErrSegmentNotFound)
// when try to remove task with ErrSegmentNotFound, should trigger UpdateNextTarget
scheduler.remove(task1)
mockTarget.AssertExpectations(suite.T())
}
func TestTask(t *testing.T) {
suite.Run(t, new(TaskSuite))
}

View File

@ -971,6 +971,61 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_OnLoadingCollection() {
s.releaseCollection(dbName, collectionName)
}
func (s *LoadTestSuite) TestLoadWithCompact() {
ctx := context.Background()
collName := "test_load_with_compact"
// Create collection with configuration
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
})
s.releaseCollection(dbName, collName)
stopInsertCh := make(chan struct{}, 1)
// Start a goroutine to continuously insert data and trigger compaction
go func() {
for {
select {
case <-stopInsertCh:
return
default:
s.InsertAndFlush(ctx, dbName, collName, 2000, dim)
_, err := s.Cluster.Proxy.ManualCompaction(ctx, &milvuspb.ManualCompactionRequest{
CollectionName: collName,
})
s.NoError(err)
time.Sleep(time.Second)
}
}
}()
time.Sleep(10 * time.Second)
// Load the collection while data is being inserted and compacted
s.loadCollection(collName, dbName, 1, nil)
// Verify the collection is loaded
s.Eventually(func() bool {
resp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
CollectionNames: []string{collName},
Type: milvuspb.ShowType_InMemory,
})
s.NoError(err)
return len(resp.InMemoryPercentages) == 1 && resp.InMemoryPercentages[0] == 100
}, 30*time.Second, 1*time.Second)
// Clean up
close(stopInsertCh)
s.releaseCollection(dbName, collName)
}
func TestReplicas(t *testing.T) {
suite.Run(t, new(LoadTestSuite))
}

View File

@ -28,6 +28,49 @@ type CreateCollectionConfig struct {
ResourceGroups []string
}
func (s *MiniClusterSuite) InsertAndFlush(ctx context.Context, dbName, collectionName string, rowNum, dim int) error {
fVecColumn := NewFloatVectorFieldData(FloatVecField, rowNum, dim)
hashKeys := GenerateHashKeys(rowNum)
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
if err != nil {
return err
}
if !merr.Ok(insertResult.Status) {
return merr.Error(insertResult.Status)
}
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
if err != nil {
return err
}
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
if !has || segmentIDs == nil {
return merr.Error(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_IllegalArgument,
Reason: "failed to get segment IDs",
})
}
ids := segmentIDs.GetData()
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
if !has {
return merr.Error(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_IllegalArgument,
Reason: "failed to get flush timestamp",
})
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
return nil
}
func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) {
schema := ConstructSchema(cfg.CollectionName, cfg.Dim, true)
marshaledSchema, err := proto.Marshal(schema)
@ -60,30 +103,8 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
for i := 0; i < cfg.SegmentNum; i++ {
fVecColumn := NewFloatVectorFieldData(FloatVecField, cfg.RowNumPerSegment, cfg.Dim)
hashKeys := GenerateHashKeys(cfg.RowNumPerSegment)
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: cfg.DBName,
CollectionName: cfg.CollectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(cfg.RowNumPerSegment),
})
err = s.InsertAndFlush(ctx, cfg.DBName, cfg.CollectionName, cfg.RowNumPerSegment, cfg.Dim)
s.NoError(err)
s.True(merr.Ok(insertResult.Status))
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: cfg.DBName,
CollectionNames: []string{cfg.CollectionName},
})
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[cfg.CollectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[cfg.CollectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, cfg.DBName, cfg.CollectionName)
}
// create index