enhance: [2.4] Handle legacy proxy load fields request (#37565) (#37569)

Cherry-pick from master
pr: #37565
Related to #35415

In rolling upgrade, legacy proxy may dispatch load request wit empty
load field list. The upgraded querycoord may report error by mistake
that load field list is changed.

This PR:

- Auto field empty load field list with all user field ids
- Refine the error messag when load field list updates
- Refine load job unit test with service cases

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-11-11 14:06:29 +08:00 committed by GitHub
parent 4f4261157d
commit 2fbb157dc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 279 additions and 57 deletions

View File

@ -27,11 +27,13 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -109,8 +111,15 @@ func (job *LoadCollectionJob) PreExecute() error {
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection")
}
// handle legacy proxy load request
if len(req.GetLoadFields()) == 0 {
req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) {
return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID
})
}
if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) {
log.Warn("collection with different load field list exists, release this collection first before chaning its replica number",
log.Warn("collection with different load field list exists, release this collection first before chaning its load fields",
zap.Int64s("loadedFieldIDs", collection.GetLoadFields()),
zap.Int64s("reqFieldIDs", req.GetLoadFields()),
)
@ -314,8 +323,15 @@ func (job *LoadPartitionJob) PreExecute() error {
return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions")
}
// handle legacy proxy load request
if len(req.GetLoadFields()) == 0 {
req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) {
return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID
})
}
if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) {
log.Warn("collection with different load field list exists, release this collection first before chaning its replica number",
log.Warn("collection with different load field list exists, release this collection first before chaning its load fields",
zap.Int64s("loadedFieldIDs", collection.GetLoadFields()),
zap.Int64s("reqFieldIDs", req.GetLoadFields()),
)

View File

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
@ -218,7 +219,7 @@ func (suite *JobSuite) BeforeTest(suiteName, testName string) {
for collection, partitions := range suite.partitions {
suite.broker.EXPECT().
GetPartitions(mock.Anything, collection).
Return(partitions, nil)
Return(partitions, nil).Maybe()
}
}
@ -307,32 +308,6 @@ func (suite *JobSuite) TestLoadCollection() {
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
// Test load existed collection with different load fields
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
LoadFields: []int64{100, 101},
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
// Test load partition while collection exists
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
@ -450,6 +425,131 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
}
}
func (suite *JobSuite) TestLoadCollectionWithLoadFields() {
ctx := context.Background()
suite.Run("init_load", func() {
// Test load collection
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
// Load with 1 replica
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
LoadFields: []int64{100, 101, 102},
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
suite.EqualValues(1, suite.meta.GetReplicaNumber(collection))
suite.targetMgr.UpdateCollectionCurrentTarget(collection)
suite.assertCollectionLoaded(collection)
}
})
suite.Run("load_again_same_fields", func() {
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
LoadFields: []int64{102, 101, 100}, // field id order shall not matter
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
}
})
suite.Run("load_again_diff_fields", func() {
// Test load existed collection with different load fields
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
LoadFields: []int64{100, 101},
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
})
suite.Run("load_from_legacy_proxy", func() {
// Test load again with legacy proxy
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadCollection {
continue
}
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100},
{FieldID: 101},
{FieldID: 102},
},
},
}
job := NewLoadCollectionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
}
})
}
func (suite *JobSuite) TestLoadPartition() {
ctx := context.Background()
@ -540,34 +640,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
// Test load partition with different load fields
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
LoadFields: []int64{100, 101},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
// Test load partition with more partition
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
@ -682,6 +754,140 @@ func (suite *JobSuite) TestLoadPartition() {
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
}
func (suite *JobSuite) TestLoadPartitionWithLoadFields() {
ctx := context.Background()
suite.Run("init_load", func() {
// Test load partition
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
// Load with 1 replica
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
ReplicaNumber: 1,
LoadFields: []int64{100, 101, 102},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
suite.EqualValues(1, suite.meta.GetReplicaNumber(collection))
suite.targetMgr.UpdateCollectionCurrentTarget(collection)
suite.assertCollectionLoaded(collection)
}
})
suite.Run("load_with_same_load_fields", func() {
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
// Load with 1 replica
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
ReplicaNumber: 1,
LoadFields: []int64{102, 101, 100},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
}
})
suite.Run("load_with_diff_load_fields", func() {
// Test load partition with different load fields
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
LoadFields: []int64{100, 101},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, merr.ErrParameterInvalid)
}
})
suite.Run("load_legacy_proxy", func() {
for _, collection := range suite.collections {
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
continue
}
// Load with 1 replica
req := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
ReplicaNumber: 1,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100},
{FieldID: 101},
{FieldID: 102},
},
},
}
job := NewLoadPartitionJob(
ctx,
req,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
suite.NoError(err)
}
})
}
func (suite *JobSuite) TestDynamicLoad() {
ctx := context.Background()