mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
Load should base on the accurate loaded number (#20999)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
2f5b03fef7
commit
c49f20ea94
@ -34,9 +34,11 @@ import (
|
|||||||
type CollectionObserver struct {
|
type CollectionObserver struct {
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
|
|
||||||
dist *meta.DistributionManager
|
dist *meta.DistributionManager
|
||||||
meta *meta.Meta
|
meta *meta.Meta
|
||||||
targetMgr *meta.TargetManager
|
targetMgr *meta.TargetManager
|
||||||
|
collectionLoadedCount map[int64]int
|
||||||
|
partitionLoadedCount map[int64]int
|
||||||
|
|
||||||
stopOnce sync.Once
|
stopOnce sync.Once
|
||||||
}
|
}
|
||||||
@ -47,10 +49,12 @@ func NewCollectionObserver(
|
|||||||
targetMgr *meta.TargetManager,
|
targetMgr *meta.TargetManager,
|
||||||
) *CollectionObserver {
|
) *CollectionObserver {
|
||||||
return &CollectionObserver{
|
return &CollectionObserver{
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
dist: dist,
|
dist: dist,
|
||||||
meta: meta,
|
meta: meta,
|
||||||
targetMgr: targetMgr,
|
targetMgr: targetMgr,
|
||||||
|
collectionLoadedCount: make(map[int64]int),
|
||||||
|
partitionLoadedCount: make(map[int64]int),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,6 +138,7 @@ func (ob *CollectionObserver) observeLoadStatus() {
|
|||||||
collections := ob.meta.CollectionManager.GetAllCollections()
|
collections := ob.meta.CollectionManager.GetAllCollections()
|
||||||
for _, collection := range collections {
|
for _, collection := range collections {
|
||||||
if collection.LoadPercentage == 100 {
|
if collection.LoadPercentage == 100 {
|
||||||
|
delete(ob.collectionLoadedCount, collection.GetCollectionID())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ob.observeCollectionLoadStatus(collection)
|
ob.observeCollectionLoadStatus(collection)
|
||||||
@ -145,6 +150,7 @@ func (ob *CollectionObserver) observeLoadStatus() {
|
|||||||
}
|
}
|
||||||
for _, partition := range partitions {
|
for _, partition := range partitions {
|
||||||
if partition.LoadPercentage == 100 {
|
if partition.LoadPercentage == 100 {
|
||||||
|
delete(ob.partitionLoadedCount, partition.GetPartitionID())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ob.observePartitionLoadStatus(partition)
|
ob.observePartitionLoadStatus(partition)
|
||||||
@ -195,10 +201,10 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
|
|||||||
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
|
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
|
||||||
}
|
}
|
||||||
|
|
||||||
if updated.LoadPercentage <= collection.LoadPercentage {
|
if loadedCount <= ob.collectionLoadedCount[collection.GetCollectionID()] && updated.LoadPercentage != 100 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
|
||||||
if loadedCount >= targetNum {
|
if loadedCount >= targetNum {
|
||||||
ob.targetMgr.UpdateCollectionCurrentTarget(updated.CollectionID)
|
ob.targetMgr.UpdateCollectionCurrentTarget(updated.CollectionID)
|
||||||
updated.Status = querypb.LoadStatus_Loaded
|
updated.Status = querypb.LoadStatus_Loaded
|
||||||
@ -260,10 +266,10 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if updated.LoadPercentage <= partition.LoadPercentage {
|
if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && updated.LoadPercentage != 100 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
|
||||||
if loadedCount >= targetNum {
|
if loadedCount >= targetNum {
|
||||||
ob.targetMgr.UpdateCollectionCurrentTarget(partition.GetCollectionID(), partition.GetPartitionID())
|
ob.targetMgr.UpdateCollectionCurrentTarget(partition.GetCollectionID(), partition.GetPartitionID())
|
||||||
updated.Status = querypb.LoadStatus_Loaded
|
updated.Status = querypb.LoadStatus_Loaded
|
||||||
|
|||||||
@ -67,10 +67,11 @@ type CollectionObserverSuite struct {
|
|||||||
func (suite *CollectionObserverSuite) SetupSuite() {
|
func (suite *CollectionObserverSuite) SetupSuite() {
|
||||||
Params.Init()
|
Params.Init()
|
||||||
|
|
||||||
suite.collections = []int64{100, 101}
|
suite.collections = []int64{100, 101, 102}
|
||||||
suite.partitions = map[int64][]int64{
|
suite.partitions = map[int64][]int64{
|
||||||
100: {10},
|
100: {10},
|
||||||
101: {11, 12},
|
101: {11, 12},
|
||||||
|
102: {13},
|
||||||
}
|
}
|
||||||
suite.channels = map[int64][]*meta.DmChannel{
|
suite.channels = map[int64][]*meta.DmChannel{
|
||||||
100: {
|
100: {
|
||||||
@ -93,6 +94,12 @@ func (suite *CollectionObserverSuite) SetupSuite() {
|
|||||||
ChannelName: "101-dmc1",
|
ChannelName: "101-dmc1",
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
|
102: {
|
||||||
|
meta.DmChannelFromVChannel(&datapb.VchannelInfo{
|
||||||
|
CollectionID: 102,
|
||||||
|
ChannelName: "102-dmc0",
|
||||||
|
}),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
suite.segments = map[int64][]*datapb.SegmentInfo{
|
suite.segments = map[int64][]*datapb.SegmentInfo{
|
||||||
100: {
|
100: {
|
||||||
@ -123,18 +130,22 @@ func (suite *CollectionObserverSuite) SetupSuite() {
|
|||||||
InsertChannel: "101-dmc1",
|
InsertChannel: "101-dmc1",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
102: genSegmentsInfo(999, 5, 102, 13, "102-dmc0"),
|
||||||
}
|
}
|
||||||
suite.loadTypes = map[int64]querypb.LoadType{
|
suite.loadTypes = map[int64]querypb.LoadType{
|
||||||
100: querypb.LoadType_LoadCollection,
|
100: querypb.LoadType_LoadCollection,
|
||||||
101: querypb.LoadType_LoadPartition,
|
101: querypb.LoadType_LoadPartition,
|
||||||
|
102: querypb.LoadType_LoadCollection,
|
||||||
}
|
}
|
||||||
suite.replicaNumber = map[int64]int32{
|
suite.replicaNumber = map[int64]int32{
|
||||||
100: 1,
|
100: 1,
|
||||||
101: 1,
|
101: 1,
|
||||||
|
102: 1,
|
||||||
}
|
}
|
||||||
suite.loadPercentage = map[int64]int32{
|
suite.loadPercentage = map[int64]int32{
|
||||||
100: 0,
|
100: 0,
|
||||||
101: 50,
|
101: 50,
|
||||||
|
102: 0,
|
||||||
}
|
}
|
||||||
suite.nodes = []int64{1, 2, 3}
|
suite.nodes = []int64{1, 2, 3}
|
||||||
}
|
}
|
||||||
@ -184,6 +195,8 @@ func (suite *CollectionObserverSuite) TestObserve() {
|
|||||||
const (
|
const (
|
||||||
timeout = 2 * time.Second
|
timeout = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
// time before load
|
||||||
|
time := suite.meta.GetCollection(suite.collections[2]).UpdatedAt
|
||||||
// Not timeout
|
// Not timeout
|
||||||
Params.QueryCoordCfg.LoadTimeoutSeconds = timeout
|
Params.QueryCoordCfg.LoadTimeoutSeconds = timeout
|
||||||
suite.ob.Start(context.Background())
|
suite.ob.Start(context.Background())
|
||||||
@ -202,10 +215,20 @@ func (suite *CollectionObserverSuite) TestObserve() {
|
|||||||
Channel: "100-dmc1",
|
Channel: "100-dmc1",
|
||||||
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}},
|
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}},
|
||||||
})
|
})
|
||||||
|
suite.dist.LeaderViewManager.Update(3, &meta.LeaderView{
|
||||||
|
ID: 3,
|
||||||
|
CollectionID: 102,
|
||||||
|
Channel: "102-dmc0",
|
||||||
|
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 5, Version: 0}},
|
||||||
|
})
|
||||||
suite.Eventually(func() bool {
|
suite.Eventually(func() bool {
|
||||||
return suite.isCollectionLoaded(suite.collections[0])
|
return suite.isCollectionLoaded(suite.collections[0])
|
||||||
}, timeout*2, timeout/10)
|
}, timeout*2, timeout/10)
|
||||||
|
|
||||||
|
suite.Eventually(func() bool {
|
||||||
|
return suite.isCollectionLoadedContinue(suite.collections[2], time)
|
||||||
|
}, timeout*2, timeout/10)
|
||||||
|
|
||||||
suite.Eventually(func() bool {
|
suite.Eventually(func() bool {
|
||||||
return suite.isCollectionTimeout(suite.collections[1])
|
return suite.isCollectionTimeout(suite.collections[1])
|
||||||
}, timeout*2, timeout/10)
|
}, timeout*2, timeout/10)
|
||||||
@ -232,18 +255,23 @@ func (suite *CollectionObserverSuite) isCollectionTimeout(collection int64) bool
|
|||||||
replicas := suite.meta.ReplicaManager.GetByCollection(collection)
|
replicas := suite.meta.ReplicaManager.GetByCollection(collection)
|
||||||
channels := suite.targetMgr.GetDmChannelsByCollection(collection, meta.CurrentTarget)
|
channels := suite.targetMgr.GetDmChannelsByCollection(collection, meta.CurrentTarget)
|
||||||
segments := suite.targetMgr.GetHistoricalSegmentsByCollection(collection, meta.CurrentTarget)
|
segments := suite.targetMgr.GetHistoricalSegmentsByCollection(collection, meta.CurrentTarget)
|
||||||
|
|
||||||
return !(exist ||
|
return !(exist ||
|
||||||
len(replicas) > 0 ||
|
len(replicas) > 0 ||
|
||||||
len(channels) > 0 ||
|
len(channels) > 0 ||
|
||||||
len(segments) > 0)
|
len(segments) > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *CollectionObserverSuite) isCollectionLoadedContinue(collection int64, beforeTime time.Time) bool {
|
||||||
|
return suite.meta.GetCollection(collection).UpdatedAt.After(beforeTime)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *CollectionObserverSuite) loadAll() {
|
func (suite *CollectionObserverSuite) loadAll() {
|
||||||
for _, collection := range suite.collections {
|
for _, collection := range suite.collections {
|
||||||
suite.load(collection)
|
suite.load(collection)
|
||||||
}
|
}
|
||||||
suite.targetMgr.UpdateCollectionCurrentTarget(suite.collections[0])
|
suite.targetMgr.UpdateCollectionCurrentTarget(suite.collections[0])
|
||||||
|
suite.targetMgr.UpdateCollectionCurrentTarget(suite.collections[2])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *CollectionObserverSuite) load(collection int64) {
|
func (suite *CollectionObserverSuite) load(collection int64) {
|
||||||
@ -304,3 +332,17 @@ func (suite *CollectionObserverSuite) load(collection int64) {
|
|||||||
func TestCollectionObserver(t *testing.T) {
|
func TestCollectionObserver(t *testing.T) {
|
||||||
suite.Run(t, new(CollectionObserverSuite))
|
suite.Run(t, new(CollectionObserverSuite))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func genSegmentsInfo(count int, start int, collID int64, partitionID int64, insertChannel string) []*datapb.SegmentInfo {
|
||||||
|
ret := make([]*datapb.SegmentInfo, 0, count)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
segment := &datapb.SegmentInfo{
|
||||||
|
ID: int64(start + i),
|
||||||
|
CollectionID: collID,
|
||||||
|
PartitionID: partitionID,
|
||||||
|
InsertChannel: insertChannel,
|
||||||
|
}
|
||||||
|
ret = append(ret, segment)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user