From 2e434e44536d7b6ebe15b6b4d85b82b2d284f4a9 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 3 Jul 2024 17:52:09 +0800 Subject: [PATCH] fix: Only load or release Flushed segment in datanode meta (#34390) issue: #34376 , #34379 , #34375 --------- Signed-off-by: Cai Zhang --- internal/datanode/services.go | 45 ++++++++++++--------- internal/datanode/services_test.go | 64 ++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 18 deletions(-) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index f2046ff27c..ee235f2de2 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -311,26 +311,35 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments futures := make([]*conc.Future[any], 0, len(missingSegments)) for _, segID := range missingSegments { - segID := segID newSeg := req.GetSegmentInfos()[segID] - newSegments = append(newSegments, newSeg) - future := io.GetOrCreateStatsPool().Submit(func() (any, error) { - var val *metacache.BloomFilterSet - var err error - err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) - if err != nil { - log.Warn("failed to DecompressBinLog", zap.Error(err)) - return val, err + switch newSeg.GetLevel() { + case datapb.SegmentLevel_L0: + log.Warn("segment level is L0, may be the channel has not been successfully watched yet", zap.Int64("segmentID", segID)) + case datapb.SegmentLevel_Legacy: + log.Warn("segment level is legacy, please check", zap.Int64("segmentID", segID)) + default: + if newSeg.GetState() == commonpb.SegmentState_Flushed { + log.Info("segment loading PKs", zap.Int64("segmentID", segID)) + newSegments = append(newSegments, newSeg) + future := io.GetOrCreateStatsPool().Submit(func() (any, error) { + var val *metacache.BloomFilterSet + var err error + err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) + if err != nil { + log.Warn("failed to DecompressBinLog", zap.Error(err)) + return val, err + } + pks, err := compaction.LoadStats(ctx, node.chunkManager, ds.GetMetaCache().Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) + if err != nil { + log.Warn("failed to load segment stats log", zap.Error(err)) + return val, err + } + val = metacache.NewBloomFilterSet(pks...) + return val, nil + }) + futures = append(futures, future) } - pks, err := compaction.LoadStats(ctx, node.chunkManager, ds.GetMetaCache().Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) - if err != nil { - log.Warn("failed to load segment stats log", zap.Error(err)) - return val, err - } - val = metacache.NewBloomFilterSet(pks...) - return val, nil - }) - futures = append(futures, future) + } } err := conc.AwaitAll(futures...) diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 9bb70dff49..05ee57b474 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -1027,6 +1027,70 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s.True(exist) s.NotNil(info) }) + + s.Run("dc growing/flushing dn dropped", func() { + s.SetupTest() + cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + }, + }, + Vchan: &datapb.VchannelInfo{}, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) + mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). + Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + s.node.flowgraphManager = mockFlowgraphManager + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 100: { + SegmentId: 100, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Growing, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + 101: { + SegmentId: 101, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushing, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + + info, exist := cache.GetSegmentByID(100) + s.False(exist) + s.Nil(info) + + info, exist = cache.GetSegmentByID(101) + s.False(exist) + s.Nil(info) + }) } func (s *DataNodeServicesSuite) TestDropCompactionPlan() {