From e4fdf5e68eeeb285e98e3da634c2ef5099b4a1a7 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 1 Nov 2023 20:04:24 +0800 Subject: [PATCH] Refine offline segments logic in shard delegator (#28073) Signed-off-by: Congqi Xia --- internal/querynodev2/delegator/delegator.go | 38 +++++++--- .../querynodev2/delegator/delegator_data.go | 4 +- .../querynodev2/delegator/delegator_test.go | 69 +++++++++++++++++++ .../querynodev2/delegator/distribution.go | 51 +++++++++----- .../delegator/distribution_test.go | 66 ++++++++++++++---- 5 files changed, 183 insertions(+), 45 deletions(-) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index a1f8c1aa2f..45c6233706 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -206,8 +206,12 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel). Observe(float64(waitTr.ElapseSpan().Milliseconds())) - sealed, growing, version := sd.distribution.GetSegments(true, req.GetReq().GetPartitionIDs()...) - defer sd.distribution.FinishUsage(version) + sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...) + if err != nil { + log.Warn("delegator failed to search, current distribution is not serviceable") + return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable") + } + defer sd.distribution.Unpin(version) existPartitions := sd.collection.GetPartitions() growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { return funcutil.SliceContain(existPartitions, segment.PartitionID) @@ -270,8 +274,12 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel). Observe(float64(waitTr.ElapseSpan().Milliseconds())) - sealed, growing, version := sd.distribution.GetSegments(true, req.GetReq().GetPartitionIDs()...) - defer sd.distribution.FinishUsage(version) + sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...) + if err != nil { + log.Warn("delegator failed to query, current distribution is not serviceable") + return merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable") + } + defer sd.distribution.Unpin(version) existPartitions := sd.collection.GetPartitions() growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { return funcutil.SliceContain(existPartitions, segment.PartitionID) @@ -334,8 +342,12 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel). Observe(float64(waitTr.ElapseSpan().Milliseconds())) - sealed, growing, version := sd.distribution.GetSegments(true, req.GetReq().GetPartitionIDs()...) - defer sd.distribution.FinishUsage(version) + sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...) + if err != nil { + log.Warn("delegator failed to query, current distribution is not serviceable") + return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable") + } + defer sd.distribution.Unpin(version) existPartitions := sd.collection.GetPartitions() growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { return funcutil.SliceContain(existPartitions, segment.PartitionID) @@ -377,21 +389,25 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta defer sd.lifetime.Done() if !funcutil.SliceContain(req.GetDmlChannels(), sd.vchannelName) { - log.Warn("deletgator received query request not belongs to it", + log.Warn("delegator received GetStatistics request not belongs to it", zap.Strings("reqChannels", req.GetDmlChannels()), ) - return nil, fmt.Errorf("dml channel not match, delegator channel %s, search channels %v", sd.vchannelName, req.GetDmlChannels()) + return nil, fmt.Errorf("dml channel not match, delegator channel %s, GetStatistics channels %v", sd.vchannelName, req.GetDmlChannels()) } // wait tsafe err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp) if err != nil { - log.Warn("delegator query failed to wait tsafe", zap.Error(err)) + log.Warn("delegator GetStatistics failed to wait tsafe", zap.Error(err)) return nil, err } - sealed, growing, version := sd.distribution.GetSegments(true, req.Req.GetPartitionIDs()...) - defer sd.distribution.FinishUsage(version) + sealed, growing, version, err := sd.distribution.PinReadableSegments(req.Req.GetPartitionIDs()...) + if err != nil { + log.Warn("delegator failed to GetStatistics, current distribution is not servicable") + return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not serviceable") + } + defer sd.distribution.Unpin(version) tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest { nodeReq := proto.Clone(req).(*querypb.GetStatisticsRequest) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index ebce727bce..4cbedf96fc 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -186,7 +186,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { offlineSegments := typeutil.NewConcurrentSet[int64]() - sealed, growing, version := sd.distribution.GetSegments(false) + sealed, growing, version := sd.distribution.PinOnlineSegments() eg, ctx := errgroup.WithContext(context.Background()) for _, entry := range sealed { @@ -225,7 +225,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // not error return in apply delete _ = eg.Wait() - sd.distribution.FinishUsage(version) + sd.distribution.Unpin(version) offlineSegIDs := offlineSegments.Collect() if len(offlineSegIDs) > 0 { log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs)) diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index ffe82e77c5..bedc51e44d 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -434,6 +434,24 @@ func (s *DelegatorSuite) TestSearch() { s.Error(err) }) + s.Run("distribution_not_serviceable", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sd, ok := s.delegator.(*shardDelegator) + s.Require().True(ok) + sd.distribution.AddOfflines(1001) + + _, err := s.delegator.Search(ctx, &querypb.SearchRequest{ + Req: &internalpb.SearchRequest{ + Base: commonpbutil.NewMsgBase(), + }, + DmlChannels: []string{s.vchannelName}, + }) + + s.Error(err) + }) + s.Run("cluster_not_serviceable", func() { s.delegator.Close() @@ -603,6 +621,22 @@ func (s *DelegatorSuite) TestQuery() { s.Error(err) }) + s.Run("distribution_not_serviceable", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sd, ok := s.delegator.(*shardDelegator) + s.Require().True(ok) + sd.distribution.AddOfflines(1001) + + _, err := s.delegator.Query(ctx, &querypb.QueryRequest{ + Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + + s.Error(err) + }) + s.Run("cluster_not_serviceable", func() { s.delegator.Close() @@ -865,6 +899,25 @@ func (s *DelegatorSuite) TestQueryStream() { s.Error(err) }) + s.Run("distribution_not_serviceable", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sd, ok := s.delegator.(*shardDelegator) + s.Require().True(ok) + sd.distribution.AddOfflines(1001) + + client := streamrpc.NewLocalQueryClient(ctx) + server := client.CreateServer() + + // run stream function + err := s.delegator.QueryStream(ctx, &querypb.QueryRequest{ + Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }, server) + s.Error(err) + }) + s.Run("cluster_not_serviceable", func() { s.delegator.Close() @@ -1023,6 +1076,22 @@ func (s *DelegatorSuite) TestGetStats() { s.Error(err) }) + s.Run("distribution_not_serviceable", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sd, ok := s.delegator.(*shardDelegator) + s.Require().True(ok) + sd.distribution.AddOfflines(1001) + + _, err := s.delegator.GetStatistics(ctx, &querypb.GetStatisticsRequest{ + Req: &internalpb.GetStatisticsRequest{Base: commonpbutil.NewMsgBase()}, + DmlChannels: []string{s.vchannelName}, + }) + + s.Error(err) + }) + s.Run("cluster_not_serviceable", func() { s.delegator.Close() diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index 795ee8b39b..5e37c047f0 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -101,8 +102,26 @@ func NewDistribution() *distribution { return dist } -// GetAllSegments returns segments in current snapshot, filter readable segment when readable is true -func (d *distribution) GetSegments(readable bool, partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64) { +func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64, err error) { + d.mut.RLock() + defer d.mut.RUnlock() + + if !d.Serviceable() { + return nil, nil, -1, merr.WrapErrServiceInternal("channel distribution is not serviceable") + } + + current := d.current.Load() + sealed, growing = current.Get(partitions...) + version = current.version + targetVersion := current.GetTargetVersion() + filterReadable := func(entry SegmentEntry, _ int) bool { + return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion + } + sealed, growing = d.filterSegments(sealed, growing, filterReadable) + return +} + +func (d *distribution) PinOnlineSegments(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64) { d.mut.RLock() defer d.mut.RUnlock() @@ -110,25 +129,20 @@ func (d *distribution) GetSegments(readable bool, partitions ...int64) (sealed [ sealed, growing = current.Get(partitions...) version = current.version - if readable { - TargetVersion := current.GetTargetVersion() - sealed, growing = d.filterReadableSegments(sealed, growing, TargetVersion) - return + filterOnline := func(entry SegmentEntry, _ int) bool { + return !d.offlines.Contain(entry.SegmentID) } + sealed, growing = d.filterSegments(sealed, growing, filterOnline) return } -func (d *distribution) filterReadableSegments(sealed []SnapshotItem, growing []SegmentEntry, targetVersion int64) ([]SnapshotItem, []SegmentEntry) { - filterReadable := func(entry SegmentEntry, _ int) bool { - return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion - } - - growing = lo.Filter(growing, filterReadable) +func (d *distribution) filterSegments(sealed []SnapshotItem, growing []SegmentEntry, filter func(SegmentEntry, int) bool) ([]SnapshotItem, []SegmentEntry) { + growing = lo.Filter(growing, filter) sealed = lo.Map(sealed, func(item SnapshotItem, _ int) SnapshotItem { return SnapshotItem{ NodeID: item.NodeID, - Segments: lo.Filter(item.Segments, filterReadable), + Segments: lo.Filter(item.Segments, filter), } }) @@ -142,16 +156,19 @@ func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed sealed, growing = current.Peek(partitions...) if readable { - TargetVersion := current.GetTargetVersion() - sealed, growing = d.filterReadableSegments(sealed, growing, TargetVersion) + targetVersion := current.GetTargetVersion() + filterReadable := func(entry SegmentEntry, _ int) bool { + return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion + } + sealed, growing = d.filterSegments(sealed, growing, filterReadable) return } return } -// FinishUsage notifies snapshot one reference is released. -func (d *distribution) FinishUsage(version int64) { +// Unpin notifies snapshot one reference is released. +func (d *distribution) Unpin(version int64) { snapshot, ok := d.snapshots.Get(version) if ok { snapshot.Done(d.getCleanup(snapshot.version)) diff --git a/internal/querynodev2/delegator/distribution_test.go b/internal/querynodev2/delegator/distribution_test.go index d45f831965..a8d632d450 100644 --- a/internal/querynodev2/delegator/distribution_test.go +++ b/internal/querynodev2/delegator/distribution_test.go @@ -177,11 +177,12 @@ func (s *DistributionSuite) TestAddDistribution() { s.SetupTest() defer s.TearDownTest() s.dist.AddGrowing(tc.growing...) - _, _, version := s.dist.GetSegments(false) + _, _, version, err := s.dist.PinReadableSegments() + s.Require().NoError(err) s.dist.AddDistributions(tc.input...) sealed, _ := s.dist.PeekSegments(false) s.compareSnapshotItems(tc.expected, sealed) - s.dist.FinishUsage(version) + s.dist.Unpin(version) }) } } @@ -246,8 +247,9 @@ func (s *DistributionSuite) TestAddGrowing() { defer s.TearDownTest() s.dist.AddGrowing(tc.input...) - _, growing, version := s.dist.GetSegments(false) - defer s.dist.FinishUsage(version) + _, growing, version, err := s.dist.PinReadableSegments() + s.Require().NoError(err) + defer s.dist.Unpin(version) s.ElementsMatch(tc.expected, growing) }) @@ -433,7 +435,9 @@ func (s *DistributionSuite) TestRemoveDistribution() { var version int64 if tc.withMockRead { - _, _, version = s.dist.GetSegments(false) + var err error + _, _, version, err = s.dist.PinReadableSegments() + s.Require().NoError(err) } ch := s.dist.RemoveDistributions(tc.removalSealed, tc.removalGrowing) @@ -446,7 +450,7 @@ func (s *DistributionSuite) TestRemoveDistribution() { default: } - s.dist.FinishUsage(version) + s.dist.Unpin(version) } // check ch close very soon timeout := time.NewTimer(time.Second) @@ -457,8 +461,8 @@ func (s *DistributionSuite) TestRemoveDistribution() { case <-ch: } - sealed, growing, version := s.dist.GetSegments(false) - defer s.dist.FinishUsage(version) + sealed, growing, version := s.dist.PinOnlineSegments() + defer s.dist.Unpin(version) s.compareSnapshotItems(tc.expectSealed, sealed) s.ElementsMatch(tc.expectGrowing, growing) }) @@ -468,13 +472,15 @@ func (s *DistributionSuite) TestRemoveDistribution() { func (s *DistributionSuite) TestPeek() { type testCase struct { tag string + readable bool input []SegmentEntry expected []SnapshotItem } cases := []testCase{ { - tag: "one node", + tag: "one_node", + readable: false, input: []SegmentEntry{ { NodeID: 1, @@ -504,7 +510,8 @@ func (s *DistributionSuite) TestPeek() { }, }, { - tag: "multiple nodes", + tag: "multiple_nodes", + readable: false, input: []SegmentEntry{ { NodeID: 1, @@ -548,6 +555,34 @@ func (s *DistributionSuite) TestPeek() { }, }, }, + { + tag: "peek_readable", + readable: true, + input: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + { + NodeID: 2, + SegmentID: 2, + }, + { + NodeID: 1, + SegmentID: 3, + }, + }, + expected: []SnapshotItem{ + { + NodeID: 1, + Segments: []SegmentEntry{}, + }, + { + NodeID: 2, + Segments: []SegmentEntry{}, + }, + }, + }, } for _, tc := range cases { @@ -558,7 +593,7 @@ func (s *DistributionSuite) TestPeek() { // peek during lock s.dist.AddDistributions(tc.input...) s.dist.mut.Lock() - sealed, _ := s.dist.PeekSegments(false) + sealed, _ := s.dist.PeekSegments(tc.readable) s.compareSnapshotItems(tc.expected, sealed) s.dist.mut.Unlock() }) @@ -668,11 +703,12 @@ func (s *DistributionSuite) Test_SyncTargetVersion() { s.dist.AddDistributions(sealed...) s.dist.SyncTargetVersion(2, []int64{2, 3}, []int64{6}, []int64{}) - s1, s2, _ := s.dist.GetSegments(true) + s1, s2, _, err := s.dist.PinReadableSegments() + s.Require().NoError(err) s.Len(s1[0].Segments, 1) s.Len(s2, 2) - s1, s2, _ = s.dist.GetSegments(false) + s1, s2, _ = s.dist.PinOnlineSegments() s.Len(s1[0].Segments, 3) s.Len(s2, 3) @@ -684,8 +720,8 @@ func (s *DistributionSuite) Test_SyncTargetVersion() { s.False(s.dist.Serviceable()) s.dist.SyncTargetVersion(2, []int64{}, []int64{333}, []int64{1, 2, 3}) - _, segments, _ := s.dist.GetSegments(true) - s.Len(segments, 0) + _, _, _, err = s.dist.PinReadableSegments() + s.Error(err) } func TestDistributionSuite(t *testing.T) {