milvus/internal/datacoord/compaction_policy_clustering_test.go
wayblink 81773bfadf
enhance: add commit time in partitionStats proto (#35125)
fix: #35110

Signed-off-by: wayblink <anyang.wang@zilliz.com>
2024-08-02 16:16:14 +08:00

307 lines
9.4 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
func TestClusteringCompactionPolicySuite(t *testing.T) {
suite.Run(t, new(ClusteringCompactionPolicySuite))
}
type ClusteringCompactionPolicySuite struct {
suite.Suite
mockAlloc *NMockAllocator
mockTriggerManager *MockTriggerManager
testLabel *CompactionGroupLabel
handler *NMockHandler
mockPlanContext *MockCompactionPlanContext
catalog *mocks.DataCoordCatalog
clusteringCompactionPolicy *clusteringCompactionPolicy
}
func (s *ClusteringCompactionPolicySuite) SetupTest() {
s.testLabel = &CompactionGroupLabel{
CollectionID: 1,
PartitionID: 10,
Channel: "ch-1",
}
catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
s.catalog = catalog
segments := genSegmentsForMeta(s.testLabel)
meta := &meta{segments: NewSegmentsInfo()}
for id, segment := range segments {
meta.segments.SetSegment(id, segment)
}
mockAllocator := newMockAllocator()
mockHandler := NewNMockHandler(s.T())
s.handler = mockHandler
s.clusteringCompactionPolicy = newClusteringCompactionPolicy(meta, mockAllocator, mockHandler)
}
func (s *ClusteringCompactionPolicySuite) TestTrigger() {
events, err := s.clusteringCompactionPolicy.Trigger()
s.NoError(err)
gotViews, ok := events[TriggerTypeClustering]
s.True(ok)
s.NotNil(gotViews)
s.Equal(0, len(gotViews))
}
func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionAbnormal() {
// mock error in handler.GetCollection
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, errors.New("mock Error")).Once()
views, triggerID, err := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 1, false)
s.Error(err)
s.Nil(views)
s.Equal(int64(0), triggerID)
// mock "collection not exist" in handler.GetCollection
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(nil, nil).Once()
views2, triggerID2, err2 := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 1, false)
s.NoError(err2)
s.Nil(views2)
s.Equal(int64(0), triggerID2)
}
func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNoClusteringKeySchema() {
coll := &collectionInfo{
ID: 100,
Schema: newTestSchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
State: datapb.CompactionTaskState_executing,
})
views, triggerID, err := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 100, false)
s.NoError(err)
s.Nil(views)
s.Equal(int64(0), triggerID)
}
func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
coll := &collectionInfo{
ID: 100,
Schema: newTestScalarClusteringKeySchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
State: datapb.CompactionTaskState_executing,
})
views3, triggerID3, err3 := s.clusteringCompactionPolicy.triggerOneCollection(context.TODO(), 100, false)
s.NoError(err3)
s.Nil(views3)
s.Equal(int64(1), triggerID3)
}
func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting() {
s.Run("no collection is compacting", func() {
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compacting, triggerID := s.clusteringCompactionPolicy.collectionIsClusteringCompacting(collID)
s.False(compacting)
s.Equal(int64(0), triggerID)
})
s.Run("collection is compacting, different state", func() {
tests := []struct {
state datapb.CompactionTaskState
isCompacting bool
triggerID int64
}{
{datapb.CompactionTaskState_pipelining, true, 1},
{datapb.CompactionTaskState_executing, true, 1},
{datapb.CompactionTaskState_completed, false, 1},
{datapb.CompactionTaskState_failed, false, 1},
{datapb.CompactionTaskState_timeout, false, 1},
{datapb.CompactionTaskState_analyzing, true, 1},
{datapb.CompactionTaskState_indexing, true, 1},
{datapb.CompactionTaskState_cleaned, false, 1},
{datapb.CompactionTaskState_meta_saved, true, 1},
}
for _, test := range tests {
s.Run(test.state.String(), func() {
collID := int64(19530)
compactionTaskMeta := newTestCompactionTaskMeta(s.T())
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: collID,
State: test.state,
})
compacting, triggerID := s.clusteringCompactionPolicy.collectionIsClusteringCompacting(collID)
s.Equal(test.isCompacting, compacting)
s.Equal(test.triggerID, triggerID)
})
}
})
}
func (s *ClusteringCompactionPolicySuite) TestGetExpectedSegmentSize() {
}
func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() {
ctx := context.TODO()
collectionID := int64(100)
partitionID := int64(101)
channel := "ch1"
tests := []struct {
description string
partitionStats []*datapb.PartitionStatsInfo
currentVersion int64
segments []*SegmentInfo
succeed bool
}{
{"no partition stats and not enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{}, false},
{"no partition stats and enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{
{
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
},
}, true},
{"very recent partition stats and enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Now().Unix(),
Version: 100,
},
},
100,
[]*SegmentInfo{
{
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
},
}, false},
{"very old partition stats and not enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Unix(1704038400, 0).Unix(),
Version: 100,
},
},
100,
[]*SegmentInfo{
{
size: *atomic.NewInt64(1024),
},
}, true},
{"partition stats and enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Now().Add(-3 * time.Hour).Unix(),
SegmentIDs: []int64{100000},
Version: 100,
},
},
100,
[]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{ID: 9999},
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
},
}, true},
{"partition stats and not enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Now().Add(-3 * time.Hour).Unix(),
SegmentIDs: []int64{100000},
Version: 100,
},
},
100,
[]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{ID: 9999},
size: *atomic.NewInt64(1024),
},
}, false},
}
for _, test := range tests {
s.Run(test.description, func() {
partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog)
s.NoError(err)
for _, partitionStats := range test.partitionStats {
partitionStatsMeta.SavePartitionStatsInfo(partitionStats)
}
if test.currentVersion != 0 {
partitionStatsMeta.partitionStatsInfos[channel][partitionID].currentVersion = test.currentVersion
}
meta := &meta{
partitionStatsMeta: partitionStatsMeta,
}
succeed, err := triggerClusteringCompactionPolicy(ctx, meta, collectionID, partitionID, channel, test.segments)
s.NoError(err)
s.Equal(test.succeed, succeed)
})
}
}