From 4f6122dd4a641d67c38d07175bb6ed8be500fc89 Mon Sep 17 00:00:00 2001 From: MrPresent-Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Mon, 18 Sep 2023 09:53:22 +0800 Subject: [PATCH] enable ctx traceId for assignsegment on dc(#26972) (#27108) Signed-off-by: MrPresent-Han --- internal/datacoord/compaction_test.go | 4 +- internal/datacoord/garbage_collector_test.go | 4 +- internal/datacoord/meta.go | 10 +- internal/datacoord/meta_test.go | 36 ++++-- internal/datacoord/segment_manager.go | 13 +- internal/datacoord/segment_manager_test.go | 16 ++- internal/datacoord/server_test.go | 124 +++++++++---------- internal/datacoord/services_test.go | 28 ++--- 8 files changed, 132 insertions(+), 103 deletions(-) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index f4cbc3a326..8e62389e51 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -600,8 +600,8 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { }, } - meta.AddSegment(NewSegmentInfo(seg1)) - meta.AddSegment(NewSegmentInfo(seg2)) + meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) + meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) segments := meta.GetAllSegmentsUnsafe() assert.Equal(t, len(segments), 2) diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 97847bb190..eb62d6d597 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -152,7 +152,7 @@ func Test_garbageCollector_scan(t *testing.T) { segment.Binlogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, inserts[0])} segment.Statslogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, stats[0])} segment.Deltalogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, delta[0])} - err = meta.AddSegment(segment) + err = meta.AddSegment(context.TODO(), segment) require.NoError(t, err) gc := newGarbageCollector(meta, newMockHandler(), GcOption{ @@ -180,7 +180,7 @@ func Test_garbageCollector_scan(t *testing.T) { segment.Statslogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, stats[0])} segment.Deltalogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, delta[0])} - err = meta.AddSegment(segment) + err = meta.AddSegment(context.TODO(), segment) require.NoError(t, err) gc := newGarbageCollector(meta, newMockHandler(), GcOption{ diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index a75f5f78cf..7e5d6ff120 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/samber/lo" @@ -297,8 +298,9 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) { } // AddSegment records segment info, persisting info into kv store -func (m *meta) AddSegment(segment *SegmentInfo) error { - log.Debug("meta update: adding segment", zap.Int64("segmentID", segment.GetID())) +func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error { + log := log.Ctx(ctx) + log.Info("meta update: adding segment - Start", zap.Int64("segmentID", segment.GetID())) m.Lock() defer m.Unlock() if err := m.catalog.AddSegment(m.ctx, segment.SegmentInfo); err != nil { @@ -923,8 +925,8 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { curSegInfo := m.segments.GetSegment(segmentID) if curSegInfo == nil { // TODO: Error handling. - log.Warn("meta update: add allocation failed - segment not found", zap.Int64("segmentID", segmentID)) - return nil + log.Error("meta update: add allocation failed - segment not found", zap.Int64("segmentID", segmentID)) + return errors.New("meta update: add allocation failed - segment not found") } // As we use global segment lastExpire to guarantee data correctness after restart // there is no need to persist allocation to meta store, only update allocation in-memory meta. diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 6f6b016333..636ac18a6f 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -243,11 +243,11 @@ func TestMeta_Basic(t *testing.T) { segInfo1_1 := buildSegment(collID, partID1, segID1_1, channelName, false) // check AddSegment - err = meta.AddSegment(segInfo0_0) + err = meta.AddSegment(context.TODO(), segInfo0_0) assert.NoError(t, err) - err = meta.AddSegment(segInfo1_0) + err = meta.AddSegment(context.TODO(), segInfo1_0) assert.NoError(t, err) - err = meta.AddSegment(segInfo1_1) + err = meta.AddSegment(context.TODO(), segInfo1_1) assert.NoError(t, err) // check GetSegment @@ -325,7 +325,7 @@ func TestMeta_Basic(t *testing.T) { meta, err := newMeta(context.TODO(), catalog, nil) assert.NoError(t, err) - err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{})) + err = meta.AddSegment(context.TODO(), NewSegmentInfo(&datapb.SegmentInfo{})) assert.Error(t, err) metakv2 := mockkv.NewMetaKv(t) @@ -342,7 +342,7 @@ func TestMeta_Basic(t *testing.T) { err = meta.DropSegment(0) assert.NoError(t, err) // nil, since Save error not injected - err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{})) + err = meta.AddSegment(context.TODO(), NewSegmentInfo(&datapb.SegmentInfo{})) assert.NoError(t, err) // error injected err = meta.DropSegment(0) @@ -366,7 +366,7 @@ func TestMeta_Basic(t *testing.T) { assert.NoError(t, err) segInfo0 := buildSegment(collID, partID0, segID0, channelName, false) segInfo0.NumOfRows = rowCount0 - err = meta.AddSegment(segInfo0) + err = meta.AddSegment(context.TODO(), segInfo0) assert.NoError(t, err) // add seg2 with 300 rows @@ -374,7 +374,7 @@ func TestMeta_Basic(t *testing.T) { assert.NoError(t, err) segInfo1 := buildSegment(collID, partID0, segID1, channelName, false) segInfo1.NumOfRows = rowCount1 - err = meta.AddSegment(segInfo1) + err = meta.AddSegment(context.TODO(), segInfo1) assert.NoError(t, err) // check partition/collection statistics @@ -432,7 +432,7 @@ func TestMeta_Basic(t *testing.T) { assert.NoError(t, err) segInfo0 := buildSegment(collID, partID0, segID0, channelName, false) segInfo0.size.Store(size0) - err = meta.AddSegment(segInfo0) + err = meta.AddSegment(context.TODO(), segInfo0) assert.NoError(t, err) // add seg1 with size1 @@ -440,7 +440,7 @@ func TestMeta_Basic(t *testing.T) { assert.NoError(t, err) segInfo1 := buildSegment(collID, partID0, segID1, channelName, false) segInfo1.size.Store(size1) - err = meta.AddSegment(segInfo1) + err = meta.AddSegment(context.TODO(), segInfo1) assert.NoError(t, err) // check TotalBinlogSize @@ -449,6 +449,16 @@ func TestMeta_Basic(t *testing.T) { assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID]) assert.Equal(t, int64(size0+size1), total) }) + + t.Run("Test AddAllocation", func(t *testing.T) { + meta, _ := newMemoryMeta() + err := meta.AddAllocation(1, &Allocation{ + SegmentID: 1, + NumOfRows: 1, + ExpireTime: 0, + }) + assert.Error(t, err) + }) } func TestGetUnFlushedSegments(t *testing.T) { @@ -460,7 +470,7 @@ func TestGetUnFlushedSegments(t *testing.T) { PartitionID: 0, State: commonpb.SegmentState_Growing, } - err = meta.AddSegment(NewSegmentInfo(s1)) + err = meta.AddSegment(context.TODO(), NewSegmentInfo(s1)) assert.NoError(t, err) s2 := &datapb.SegmentInfo{ ID: 1, @@ -468,7 +478,7 @@ func TestGetUnFlushedSegments(t *testing.T) { PartitionID: 0, State: commonpb.SegmentState_Flushed, } - err = meta.AddSegment(NewSegmentInfo(s2)) + err = meta.AddSegment(context.TODO(), NewSegmentInfo(s2)) assert.NoError(t, err) segments := meta.GetUnFlushedSegments() @@ -486,7 +496,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog0", 1))}, Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog0", 1))}}} - err = meta.AddSegment(segment1) + err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))}, @@ -529,7 +539,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { assert.NoError(t, err) segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}} - err = meta.AddSegment(segment1) + err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) err = meta.UpdateFlushSegmentsInfo(1, false, false, false, nil, nil, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}}, diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 0d87fc2113..90aeb21cdb 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -258,7 +258,11 @@ func (s *SegmentManager) maybeResetLastExpireForSegments() error { // AllocSegment allocate segment per request collcation, partication, channel and rows func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) { - + log := log.Ctx(ctx). + With(zap.Int64("collectionID", collectionID)). + With(zap.Int64("partitionID", partitionID)). + With(zap.String("channelName", channelName)). + With(zap.Int64("requestRows", requestRows)) _, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Alloc-Segment") defer sp.End() s.mu.Lock() @@ -269,7 +273,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID for _, segmentID := range s.segments { segment := s.meta.GetHealthySegment(segmentID) if segment == nil { - log.Warn("Failed to get seginfo from meta", zap.Int64("id", segmentID)) + log.Warn("Failed to get segment info from meta", zap.Int64("id", segmentID)) continue } if !satisfy(segment, collectionID, partitionID, channelName) || !isGrowing(segment) { @@ -294,6 +298,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID for _, allocation := range newSegmentAllocations { segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName, commonpb.SegmentState_Growing) if err != nil { + log.Error("Failed to open new segment for segment allocation") return nil, err } allocation.ExpireTime = expireTs @@ -306,6 +311,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID for _, allocation := range existedSegmentAllocations { allocation.ExpireTime = expireTs if err := s.meta.AddAllocation(allocation.SegmentID, allocation); err != nil { + log.Error("Failed to add allocation to existed segment", zap.Int64("segmentID", allocation.SegmentID)) return nil, err } } @@ -370,6 +376,7 @@ func (s *SegmentManager) genExpireTs(ctx context.Context, isImported bool) (Time func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, segmentState commonpb.SegmentState) (*SegmentInfo, error) { + log := log.Ctx(ctx) ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment") defer sp.End() id, err := s.allocator.allocID(ctx) @@ -397,7 +404,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique segmentInfo.IsImporting = true } segment := NewSegmentInfo(segmentInfo) - if err := s.meta.AddSegment(segment); err != nil { + if err := s.meta.AddSegment(ctx, segment); err != nil { log.Error("failed to add segment to DataCoord", zap.Error(err)) return nil, err } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 592aa21ac6..ec5fc66859 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -170,10 +170,20 @@ func TestLastExpireReset(t *testing.T) { collID, err := mockAllocator.allocID(ctx) assert.Nil(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) + initSegment := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + }, + } + meta.AddSegment(context.TODO(), initSegment) //assign segments, set max segment to only 1MB, equalling to 10485 rows var bigRows, smallRows int64 = 10000, 1000 segmentManager, _ := newSegmentManager(meta, mockAllocator) + initSegment.SegmentInfo.State = commonpb.SegmentState_Dropped + meta.segments.SetSegment(1, initSegment) allocs, _ := segmentManager.AllocSegment(context.Background(), collID, 0, channelName, bigRows) segmentID1, expire1 := allocs[0].SegmentID, allocs[0].ExpireTime time.Sleep(100 * time.Millisecond) @@ -308,11 +318,11 @@ func TestLoadSegmentsFromMeta(t *testing.T) { MaxRowNum: 100, LastExpireTime: 1000, } - err = meta.AddSegment(NewSegmentInfo(sealedSegment)) + err = meta.AddSegment(context.TODO(), NewSegmentInfo(sealedSegment)) assert.NoError(t, err) - err = meta.AddSegment(NewSegmentInfo(growingSegment)) + err = meta.AddSegment(context.TODO(), NewSegmentInfo(growingSegment)) assert.NoError(t, err) - err = meta.AddSegment(NewSegmentInfo(flushedSegment)) + err = meta.AddSegment(context.TODO(), NewSegmentInfo(flushedSegment)) assert.NoError(t, err) segmentManager, _ := newSegmentManager(meta, mockAllocator) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index dc47fc7b25..623577c408 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -389,7 +389,7 @@ func TestGetSegmentStates(t *testing.T) { Timestamp: 0, }, } - err := svr.meta.AddSegment(NewSegmentInfo(segment)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) cases := []struct { @@ -463,7 +463,7 @@ func TestGetInsertBinlogPaths(t *testing.T) { }, State: commonpb.SegmentState_Growing, } - err := svr.meta.AddSegment(NewSegmentInfo(info)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(info)) assert.NoError(t, err) req := &datapb.GetInsertBinlogPathsRequest{ SegmentID: 0, @@ -495,7 +495,7 @@ func TestGetInsertBinlogPaths(t *testing.T) { State: commonpb.SegmentState_Growing, } - err := svr.meta.AddSegment(NewSegmentInfo(info)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(info)) assert.NoError(t, err) req := &datapb.GetInsertBinlogPathsRequest{ SegmentID: 1, @@ -595,7 +595,7 @@ func TestGetSegmentInfo(t *testing.T) { }, }, } - err := svr.meta.AddSegment(NewSegmentInfo(segInfo)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)) assert.NoError(t, err) req := &datapb.GetSegmentInfoRequest{ @@ -616,7 +616,7 @@ func TestGetSegmentInfo(t *testing.T) { ID: 0, State: commonpb.SegmentState_Flushed, } - err := svr.meta.AddSegment(NewSegmentInfo(segInfo)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)) assert.NoError(t, err) req := &datapb.GetSegmentInfoRequest{ @@ -644,7 +644,7 @@ func TestGetSegmentInfo(t *testing.T) { ID: 0, State: commonpb.SegmentState_Dropped, } - err := svr.meta.AddSegment(NewSegmentInfo(segInfo)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)) assert.NoError(t, err) req := &datapb.GetSegmentInfoRequest{ @@ -681,7 +681,7 @@ func TestGetSegmentInfo(t *testing.T) { ID: 0, State: commonpb.SegmentState_Flushed, } - err := svr.meta.AddSegment(NewSegmentInfo(segInfo)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)) assert.NoError(t, err) req := &datapb.GetSegmentInfoRequest{ @@ -705,7 +705,7 @@ func TestGetSegmentInfo(t *testing.T) { segInfo.InsertChannel = mockVChannel segInfo.ID = 2 req.SegmentIDs = []int64{2} - err = svr.meta.AddSegment(NewSegmentInfo(segInfo)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)) assert.NoError(t, err) resp, err = svr.GetSegmentInfo(svr.ctx, req) assert.NoError(t, err) @@ -798,7 +798,7 @@ func TestGetFlushedSegments(t *testing.T) { PartitionID: tc.partID, State: commonpb.SegmentState_Flushed, } - assert.Nil(t, svr.meta.AddSegment(NewSegmentInfo(segInfo))) + assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo))) } for _, us := range tc.unflushedSegments { segInfo := &datapb.SegmentInfo{ @@ -807,7 +807,7 @@ func TestGetFlushedSegments(t *testing.T) { PartitionID: tc.partID, State: commonpb.SegmentState_Growing, } - assert.Nil(t, svr.meta.AddSegment(NewSegmentInfo(segInfo))) + assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo))) } resp, err := svr.GetFlushedSegments(context.Background(), &datapb.GetFlushedSegmentsRequest{ @@ -893,7 +893,7 @@ func TestGetSegmentsByStates(t *testing.T) { PartitionID: tc.partID, State: commonpb.SegmentState_Flushed, } - assert.Nil(t, svr.meta.AddSegment(NewSegmentInfo(segInfo))) + assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo))) } for _, us := range tc.sealedSegments { segInfo := &datapb.SegmentInfo{ @@ -902,7 +902,7 @@ func TestGetSegmentsByStates(t *testing.T) { PartitionID: tc.partID, State: commonpb.SegmentState_Sealed, } - assert.Nil(t, svr.meta.AddSegment(NewSegmentInfo(segInfo))) + assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo))) } for _, us := range tc.growingSegments { segInfo := &datapb.SegmentInfo{ @@ -911,7 +911,7 @@ func TestGetSegmentsByStates(t *testing.T) { PartitionID: tc.partID, State: commonpb.SegmentState_Growing, } - assert.Nil(t, svr.meta.AddSegment(NewSegmentInfo(segInfo))) + assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo))) } resp, err := svr.GetSegmentsByStates(context.Background(), &datapb.GetSegmentsByStatesRequest{ @@ -1276,7 +1276,7 @@ func TestSaveBinlogPaths(t *testing.T) { InsertChannel: "ch1", State: commonpb.SegmentState_Growing, } - err := svr.meta.AddSegment(NewSegmentInfo(s)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) assert.NoError(t, err) } @@ -1363,7 +1363,7 @@ func TestSaveBinlogPaths(t *testing.T) { InsertChannel: "ch1", State: commonpb.SegmentState_Dropped, } - err := svr.meta.AddSegment(NewSegmentInfo(s)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) assert.NoError(t, err) } @@ -1441,7 +1441,7 @@ func TestSaveBinlogPaths(t *testing.T) { InsertChannel: "ch1", State: commonpb.SegmentState_NotExist, } - err := svr.meta.AddSegment(NewSegmentInfo(s)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) assert.NoError(t, err) } @@ -1556,7 +1556,7 @@ func TestSaveBinlogPaths(t *testing.T) { InsertChannel: "ch2", State: commonpb.SegmentState_Growing, } - svr.meta.AddSegment(NewSegmentInfo(s)) + svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{ SegmentID: 1, @@ -1651,7 +1651,7 @@ func TestDropVirtualChannel(t *testing.T) { {FieldID: 1}, } } - err := svr.meta.AddSegment(NewSegmentInfo(s)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) assert.NoError(t, err) } // add non matched segments @@ -1663,7 +1663,7 @@ func TestDropVirtualChannel(t *testing.T) { State: commonpb.SegmentState_Growing, } - svr.meta.AddSegment(NewSegmentInfo(os)) + svr.meta.AddSegment(context.TODO(), NewSegmentInfo(os)) err := svr.channelManager.AddNode(0) require.Nil(t, err) @@ -1848,7 +1848,7 @@ func TestGetChannelSeekPosition(t *testing.T) { DmlPosition: segPos, InsertChannel: "ch1", } - err := svr.meta.AddSegment(NewSegmentInfo(seg)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg)) assert.NoError(t, err) } if test.channelCP != nil { @@ -1924,7 +1924,7 @@ func TestGetDataVChanPositions(t *testing.T) { MsgID: []byte{1, 2, 3}, }, } - err := svr.meta.AddSegment(NewSegmentInfo(s1)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s1)) require.Nil(t, err) s2 := &datapb.SegmentInfo{ ID: 2, @@ -1942,7 +1942,7 @@ func TestGetDataVChanPositions(t *testing.T) { Timestamp: 1, }, } - err = svr.meta.AddSegment(NewSegmentInfo(s2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s2)) require.Nil(t, err) s3 := &datapb.SegmentInfo{ ID: 3, @@ -1960,7 +1960,7 @@ func TestGetDataVChanPositions(t *testing.T) { Timestamp: 2, }, } - err = svr.meta.AddSegment(NewSegmentInfo(s3)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s3)) require.Nil(t, err) t.Run("get unexisted channel", func(t *testing.T) { @@ -2051,7 +2051,7 @@ func TestGetQueryVChanPositions(t *testing.T) { }, NumOfRows: 2048, } - err = svr.meta.AddSegment(NewSegmentInfo(s1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s1)) assert.NoError(t, err) err = svr.meta.AddSegmentIndex(&model.SegmentIndex{ SegmentID: 1, @@ -2082,7 +2082,7 @@ func TestGetQueryVChanPositions(t *testing.T) { Timestamp: 1, }, } - err = svr.meta.AddSegment(NewSegmentInfo(s2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s2)) assert.NoError(t, err) s3 := &datapb.SegmentInfo{ ID: 3, @@ -2102,7 +2102,7 @@ func TestGetQueryVChanPositions(t *testing.T) { Timestamp: 2, }, } - err = svr.meta.AddSegment(NewSegmentInfo(s3)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s3)) assert.NoError(t, err) //mockResp := &indexpb.GetIndexInfoResponse{ // Status: &commonpb.Status{}, @@ -2191,7 +2191,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { }, CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed } - err = svr.meta.AddSegment(NewSegmentInfo(c)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) assert.NoError(t, err) d := &datapb.SegmentInfo{ ID: 2, @@ -2206,7 +2206,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Timestamp: 1, }, } - err = svr.meta.AddSegment(NewSegmentInfo(d)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) assert.NoError(t, err) e := &datapb.SegmentInfo{ ID: 3, @@ -2224,7 +2224,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { NumOfRows: 2048, } - err = svr.meta.AddSegment(NewSegmentInfo(e)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) assert.NoError(t, err) vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds)) @@ -2260,7 +2260,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Timestamp: 1, }, } - err = svr.meta.AddSegment(NewSegmentInfo(a)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(a)) assert.NoError(t, err) c := &datapb.SegmentInfo{ @@ -2277,7 +2277,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { }, CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed } - err = svr.meta.AddSegment(NewSegmentInfo(c)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) assert.NoError(t, err) d := &datapb.SegmentInfo{ ID: 2, @@ -2292,7 +2292,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Timestamp: 1, }, } - err = svr.meta.AddSegment(NewSegmentInfo(d)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) assert.NoError(t, err) e := &datapb.SegmentInfo{ ID: 3, @@ -2310,7 +2310,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { NumOfRows: 2048, } - err = svr.meta.AddSegment(NewSegmentInfo(e)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) assert.NoError(t, err) vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID) assert.EqualValues(t, 2, len(vchan.FlushedSegmentIds)) @@ -2347,7 +2347,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { }, CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed } - err = svr.meta.AddSegment(NewSegmentInfo(c)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(c)) assert.NoError(t, err) d := &datapb.SegmentInfo{ ID: 2, @@ -2362,7 +2362,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { Timestamp: 1, }, } - err = svr.meta.AddSegment(NewSegmentInfo(d)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d)) assert.NoError(t, err) err = svr.meta.AddSegmentIndex(&model.SegmentIndex{ SegmentID: 2, @@ -2390,7 +2390,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { CompactionFrom: []int64{1, 2}, // c, d NumOfRows: 2048, } - err = svr.meta.AddSegment(NewSegmentInfo(e)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e)) assert.NoError(t, err) err = svr.meta.AddSegmentIndex(&model.SegmentIndex{ SegmentID: 3, @@ -2617,9 +2617,9 @@ func TestGetRecoveryInfo(t *testing.T) { }, }, } - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) err = svr.meta.AddSegmentIndex(&model.SegmentIndex{ SegmentID: seg1.ID, @@ -2714,9 +2714,9 @@ func TestGetRecoveryInfo(t *testing.T) { }, }, } - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) //svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(nil, nil) @@ -2788,7 +2788,7 @@ func TestGetRecoveryInfo(t *testing.T) { }, } segment := createSegment(0, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Flushed) - err := svr.meta.AddSegment(NewSegmentInfo(segment)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) err = svr.meta.CreateIndex(&model.Index{ @@ -2856,9 +2856,9 @@ func TestGetRecoveryInfo(t *testing.T) { seg1 := createSegment(7, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) seg2 := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) req := &datapb.GetRecoveryInfoRequest{ @@ -2899,9 +2899,9 @@ func TestGetRecoveryInfo(t *testing.T) { seg1 := createSegment(7, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) seg2 := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) seg2.IsFake = true - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) req := &datapb.GetRecoveryInfoRequest{ @@ -2944,15 +2944,15 @@ func TestGetRecoveryInfo(t *testing.T) { seg4 := createSegment(12, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped) seg5 := createSegment(13, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Flushed) seg5.CompactionFrom = []int64{11, 12} - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg3)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg3)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg4)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg5)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) assert.NoError(t, err) err = svr.meta.CreateIndex(&model.Index{ TenantID: "", @@ -3359,7 +3359,7 @@ func TestPostFlush(t *testing.T) { defer closeTestServer(t, svr) svr.rootCoordClient = &rootCoordSegFlushComplete{flag: true} - err := svr.meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{ + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(&datapb.SegmentInfo{ ID: 1, CollectionID: 1, PartitionID: 1, @@ -3380,14 +3380,14 @@ func TestGetFlushState(t *testing.T) { svr := newTestServerWithMeta(t, nil, meta) defer closeTestServer(t, svr) - err = meta.AddSegment(&SegmentInfo{ + err = meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushed, }, }) assert.NoError(t, err) - err = meta.AddSegment(&SegmentInfo{ + err = meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 2, State: commonpb.SegmentState_Flushed, @@ -3428,14 +3428,14 @@ func TestGetFlushState(t *testing.T) { svr := newTestServerWithMeta(t, nil, meta) defer closeTestServer(t, svr) - err = meta.AddSegment(&SegmentInfo{ + err = meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushed, }, }) assert.NoError(t, err) - err = meta.AddSegment(&SegmentInfo{ + err = meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 2, State: commonpb.SegmentState_Sealed, @@ -3476,14 +3476,14 @@ func TestGetFlushState(t *testing.T) { svr := newTestServerWithMeta(t, nil, meta) defer closeTestServer(t, svr) - err = meta.AddSegment(&SegmentInfo{ + err = meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushed, }, }) assert.NoError(t, err) - err = meta.AddSegment(&SegmentInfo{ + err = meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 2, State: commonpb.SegmentState_Dropped, @@ -3804,7 +3804,7 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) { Timestamp: 0, }, } - err := svr.meta.AddSegment(NewSegmentInfo(segment)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) // Set segment state. svr.SetSegmentState(context.TODO(), &datapb.SetSegmentStateRequest{ @@ -3983,7 +3983,7 @@ func TestDataCoord_SegmentStatistics(t *testing.T) { } info := NewSegmentInfo(seg1) - svr.meta.AddSegment(info) + svr.meta.AddSegment(context.TODO(), info) status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{ Stats: []*commonpb.SegmentStats{{ @@ -4010,7 +4010,7 @@ func TestDataCoord_SegmentStatistics(t *testing.T) { } info := NewSegmentInfo(seg1) - svr.meta.AddSegment(info) + svr.meta.AddSegment(context.TODO(), info) status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{ Stats: []*commonpb.SegmentStats{{ @@ -4034,7 +4034,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) { ID: 100, }) seg := buildSegment(100, 100, 100, "ch1", false) - svr.meta.AddSegment(seg) + svr.meta.AddSegment(context.TODO(), seg) svr.sessionManager.AddSession(&NodeInfo{ NodeID: 110, Address: "localhost:8080", @@ -4107,7 +4107,7 @@ func TestDataCoord_UnsetIsImportingState(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) seg := buildSegment(100, 100, 100, "ch1", false) - svr.meta.AddSegment(seg) + svr.meta.AddSegment(context.TODO(), seg) status, err := svr.UnsetIsImportingState(context.Background(), &datapb.UnsetIsImportingStateRequest{ SegmentIds: []int64{100}, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 3849ededf8..3ef562a23a 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -205,9 +205,9 @@ func TestGetRecoveryInfoV2(t *testing.T) { }, }, } - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) err = svr.meta.AddSegmentIndex(&model.SegmentIndex{ SegmentID: seg1.ID, @@ -301,9 +301,9 @@ func TestGetRecoveryInfoV2(t *testing.T) { }, }, } - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) req := &datapb.GetRecoveryInfoRequestV2{ @@ -373,7 +373,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { }, } segment := createSegment(0, 0, 1, 100, 10, "vchan1", commonpb.SegmentState_Flushed) - err := svr.meta.AddSegment(NewSegmentInfo(segment)) + err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment)) assert.NoError(t, err) err = svr.meta.CreateIndex(&model.Index{ @@ -441,9 +441,9 @@ func TestGetRecoveryInfoV2(t *testing.T) { seg1 := createSegment(7, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) seg2 := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) req := &datapb.GetRecoveryInfoRequestV2{ @@ -483,9 +483,9 @@ func TestGetRecoveryInfoV2(t *testing.T) { seg1 := createSegment(7, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) seg2 := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) seg2.IsFake = true - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) req := &datapb.GetRecoveryInfoRequestV2{ @@ -527,15 +527,15 @@ func TestGetRecoveryInfoV2(t *testing.T) { seg4 := createSegment(12, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Dropped) seg5 := createSegment(13, 0, 0, 2048, 40, "vchan1", commonpb.SegmentState_Flushed) seg5.CompactionFrom = []int64{11, 12} - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg3)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg3)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg4)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4)) assert.NoError(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg5)) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) assert.NoError(t, err) err = svr.meta.CreateIndex(&model.Index{ TenantID: "",