diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index ffb5e72797..e8b89000cf 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -503,48 +503,27 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo { // since the flag is not marked so DataNode can re-consume the drop collection msg // 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*SegmentInfo) error { - - // the limitation of etcd operations number per transaction is 128, since segment number might be enormous so we shall split - // all save operations into batches - - // since the removal flag shall always be with the last batch, so the last batch shall be maxOperationNumber - 1 - for len(modSegments) > maxOperationsPerTxn-1 { - err := m.saveDropSegmentAndRemove(channel, modSegments, false) - if err != nil { - return err - } - } - - // removal flag should be saved with last batch - return m.saveDropSegmentAndRemove(channel, modSegments, true) -} - -func (m *meta) saveDropSegmentAndRemove(channel string, segments map[int64]*SegmentInfo, withFlag bool) error { - segmentMap := make(map[int64]*datapb.SegmentInfo) - for id, seg := range segments { - segmentMap[id] = seg.SegmentInfo - } - // TODO: RootCoord supports read-write prohibit when dropping collection // divides two api calls: save dropped segments & mark channel deleted - updateIDs, err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segmentMap) + segments := make([]*datapb.SegmentInfo, 0) + for _, seg := range modSegments { + segments = append(segments, seg.SegmentInfo) + } + err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segments) if err != nil { return err } - if withFlag { - err = m.catalog.MarkChannelDeleted(m.ctx, channel) - if err != nil { - return err - } + + if err = m.catalog.MarkChannelDeleted(m.ctx, channel); err != nil { + return err } // update memory info - for _, id := range updateIDs { - m.segments.SetSegment(id, segments[id]) - delete(segments, id) + for id, segment := range modSegments { + m.segments.SetSegment(id, segment) } - metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Add(float64(len(updateIDs))) + metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Add(float64(len(segments))) return nil } diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index 7d3cf9a831..9bc6854bf3 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -58,7 +58,7 @@ type DataCoordCatalog interface { AlterSegments(ctx context.Context, segments []*datapb.SegmentInfo) error // AlterSegmentsAndAddNewSegment for transaction AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error - SaveDroppedSegmentsInBatch(ctx context.Context, modSegments map[int64]*datapb.SegmentInfo) ([]int64, error) + SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error MarkChannelDeleted(ctx context.Context, channel string) error IsChannelDropped(ctx context.Context, channel string) bool diff --git a/internal/metastore/kv/datacoord/constant.go b/internal/metastore/kv/datacoord/constant.go index a3228339af..9395398269 100644 --- a/internal/metastore/kv/datacoord/constant.go +++ b/internal/metastore/kv/datacoord/constant.go @@ -25,6 +25,7 @@ const ( ChannelRemovePrefix = MetaPrefix + "/channel-removal" RemoveFlagTomestone = "removed" - MaxOperationsPerTxn = 64 - MaxBytesPerTxn = 1024 * 1024 + + maxOperationsPerTxn = 64 + maxBytesPerTxn = 1024 * 1024 ) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index 3c76432ae8..16ad6ac27e 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -179,38 +179,54 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [ return nil } -func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, modSegments map[int64]*datapb.SegmentInfo) ([]int64, error) { +func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { kvs := make(map[string]string) - batchIDs := make([]int64, 0, MaxOperationsPerTxn) + batchIDs := make([]int64, 0, maxOperationsPerTxn) - size := 0 - for id, s := range modSegments { + multiSave := func() error { + if len(kvs) == 0 { + return nil + } + + if err := kc.Txn.MultiSave(kvs); err != nil { + log.Error("Failed to save segments in batch for DropChannel", + zap.Any("segmentIDs", batchIDs), + zap.Error(err)) + return err + } + return nil + } + + // the limitation of etcd operations number per transaction is 128, + // since segment number might be enormous, so we shall split all save operations into batches + splitCount := 0 + for _, s := range segments { key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID()) segBytes, err := proto.Marshal(s) if err != nil { - return nil, fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err) + return fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err) } + + kvSize := len(key) + len(segBytes) + splitCount += kvSize + if len(kvs) == maxOperationsPerTxn || (len(kvs) > 0 && splitCount >= maxBytesPerTxn) { + if err := multiSave(); err != nil { + return err + } + + kvs = make(map[string]string) + batchIDs = make([]int64, 0, maxOperationsPerTxn) + + if splitCount >= maxBytesPerTxn { + splitCount = kvSize + } + } + kvs[key] = string(segBytes) batchIDs = append(batchIDs, s.ID) - size += len(key) + len(segBytes) - // remove record from map `modSegments` - delete(modSegments, id) - // batch stops when one of conditions matched: - // 1. number of records is equals MaxOperationsPerTxn - // 2. left number of modSegments is equals 1 - // 3. bytes size is greater than MaxBytesPerTxn - if len(kvs) == MaxOperationsPerTxn || len(modSegments) == 1 || size >= MaxBytesPerTxn { - break - } } - err := kc.Txn.MultiSave(kvs) - if err != nil { - log.Error("Failed to save segments in batch for DropChannel", zap.Any("segmentIDs", batchIDs), zap.Error(err)) - return nil, err - } - - return batchIDs, nil + return multiSave() } func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error { diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index c43a56a6e1..224e77a0b2 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -3,6 +3,7 @@ package datacoord import ( "context" "errors" + "math/rand" "testing" "github.com/milvus-io/milvus/internal/kv" @@ -56,17 +57,97 @@ func Test_SaveDroppedSegmentsInBatch_SaveError(t *testing.T) { } catalog := &Catalog{txn} - segments := map[int64]*datapb.SegmentInfo{ - 1: { + segments := []*datapb.SegmentInfo{ + { ID: 1, CollectionID: 1000, }, } - ids, err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments) - assert.Nil(t, ids) + err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments) assert.Error(t, err) } +func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) { + txn := &MockedTxnKV{} + count := 0 + kvSize := 0 + txn.multiSave = func(kvs map[string]string) error { + count++ + kvSize += len(kvs) + return nil + } + + catalog := &Catalog{txn} + + // testing for no splitting + { + segments1 := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: 1000, + PartitionID: 100, + }, + } + + err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments1) + assert.Nil(t, err) + assert.Equal(t, 1, count) + assert.Equal(t, 1, kvSize) + } + + // testing for reaching max operation + { + segments2 := make([]*datapb.SegmentInfo, 65) + for i := 0; i < 65; i++ { + segments2[i] = &datapb.SegmentInfo{ + ID: int64(i), + CollectionID: 1000, + PartitionID: 100, + } + } + + count = 0 + kvSize = 0 + err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2) + assert.Nil(t, err) + assert.Equal(t, 2, count) + assert.Equal(t, 65, kvSize) + } + + // testing for reaching max bytes size + { + segments3 := []*datapb.SegmentInfo{ + { + ID: int64(1), + CollectionID: 1000, + PartitionID: 100, + InsertChannel: randomString(1024 * 1024 * 2), + }, + { + ID: int64(2), + CollectionID: 1000, + PartitionID: 100, + InsertChannel: randomString(1024), + }, + } + + count = 0 + kvSize = 0 + err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments3) + assert.Nil(t, err) + assert.Equal(t, 2, count) + assert.Equal(t, 2, kvSize) + } +} + +func randomString(len int) string { + bytes := make([]byte, len) + for i := 0; i < len; i++ { + bytes[i] = byte(65 + rand.Intn(25)) + } + return string(bytes) +} + func Test_MarkChannelDeleted_SaveError(t *testing.T) { txn := &MockedTxnKV{} txn.save = func(key, value string) error {