diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go new file mode 100644 index 0000000000..9522669a35 --- /dev/null +++ b/internal/datanode/metacache/actions.go @@ -0,0 +1,83 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metacache + +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" +) + +type SegmentFilter func(info *SegmentInfo) bool + +func WithPartitionID(partitionID int64) SegmentFilter { + return func(info *SegmentInfo) bool { + return info.partitionID == partitionID + } +} + +func WithSegmentID(segmentID int64) SegmentFilter { + return func(info *SegmentInfo) bool { + return info.segmentID == segmentID + } +} + +func WithSegmentState(state commonpb.SegmentState) SegmentFilter { + return func(info *SegmentInfo) bool { + return info.state == state + } +} + +func WithStartPosNotRecorded() SegmentFilter { + return func(info *SegmentInfo) bool { + return !info.startPosRecorded + } +} + +type SegmentAction func(info *SegmentInfo) + +func UpdateState(state commonpb.SegmentState) SegmentAction { + return func(info *SegmentInfo) { + info.state = state + } +} + +func UpdateCheckpoint(checkpoint *msgpb.MsgPosition) SegmentAction { + return func(info *SegmentInfo) { + info.checkpoint = checkpoint + } +} + +func UpdateNumOfRows(numOfRows int64) SegmentAction { + return func(info *SegmentInfo) { + info.numOfRows = numOfRows + } +} + +func RollStats() SegmentAction { + return func(info *SegmentInfo) { + info.bfs.Roll() + } +} + +// MergeSegmentAction is the util function to merge multiple SegmentActions into one. +func MergeSegmentAction(actions ...SegmentAction) SegmentAction { + return func(info *SegmentInfo) { + for _, action := range actions { + action(info) + } + } +} diff --git a/internal/datanode/metacache/actions_test.go b/internal/datanode/metacache/actions_test.go new file mode 100644 index 0000000000..bca462fdf0 --- /dev/null +++ b/internal/datanode/metacache/actions_test.go @@ -0,0 +1,121 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metacache + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" +) + +type SegmentFilterSuite struct { + suite.Suite +} + +func (s *SegmentFilterSuite) TestFilters() { + info := &SegmentInfo{} + + partitionID := int64(1001) + filter := WithPartitionID(partitionID) + info.partitionID = partitionID + 1 + s.False(filter(info)) + info.partitionID = partitionID + s.True(filter(info)) + + segmentID := int64(10001) + filter = WithSegmentID(segmentID) + info.segmentID = segmentID + 1 + s.False(filter(info)) + info.segmentID = segmentID + s.True(filter(info)) + + state := commonpb.SegmentState_Growing + filter = WithSegmentState(state) + info.state = commonpb.SegmentState_Flushed + s.False(filter(info)) + info.state = state + s.True(filter(info)) + + filter = WithStartPosNotRecorded() + info.startPosRecorded = true + s.False(filter(info)) + info.startPosRecorded = false + s.True(filter(info)) +} + +func TestFilters(t *testing.T) { + suite.Run(t, new(SegmentFilterSuite)) +} + +type SegmentActionSuite struct { + suite.Suite +} + +func (s *SegmentActionSuite) TestActions() { + info := &SegmentInfo{} + + state := commonpb.SegmentState_Flushed + action := UpdateState(state) + action(info) + s.Equal(state, info.State()) + + cp := &msgpb.MsgPosition{ + MsgID: []byte{1, 2, 3, 4}, + ChannelName: "channel_1", + Timestamp: 20000, + } + action = UpdateCheckpoint(cp) + action(info) + s.Equal(cp, info.Checkpoint()) + + numOfRows := int64(2048) + action = UpdateNumOfRows(numOfRows) + action(info) + s.Equal(numOfRows, info.NumOfRows()) +} + +func (s *SegmentActionSuite) TestMergeActions() { + info := &SegmentInfo{} + + var actions []SegmentAction + state := commonpb.SegmentState_Flushed + actions = append(actions, UpdateState(state)) + + cp := &msgpb.MsgPosition{ + MsgID: []byte{1, 2, 3, 4}, + ChannelName: "channel_1", + Timestamp: 20000, + } + actions = append(actions, UpdateCheckpoint(cp)) + + numOfRows := int64(2048) + actions = append(actions, UpdateNumOfRows(numOfRows)) + + action := MergeSegmentAction(actions...) + action(info) + + s.Equal(state, info.State()) + s.Equal(numOfRows, info.NumOfRows()) + s.Equal(cp, info.Checkpoint()) +} + +func TestActions(t *testing.T) { + suite.Run(t, new(SegmentActionSuite)) +} diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go new file mode 100644 index 0000000000..232d02283c --- /dev/null +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -0,0 +1,80 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metacache + +import ( + "sync" + + "github.com/bits-and-blooms/bloom/v3" + + "github.com/milvus-io/milvus/internal/storage" +) + +type BloomFilterSet struct { + mut sync.Mutex + current *storage.PkStatistics + history []*storage.PkStatistics +} + +func newBloomFilterSet() *BloomFilterSet { + return &BloomFilterSet{} +} + +func (bfs *BloomFilterSet) PkExists(pk storage.PrimaryKey) bool { + bfs.mut.Lock() + defer bfs.mut.Unlock() + if bfs.current != nil && bfs.current.PkExist(pk) { + return true + } + + for _, bf := range bfs.history { + if bf.PkExist(pk) { + return true + } + } + return false +} + +func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { + bfs.mut.Lock() + defer bfs.mut.Unlock() + + if bfs.current == nil { + bfs.current = &storage.PkStatistics{ + PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive), + } + } + + return bfs.current.UpdatePKRange(ids) +} + +func (bfs *BloomFilterSet) Roll() { + bfs.mut.Lock() + defer bfs.mut.Unlock() + + if bfs.current != nil { + bfs.history = append(bfs.history, bfs.current) + bfs.current = nil + } +} + +func (bfs *BloomFilterSet) GetHistory() []*storage.PkStatistics { + bfs.mut.Lock() + defer bfs.mut.Unlock() + + return bfs.history +} diff --git a/internal/datanode/metacache/bloom_filter_set_test.go b/internal/datanode/metacache/bloom_filter_set_test.go new file mode 100644 index 0000000000..3e0f911d16 --- /dev/null +++ b/internal/datanode/metacache/bloom_filter_set_test.go @@ -0,0 +1,93 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metacache + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" +) + +type BloomFilterSetSuite struct { + suite.Suite + bfs *BloomFilterSet +} + +func (s *BloomFilterSetSuite) SetupTest() { + s.bfs = newBloomFilterSet() +} + +func (s *BloomFilterSetSuite) TearDownSuite() { + s.bfs = nil +} + +func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData { + fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{ + FieldID: 101, + Name: "ID", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }) + s.Require().NoError(err) + + for _, id := range ids { + err = fd.AppendRow(id) + s.Require().NoError(err) + } + return fd +} + +func (s *BloomFilterSetSuite) TestWriteRead() { + ids := []int64{1, 2, 3, 4, 5} + + for _, id := range ids { + s.False(s.bfs.PkExists(storage.NewInt64PrimaryKey(id)), "pk shall not exist before update") + } + + err := s.bfs.UpdatePKRange(s.GetFieldData(ids)) + s.NoError(err) + + for _, id := range ids { + s.True(s.bfs.PkExists(storage.NewInt64PrimaryKey(id)), "pk shall return exist after update") + } +} + +func (s *BloomFilterSetSuite) TestRoll() { + history := s.bfs.GetHistory() + + s.Equal(0, len(history), "history empty for new bfs") + + ids := []int64{1, 2, 3, 4, 5} + err := s.bfs.UpdatePKRange(s.GetFieldData(ids)) + s.NoError(err) + + s.bfs.Roll() + + history = s.bfs.GetHistory() + s.Equal(1, len(history), "history shall have one entry after roll with current data") + + s.bfs.Roll() + history = s.bfs.GetHistory() + s.Equal(1, len(history), "history shall have one entry after empty roll") +} + +func TestBloomFilterSet(t *testing.T) { + suite.Run(t, new(BloomFilterSetSuite)) +} diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index 792b6f1845..04ea5fff73 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -19,78 +19,69 @@ package metacache import ( "sync" - "github.com/pingcap/log" + "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/log" ) type MetaCache interface { NewSegment(segmentID, partitionID int64) - UpdateSegment(newSegmentID, partitionID int64, dropSegmentIDs ...int64) + UpdateSegments(action SegmentAction, filters ...SegmentFilter) + CompactSegments(newSegmentID, partitionID int64, oldSegmentIDs ...int64) + GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo GetSegmentIDsBy(filters ...SegmentFilter) []int64 } -type SegmentFilter func(info *SegmentInfo) bool +var _ MetaCache = (*metaCacheImpl)(nil) -type SegmentInfo struct { - segmentID int64 - partitionID int64 -} +type PkStatsFactory func(vchannel *datapb.SegmentInfo) *BloomFilterSet -func newSegmentInfo(segmentID, partitionID int64) *SegmentInfo { - return &SegmentInfo{ - segmentID: segmentID, - partitionID: partitionID, - } -} - -func WithPartitionID(partitionID int64) func(info *SegmentInfo) bool { - return func(info *SegmentInfo) bool { - return info.partitionID == partitionID - } -} - -var _ MetaCache = (*MetaCacheImpl)(nil) - -type MetaCacheImpl struct { +type metaCacheImpl struct { collectionID int64 vChannelName string segmentInfos map[int64]*SegmentInfo - mu sync.Mutex + mu sync.RWMutex } -func NewMetaCache(vchannel *datapb.VchannelInfo) MetaCache { - cache := &MetaCacheImpl{ +func NewMetaCache(vchannel *datapb.VchannelInfo, factory PkStatsFactory) MetaCache { + cache := &metaCacheImpl{ collectionID: vchannel.GetCollectionID(), vChannelName: vchannel.GetChannelName(), segmentInfos: make(map[int64]*SegmentInfo), } - cache.init(vchannel) + cache.init(vchannel, factory) return cache } -func (c *MetaCacheImpl) init(vchannel *datapb.VchannelInfo) { +func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFactory) { for _, seg := range vchannel.FlushedSegments { - c.segmentInfos[seg.GetID()] = newSegmentInfo(seg.GetID(), seg.GetPartitionID()) + c.segmentInfos[seg.GetID()] = newSegmentInfo(seg, factory(seg)) } for _, seg := range vchannel.UnflushedSegments { - c.segmentInfos[seg.GetID()] = newSegmentInfo(seg.GetID(), seg.GetPartitionID()) + c.segmentInfos[seg.GetID()] = newSegmentInfo(seg, factory(seg)) } } -func (c *MetaCacheImpl) NewSegment(segmentID, partitionID int64) { +func (c *metaCacheImpl) NewSegment(segmentID, partitionID int64) { c.mu.Lock() defer c.mu.Unlock() if _, ok := c.segmentInfos[segmentID]; !ok { - c.segmentInfos[segmentID] = newSegmentInfo(segmentID, partitionID) + c.segmentInfos[segmentID] = &SegmentInfo{ + segmentID: segmentID, + partitionID: partitionID, + state: commonpb.SegmentState_Growing, + startPosRecorded: false, + } } } -func (c *MetaCacheImpl) UpdateSegment(newSegmentID, partitionID int64, dropSegmentIDs ...int64) { +func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, dropSegmentIDs ...int64) { c.mu.Lock() defer c.mu.Unlock() @@ -106,15 +97,53 @@ func (c *MetaCacheImpl) UpdateSegment(newSegmentID, partitionID int64, dropSegme } if _, ok := c.segmentInfos[newSegmentID]; !ok { - c.segmentInfos[newSegmentID] = newSegmentInfo(newSegmentID, partitionID) + c.segmentInfos[newSegmentID] = &SegmentInfo{ + segmentID: newSegmentID, + partitionID: partitionID, + state: commonpb.SegmentState_Flushed, + startPosRecorded: true, + } } } -func (c *MetaCacheImpl) GetSegmentIDsBy(filters ...SegmentFilter) []int64 { +func (c *metaCacheImpl) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo { + c.mu.RLock() + defer c.mu.RUnlock() + + filter := c.mergeFilters(filters...) + + var segments []*SegmentInfo + for _, info := range c.segmentInfos { + if filter(info) { + segments = append(segments, info) + } + } + return segments +} + +func (c *metaCacheImpl) GetSegmentIDsBy(filters ...SegmentFilter) []int64 { + segments := c.GetSegmentsBy(filters...) + return lo.Map(segments, func(info *SegmentInfo, _ int) int64 { return info.SegmentID() }) +} + +func (c *metaCacheImpl) UpdateSegments(action SegmentAction, filters ...SegmentFilter) { c.mu.Lock() defer c.mu.Unlock() - filter := func(info *SegmentInfo) bool { + filter := c.mergeFilters(filters...) + + for id, info := range c.segmentInfos { + if !filter(info) { + continue + } + nInfo := info.Clone() + action(nInfo) + c.segmentInfos[id] = nInfo + } +} + +func (c *metaCacheImpl) mergeFilters(filters ...SegmentFilter) SegmentFilter { + return func(info *SegmentInfo) bool { for _, filter := range filters { if !filter(info) { return false @@ -122,12 +151,4 @@ func (c *MetaCacheImpl) GetSegmentIDsBy(filters ...SegmentFilter) []int64 { } return true } - - segments := []int64{} - for _, info := range c.segmentInfos { - if filter(info) { - segments = append(segments, info.segmentID) - } - } - return segments } diff --git a/internal/datanode/metacache/meta_cache_test.go b/internal/datanode/metacache/meta_cache_test.go index 90e36dc965..987f07d4ed 100644 --- a/internal/datanode/metacache/meta_cache_test.go +++ b/internal/datanode/metacache/meta_cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" ) @@ -36,6 +37,8 @@ type MetaCacheSuite struct { growingSegments []int64 newSegments []int64 cache MetaCache + + bfsFactory PkStatsFactory } func (s *MetaCacheSuite) SetupSuite() { @@ -46,6 +49,9 @@ func (s *MetaCacheSuite) SetupSuite() { s.growingSegments = []int64{5, 6, 7, 8} s.newSegments = []int64{9, 10, 11, 12} s.invaliedSeg = 111 + s.bfsFactory = func(*datapb.SegmentInfo) *BloomFilterSet { + return newBloomFilterSet() + } } func (s *MetaCacheSuite) SetupTest() { @@ -53,6 +59,7 @@ func (s *MetaCacheSuite) SetupTest() { return &datapb.SegmentInfo{ ID: s.flushedSegments[i], PartitionID: s.partitionIDs[i], + State: commonpb.SegmentState_Flushed, } }) @@ -60,6 +67,7 @@ func (s *MetaCacheSuite) SetupTest() { return &datapb.SegmentInfo{ ID: s.growingSegments[i], PartitionID: s.partitionIDs[i], + State: commonpb.SegmentState_Growing, } }) @@ -68,7 +76,7 @@ func (s *MetaCacheSuite) SetupTest() { ChannelName: s.vchannel, FlushedSegments: flushSegmentInfos, UnflushedSegments: growingSegmentInfos, - }) + }, s.bfsFactory) } func (s *MetaCacheSuite) TestNewSegment() { @@ -86,10 +94,10 @@ func (s *MetaCacheSuite) TestNewSegment() { } } -func (s *MetaCacheSuite) TestUpdateSegment() { +func (s *MetaCacheSuite) TestCompactSegments() { for i, seg := range s.newSegments { // compaction from flushed[i], unflushed[i] and invalidSeg to new[i] - s.cache.UpdateSegment(seg, s.partitionIDs[i], s.flushedSegments[i], s.growingSegments[i], s.invaliedSeg) + s.cache.CompactSegments(seg, s.partitionIDs[i], s.flushedSegments[i], s.growingSegments[i], s.invaliedSeg) } for i, partitionID := range s.partitionIDs { @@ -101,6 +109,14 @@ func (s *MetaCacheSuite) TestUpdateSegment() { } } +func (s *MetaCacheSuite) TestUpdateSegments() { + s.cache.UpdateSegments(UpdateState(commonpb.SegmentState_Flushed), WithSegmentID(5)) + segments := s.cache.GetSegmentsBy(WithSegmentID(5)) + s.Require().Equal(1, len(segments)) + segment := segments[0] + s.Equal(commonpb.SegmentState_Flushed, segment.State()) +} + func TestMetaCacheSuite(t *testing.T) { suite.Run(t, new(MetaCacheSuite)) } diff --git a/internal/datanode/metacache/segment.go b/internal/datanode/metacache/segment.go new file mode 100644 index 0000000000..aad3e06e7c --- /dev/null +++ b/internal/datanode/metacache/segment.go @@ -0,0 +1,89 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metacache + +import ( + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" +) + +type SegmentInfo struct { + segmentID int64 + partitionID int64 + state commonpb.SegmentState + startPosition *msgpb.MsgPosition + checkpoint *msgpb.MsgPosition + startPosRecorded bool + numOfRows int64 + bfs *BloomFilterSet +} + +func (s *SegmentInfo) SegmentID() int64 { + return s.segmentID +} + +func (s *SegmentInfo) PartitionID() int64 { + return s.partitionID +} + +func (s *SegmentInfo) State() commonpb.SegmentState { + return s.state +} + +func (s *SegmentInfo) NumOfRows() int64 { + return s.numOfRows +} + +func (s *SegmentInfo) StartPosition() *msgpb.MsgPosition { + return s.startPosition +} + +func (s *SegmentInfo) Checkpoint() *msgpb.MsgPosition { + return s.checkpoint +} + +func (s *SegmentInfo) GetHistory() []*storage.PkStatistics { + return s.bfs.GetHistory() +} + +func (s *SegmentInfo) Clone() *SegmentInfo { + return &SegmentInfo{ + segmentID: s.segmentID, + partitionID: s.partitionID, + state: s.state, + startPosition: s.startPosition, + checkpoint: s.checkpoint, + startPosRecorded: s.startPosRecorded, + numOfRows: s.numOfRows, + bfs: s.bfs, + } +} + +func newSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo { + return &SegmentInfo{ + segmentID: info.GetID(), + partitionID: info.GetPartitionID(), + state: info.GetState(), + numOfRows: info.GetNumOfRows(), + startPosition: info.GetStartPosition(), + checkpoint: info.GetDmlPosition(), + startPosRecorded: true, + bfs: bfs, + } +} diff --git a/internal/datanode/metacache/segment_test.go b/internal/datanode/metacache/segment_test.go new file mode 100644 index 0000000000..72d775080e --- /dev/null +++ b/internal/datanode/metacache/segment_test.go @@ -0,0 +1,60 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metacache + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/proto/datapb" +) + +type SegmentSuite struct { + suite.Suite + + info *datapb.SegmentInfo +} + +func (s *SegmentSuite) TestBasic() { + bfs := newBloomFilterSet() + segment := newSegmentInfo(s.info, bfs) + s.Equal(s.info.GetID(), segment.SegmentID()) + s.Equal(s.info.GetPartitionID(), segment.PartitionID()) + s.Equal(s.info.GetNumOfRows(), segment.NumOfRows()) + s.Equal(s.info.GetStartPosition(), segment.StartPosition()) + s.Equal(s.info.GetDmlPosition(), segment.Checkpoint()) + s.Equal(bfs.GetHistory(), segment.GetHistory()) + s.True(segment.startPosRecorded) +} + +func (s *SegmentSuite) TestClone() { + bfs := newBloomFilterSet() + segment := newSegmentInfo(s.info, bfs) + cloned := segment.Clone() + s.Equal(segment.SegmentID(), cloned.SegmentID()) + s.Equal(segment.PartitionID(), cloned.PartitionID()) + s.Equal(segment.NumOfRows(), cloned.NumOfRows()) + s.Equal(segment.StartPosition(), cloned.StartPosition()) + s.Equal(segment.Checkpoint(), cloned.Checkpoint()) + s.Equal(segment.GetHistory(), cloned.GetHistory()) + s.Equal(segment.startPosRecorded, cloned.startPosRecorded) +} + +func TestSegment(t *testing.T) { + suite.Run(t, new(SegmentSuite)) +}