diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index f86ea46f3f..86c39e826d 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -255,24 +255,9 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact } log.Info("handleCompactionResult: altering metastore after compaction") - if newSegment.GetNumOfRows() > 0 { - if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil { - log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err)) - return fmt.Errorf("fail to alter metastore after compaction, err=%w", err) - } - } else { - log.Warn("compaction produced an empty segment", zap.Int64("segmentID", newSegment.GetID())) - fakedSegment := &datapb.SegmentInfo{ - ID: newSegment.GetID(), - CollectionID: newSegment.GetCollectionID(), - PartitionID: newSegment.GetPartitionID(), - CompactionFrom: newSegment.GetCompactionFrom(), - CreatedByCompaction: true, - IsFake: true, - } - if err := c.meta.AddFakedSegment(fakedSegment); err != nil { - return fmt.Errorf("fail to save fake segment after compaction, err=%w", err) - } + if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil { + log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err)) + return fmt.Errorf("fail to alter metastore after compaction, err=%w", err) } var nodeID = c.plans[plan.GetPlanID()].dataNodeID diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index b8ad0bfc3d..a30c56416a 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -471,6 +471,114 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { assert.Equal(t, compactionResult.GetSegmentID(), segID) assert.NoError(t, err) }) + + t.Run("test empty result merge compaction task", func(t *testing.T) { + mockDataNode := &mocks.DataNode{} + mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil) + + dataNodeID := UniqueID(111) + + seg1 := &datapb.SegmentInfo{ + ID: 1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")}, + } + + seg2 := &datapb.SegmentInfo{ + ID: 2, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")}, + } + + plan := &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: seg1.ID, + FieldBinlogs: seg1.GetBinlogs(), + Field2StatslogPaths: seg1.GetStatslogs(), + Deltalogs: seg1.GetDeltalogs(), + }, + { + SegmentID: seg2.ID, + FieldBinlogs: seg2.GetBinlogs(), + Field2StatslogPaths: seg2.GetStatslogs(), + Deltalogs: seg2.GetDeltalogs(), + }, + }, + Type: datapb.CompactionType_MergeCompaction, + } + + sessions := &SessionManager{ + sessions: struct { + sync.RWMutex + data map[int64]*Session + }{ + data: map[int64]*Session{ + dataNodeID: {client: mockDataNode}}, + }, + } + + task := &compactionTask{ + triggerInfo: &compactionSignal{id: 1}, + state: executing, + plan: plan, + dataNodeID: dataNodeID, + } + + plans := map[int64]*compactionTask{1: task} + + meta := &meta{ + catalog: &datacoord.Catalog{Txn: memkv.NewMemoryKV()}, + segments: &SegmentsInfo{ + map[int64]*SegmentInfo{ + seg1.ID: {SegmentInfo: seg1}, + seg2.ID: {SegmentInfo: seg2}, + }, + }, + } + + meta.AddSegment(NewSegmentInfo(seg1)) + meta.AddSegment(NewSegmentInfo(seg2)) + + segments := meta.GetAllSegmentsUnsafe() + assert.Equal(t, len(segments), 2) + compactionResult := datapb.CompactionResult{ + PlanID: 1, + SegmentID: 3, + NumOfRows: 0, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")}, + } + + flushCh := make(chan UniqueID, 1) + c := &compactionPlanHandler{ + plans: plans, + sessions: sessions, + meta: meta, + flushCh: flushCh, + segRefer: &SegmentReferenceManager{ + segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}, + }, + } + + err := c.completeCompaction(&compactionResult) + + segID, ok := <-flushCh + assert.True(t, ok) + assert.Equal(t, compactionResult.GetSegmentID(), segID) + assert.NoError(t, err) + + segments = meta.GetAllSegmentsUnsafe() + assert.Equal(t, len(segments), 2) + + for _, segment := range segments { + assert.True(t, segment.State == commonpb.SegmentState_Dropped) + } + }) } func Test_compactionPlanHandler_getCompaction(t *testing.T) { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 0763dbf153..b51d59c564 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -228,20 +228,6 @@ func (m *meta) AddSegment(segment *SegmentInfo) error { return nil } -// AddFakedSegment persist a faked segment into meta -func (m *meta) AddFakedSegment(fakedSegment *datapb.SegmentInfo) error { - m.Lock() - defer m.Unlock() - if err := m.catalog.AddFakedSegment(m.ctx, fakedSegment); err != nil { - log.Error("adding faked segment failed", - zap.Any("segment", fakedSegment), - zap.Error(err)) - return err - } - log.Debug("add faked segment finished", zap.Any("segment", fakedSegment)) - return nil -} - // DropSegment remove segment with provided id, etcd persistence also removed func (m *meta) DropSegment(segmentID UniqueID) error { log.Info("meta update: dropping segment", diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index aca6390644..f59ef9690e 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -359,8 +359,6 @@ func TestMeta_Basic(t *testing.T) { assert.NotNil(t, info1_1) assert.Equal(t, false, info1_1.GetIsImporting()) - err = meta.AddFakedSegment(segInfo1_1.SegmentInfo) - assert.NoError(t, err) }) t.Run("Test segment with kv fails", func(t *testing.T) { @@ -388,8 +386,6 @@ func TestMeta_Basic(t *testing.T) { meta, err = newMeta(context.TODO(), fkv, "") assert.Nil(t, err) - err = meta.AddFakedSegment(&datapb.SegmentInfo{IsFake: true}) - assert.NotNil(t, err) }) t.Run("Test GetCount", func(t *testing.T) { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 843e4e5cd4..b77cf780a0 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -805,8 +805,7 @@ func (s *Server) startFlushLoop(ctx context.Context) { func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { segment := s.meta.GetSegment(segmentID) if segment == nil { - log.Warn("failed to get flused segment", zap.Int64("id", segmentID)) - return errors.New("segment not found") + return errors.New("segment not found, might be a faked segemnt, ignore post flush") } // set segment to SegmentState_Flushed if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index b2bce7e0b1..9b062c2f65 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2693,7 +2693,7 @@ func TestPostFlush(t *testing.T) { defer closeTestServer(t, svr) err := svr.postFlush(context.Background(), 1) - assert.EqualValues(t, errors.New("segment not found"), err) + assert.EqualValues(t, errors.New("segment not found, might be a faked segemnt, ignore post flush"), err) }) t.Run("success post flush", func(t *testing.T) { diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index f0fc7c2b37..4de579d22c 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -76,8 +76,6 @@ type DataCoordCatalog interface { AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error // AlterSegmentsAndAddNewSegment for transaction AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error - // TODO Remove this later, only a hack - AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 23c5c0e875..2e44460dc5 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -91,23 +91,6 @@ func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, err return segments, nil } -func (kc *Catalog) AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error { - if !segment.IsFake { - return nil - } - // The fake segment will not be saved into meta, it only needs process handoff case - kvs := make(map[string]string) - flushSegKey := buildFlushedSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) - clonedSegment := proto.Clone(segment).(*datapb.SegmentInfo) - segBytes, err := marshalSegmentInfo(clonedSegment) - if err != nil { - return err - } - - kvs[flushSegKey] = segBytes - return kc.Txn.MultiSave(kvs) -} - func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error { kvs, err := buildSegmentAndBinlogsKvs(segment) if err != nil { @@ -209,14 +192,23 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [ } if newSegment != nil { - segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment) - if err != nil { - return err + if newSegment.GetNumOfRows() > 0 { + segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment) + if err != nil { + return err + } + maps.Copy(kvs, segmentKvs) + } else { + // should be a faked segment, we create flush path directly here + flushSegKey := buildFlushedSegmentPath(newSegment.GetCollectionID(), newSegment.GetPartitionID(), newSegment.GetID()) + clonedSegment := proto.Clone(newSegment).(*datapb.SegmentInfo) + segBytes, err := marshalSegmentInfo(clonedSegment) + if err != nil { + return err + } + kvs[flushSegKey] = segBytes } - - maps.Copy(kvs, segmentKvs) } - return kc.Txn.MultiSave(kvs) }