From 6387403639d5301bbc035fef44aff36df3bfa92a Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 1 Mar 2024 13:59:02 +0800 Subject: [PATCH] fix: Prevent clone when selecting segments from meta (#30928) See also #30538 Previously the `SelectSegments` changed to clone all return value preventing possible update to returned info. Since meta is implemented following COW rules, this shall not happen and any update on segment shall have copy before it. This PR: - Remove clone for read-only Get segment info - Add Segment Operator abstraction for changing segment - Implemnt COW for updating MaxRowNum --------- Signed-off-by: Congqi Xia --- internal/datacoord/compaction_trigger.go | 41 ++++++--- internal/datacoord/compaction_trigger_test.go | 15 +++- internal/datacoord/meta.go | 43 ++++++++- internal/datacoord/meta_test.go | 88 +++++++++++++++++++ internal/datacoord/segment_operator.go | 30 +++++++ internal/datacoord/segment_operator_test.go | 49 +++++++++++ internal/datacoord/services.go | 2 + 7 files changed, 251 insertions(+), 17 deletions(-) create mode 100644 internal/datacoord/segment_operator.go create mode 100644 internal/datacoord/segment_operator_test.go diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 378e5fa76d..d7aa23ae7f 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -34,6 +35,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -335,6 +337,25 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, return false }) + updateSegments := func(segments []*SegmentInfo, newMaxRows int64, isDiskAnn bool) error { + for idx, segmentInfo := range segments { + if segmentInfo.GetMaxRowNum() != newMaxRows { + log.Info("segment max row recalculated", + zap.Int64("segmentID", segmentInfo.GetID()), + zap.Int64("old max rows", segmentInfo.GetMaxRowNum()), + zap.Int64("new max rows", newMaxRows), + zap.Bool("isDiskANN", isDiskAnn), + ) + err := t.meta.UpdateSegment(segmentInfo.GetID(), SetMaxRowCount(newMaxRows)) + if err != nil && !errors.Is(err, merr.ErrSegmentNotFound) { + return err + } + segments[idx] = t.meta.GetSegment(segmentInfo.GetID()) + } + } + return nil + } + allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) if allDiskIndex { // Only if all vector fields index type are DiskANN, recalc segment max size here. @@ -342,13 +363,9 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, if err != nil { return false, err } - if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { - log.Info("segment max rows recalculated for DiskANN collection", - zap.Int64("old max rows", segments[0].GetMaxRowNum()), - zap.Int64("new max rows", int64(newMaxRows))) - for _, segment := range segments { - segment.MaxRowNum = int64(newMaxRows) - } + err = updateSegments(segments, int64(newMaxRows), true) + if err != nil { + return false, err } } // If some vector fields index type are not DiskANN, recalc segment max size using default policy. @@ -357,13 +374,9 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, if err != nil { return allDiskIndex, err } - if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { - log.Info("segment max rows recalculated for non-DiskANN collection", - zap.Int64("old max rows", segments[0].GetMaxRowNum()), - zap.Int64("new max rows", int64(newMaxRows))) - for _, segment := range segments { - segment.MaxRowNum = int64(newMaxRows) - } + err = updateSegments(segments, int64(newMaxRows), true) + if err != nil { + return false, err } } return allDiskIndex, nil diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 6faef07a0f..dc946a903e 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" @@ -96,6 +97,10 @@ func Test_compactionTrigger_force(t *testing.T) { globalTrigger *time.Ticker } + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().AlterSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe() + vecFieldID := int64(201) indexID := int64(1001) tests := []struct { @@ -109,6 +114,7 @@ func Test_compactionTrigger_force(t *testing.T) { "test force compaction", fields{ &meta{ + catalog: catalog, segments: &SegmentsInfo{ map[int64]*SegmentInfo{ 1: { @@ -500,7 +506,7 @@ func Test_compactionTrigger_force(t *testing.T) { _, err := tr.forceTriggerCompaction(tt.collectionID) assert.Equal(t, tt.wantErr, err != nil) // expect max row num = 2048*1024*1024/(128*4) = 4194304 - assert.EqualValues(t, 300, tt.fields.meta.segments.GetSegments()[0].MaxRowNum) + assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) <-spy.spyChan }) @@ -2509,6 +2515,10 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { }, } + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().AlterSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe() + tests := []struct { name string fields fields @@ -2519,6 +2529,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { "all mem index", fields{ &meta{ + catalog: catalog, segments: segmentsInfo, collections: map[int64]*collectionInfo{ collectionID: info, @@ -2579,6 +2590,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { "all disk index", fields{ &meta{ + catalog: catalog, segments: segmentsInfo, collections: map[int64]*collectionInfo{ collectionID: info, @@ -2639,6 +2651,7 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { "some mme index", fields{ &meta{ + catalog: catalog, segments: segmentsInfo, collections: map[int64]*collectionInfo{ collectionID: info, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 3d88ada594..b67d08b2ff 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -344,7 +344,7 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo { defer m.RUnlock() segment := m.segments.GetSegment(segID) if segment != nil && isSegmentHealthy(segment) { - return segment.Clone() + return segment } return nil } @@ -409,6 +409,45 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e return nil } +func (m *meta) UpdateSegment(segmentID int64, operators ...SegmentOperator) error { + m.Lock() + defer m.Unlock() + info := m.segments.GetSegment(segmentID) + if info == nil { + log.Warn("meta update: UpdateSegment - segment not found", + zap.Int64("segmentID", segmentID)) + + return merr.WrapErrSegmentNotFound(segmentID) + } + // Persist segment updates first. + cloned := info.Clone() + + var updated bool + for _, operator := range operators { + updated = updated || operator(cloned) + } + + if !updated { + log.Warn("meta update:UpdateSegmnt skipped, no update", + zap.Int64("segmentID", segmentID), + ) + return nil + } + + if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo}); err != nil { + log.Warn("meta update: update segment - failed to alter segments", + zap.Int64("segmentID", segmentID), + zap.Error(err)) + return err + } + // Update in-memory meta. + m.segments.SetSegment(segmentID, cloned) + + log.Info("meta update: update segment - complete", + zap.Int64("segmentID", segmentID)) + return nil +} + // UnsetIsImporting removes the `isImporting` flag of a segment. func (m *meta) UnsetIsImporting(segmentID UniqueID) error { log.Debug("meta update: unsetting isImport state of segment", @@ -909,7 +948,7 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { segments := m.segments.GetSegments() for _, info := range segments { if selector(info) { - ret = append(ret, info.Clone()) + ret = append(ret, info) } } return ret diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 7a9bbd621c..a079bbf256 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/testutils" ) @@ -321,6 +322,93 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { suite.EqualValues(2, mutation.rowCountAccChange) } +func (suite *MetaBasicSuite) TestSetSegment() { + meta := suite.meta + catalog := mocks.NewDataCoordCatalog(suite.T()) + meta.catalog = catalog + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + suite.Run("normal", func() { + segmentID := int64(1000) + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: segmentID, + MaxRowNum: 30000, + CollectionID: suite.collID, + InsertChannel: suite.channelName, + State: commonpb.SegmentState_Flushed, + }) + err := meta.AddSegment(ctx, segment) + suite.Require().NoError(err) + + noOp := func(segment *SegmentInfo) bool { + return true + } + + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Once() + + err = meta.UpdateSegment(segmentID, noOp) + suite.NoError(err) + }) + + suite.Run("not_updated", func() { + segmentID := int64(1001) + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: segmentID, + MaxRowNum: 30000, + CollectionID: suite.collID, + InsertChannel: suite.channelName, + State: commonpb.SegmentState_Flushed, + }) + err := meta.AddSegment(ctx, segment) + suite.Require().NoError(err) + + noOp := func(segment *SegmentInfo) bool { + return false + } + + err = meta.UpdateSegment(segmentID, noOp) + suite.NoError(err) + }) + + suite.Run("catalog_error", func() { + segmentID := int64(1002) + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil).Once() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: segmentID, + MaxRowNum: 30000, + CollectionID: suite.collID, + InsertChannel: suite.channelName, + State: commonpb.SegmentState_Flushed, + }) + err := meta.AddSegment(ctx, segment) + suite.Require().NoError(err) + + noOp := func(segment *SegmentInfo) bool { + return true + } + + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(errors.New("mocked")).Once() + + err = meta.UpdateSegment(segmentID, noOp) + suite.Error(err) + }) + + suite.Run("segment_not_found", func() { + segmentID := int64(1003) + + noOp := func(segment *SegmentInfo) bool { + return true + } + + err := meta.UpdateSegment(segmentID, noOp) + suite.Error(err) + suite.ErrorIs(err, merr.ErrSegmentNotFound) + }) +} + func TestMeta(t *testing.T) { suite.Run(t, new(MetaBasicSuite)) suite.Run(t, new(MetaReloadSuite)) diff --git a/internal/datacoord/segment_operator.go b/internal/datacoord/segment_operator.go new file mode 100644 index 0000000000..1e2c1fe4e7 --- /dev/null +++ b/internal/datacoord/segment_operator.go @@ -0,0 +1,30 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +// SegmentOperator is function type to update segment info. +type SegmentOperator func(segment *SegmentInfo) bool + +func SetMaxRowCount(maxRow int64) SegmentOperator { + return func(segment *SegmentInfo) bool { + if segment.MaxRowNum == maxRow { + return false + } + segment.MaxRowNum = maxRow + return true + } +} diff --git a/internal/datacoord/segment_operator_test.go b/internal/datacoord/segment_operator_test.go new file mode 100644 index 0000000000..7b837f45a2 --- /dev/null +++ b/internal/datacoord/segment_operator_test.go @@ -0,0 +1,49 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/proto/datapb" +) + +type TestSegmentOperatorSuite struct { + suite.Suite +} + +func (s *TestSegmentOperatorSuite) TestSetMaxRowCount() { + segment := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + MaxRowNum: 300, + }, + } + + ops := SetMaxRowCount(20000) + updated := ops(segment) + s.Require().True(updated) + s.EqualValues(20000, segment.GetMaxRowNum()) + + updated = ops(segment) + s.False(updated) +} + +func TestSegmentOperators(t *testing.T) { + suite.Run(t, new(TestSegmentOperatorSuite)) +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 0921d767d5..1a9a5c578e 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -304,6 +304,8 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert }, nil } + segment = segment.Clone() + err := binlog.DecompressBinLog(storage.InsertBinlog, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), segment.GetBinlogs()) if err != nil { return &datapb.GetInsertBinlogPathsResponse{