diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 1b536c6295..8375043787 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -865,7 +865,7 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen } for _, s := range segments { - m.segments.DropSegment(s.GetID()) + m.segments.SetSegment(s.GetID(), s) } // Handle empty segment generated by merge-compaction diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ebec2e1f2e..dab6e7506f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -902,7 +902,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { c.CallRemoveIndexService = func(ctx context.Context, buildIDs []UniqueID) (retErr error) { defer func() { if err := recover(); err != nil { - retErr = fmt.Errorf("get index state from index service panic, msg = %v", err) + retErr = fmt.Errorf("remove index from index service panic, msg = %v", err) } }() <-initCh @@ -2242,7 +2242,7 @@ func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.In } // SegmentFlushCompleted check whether segment flush has completed -func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { +func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (status *commonpb.Status, err error) { if code, ok := c.checkHealthy(); !ok { return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } @@ -2252,7 +2252,23 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus log.Info("SegmentFlushCompleted received", zap.Int64("msgID", in.Base.MsgID), zap.Int64("collID", in.Segment.CollectionID), zap.Int64("partID", in.Segment.PartitionID), zap.Int64("segID", in.Segment.ID), zap.Int64s("compactFrom", in.Segment.CompactionFrom)) - err := c.createIndexForSegment(ctx, in.Segment.CollectionID, in.Segment.PartitionID, in.Segment.ID, in.Segment.NumOfRows, in.Segment.Binlogs) + // acquire reference lock before building index + if in.Segment.CreatedByCompaction { + log.Debug("try to acquire segment reference lock", zap.Int64("task id", in.Base.MsgID), zap.Int64s("segmentIDs", in.Segment.CompactionFrom)) + if err := c.CallAddSegRefLock(ctx, in.Base.MsgID, in.Segment.CompactionFrom); err != nil { + log.Warn("acquire segment reference lock failed", zap.Int64("task id", in.Base.MsgID), zap.Int64s("segmentIDs", in.Segment.CompactionFrom)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "AcquireSegRefLock failed: "+err.Error()), nil + } + defer func() { + if err := c.CallReleaseSegRefLock(ctx, in.Base.MsgID, in.Segment.CompactionFrom); err != nil { + log.Warn("release segment reference lock failed", zap.Int64("task id", in.Base.MsgID), zap.Int64s("segmentIDs", in.Segment.CompactionFrom)) + // panic to let ref manager detect release failure + panic(err) + } + }() + } + + err = c.createIndexForSegment(ctx, in.Segment.CollectionID, in.Segment.PartitionID, in.Segment.ID, in.Segment.NumOfRows, in.Segment.Binlogs) if err != nil { log.Error("createIndexForSegment", zap.Int64("msgID", in.Base.MsgID), zap.Int64("collID", in.Segment.CollectionID), zap.Int64("partID", in.Segment.PartitionID), zap.Int64("segID", in.Segment.ID), zap.Error(err)) @@ -2275,6 +2291,7 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus zap.Int64s("compactFrom", in.Segment.CompactionFrom), zap.Error(err)) return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil } + log.Debug("SegmentFlushCompleted success", zap.String("role", typeutil.RootCoordRole), zap.Int64("collection id", in.Segment.CollectionID), zap.Int64("partition id", in.Segment.PartitionID), zap.Int64("segment id", in.Segment.ID), zap.Int64("msgID", in.Base.MsgID)) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 5594aab23b..5dec5b02f0 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -381,6 +381,15 @@ func (idx *indexMock) DropIndex(ctx context.Context, req *indexpb.DropIndexReque }, nil } +func (idx *indexMock) RemoveIndex(ctx context.Context, req *indexpb.RemoveIndexRequest) (*commonpb.Status, error) { + idx.mutex.Lock() + defer idx.mutex.Unlock() + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, nil +} + func (idx *indexMock) getFileArray() []string { idx.mutex.Lock() defer idx.mutex.Unlock() @@ -1443,6 +1452,46 @@ func TestRootCoord_Base(t *testing.T) { assert.Equal(t, Params.CommonCfg.DefaultIndexName, rsp.IndexDescriptions[0].IndexName) }) + t.Run("flush segment from compaction", func(t *testing.T) { + coll, err := core.MetaTable.GetCollectionByName(collName, 0) + assert.NoError(t, err) + partID := coll.PartitionIDs[1] + + flushMsg := datapb.SegmentFlushCompletedMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentFlushDone, + }, + Segment: &datapb.SegmentInfo{ + ID: segID + 1, + CollectionID: coll.ID, + PartitionID: partID, + CompactionFrom: []int64{segID}, + CreatedByCompaction: true, + }, + } + st, err := core.SegmentFlushCompleted(ctx, &flushMsg) + assert.NoError(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_Success) + + req := &milvuspb.DescribeIndexRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeIndex, + MsgID: 210, + Timestamp: 210, + SourceID: 210, + }, + DbName: "", + CollectionName: collName, + FieldName: "vector", + IndexName: "", + } + rsp, err := core.DescribeIndex(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode) + assert.Equal(t, 1, len(rsp.IndexDescriptions)) + assert.Equal(t, Params.CommonCfg.DefaultIndexName, rsp.IndexDescriptions[0].IndexName) + }) + wg.Add(1) t.Run("import", func(t *testing.T) { defer wg.Done()