diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 6116579d9c..06440d7fc3 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -317,3 +317,33 @@ func (c *ChannelManager) FindWatcher(channel string) (int64, error) { } return 0, errChannelNotWatched } + +// RemoveChannel removes the channel from channel manager +func (c *ChannelManager) RemoveChannel(channelName string) error { + c.mu.Lock() + defer c.mu.Unlock() + + nodeID, ch := c.findChannel(channelName) + if ch == nil { + return nil + } + + var op ChannelOpSet + op.Delete(nodeID, []*channel{ch}) + if err := c.store.Update(op); err != nil { + return err + } + return nil +} + +func (c *ChannelManager) findChannel(channelName string) (int64, *channel) { + infos := c.store.GetNodesChannels() + for _, info := range infos { + for _, channelInfo := range info.Channels { + if channelInfo.Name == channelName { + return info.NodeID, channelInfo + } + } + } + return 0, nil +} diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 316eba9dc0..e882d2e801 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -45,3 +45,50 @@ func TestReload(t *testing.T) { assert.True(t, cm2.Match(3, "channel2")) }) } + +func TestChannelManager_RemoveChannel(t *testing.T) { + type fields struct { + store RWChannelStore + } + type args struct { + channelName string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + "test remove existed channel", + fields{ + store: &ChannelStore{ + store: memkv.NewMemoryKV(), + channelsInfo: map[int64]*NodeChannelInfo{ + 1: { + NodeID: 1, + Channels: []*channel{ + {"ch1", 1}, + }, + }, + }, + }, + }, + args{ + "ch1", + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &ChannelManager{ + store: tt.fields.store, + } + err := c.RemoveChannel(tt.args.channelName) + assert.Equal(t, tt.wantErr, err != nil) + _, ch := c.findChannel(tt.args.channelName) + assert.Nil(t, ch) + }) + } +} diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 82da1fbce0..bdd291736a 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -294,6 +294,7 @@ func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isFo m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool { _, has := colls[segment.GetCollectionID()] return (has || len(collections) == 0) && // if filters collection + isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Flushed && // flushed only !segment.isCompacting // not compacting now }) // m is list of chanPartSegments, which is channel-partition organized segments diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index b45f12fc08..8bd93b61e5 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -73,9 +73,6 @@ func (m *meta) reloadFromKV() error { if err != nil { return fmt.Errorf("DataCoord reloadFromKV UnMarshal datapb.SegmentInfo err:%w", err) } - if segmentInfo.State == commonpb.SegmentState_NotExist { - continue - } m.segments.SetSegment(segmentInfo.GetID(), NewSegmentInfo(segmentInfo)) } @@ -145,7 +142,7 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 { var ret int64 = 0 segments := m.segments.GetSegments() for _, segment := range segments { - if segment.GetCollectionID() == collectionID { + if isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID { ret += segment.GetNumOfRows() } } @@ -163,6 +160,7 @@ func (m *meta) AddSegment(segment *SegmentInfo) error { return nil } +// Deprecated // DropSegment remove segment with provided id, etcd persistence also removed func (m *meta) DropSegment(segmentID UniqueID) error { m.Lock() @@ -183,7 +181,11 @@ func (m *meta) DropSegment(segmentID UniqueID) error { func (m *meta) GetSegment(segID UniqueID) *SegmentInfo { m.RLock() defer m.RUnlock() - return m.segments.GetSegment(segID) + segment := m.segments.GetSegment(segID) + if segment != nil && isSegmentHealthy(segment) { + return segment + } + return nil } // SetState setting segment with provided ID state @@ -191,7 +193,7 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error { m.Lock() defer m.Unlock() m.segments.SetState(segmentID, state) - if segInfo := m.segments.GetSegment(segmentID); segInfo != nil { + if segInfo := m.segments.GetSegment(segmentID); segInfo != nil && isSegmentHealthy(segInfo) { return m.saveSegmentInfo(segInfo) } return nil @@ -200,14 +202,20 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error { // UpdateFlushSegmentsInfo update segment partial/completed flush info // `flushed` parameter indicating whether segment is flushed completely or partially // `binlogs`, `checkpoints` and `statPositions` are persistence data for segment -func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool, - binlogs, statslogs []*datapb.FieldBinlog, deltalogs []*datapb.DeltaLogInfo, checkpoints []*datapb.CheckPoint, - startPositions []*datapb.SegmentStartPosition) error { +func (m *meta) UpdateFlushSegmentsInfo( + segmentID UniqueID, + flushed bool, + dropped bool, + binlogs, statslogs []*datapb.FieldBinlog, + deltalogs []*datapb.DeltaLogInfo, + checkpoints []*datapb.CheckPoint, + startPositions []*datapb.SegmentStartPosition, +) error { m.Lock() defer m.Unlock() segment := m.segments.GetSegment(segmentID) - if segment == nil { + if segment == nil || !isSegmentHealthy(segment) { return nil } @@ -221,6 +229,11 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool, modSegments[segmentID] = clonedSegment } + if dropped { + clonedSegment.State = commonpb.SegmentState_Dropped + modSegments[segmentID] = clonedSegment + } + currBinlogs := clonedSegment.GetBinlogs() var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog { @@ -261,7 +274,7 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool, if s, ok := modSegments[segmentID]; ok { return s } - if s := m.segments.GetSegment(segmentID); s != nil { + if s := m.segments.GetSegment(segmentID); s != nil && isSegmentHealthy(s) { return s.Clone() } return nil @@ -320,20 +333,6 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool, return nil } -// ListSegmentIDs list all segment ids stored in meta (no collection filter) -func (m *meta) ListSegmentIDs() []UniqueID { - m.RLock() - defer m.RUnlock() - - infos := make([]UniqueID, 0) - segments := m.segments.GetSegments() - for _, segment := range segments { - infos = append(infos, segment.GetID()) - } - - return infos -} - // ListSegmentFiles lists all segment related file paths in valid & dropped list func (m *meta) ListSegmentFiles() ([]string, []string) { m.RLock() @@ -378,7 +377,7 @@ func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo { infos := make([]*SegmentInfo, 0) segments := m.segments.GetSegments() for _, segment := range segments { - if segment.InsertChannel != dmlCh { + if !isSegmentHealthy(segment) || segment.InsertChannel != dmlCh { continue } infos = append(infos, segment) @@ -394,7 +393,7 @@ func (m *meta) GetSegmentsOfCollection(collectionID UniqueID) []*SegmentInfo { ret := make([]*SegmentInfo, 0) segments := m.segments.GetSegments() for _, segment := range segments { - if segment.GetCollectionID() == collectionID { + if isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID { ret = append(ret, segment) } } @@ -407,9 +406,9 @@ func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID { defer m.RUnlock() ret := make([]UniqueID, 0) segments := m.segments.GetSegments() - for _, info := range segments { - if info.CollectionID == collectionID { - ret = append(ret, info.ID) + for _, segment := range segments { + if isSegmentHealthy(segment) && segment.CollectionID == collectionID { + ret = append(ret, segment.ID) } } return ret @@ -421,9 +420,9 @@ func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []Un defer m.RUnlock() ret := make([]UniqueID, 0) segments := m.segments.GetSegments() - for _, info := range segments { - if info.CollectionID == collectionID && info.PartitionID == partitionID { - ret = append(ret, info.ID) + for _, segment := range segments { + if isSegmentHealthy(segment) && segment.CollectionID == collectionID && segment.PartitionID == partitionID { + ret = append(ret, segment.ID) } } return ret @@ -435,9 +434,9 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID defer m.RUnlock() var ret int64 = 0 segments := m.segments.GetSegments() - for _, info := range segments { - if info.CollectionID == collectionID && info.PartitionID == partitionID { - ret += info.NumOfRows + for _, segment := range segments { + if isSegmentHealthy(segment) && segment.CollectionID == collectionID && segment.PartitionID == partitionID { + ret += segment.NumOfRows } } return ret @@ -449,9 +448,9 @@ func (m *meta) GetUnFlushedSegments() []*SegmentInfo { defer m.RUnlock() ret := make([]*SegmentInfo, 0) segments := m.segments.GetSegments() - for _, info := range segments { - if info.State != commonpb.SegmentState_Flushing && info.State != commonpb.SegmentState_Flushed { - ret = append(ret, info) + for _, segment := range segments { + if segment.State == commonpb.SegmentState_Growing || segment.State == commonpb.SegmentState_Sealed { + ret = append(ret, segment) } } return ret @@ -536,7 +535,7 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen for _, cl := range compactionLogs { if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil { cloned := segment.Clone() - cloned.State = commonpb.SegmentState_NotExist + cloned.State = commonpb.SegmentState_Dropped segments = append(segments, cloned) } } @@ -549,12 +548,12 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen } // find new added delta logs when executing compaction - originDeltalogs := make([]*datapb.DeltaLogInfo, 0) + var originDeltalogs []*datapb.DeltaLogInfo for _, s := range segments { originDeltalogs = append(originDeltalogs, s.GetDeltalogs()...) } - deletedDeltalogs := make([]*datapb.DeltaLogInfo, 0) + var deletedDeltalogs []*datapb.DeltaLogInfo for _, l := range compactionLogs { deletedDeltalogs = append(deletedDeltalogs, l.GetDeltalogs()...) } @@ -775,3 +774,8 @@ func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueI } return NewSegmentInfo(info) } + +func isSegmentHealthy(segment *SegmentInfo) bool { + return segment.GetState() != commonpb.SegmentState_NotExist && + segment.GetState() != commonpb.SegmentState_Dropped +} diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 6ebdf95f01..8eff7302d2 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -241,7 +241,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { err = meta.AddSegment(segment1) assert.Nil(t, err) - err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog1"}}}, + err = meta.UpdateFlushSegmentsInfo(1, true, false, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog1"}}}, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog1"}}}, []*datapb.DeltaLogInfo{{RecordEntries: 1, TimestampFrom: 100, TimestampTo: 200, DeltaLogSize: 1000}}, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}) @@ -262,7 +262,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { meta, err := newMeta(memkv.NewMemoryKV()) assert.Nil(t, err) - err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil, nil, nil) + err = meta.UpdateFlushSegmentsInfo(1, false, false, nil, nil, nil, nil, nil) assert.Nil(t, err) }) @@ -274,7 +274,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { err = meta.AddSegment(segment1) assert.Nil(t, err) - err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}}, + err = meta.UpdateFlushSegmentsInfo(1, false, false, nil, nil, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 2, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}) assert.Nil(t, err) @@ -296,7 +296,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { } meta.segments.SetSegment(1, segmentInfo) - err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog"}}}, + err = meta.UpdateFlushSegmentsInfo(1, true, false, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog"}}}, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog"}}}, []*datapb.DeltaLogInfo{{RecordEntries: 1, TimestampFrom: 100, TimestampTo: 200, DeltaLogSize: 1000}}, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 1b97c3b1b9..4199091b88 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -83,6 +83,8 @@ type Manager interface { GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error) // ExpireAllocations notifies segment status to expire old allocations ExpireAllocations(channel string, ts Timestamp) error + // DropSegmentsOfChannel drops all segments in a channel + DropSegmentsOfChannel(ctx context.Context, channel string) } // Allocation records the allocation info @@ -482,3 +484,23 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { } return nil } + +// DropSegmentsOfChannel drops all segments in a channel +func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) { + s.mu.Lock() + defer s.mu.Unlock() + + validSegments := make([]int64, 0, len(s.segments)) + for _, sid := range s.segments { + segment := s.meta.GetSegment(sid) + if segment != nil && segment.GetInsertChannel() != channel { + validSegments = append(validSegments, sid) + } + s.meta.SetAllocations(sid, nil) + for _, allocation := range segment.allocations { + putAllocation(allocation) + } + } + + s.segments = validSegments +} diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index d8264f42b8..844c8d1b49 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/schemapb" - "github.com/stretchr/testify/assert" ) @@ -496,3 +495,58 @@ func TestAllocationPool(t *testing.T) { }) } + +func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { + type fields struct { + meta *meta + segments []UniqueID + } + type args struct { + channel string + } + tests := []struct { + name string + fields fields + args args + want []UniqueID + }{ + { + "test drop segments", + fields{ + meta: &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + InsertChannel: "ch1", + }, + }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + InsertChannel: "ch2", + }, + }, + }, + }, + }, + segments: []UniqueID{1, 2}, + }, + args{ + "ch1", + }, + []UniqueID{2}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SegmentManager{ + meta: tt.fields.meta, + segments: tt.fields.segments, + } + s.DropSegmentsOfChannel(context.TODO(), tt.args.channel) + assert.ElementsMatch(t, tt.want, s.segments) + }) + } +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 19c7757397..c60be88385 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -167,6 +167,13 @@ func SetDataNodeCreator(creator dataNodeCreatorFunc) Option { } } +// SetSegmentManager returns an Option to set SegmentManager +func SetSegmentManager(manager Manager) Option { + return func(svr *Server) { + svr.segmentManager = manager + } +} + // CreateServer create `Server` instance func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) { rand.Seed(time.Now().UnixNano()) @@ -359,7 +366,9 @@ func (s *Server) initServiceDiscovery() error { } func (s *Server) startSegmentManager() { - s.segmentManager = newSegmentManager(s.meta, s.allocator) + if s.segmentManager == nil { + s.segmentManager = newSegmentManager(s.meta, s.allocator) + } } func (s *Server) initMeta() error { @@ -499,7 +508,8 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { } staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool { - return info.GetInsertChannel() == ch && + return isSegmentHealthy(info) && + info.GetInsertChannel() == ch && !info.lastFlushTime.IsZero() && time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration }) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index eba973dacc..c5acd04bb2 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -734,53 +734,75 @@ func TestChannel(t *testing.T) { }) } +type spySegmentManager struct { + spyCh chan struct{} +} + +// AllocSegment allocates rows and record the allocation. +func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) { + panic("not implemented") // TODO: Implement +} + +// DropSegment drops the segment from manager. +func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) { + panic("not implemented") // TODO: Implement +} + +// SealAllSegments seals all segments of collection with collectionID and return sealed segments +func (s *spySegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) { + panic("not implemented") // TODO: Implement +} + +// GetFlushableSegments returns flushable segment ids +func (s *spySegmentManager) GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error) { + panic("not implemented") // TODO: Implement +} + +// ExpireAllocations notifies segment status to expire old allocations +func (s *spySegmentManager) ExpireAllocations(channel string, ts Timestamp) error { + panic("not implemented") // TODO: Implement +} + +// DropSegmentsOfChannel drops all segments in a channel +func (s *spySegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) { + s.spyCh <- struct{}{} +} + func TestSaveBinlogPaths(t *testing.T) { t.Run("Normal SaveRequest", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - collections := []struct { - ID UniqueID - Partitions []int64 - }{ - {0, []int64{0, 1}}, - {1, []int64{0, 1}}, - } - - for _, collection := range collections { - svr.meta.AddCollection(&datapb.CollectionInfo{ - ID: collection.ID, - Schema: nil, - Partitions: collection.Partitions, - }) - } + svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0}) segments := []struct { id UniqueID collectionID UniqueID - partitionID UniqueID }{ - {0, 0, 0}, - {1, 0, 0}, - {2, 0, 1}, - {3, 1, 1}, + {0, 0}, + {1, 0}, } for _, segment := range segments { s := &datapb.SegmentInfo{ - ID: segment.id, - CollectionID: segment.collectionID, - PartitionID: segment.partitionID, + ID: segment.id, + CollectionID: segment.collectionID, + InsertChannel: "ch1", } err := svr.meta.AddSegment(NewSegmentInfo(s)) assert.Nil(t, err) } + err := svr.channelManager.AddNode(0) + assert.Nil(t, err) + err = svr.channelManager.Watch(&channel{"ch1", 0}) + assert.Nil(t, err) + ctx := context.Background() resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ Base: &commonpb.MsgBase{ Timestamp: uint64(time.Now().Unix()), }, - SegmentID: 2, + SegmentID: 1, CollectionID: 0, Field2BinlogPaths: []*datapb.FieldBinlog{ { @@ -808,7 +830,7 @@ func TestSaveBinlogPaths(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success) - segment := svr.meta.GetSegment(2) + segment := svr.meta.GetSegment(1) assert.NotNil(t, segment) binlogs := segment.GetBinlogs() assert.EqualValues(t, 1, len(binlogs)) @@ -834,6 +856,34 @@ func TestSaveBinlogPaths(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) assert.Equal(t, serverNotServingErrMsg, resp.GetReason()) }) + + t.Run("test save dropped segment and remove channel", func(t *testing.T) { + spyCh := make(chan struct{}, 1) + svr := newTestServer(t, nil, SetSegmentManager(&spySegmentManager{spyCh: spyCh})) + defer closeTestServer(t, svr) + + svr.meta.AddCollection(&datapb.CollectionInfo{ID: 1}) + err := svr.meta.AddSegment(&SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + InsertChannel: "ch1", + }, + }) + assert.Nil(t, err) + + err = svr.channelManager.AddNode(0) + assert.Nil(t, err) + err = svr.channelManager.Watch(&channel{"ch1", 1}) + assert.Nil(t, err) + + _, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{ + SegmentID: 1, + Dropped: true, + }) + assert.Nil(t, err) + <-spyCh + }) } func TestDataNodeTtChannel(t *testing.T) { @@ -1276,6 +1326,12 @@ func TestGetRecoveryInfo(t *testing.T) { segment := createSegment(0, 0, 0, 100, 10, "ch1", commonpb.SegmentState_Flushed) err := svr.meta.AddSegment(NewSegmentInfo(segment)) assert.Nil(t, err) + + err = svr.channelManager.AddNode(0) + assert.Nil(t, err) + err = svr.channelManager.Watch(&channel{"ch1", 0}) + assert.Nil(t, err) + sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, sResp.ErrorCode) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 90cab67e96..e65f165a9b 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -289,9 +289,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR // SaveBinlogPaths update segment related binlog path // works for Checkpoints and Flush func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { - resp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - } + resp := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} if s.isClosed() { resp.Reason = serverNotServingErrMsg @@ -319,12 +317,19 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath if !s.channelManager.Match(nodeID, channel) { FailResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID)) log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID)) + return resp, nil } // set segment to SegmentState_Flushing and save binlogs and checkpoints - err := s.meta.UpdateFlushSegmentsInfo(req.GetSegmentID(), req.GetFlushed(), - req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), - req.GetCheckPoints(), req.GetStartPositions()) + err := s.meta.UpdateFlushSegmentsInfo( + req.GetSegmentID(), + req.GetFlushed(), + req.GetDropped(), + req.GetField2BinlogPaths(), + req.GetField2StatslogPaths(), + req.GetDeltalogs(), + req.GetCheckPoints(), + req.GetStartPositions()) if err != nil { log.Error("save binlog and checkpoints failed", zap.Int64("segmentID", req.GetSegmentID()), @@ -336,7 +341,15 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID), zap.Any("meta", req.GetField2BinlogPaths())) - if req.Flushed { + if req.GetDropped() && s.checkShouldDropChannel(channel) { + err = s.channelManager.RemoveChannel(channel) + if err != nil { + log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err)) + } + s.segmentManager.DropSegmentsOfChannel(ctx, channel) + } + + if req.GetFlushed() { s.segmentManager.DropSegment(ctx, req.SegmentID) s.flushCh <- req.SegmentID @@ -357,6 +370,21 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return resp, nil } +func (s *Server) checkShouldDropChannel(channel string) bool { + segments := s.meta.GetSegmentsByChannel(channel) + for _, segment := range segments { + if segment.GetStartPosition() != nil && // fitler empty segment + // FIXME: we filter compaction generated segments + // because datanode may not know the segment due to the network lag or + // datacoord crash when handling CompleteCompaction. + len(segment.CompactionFrom) != 0 && + segment.GetState() != commonpb.SegmentState_Dropped { + return false + } + } + return true +} + // GetComponentStates returns DataCoord's current state func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { resp := &internalpb.ComponentStates{