From ed94ecf847fbd8a32ec7970146640f9a9bb4f026 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 20 Jun 2022 21:56:12 +0800 Subject: [PATCH] Segments can't be compacted when they have reference lock (#17649) Signed-off-by: Cai.Zhang --- internal/datacoord/compaction.go | 14 ++- internal/datacoord/compaction_test.go | 88 ++++++++++++++++++- internal/datacoord/compaction_trigger.go | 10 ++- internal/datacoord/compaction_trigger_test.go | 16 +++- internal/datacoord/meta.go | 7 +- internal/datacoord/meta_test.go | 5 +- internal/datacoord/server.go | 13 +-- 7 files changed, 134 insertions(+), 19 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 576bee751e..7d43beae44 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -101,10 +101,11 @@ type compactionPlanHandler struct { quit chan struct{} wg sync.WaitGroup flushCh chan UniqueID + segRefer *SegmentReferenceManager } func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta, - allocator allocator, flush chan UniqueID) *compactionPlanHandler { + allocator allocator, flush chan UniqueID, segRefer *SegmentReferenceManager) *compactionPlanHandler { return &compactionPlanHandler{ plans: make(map[int64]*compactionTask), chManager: cm, @@ -112,6 +113,7 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta sessions: sessions, allocator: allocator, flushCh: flush, + segRefer: segRefer, } } @@ -201,7 +203,9 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu return err } case datapb.CompactionType_MergeCompaction, datapb.CompactionType_MixCompaction: - if err := c.handleMergeCompactionResult(plan, result); err != nil { + if err := c.handleMergeCompactionResult(plan, result, func(segment *datapb.CompactionSegmentBinlogs) bool { + return !c.segRefer.HasSegmentLock(segment.SegmentID) + }); err != nil { return err } default: @@ -219,11 +223,13 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu } func (c *compactionPlanHandler) handleInnerCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error { + //TODO @xiaocai2333: Can reference locks be ignored? return c.meta.CompleteInnerCompaction(plan.GetSegmentBinlogs()[0], result) } -func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error { - return c.meta.CompleteMergeCompaction(plan.GetSegmentBinlogs(), result) +func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult, + canCompaction func(segment *datapb.CompactionSegmentBinlogs) bool) error { + return c.meta.CompleteMergeCompaction(plan.GetSegmentBinlogs(), result, canCompaction) } // getCompaction return compaction task. If planId does not exist, return nil. diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 945c983e6e..3796e2b80e 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -242,6 +242,89 @@ func Test_compactionPlanHandler_completeCompaction(t *testing.T) { sessions: tt.fields.sessions, meta: tt.fields.meta, flushCh: tt.fields.flushCh, + segRefer: &SegmentReferenceManager{ + segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}, + }, + } + err := c.completeCompaction(tt.args.result) + assert.Equal(t, tt.wantErr, err != nil) + }) + } +} + +func Test_compactionPlanHandler_segment_is_referenced(t *testing.T) { + type fields struct { + plans map[int64]*compactionTask + sessions *SessionManager + meta *meta + flushCh chan UniqueID + } + type args struct { + result *datapb.CompactionResult + } + tests := []struct { + name string + fields fields + args args + wantErr bool + want *compactionTask + }{ + { + "test compaction segment is referenced", + fields{ + map[int64]*compactionTask{ + 1: { + triggerInfo: &compactionSignal{id: 1}, + state: executing, + plan: &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + + {SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}}, + {SegmentID: 2, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log2")}}, + }, + Type: datapb.CompactionType_MergeCompaction, + }, + }, + }, + nil, + &meta{ + client: memkv.NewMemoryKV(), + segments: &SegmentsInfo{ + map[int64]*SegmentInfo{ + + 1: {SegmentInfo: &datapb.SegmentInfo{ID: 1, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}}}, + 2: {SegmentInfo: &datapb.SegmentInfo{ID: 2, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log2")}}}, + }, + }, + }, + make(chan UniqueID, 1), + }, + args{ + result: &datapb.CompactionResult{ + PlanID: 1, + SegmentID: 3, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")}, + }, + }, + true, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &compactionPlanHandler{ + plans: tt.fields.plans, + sessions: tt.fields.sessions, + meta: tt.fields.meta, + flushCh: tt.fields.flushCh, + segRefer: &SegmentReferenceManager{ + segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}, + segmentReferCnt: map[UniqueID]int{ + 1: 1, + }, + }, } err := c.completeCompaction(tt.args.result) assert.Equal(t, tt.wantErr, err != nil) @@ -382,6 +465,7 @@ func Test_newCompactionPlanHandler(t *testing.T) { meta *meta allocator allocator flush chan UniqueID + segRefer *SegmentReferenceManager } tests := []struct { name string @@ -396,6 +480,7 @@ func Test_newCompactionPlanHandler(t *testing.T) { &meta{}, newMockAllocator(), nil, + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, }, &compactionPlanHandler{ plans: map[int64]*compactionTask{}, @@ -404,12 +489,13 @@ func Test_newCompactionPlanHandler(t *testing.T) { meta: &meta{}, allocator: newMockAllocator(), flushCh: nil, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := newCompactionPlanHandler(tt.args.sessions, tt.args.cm, tt.args.meta, tt.args.allocator, tt.args.flush) + got := newCompactionPlanHandler(tt.args.sessions, tt.args.cm, tt.args.meta, tt.args.allocator, tt.args.flush, tt.args.segRefer) assert.EqualValues(t, tt.want, got) }) } diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 7e680eba76..51755cf318 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -67,14 +67,17 @@ type compactionTrigger struct { forceMu sync.Mutex quit chan struct{} wg sync.WaitGroup + segRefer *SegmentReferenceManager } -func newCompactionTrigger(meta *meta, compactionHandler compactionPlanContext, allocator allocator) *compactionTrigger { +func newCompactionTrigger(meta *meta, compactionHandler compactionPlanContext, allocator allocator, + segRefer *SegmentReferenceManager) *compactionTrigger { return &compactionTrigger{ meta: meta, allocator: allocator, signals: make(chan *compactionSignal, 100), compactionHandler: compactionHandler, + segRefer: segRefer, } } @@ -225,7 +228,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) && isSegmentHealthy(segment) && isFlush(segment) && - !segment.isCompacting // not compacting now + !segment.isCompacting && // not compacting now + !t.segRefer.HasSegmentLock(segment.ID) // not reference }) // m is list of chanPartSegments, which is channel-partition organized segments for _, group := range m { if !signal.isForce && t.compactionHandler.isFull() { @@ -434,7 +438,7 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni var res []*SegmentInfo for _, s := range segments { if !isFlush(s) || s.GetInsertChannel() != channel || - s.GetPartitionID() != partitionID || s.isCompacting { + s.GetPartitionID() != partitionID || s.isCompacting || t.segRefer.HasSegmentLock(s.ID) { continue } res = append(res, s) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 76b912887b..0bee6dbdf6 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -76,6 +76,7 @@ func Test_compactionTrigger_force(t *testing.T) { signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker + segRefer *SegmentReferenceManager } type args struct { collectionID int64 @@ -154,6 +155,7 @@ func Test_compactionTrigger_force(t *testing.T) { nil, &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1)}, nil, + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, }, args{ 2, @@ -218,6 +220,7 @@ func Test_compactionTrigger_force(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, } _, err := tr.forceTriggerCompaction(tt.args.collectionID, tt.args.compactTime) assert.Equal(t, tt.wantErr, err != nil) @@ -358,6 +361,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, } _, err := tr.forceTriggerCompaction(tt.args.collectionID, tt.args.compactTime) assert.Equal(t, tt.wantErr, err != nil) @@ -535,6 +539,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, } tr.start() defer tr.stop() @@ -708,6 +713,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, } tr.start() defer tr.stop() @@ -811,6 +817,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, } tr.start() defer tr.stop() @@ -853,7 +860,8 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { Params.Init() - trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator()) + trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}) // Test too many files. var binlogs []*datapb.FieldBinlog @@ -985,7 +993,8 @@ func Test_newCompactionTrigger(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator) + got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}) assert.Equal(t, tt.args.meta, got.meta) assert.Equal(t, tt.args.compactionHandler, got.compactionHandler) assert.Equal(t, tt.args.allocator, got.allocator) @@ -995,7 +1004,8 @@ func Test_newCompactionTrigger(t *testing.T) { func Test_handleSignal(t *testing.T) { - got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator()) + got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}) signal := &compactionSignal{ segmentID: 1, } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 41d10b2378..f2b508cd7f 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -779,12 +779,17 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { m.segments.SetIsCompacting(segmentID, compacting) } -func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) error { +func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult, + canCompaction func(segment *datapb.CompactionSegmentBinlogs) bool) error { m.Lock() defer m.Unlock() segments := make([]*SegmentInfo, 0, len(compactionLogs)) for _, cl := range compactionLogs { + if !canCompaction(cl) { + log.Warn("can not be compacted, segment has reference lock", zap.Int64("segmentID", cl.SegmentID)) + return fmt.Errorf("can not be compacted, segment with ID %d has reference lock", cl.SegmentID) + } if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil { cloned := segment.Clone() cloned.State = commonpb.SegmentState_Dropped diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index b1445e7814..3944b35ae1 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -447,7 +447,10 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) { collections: tt.fields.collections, segments: tt.fields.segments, } - err := m.CompleteMergeCompaction(tt.args.compactionLogs, tt.args.result) + canCompaction := func(segment *datapb.CompactionSegmentBinlogs) bool { + return true + } + err := m.CompleteMergeCompaction(tt.args.compactionLogs, tt.args.result, canCompaction) assert.Equal(t, tt.wantErr, err != nil) if err == nil { for _, l := range tt.args.compactionLogs { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index a7b1c5c100..e72abe5b8e 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -282,15 +282,16 @@ func (s *Server) Start() error { } s.allocator = newRootCoordAllocator(s.rootCoordClient) + + if err = s.initServiceDiscovery(); err != nil { + return err + } + if Params.DataCoordCfg.EnableCompaction { s.createCompactionHandler() s.createCompactionTrigger() } - s.startSegmentManager() - if err = s.initServiceDiscovery(); err != nil { - return err - } if err = s.initGarbageCollection(); err != nil { return err @@ -333,7 +334,7 @@ func (s *Server) SetEtcdClient(client *clientv3.Client) { } func (s *Server) createCompactionHandler() { - s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh) + s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh, s.segReferManager) s.compactionHandler.start() } @@ -342,7 +343,7 @@ func (s *Server) stopCompactionHandler() { } func (s *Server) createCompactionTrigger() { - s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator) + s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager) s.compactionTrigger.start() }