enhance: remove not inused util/* in datanode (#41177)

See also: #41229

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-04-11 10:34:29 +08:00 committed by GitHub
parent 8c679161f8
commit e7a53da025
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 0 additions and 191 deletions

View File

@ -41,7 +41,6 @@ import (
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/datanode/index"
"github.com/milvus-io/milvus/internal/datanode/msghandlerimpl"
"github.com/milvus-io/milvus/internal/datanode/util"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
@ -87,9 +86,6 @@ var Params *paramtable.ComponentParam = paramtable.Get()
// `rootCoord` is a grpc client of root coordinator.
// `dataCoord` is a grpc client of data service.
// `stateCode` is current statement of this data node, indicating whether it's healthy.
//
// `clearSignal` is a signal channel for releasing the flowgraph resources.
// `segmentCache` stores all flushing and flushed segments.
type DataNode struct {
ctx context.Context
cancel context.CancelFunc
@ -109,7 +105,6 @@ type DataNode struct {
taskScheduler *index.TaskScheduler
taskManager *index.TaskManager
segmentCache *util.Cache
compactionExecutor compactor.Executor
timeTickSender *util2.TimeTickSender
channelCheckpointUpdater *util2.ChannelCheckpointUpdater
@ -156,7 +151,6 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode {
rootCoord: nil,
dataCoord: nil,
factory: factory,
segmentCache: util.NewCache(),
compactionExecutor: compactor.NewExecutor(),
reportImportRetryTimes: 10,
metricsRequest: metricsinfo.NewMetricsRequest(),

View File

@ -1,60 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// Cache stores flushing segments' ids to prevent flushing the same segment again and again.
//
// Once a segment is flushed, its id will be removed from the cache.
//
// A segment not in cache will be added into the cache when `FlushSegments` is called.
// After the flush procedure, whether the segment successfully flushed or not,
// it'll be removed from the cache. So if flush failed, the secondary flush can be triggered.
type Cache struct {
*typeutil.ConcurrentSet[typeutil.UniqueID]
}
// NewCache returns a new Cache
func NewCache() *Cache {
return &Cache{
ConcurrentSet: typeutil.NewConcurrentSet[typeutil.UniqueID](),
}
}
// checkIfCached returns whether unique id is in cache
func (c *Cache) checkIfCached(key typeutil.UniqueID) bool {
return c.Contain(key)
}
// Cache caches a specific ID into the cache
func (c *Cache) Cache(ID typeutil.UniqueID) {
c.Insert(ID)
}
// checkOrCache returns true if `key` is present.
// Otherwise, it returns false and stores `key` into cache.
func (c *Cache) checkOrCache(key typeutil.UniqueID) bool {
return !c.Insert(key)
}
// Remove removes a set of IDs from the cache
func (c *Cache) Remove(IDs ...typeutil.UniqueID) {
c.ConcurrentSet.Remove(IDs...)
}

View File

@ -1,49 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestMain(t *testing.M) {
paramtable.Init()
code := t.Run()
os.Exit(code)
}
func TestSegmentCache(t *testing.T) {
segCache := NewCache()
assert.False(t, segCache.checkIfCached(0))
segCache.Cache(typeutil.UniqueID(0))
assert.True(t, segCache.checkIfCached(0))
assert.False(t, segCache.checkOrCache(typeutil.UniqueID(1)))
assert.True(t, segCache.checkIfCached(1))
assert.True(t, segCache.checkOrCache(typeutil.UniqueID(1)))
segCache.Remove(typeutil.UniqueID(0))
assert.False(t, segCache.checkIfCached(0))
}

View File

@ -1,76 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
)
// ReviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2
func ReviseVChannelInfo(vChannel *datapb.VchannelInfo) {
removeDuplicateSegmentIDFn := func(ids []int64) []int64 {
result := make([]int64, 0, len(ids))
existDict := make(map[int64]bool)
for _, id := range ids {
if _, ok := existDict[id]; !ok {
existDict[id] = true
result = append(result, id)
}
}
return result
}
if vChannel == nil {
return
}
// if the segment infos is not nil(generated by 2.0.2), append the corresponding IDs to segmentIDs
// and remove the segment infos, remove deplicate ids in case there are some mixed situations
if vChannel.FlushedSegments != nil && len(vChannel.FlushedSegments) > 0 {
for _, segment := range vChannel.FlushedSegments {
vChannel.FlushedSegmentIds = append(vChannel.GetFlushedSegmentIds(), segment.GetID())
}
vChannel.FlushedSegments = []*datapb.SegmentInfo{}
}
vChannel.FlushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetFlushedSegmentIds())
if vChannel.UnflushedSegments != nil && len(vChannel.UnflushedSegments) > 0 {
for _, segment := range vChannel.UnflushedSegments {
vChannel.UnflushedSegmentIds = append(vChannel.GetUnflushedSegmentIds(), segment.GetID())
}
vChannel.UnflushedSegments = []*datapb.SegmentInfo{}
}
vChannel.UnflushedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetUnflushedSegmentIds())
if vChannel.DroppedSegments != nil && len(vChannel.DroppedSegments) > 0 {
for _, segment := range vChannel.DroppedSegments {
vChannel.DroppedSegmentIds = append(vChannel.GetDroppedSegmentIds(), segment.GetID())
}
vChannel.DroppedSegments = []*datapb.SegmentInfo{}
}
vChannel.DroppedSegmentIds = removeDuplicateSegmentIDFn(vChannel.GetDroppedSegmentIds())
}
func getPKField(meta *etcdpb.CollectionMeta) *schemapb.FieldSchema {
for _, field := range meta.Schema.Fields {
if field.GetIsPrimaryKey() {
return field
}
}
return nil
}