diff --git a/Makefile b/Makefile index d35c1efeba..f1c7fa4d91 100644 --- a/Makefile +++ b/Makefile @@ -348,6 +348,8 @@ generate-mockery: getdeps # internal/querynodev2 $(PWD)/bin/mockery --name=Manager --dir=$(PWD)/internal/querynodev2/cluster --output=$(PWD)/internal/querynodev2/cluster --filename=mock_manager.go --with-expecter --outpkg=cluster --structname=MockManager --inpackage $(PWD)/bin/mockery --name=Loader --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_loader.go --with-expecter --outpkg=segments --structname=MockLoader --inpackage + $(PWD)/bin/mockery --name=SegmentManager --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_segment_manager.go --with-expecter --outpkg=segments --structname=MockSegmentManager --inpackage + $(PWD)/bin/mockery --name=Segment --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_segment.go --with-expecter --outpkg=segments --structname=MockSegment --inpackage $(PWD)/bin/mockery --name=Worker --dir=$(PWD)/internal/querynodev2/cluster --output=$(PWD)/internal/querynodev2/cluster --filename=mock_worker.go --with-expecter --outpkg=worker --structname=MockWorker --inpackage $(PWD)/bin/mockery --name=ShardDelegator --dir=$(PWD)/internal/querynodev2/delegator/ --output=$(PWD)/internal/querynodev2/delegator/ --filename=mock_delegator.go --with-expecter --outpkg=delegator --structname=MockShardDelegator --inpackage # internal/datacoord diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 5913775d77..a259ae3333 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -184,6 +184,12 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis } loadInfo := utils.PackSegmentLoadInfo(resp, nil) + log.Debug("leader observer append a segment to set", + zap.Int64("collectionID", leaderView.CollectionID), + zap.String("channel", leaderView.Channel), + zap.Int64("leaderViewID", leaderView.ID), + zap.Int64("segmentID", s.GetID()), + zap.Int64("nodeID", s.Node)) ret = append(ret, &querypb.SyncAction{ Type: querypb.SyncType_Set, PartitionID: s.GetPartitionID(), @@ -210,11 +216,12 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di if ok || existInCurrentTarget || existInNextTarget { continue } - log.Debug("leader observer append a segment to remove:", zap.Int64("collectionID", leaderView.CollectionID), - zap.String("Channel", leaderView.Channel), zap.Int64("leaderViewID", leaderView.ID), - zap.Int64("segmentID", sid), zap.Bool("distMap_exist", ok), - zap.Bool("existInCurrentTarget", existInCurrentTarget), - zap.Bool("existInNextTarget", existInNextTarget)) + log.Debug("leader observer append a segment to remove", + zap.Int64("collectionID", leaderView.CollectionID), + zap.String("channel", leaderView.Channel), + zap.Int64("leaderViewID", leaderView.ID), + zap.Int64("segmentID", sid), + zap.Int64("nodeID", s.NodeID)) ret = append(ret, &querypb.SyncAction{ Type: querypb.SyncType_Remove, SegmentID: sid, diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 432ce5ab72..9c535f1e11 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -914,11 +914,13 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade } // Check whether QueryNodes are online and available isAvailable := true - for _, version := range leader.Segments { + for id, version := range leader.Segments { info := s.nodeMgr.Get(version.GetNodeID()) err = checkNodeAvailable(version.GetNodeID(), info) if err != nil { - log.Info("leader is not available due to QueryNode unavailable", zap.Error(err)) + log.Info("leader is not available due to QueryNode unavailable", + zap.Int64("segmentID", id), + zap.Error(err)) isAvailable = false multierr.AppendInto(&channelErr, err) break diff --git a/internal/querynodev2/cluster/mock_manager.go b/internal/querynodev2/cluster/mock_manager.go index e57bc724b0..496382b0a3 100644 --- a/internal/querynodev2/cluster/mock_manager.go +++ b/internal/querynodev2/cluster/mock_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.16.0. DO NOT EDIT. +// Code generated by mockery v2.21.1. DO NOT EDIT. package cluster @@ -22,6 +22,10 @@ func (_m *MockManager) GetWorker(nodeID int64) (Worker, error) { ret := _m.Called(nodeID) var r0 Worker + var r1 error + if rf, ok := ret.Get(0).(func(int64) (Worker, error)); ok { + return rf(nodeID) + } if rf, ok := ret.Get(0).(func(int64) Worker); ok { r0 = rf(nodeID) } else { @@ -30,7 +34,6 @@ func (_m *MockManager) GetWorker(nodeID int64) (Worker, error) { } } - var r1 error if rf, ok := ret.Get(1).(func(int64) error); ok { r1 = rf(nodeID) } else { @@ -46,7 +49,7 @@ type MockManager_GetWorker_Call struct { } // GetWorker is a helper method to define mock.On call -// - nodeID int64 +// - nodeID int64 func (_e *MockManager_Expecter) GetWorker(nodeID interface{}) *MockManager_GetWorker_Call { return &MockManager_GetWorker_Call{Call: _e.mock.On("GetWorker", nodeID)} } @@ -63,6 +66,11 @@ func (_c *MockManager_GetWorker_Call) Return(_a0 Worker, _a1 error) *MockManager return _c } +func (_c *MockManager_GetWorker_Call) RunAndReturn(run func(int64) (Worker, error)) *MockManager_GetWorker_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewMockManager interface { mock.TestingT Cleanup(func()) diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index 175e031490..8a933891d9 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -171,9 +171,21 @@ func (d *distribution) AddDistributions(entries ...SegmentEntry) { defer d.mut.Unlock() for _, entry := range entries { - if s, ok := d.sealedSegments[entry.SegmentID]; ok { + oldEntry, ok := d.sealedSegments[entry.SegmentID] + if ok && oldEntry.Version >= entry.Version { + log.Warn("Invalid segment distribution changed, skip it", + zap.Int64("segmentID", entry.SegmentID), + zap.Int64("oldVersion", oldEntry.Version), + zap.Int64("oldNode", oldEntry.NodeID), + zap.Int64("newVersion", entry.Version), + zap.Int64("newNode", entry.NodeID), + ) + continue + } + + if ok { // remain the target version for already loaded segment to void skipping this segment when executing search - entry.TargetVersion = s.TargetVersion + entry.TargetVersion = oldEntry.TargetVersion } d.sealedSegments[entry.SegmentID] = entry d.offlines.Remove(entry.SegmentID) diff --git a/internal/querynodev2/delegator/distribution_test.go b/internal/querynodev2/delegator/distribution_test.go index f37d6bedae..f22534b709 100644 --- a/internal/querynodev2/delegator/distribution_test.go +++ b/internal/querynodev2/delegator/distribution_test.go @@ -76,6 +76,31 @@ func (s *DistributionSuite) TestAddDistribution() { }, expectedSignalClosed: true, }, + { + tag: "duplicate segment", + input: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + { + NodeID: 1, + SegmentID: 1, + }, + }, + expected: []SnapshotItem{ + { + NodeID: 1, + Segments: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + }, + }, + }, + expectedSignalClosed: true, + }, { tag: "multiple_nodes", input: []SegmentEntry{ diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index 4245893433..55f1a9e6f8 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -126,7 +126,7 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR continue } - err := node.loader.LoadIndex(ctx, localSegment, info) + err := node.loader.LoadIndex(ctx, localSegment, info, req.Version) if err != nil { log.Warn("failed to load index", zap.Error(err)) status = merr.Status(err) diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index f1e4a8dd6c..88b404fa7e 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -30,9 +30,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/eventlog" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/paramtable" . "github.com/milvus-io/milvus/pkg/util/typeutil" + "go.uber.org/zap" ) type SegmentFilter func(segment Segment) bool @@ -98,6 +100,8 @@ type SegmentManager interface { Remove(segmentID UniqueID, scope querypb.DataScope) (int, int) RemoveBy(filters ...SegmentFilter) (int, int) Clear() + + UpdateSegmentVersion(segmentType SegmentType, segmentID int64, newVersion int64) } var _ SegmentManager = (*segmentManager)(nil) @@ -132,31 +136,76 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) { } for _, segment := range segments { - if _, ok := targetMap[segment.ID()]; ok { + oldSegment, ok := targetMap[segment.ID()] + + if ok && oldSegment.Version() >= segment.Version() { + log.Warn("Invalid segment distribution changed, skip it", + zap.Int64("segmentID", segment.ID()), + zap.Int64("oldVersion", oldSegment.Version()), + zap.Int64("newVersion", segment.Version()), + ) continue } - eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection()))) targetMap[segment.ID()] = segment - metrics.QueryNodeNumSegments.WithLabelValues( - fmt.Sprint(paramtable.GetNodeID()), - fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), - segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), - ).Inc() - if segment.RowNum() > 0 { - metrics.QueryNodeNumEntities.WithLabelValues( + + if !ok { + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection()))) + metrics.QueryNodeNumSegments.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(segment.Collection()), fmt.Sprint(segment.Partition()), segment.Type().String(), fmt.Sprint(len(segment.Indexes())), - ).Add(float64(segment.RowNum())) + ).Inc() + if segment.RowNum() > 0 { + metrics.QueryNodeNumEntities.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(segment.Collection()), + fmt.Sprint(segment.Partition()), + segment.Type().String(), + fmt.Sprint(len(segment.Indexes())), + ).Add(float64(segment.RowNum())) + } } } mgr.updateMetric() } +func (mgr *segmentManager) UpdateSegmentVersion(segmentType SegmentType, segmentID int64, newVersion int64) { + mgr.mu.Lock() + defer mgr.mu.Unlock() + + targetMap := mgr.growingSegments + switch segmentType { + case SegmentTypeGrowing: + targetMap = mgr.growingSegments + case SegmentTypeSealed: + targetMap = mgr.sealedSegments + default: + panic("unexpected segment type") + } + + segment, ok := targetMap[segmentID] + if !ok { + log.Warn("segment not exist, skip segment version change", + zap.Int64("segmentID", segmentID), + zap.Int64("newVersion", newVersion), + ) + return + } + + if segment.Version() >= newVersion { + log.Warn("Invalid segment version changed, skip it", + zap.Int64("segmentID", segment.ID()), + zap.Int64("oldVersion", segment.Version()), + zap.Int64("newVersion", newVersion)) + return + } + + segment.UpdateVersion(newVersion) + targetMap[segmentID] = segment +} + func (mgr *segmentManager) Get(segmentID UniqueID) Segment { mgr.mu.RLock() defer mgr.mu.RUnlock() diff --git a/internal/querynodev2/segments/mock_loader.go b/internal/querynodev2/segments/mock_loader.go index 2801447ef7..d6f5382c51 100644 --- a/internal/querynodev2/segments/mock_loader.go +++ b/internal/querynodev2/segments/mock_loader.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.16.0. DO NOT EDIT. +// Code generated by mockery v2.21.1. DO NOT EDIT. package segments @@ -41,6 +41,10 @@ func (_m *MockLoader) Load(ctx context.Context, collectionID int64, segmentType ret := _m.Called(_ca...) var r0 []Segment + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) ([]Segment, error)); ok { + return rf(ctx, collectionID, segmentType, version, segments...) + } if rf, ok := ret.Get(0).(func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) []Segment); ok { r0 = rf(ctx, collectionID, segmentType, version, segments...) } else { @@ -49,7 +53,6 @@ func (_m *MockLoader) Load(ctx context.Context, collectionID int64, segmentType } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) error); ok { r1 = rf(ctx, collectionID, segmentType, version, segments...) } else { @@ -65,11 +68,11 @@ type MockLoader_Load_Call struct { } // Load is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - segmentType commonpb.SegmentState -// - version int64 -// - segments ...*querypb.SegmentLoadInfo +// - ctx context.Context +// - collectionID int64 +// - segmentType commonpb.SegmentState +// - version int64 +// - segments ...*querypb.SegmentLoadInfo func (_e *MockLoader_Expecter) Load(ctx interface{}, collectionID interface{}, segmentType interface{}, version interface{}, segments ...interface{}) *MockLoader_Load_Call { return &MockLoader_Load_Call{Call: _e.mock.On("Load", append([]interface{}{ctx, collectionID, segmentType, version}, segments...)...)} @@ -93,6 +96,11 @@ func (_c *MockLoader_Load_Call) Return(_a0 []Segment, _a1 error) *MockLoader_Loa return _c } +func (_c *MockLoader_Load_Call) RunAndReturn(run func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) ([]Segment, error)) *MockLoader_Load_Call { + _c.Call.Return(run) + return _c +} + // LoadBloomFilterSet provides a mock function with given fields: ctx, collectionID, version, infos func (_m *MockLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) { _va := make([]interface{}, len(infos)) @@ -105,6 +113,10 @@ func (_m *MockLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64 ret := _m.Called(_ca...) var r0 []*pkoracle.BloomFilterSet + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)); ok { + return rf(ctx, collectionID, version, infos...) + } if rf, ok := ret.Get(0).(func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet); ok { r0 = rf(ctx, collectionID, version, infos...) } else { @@ -113,7 +125,6 @@ func (_m *MockLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64 } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) error); ok { r1 = rf(ctx, collectionID, version, infos...) } else { @@ -129,10 +140,10 @@ type MockLoader_LoadBloomFilterSet_Call struct { } // LoadBloomFilterSet is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - version int64 -// - infos ...*querypb.SegmentLoadInfo +// - ctx context.Context +// - collectionID int64 +// - version int64 +// - infos ...*querypb.SegmentLoadInfo func (_e *MockLoader_Expecter) LoadBloomFilterSet(ctx interface{}, collectionID interface{}, version interface{}, infos ...interface{}) *MockLoader_LoadBloomFilterSet_Call { return &MockLoader_LoadBloomFilterSet_Call{Call: _e.mock.On("LoadBloomFilterSet", append([]interface{}{ctx, collectionID, version}, infos...)...)} @@ -156,6 +167,11 @@ func (_c *MockLoader_LoadBloomFilterSet_Call) Return(_a0 []*pkoracle.BloomFilter return _c } +func (_c *MockLoader_LoadBloomFilterSet_Call) RunAndReturn(run func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)) *MockLoader_LoadBloomFilterSet_Call { + _c.Call.Return(run) + return _c +} + // LoadDeltaLogs provides a mock function with given fields: ctx, segment, deltaLogs func (_m *MockLoader) LoadDeltaLogs(ctx context.Context, segment *LocalSegment, deltaLogs []*datapb.FieldBinlog) error { ret := _m.Called(ctx, segment, deltaLogs) @@ -176,9 +192,9 @@ type MockLoader_LoadDeltaLogs_Call struct { } // LoadDeltaLogs is a helper method to define mock.On call -// - ctx context.Context -// - segment *LocalSegment -// - deltaLogs []*datapb.FieldBinlog +// - ctx context.Context +// - segment *LocalSegment +// - deltaLogs []*datapb.FieldBinlog func (_e *MockLoader_Expecter) LoadDeltaLogs(ctx interface{}, segment interface{}, deltaLogs interface{}) *MockLoader_LoadDeltaLogs_Call { return &MockLoader_LoadDeltaLogs_Call{Call: _e.mock.On("LoadDeltaLogs", ctx, segment, deltaLogs)} } @@ -195,13 +211,18 @@ func (_c *MockLoader_LoadDeltaLogs_Call) Return(_a0 error) *MockLoader_LoadDelta return _c } -// LoadIndex provides a mock function with given fields: ctx, segment, info -func (_m *MockLoader) LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo) error { - ret := _m.Called(ctx, segment, info) +func (_c *MockLoader_LoadDeltaLogs_Call) RunAndReturn(run func(context.Context, *LocalSegment, []*datapb.FieldBinlog) error) *MockLoader_LoadDeltaLogs_Call { + _c.Call.Return(run) + return _c +} + +// LoadIndex provides a mock function with given fields: ctx, segment, info, version +func (_m *MockLoader) LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error { + ret := _m.Called(ctx, segment, info, version) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo) error); ok { - r0 = rf(ctx, segment, info) + if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, int64) error); ok { + r0 = rf(ctx, segment, info, version) } else { r0 = ret.Error(0) } @@ -215,16 +236,17 @@ type MockLoader_LoadIndex_Call struct { } // LoadIndex is a helper method to define mock.On call -// - ctx context.Context -// - segment *LocalSegment -// - info *querypb.SegmentLoadInfo -func (_e *MockLoader_Expecter) LoadIndex(ctx interface{}, segment interface{}, info interface{}) *MockLoader_LoadIndex_Call { - return &MockLoader_LoadIndex_Call{Call: _e.mock.On("LoadIndex", ctx, segment, info)} +// - ctx context.Context +// - segment *LocalSegment +// - info *querypb.SegmentLoadInfo +// - version int64 +func (_e *MockLoader_Expecter) LoadIndex(ctx interface{}, segment interface{}, info interface{}, version interface{}) *MockLoader_LoadIndex_Call { + return &MockLoader_LoadIndex_Call{Call: _e.mock.On("LoadIndex", ctx, segment, info, version)} } -func (_c *MockLoader_LoadIndex_Call) Run(run func(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo)) *MockLoader_LoadIndex_Call { +func (_c *MockLoader_LoadIndex_Call) Run(run func(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64)) *MockLoader_LoadIndex_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo)) + run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo), args[3].(int64)) }) return _c } @@ -234,6 +256,11 @@ func (_c *MockLoader_LoadIndex_Call) Return(_a0 error) *MockLoader_LoadIndex_Cal return _c } +func (_c *MockLoader_LoadIndex_Call) RunAndReturn(run func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, int64) error) *MockLoader_LoadIndex_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewMockLoader interface { mock.TestingT Cleanup(func()) diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index 7b28081b30..7a978c9a0b 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -1,13 +1,13 @@ -// Code generated by mockery v2.16.0. DO NOT EDIT. +// Code generated by mockery v2.21.1. DO NOT EDIT. package segments import ( commonpb "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - mock "github.com/stretchr/testify/mock" + msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + segcorepb "github.com/milvus-io/milvus/internal/proto/segcorepb" storage "github.com/milvus-io/milvus/internal/storage" @@ -55,6 +55,11 @@ func (_c *MockSegment_AddIndex_Call) Return() *MockSegment_AddIndex_Call { return _c } +func (_c *MockSegment_AddIndex_Call) RunAndReturn(run func(int64, *IndexedFieldInfo)) *MockSegment_AddIndex_Call { + _c.Call.Return(run) + return _c +} + // Collection provides a mock function with given fields: func (_m *MockSegment) Collection() int64 { ret := _m.Called() @@ -91,6 +96,11 @@ func (_c *MockSegment_Collection_Call) Return(_a0 int64) *MockSegment_Collection return _c } +func (_c *MockSegment_Collection_Call) RunAndReturn(run func() int64) *MockSegment_Collection_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: primaryKeys, timestamps func (_m *MockSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []uint64) error { ret := _m.Called(primaryKeys, timestamps) @@ -129,6 +139,11 @@ func (_c *MockSegment_Delete_Call) Return(_a0 error) *MockSegment_Delete_Call { return _c } +func (_c *MockSegment_Delete_Call) RunAndReturn(run func([]storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call { + _c.Call.Return(run) + return _c +} + // ExistIndex provides a mock function with given fields: fieldID func (_m *MockSegment) ExistIndex(fieldID int64) bool { ret := _m.Called(fieldID) @@ -166,6 +181,11 @@ func (_c *MockSegment_ExistIndex_Call) Return(_a0 bool) *MockSegment_ExistIndex_ return _c } +func (_c *MockSegment_ExistIndex_Call) RunAndReturn(run func(int64) bool) *MockSegment_ExistIndex_Call { + _c.Call.Return(run) + return _c +} + // GetIndex provides a mock function with given fields: fieldID func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo { ret := _m.Called(fieldID) @@ -205,6 +225,11 @@ func (_c *MockSegment_GetIndex_Call) Return(_a0 *IndexedFieldInfo) *MockSegment_ return _c } +func (_c *MockSegment_GetIndex_Call) RunAndReturn(run func(int64) *IndexedFieldInfo) *MockSegment_GetIndex_Call { + _c.Call.Return(run) + return _c +} + // ID provides a mock function with given fields: func (_m *MockSegment) ID() int64 { ret := _m.Called() @@ -241,6 +266,11 @@ func (_c *MockSegment_ID_Call) Return(_a0 int64) *MockSegment_ID_Call { return _c } +func (_c *MockSegment_ID_Call) RunAndReturn(run func() int64) *MockSegment_ID_Call { + _c.Call.Return(run) + return _c +} + // Indexes provides a mock function with given fields: func (_m *MockSegment) Indexes() []*IndexedFieldInfo { ret := _m.Called() @@ -279,6 +309,11 @@ func (_c *MockSegment_Indexes_Call) Return(_a0 []*IndexedFieldInfo) *MockSegment return _c } +func (_c *MockSegment_Indexes_Call) RunAndReturn(run func() []*IndexedFieldInfo) *MockSegment_Indexes_Call { + _c.Call.Return(run) + return _c +} + // Insert provides a mock function with given fields: rowIDs, timestamps, record func (_m *MockSegment) Insert(rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord) error { ret := _m.Called(rowIDs, timestamps, record) @@ -318,6 +353,11 @@ func (_c *MockSegment_Insert_Call) Return(_a0 error) *MockSegment_Insert_Call { return _c } +func (_c *MockSegment_Insert_Call) RunAndReturn(run func([]int64, []uint64, *segcorepb.InsertRecord) error) *MockSegment_Insert_Call { + _c.Call.Return(run) + return _c +} + // InsertCount provides a mock function with given fields: func (_m *MockSegment) InsertCount() int64 { ret := _m.Called() @@ -354,6 +394,11 @@ func (_c *MockSegment_InsertCount_Call) Return(_a0 int64) *MockSegment_InsertCou return _c } +func (_c *MockSegment_InsertCount_Call) RunAndReturn(run func() int64) *MockSegment_InsertCount_Call { + _c.Call.Return(run) + return _c +} + // LastDeltaTimestamp provides a mock function with given fields: func (_m *MockSegment) LastDeltaTimestamp() uint64 { ret := _m.Called() @@ -390,6 +435,11 @@ func (_c *MockSegment_LastDeltaTimestamp_Call) Return(_a0 uint64) *MockSegment_L return _c } +func (_c *MockSegment_LastDeltaTimestamp_Call) RunAndReturn(run func() uint64) *MockSegment_LastDeltaTimestamp_Call { + _c.Call.Return(run) + return _c +} + // MayPkExist provides a mock function with given fields: pk func (_m *MockSegment) MayPkExist(pk storage.PrimaryKey) bool { ret := _m.Called(pk) @@ -427,6 +477,11 @@ func (_c *MockSegment_MayPkExist_Call) Return(_a0 bool) *MockSegment_MayPkExist_ return _c } +func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(storage.PrimaryKey) bool) *MockSegment_MayPkExist_Call { + _c.Call.Return(run) + return _c +} + // MemSize provides a mock function with given fields: func (_m *MockSegment) MemSize() int64 { ret := _m.Called() @@ -463,6 +518,11 @@ func (_c *MockSegment_MemSize_Call) Return(_a0 int64) *MockSegment_MemSize_Call return _c } +func (_c *MockSegment_MemSize_Call) RunAndReturn(run func() int64) *MockSegment_MemSize_Call { + _c.Call.Return(run) + return _c +} + // Partition provides a mock function with given fields: func (_m *MockSegment) Partition() int64 { ret := _m.Called() @@ -499,6 +559,11 @@ func (_c *MockSegment_Partition_Call) Return(_a0 int64) *MockSegment_Partition_C return _c } +func (_c *MockSegment_Partition_Call) RunAndReturn(run func() int64) *MockSegment_Partition_Call { + _c.Call.Return(run) + return _c +} + // RowNum provides a mock function with given fields: func (_m *MockSegment) RowNum() int64 { ret := _m.Called() @@ -535,6 +600,11 @@ func (_c *MockSegment_RowNum_Call) Return(_a0 int64) *MockSegment_RowNum_Call { return _c } +func (_c *MockSegment_RowNum_Call) RunAndReturn(run func() int64) *MockSegment_RowNum_Call { + _c.Call.Return(run) + return _c +} + // Shard provides a mock function with given fields: func (_m *MockSegment) Shard() string { ret := _m.Called() @@ -571,6 +641,11 @@ func (_c *MockSegment_Shard_Call) Return(_a0 string) *MockSegment_Shard_Call { return _c } +func (_c *MockSegment_Shard_Call) RunAndReturn(run func() string) *MockSegment_Shard_Call { + _c.Call.Return(run) + return _c +} + // StartPosition provides a mock function with given fields: func (_m *MockSegment) StartPosition() *msgpb.MsgPosition { ret := _m.Called() @@ -609,6 +684,11 @@ func (_c *MockSegment_StartPosition_Call) Return(_a0 *msgpb.MsgPosition) *MockSe return _c } +func (_c *MockSegment_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosition) *MockSegment_StartPosition_Call { + _c.Call.Return(run) + return _c +} + // Type provides a mock function with given fields: func (_m *MockSegment) Type() commonpb.SegmentState { ret := _m.Called() @@ -645,6 +725,11 @@ func (_c *MockSegment_Type_Call) Return(_a0 commonpb.SegmentState) *MockSegment_ return _c } +func (_c *MockSegment_Type_Call) RunAndReturn(run func() commonpb.SegmentState) *MockSegment_Type_Call { + _c.Call.Return(run) + return _c +} + // UpdateBloomFilter provides a mock function with given fields: pks func (_m *MockSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { _m.Called(pks) @@ -673,6 +758,44 @@ func (_c *MockSegment_UpdateBloomFilter_Call) Return() *MockSegment_UpdateBloomF return _c } +func (_c *MockSegment_UpdateBloomFilter_Call) RunAndReturn(run func([]storage.PrimaryKey)) *MockSegment_UpdateBloomFilter_Call { + _c.Call.Return(run) + return _c +} + +// UpdateVersion provides a mock function with given fields: version +func (_m *MockSegment) UpdateVersion(version int64) { + _m.Called(version) +} + +// MockSegment_UpdateVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateVersion' +type MockSegment_UpdateVersion_Call struct { + *mock.Call +} + +// UpdateVersion is a helper method to define mock.On call +// - version int64 +func (_e *MockSegment_Expecter) UpdateVersion(version interface{}) *MockSegment_UpdateVersion_Call { + return &MockSegment_UpdateVersion_Call{Call: _e.mock.On("UpdateVersion", version)} +} + +func (_c *MockSegment_UpdateVersion_Call) Run(run func(version int64)) *MockSegment_UpdateVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockSegment_UpdateVersion_Call) Return() *MockSegment_UpdateVersion_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSegment_UpdateVersion_Call) RunAndReturn(run func(int64)) *MockSegment_UpdateVersion_Call { + _c.Call.Return(run) + return _c +} + // Version provides a mock function with given fields: func (_m *MockSegment) Version() int64 { ret := _m.Called() @@ -709,6 +832,11 @@ func (_c *MockSegment_Version_Call) Return(_a0 int64) *MockSegment_Version_Call return _c } +func (_c *MockSegment_Version_Call) RunAndReturn(run func() int64) *MockSegment_Version_Call { + _c.Call.Return(run) + return _c +} + type mockConstructorTestingTNewMockSegment interface { mock.TestingT Cleanup(func()) diff --git a/internal/querynodev2/segments/mock_segment_manager.go b/internal/querynodev2/segments/mock_segment_manager.go index 127c432469..c13cfcc1ff 100644 --- a/internal/querynodev2/segments/mock_segment_manager.go +++ b/internal/querynodev2/segments/mock_segment_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.16. DO NOT EDIT. +// Code generated by mockery v2.21.1. DO NOT EDIT. package segments @@ -495,12 +495,48 @@ func (_c *MockSegmentManager_RemoveBy_Call) RunAndReturn(run func(...SegmentFilt return _c } -// NewMockSegmentManager creates a new instance of MockSegmentManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockSegmentManager(t interface { +// UpdateSegmentVersion provides a mock function with given fields: segmentType, segmentID, newVersion +func (_m *MockSegmentManager) UpdateSegmentVersion(segmentType commonpb.SegmentState, segmentID int64, newVersion int64) { + _m.Called(segmentType, segmentID, newVersion) +} + +// MockSegmentManager_UpdateSegmentVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentVersion' +type MockSegmentManager_UpdateSegmentVersion_Call struct { + *mock.Call +} + +// UpdateSegmentVersion is a helper method to define mock.On call +// - segmentType commonpb.SegmentState +// - segmentID int64 +// - newVersion int64 +func (_e *MockSegmentManager_Expecter) UpdateSegmentVersion(segmentType interface{}, segmentID interface{}, newVersion interface{}) *MockSegmentManager_UpdateSegmentVersion_Call { + return &MockSegmentManager_UpdateSegmentVersion_Call{Call: _e.mock.On("UpdateSegmentVersion", segmentType, segmentID, newVersion)} +} + +func (_c *MockSegmentManager_UpdateSegmentVersion_Call) Run(run func(segmentType commonpb.SegmentState, segmentID int64, newVersion int64)) *MockSegmentManager_UpdateSegmentVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(commonpb.SegmentState), args[1].(int64), args[2].(int64)) + }) + return _c +} + +func (_c *MockSegmentManager_UpdateSegmentVersion_Call) Return() *MockSegmentManager_UpdateSegmentVersion_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSegmentManager_UpdateSegmentVersion_Call) RunAndReturn(run func(commonpb.SegmentState, int64, int64)) *MockSegmentManager_UpdateSegmentVersion_Call { + _c.Call.Return(run) + return _c +} + +type mockConstructorTestingTNewMockSegmentManager interface { mock.TestingT Cleanup(func()) -}) *MockSegmentManager { +} + +// NewMockSegmentManager creates a new instance of MockSegmentManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockSegmentManager(t mockConstructorTestingTNewMockSegmentManager) *MockSegmentManager { mock := &MockSegmentManager{} mock.Mock.Test(t) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 261616f69b..9db5d1f881 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -73,39 +73,6 @@ type IndexedFieldInfo struct { IndexInfo *querypb.FieldIndexInfo } -type Segment interface { - // Properties - ID() int64 - Collection() int64 - Partition() int64 - Shard() string - Version() int64 - StartPosition() *msgpb.MsgPosition - Type() SegmentType - - // Stats related - // InsertCount returns the number of inserted rows, not effected by deletion - InsertCount() int64 - // RowNum returns the number of rows, it's slow, so DO NOT call it in a loop - RowNum() int64 - MemSize() int64 - - // Index related - AddIndex(fieldID int64, index *IndexedFieldInfo) - GetIndex(fieldID int64) *IndexedFieldInfo - ExistIndex(fieldID int64) bool - Indexes() []*IndexedFieldInfo - - // Modification related - Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error - Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error - LastDeltaTimestamp() uint64 - - // Bloom filter related - UpdateBloomFilter(pks []storage.PrimaryKey) - MayPkExist(pk storage.PrimaryKey) bool -} - type baseSegment struct { segmentID int64 partitionID int64 @@ -159,6 +126,10 @@ func (s *baseSegment) Version() int64 { return s.version } +func (s *baseSegment) UpdateVersion(version int64) { + s.version = version +} + func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { s.bloomFilterSet.UpdateBloomFilter(pks) } diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go new file mode 100644 index 0000000000..6008066920 --- /dev/null +++ b/internal/querynodev2/segments/segment_interface.go @@ -0,0 +1,58 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package segments + +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/proto/segcorepb" + storage "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type Segment interface { + // Properties + ID() int64 + Collection() int64 + Partition() int64 + Shard() string + Version() int64 + StartPosition() *msgpb.MsgPosition + Type() SegmentType + + // Stats related + // InsertCount returns the number of inserted rows, not effected by deletion + InsertCount() int64 + // RowNum returns the number of rows, it's slow, so DO NOT call it in a loop + RowNum() int64 + MemSize() int64 + + // Index related + AddIndex(fieldID int64, index *IndexedFieldInfo) + GetIndex(fieldID int64) *IndexedFieldInfo + ExistIndex(fieldID int64) bool + Indexes() []*IndexedFieldInfo + + // Modification related + Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error + Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error + LastDeltaTimestamp() uint64 + + // Bloom filter related + UpdateBloomFilter(pks []storage.PrimaryKey) + MayPkExist(pk storage.PrimaryKey) bool + UpdateVersion(version int64) +} diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index df3c80d3b2..2c08e01e0d 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -68,7 +68,7 @@ type Loader interface { LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) // LoadIndex append index for segment and remove vector binlogs. - LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo) error + LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error } type LoadResource struct { @@ -148,7 +148,7 @@ func (loader *segmentLoader) Load(ctx context.Context, return nil, nil } // Filter out loaded & loading segments - infos := loader.prepare(segmentType, segments...) + infos := loader.prepare(segmentType, version, segments...) defer loader.unregister(infos...) // continue to wait other task done @@ -246,7 +246,7 @@ func (loader *segmentLoader) Load(ctx context.Context, return loaded, nil } -func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo { +func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo { loader.mut.Lock() defer loader.mut.Unlock() @@ -259,6 +259,8 @@ func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*query infos = append(infos, segment) loader.loadingSegments.Insert(segment.GetSegmentID(), make(chan struct{})) } else { + // try to update segment version before skip load operation + loader.manager.Segment.UpdateSegmentVersion(segmentType, segment.SegmentID, version) log.Info("skip loaded/loading segment", zap.Int64("segmentID", segment.GetSegmentID()), zap.Bool("isLoaded", len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) > 0), zap.Bool("isLoading", loader.loadingSegments.Contain(segment.GetSegmentID())), @@ -934,7 +936,7 @@ func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb return 0, merr.WrapErrFieldNotFound(fieldID) } -func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error { +func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo, version int64) error { log := log.Ctx(ctx).With( zap.Int64("collection", segment.Collection()), zap.Int64("segment", segment.ID()), @@ -942,7 +944,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen // Filter out LOADING segments only // use None to avoid loaded check - infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, loadInfo) + infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, version, loadInfo) defer loader.unregister(infos...) indexInfo := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *querypb.SegmentLoadInfo { diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 6e276088a6..19230c2076 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -367,7 +367,7 @@ func (suite *SegmentLoaderSuite) TestLoadIndex() { }, } - err := suite.loader.LoadIndex(ctx, segment, loadInfo) + err := suite.loader.LoadIndex(ctx, segment, loadInfo, 0) suite.ErrorIs(err, merr.ErrIndexNotFound) } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 15cef644c5..b6cbf70d1c 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -721,7 +721,7 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() { suite.node.loader = loader }() - mockLoader.EXPECT().LoadIndex(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mocked error")) + mockLoader.EXPECT().LoadIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mocked error")) infos := suite.genSegmentLoadInfos(schema) req := &querypb.LoadSegmentsRequest{