From e7a53da0252a3b2eb51d77249fa1ddae151d8299 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 11 Apr 2025 10:34:29 +0800 Subject: [PATCH] enhance: remove not inused util/* in datanode (#41177) See also: #41229 --------- Signed-off-by: yangxuan --- internal/datanode/data_node.go | 6 --- internal/datanode/util/cache.go | 60 ---------------------- internal/datanode/util/cache_test.go | 49 ------------------ internal/datanode/util/meta_util.go | 76 ---------------------------- 4 files changed, 191 deletions(-) delete mode 100644 internal/datanode/util/cache.go delete mode 100644 internal/datanode/util/cache_test.go delete mode 100644 internal/datanode/util/meta_util.go diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 53c25c9405..f171586d39 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -41,7 +41,6 @@ import ( "github.com/milvus-io/milvus/internal/datanode/importv2" "github.com/milvus-io/milvus/internal/datanode/index" "github.com/milvus-io/milvus/internal/datanode/msghandlerimpl" - "github.com/milvus-io/milvus/internal/datanode/util" "github.com/milvus-io/milvus/internal/flushcommon/broker" "github.com/milvus-io/milvus/internal/flushcommon/pipeline" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" @@ -87,9 +86,6 @@ var Params *paramtable.ComponentParam = paramtable.Get() // `rootCoord` is a grpc client of root coordinator. // `dataCoord` is a grpc client of data service. // `stateCode` is current statement of this data node, indicating whether it's healthy. -// -// `clearSignal` is a signal channel for releasing the flowgraph resources. -// `segmentCache` stores all flushing and flushed segments. type DataNode struct { ctx context.Context cancel context.CancelFunc @@ -109,7 +105,6 @@ type DataNode struct { taskScheduler *index.TaskScheduler taskManager *index.TaskManager - segmentCache *util.Cache compactionExecutor compactor.Executor timeTickSender *util2.TimeTickSender channelCheckpointUpdater *util2.ChannelCheckpointUpdater @@ -156,7 +151,6 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode { rootCoord: nil, dataCoord: nil, factory: factory, - segmentCache: util.NewCache(), compactionExecutor: compactor.NewExecutor(), reportImportRetryTimes: 10, metricsRequest: metricsinfo.NewMetricsRequest(), diff --git a/internal/datanode/util/cache.go b/internal/datanode/util/cache.go deleted file mode 100644 index 4b40b41b6a..0000000000 --- a/internal/datanode/util/cache.go +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" -) - -// Cache stores flushing segments' ids to prevent flushing the same segment again and again. -// -// Once a segment is flushed, its id will be removed from the cache. -// -// A segment not in cache will be added into the cache when `FlushSegments` is called. -// After the flush procedure, whether the segment successfully flushed or not, -// it'll be removed from the cache. So if flush failed, the secondary flush can be triggered. -type Cache struct { - *typeutil.ConcurrentSet[typeutil.UniqueID] -} - -// NewCache returns a new Cache -func NewCache() *Cache { - return &Cache{ - ConcurrentSet: typeutil.NewConcurrentSet[typeutil.UniqueID](), - } -} - -// checkIfCached returns whether unique id is in cache -func (c *Cache) checkIfCached(key typeutil.UniqueID) bool { - return c.Contain(key) -} - -// Cache caches a specific ID into the cache -func (c *Cache) Cache(ID typeutil.UniqueID) { - c.Insert(ID) -} - -// checkOrCache returns true if `key` is present. -// Otherwise, it returns false and stores `key` into cache. -func (c *Cache) checkOrCache(key typeutil.UniqueID) bool { - return !c.Insert(key) -} - -// Remove removes a set of IDs from the cache -func (c *Cache) Remove(IDs ...typeutil.UniqueID) { - c.ConcurrentSet.Remove(IDs...) -} diff --git a/internal/datanode/util/cache_test.go b/internal/datanode/util/cache_test.go deleted file mode 100644 index 9073bb49a9..0000000000 --- a/internal/datanode/util/cache_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "os" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" -) - -func TestMain(t *testing.M) { - paramtable.Init() - code := t.Run() - os.Exit(code) -} - -func TestSegmentCache(t *testing.T) { - segCache := NewCache() - - assert.False(t, segCache.checkIfCached(0)) - - segCache.Cache(typeutil.UniqueID(0)) - assert.True(t, segCache.checkIfCached(0)) - - assert.False(t, segCache.checkOrCache(typeutil.UniqueID(1))) - assert.True(t, segCache.checkIfCached(1)) - assert.True(t, segCache.checkOrCache(typeutil.UniqueID(1))) - - segCache.Remove(typeutil.UniqueID(0)) - assert.False(t, segCache.checkIfCached(0)) -} diff --git a/internal/datanode/util/meta_util.go b/internal/datanode/util/meta_util.go deleted file mode 100644 index a7f78d82f0..0000000000 --- a/internal/datanode/util/meta_util.go +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/pkg/v2/proto/datapb" - "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" -) - -// ReviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 -func ReviseVChannelInfo(vChannel *datapb.VchannelInfo) { - removeDuplicateSegmentIDFn := func(ids []int64) []int64 { - result := make([]int64, 0, len(ids)) - existDict := make(map[int64]bool) - for _, id := range ids { - if _, ok := existDict[id]; !ok { - existDict[id] = true - result = append(result, id) - } - } - return result - } - - if vChannel == nil { - return - } - // if the segment infos is not nil(generated by 2.0.2), append the corresponding IDs to segmentIDs - // and remove the segment infos, remove deplicate ids in case there are some mixed situations - if vChannel.FlushedSegments != nil && len(vChannel.FlushedSegments) > 0 { - for _, segment := range vChannel.FlushedSegments { - vChannel.FlushedSegmentIds = append(vChannel.GetFlushedSegmentIds(), segment.GetID()) - } - vChannel.FlushedSegments = []*datapb.SegmentInfo{} - } - vChannel.FlushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetFlushedSegmentIds()) - - if vChannel.UnflushedSegments != nil && len(vChannel.UnflushedSegments) > 0 { - for _, segment := range vChannel.UnflushedSegments { - vChannel.UnflushedSegmentIds = append(vChannel.GetUnflushedSegmentIds(), segment.GetID()) - } - vChannel.UnflushedSegments = []*datapb.SegmentInfo{} - } - vChannel.UnflushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetUnflushedSegmentIds()) - - if vChannel.DroppedSegments != nil && len(vChannel.DroppedSegments) > 0 { - for _, segment := range vChannel.DroppedSegments { - vChannel.DroppedSegmentIds = append(vChannel.GetDroppedSegmentIds(), segment.GetID()) - } - vChannel.DroppedSegments = []*datapb.SegmentInfo{} - } - vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds()) -} - -func getPKField(meta *etcdpb.CollectionMeta) *schemapb.FieldSchema { - for _, field := range meta.Schema.Fields { - if field.GetIsPrimaryKey() { - return field - } - } - return nil -}