diff --git a/internal/datanode/importv2/pool_test.go b/internal/datanode/importv2/pool_test.go index 06873c6d31..4449a5031c 100644 --- a/internal/datanode/importv2/pool_test.go +++ b/internal/datanode/importv2/pool_test.go @@ -20,9 +20,10 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/stretchr/testify/assert" ) func TestResizePools(t *testing.T) { diff --git a/internal/metastore/mocks/mock_rootcoord_catalog.go b/internal/metastore/mocks/mock_rootcoord_catalog.go index 646eb849ae..8c35d288c1 100644 --- a/internal/metastore/mocks/mock_rootcoord_catalog.go +++ b/internal/metastore/mocks/mock_rootcoord_catalog.go @@ -1879,7 +1879,7 @@ func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Return(_a0 *milvuspb.Privileg return _c } -func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo,error)) *RootCoordCatalog_GetPrivilegeGroup_Call { +func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo, error)) *RootCoordCatalog_GetPrivilegeGroup_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 4f438a9da1..f7c75da2fc 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -27,11 +27,13 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -109,8 +111,15 @@ func (job *LoadCollectionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection") } + // handle legacy proxy load request + if len(req.GetLoadFields()) == 0 { + req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { + return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID + }) + } + if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) { - log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + log.Warn("collection with different load field list exists, release this collection first before chaning its load fields", zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), zap.Int64s("reqFieldIDs", req.GetLoadFields()), ) @@ -314,8 +323,15 @@ func (job *LoadPartitionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions") } + // handle legacy proxy load request + if len(req.GetLoadFields()) == 0 { + req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { + return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID + }) + } + if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) { - log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + log.Warn("collection with different load field list exists, release this collection first before chaning its load fields", zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), zap.Int64s("reqFieldIDs", req.GetLoadFields()), ) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index e919d28b7a..b3d0f58d62 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" @@ -218,7 +219,7 @@ func (suite *JobSuite) BeforeTest(suiteName, testName string) { for collection, partitions := range suite.partitions { suite.broker.EXPECT(). GetPartitions(mock.Anything, collection). - Return(partitions, nil) + Return(partitions, nil).Maybe() } } @@ -307,32 +308,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.ErrorIs(err, merr.ErrParameterInvalid) } - // Test load existed collection with different load fields - for _, collection := range suite.collections { - if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { - continue - } - req := &querypb.LoadCollectionRequest{ - CollectionID: collection, - LoadFields: []int64{100, 101}, - } - job := NewLoadCollectionJob( - ctx, - req, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(job) - err := job.Wait() - suite.ErrorIs(err, merr.ErrParameterInvalid) - } - // Test load partition while collection exists for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { @@ -450,6 +425,131 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() { } } +func (suite *JobSuite) TestLoadCollectionWithLoadFields() { + ctx := context.Background() + + suite.Run("init_load", func() { + // Test load collection + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + // Load with 1 replica + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{100, 101, 102}, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + suite.EqualValues(1, suite.meta.GetReplicaNumber(collection)) + suite.targetMgr.UpdateCollectionCurrentTarget(collection) + suite.assertCollectionLoaded(collection) + } + }) + + suite.Run("load_again_same_fields", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{102, 101, 100}, // field id order shall not matter + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) + + suite.Run("load_again_diff_fields", func() { + // Test load existed collection with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{100, 101}, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + }) + + suite.Run("load_from_legacy_proxy", func() { + // Test load again with legacy proxy + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100}, + {FieldID: 101}, + {FieldID: 102}, + }, + }, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) +} + func (suite *JobSuite) TestLoadPartition() { ctx := context.Background() @@ -540,34 +640,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.ErrorIs(err, merr.ErrParameterInvalid) } - // Test load partition with different load fields - for _, collection := range suite.collections { - if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { - continue - } - - req := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - LoadFields: []int64{100, 101}, - } - job := NewLoadPartitionJob( - ctx, - req, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(job) - err := job.Wait() - suite.ErrorIs(err, merr.ErrParameterInvalid) - } - // Test load partition with more partition for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { @@ -682,6 +754,140 @@ func (suite *JobSuite) TestLoadPartition() { suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } +func (suite *JobSuite) TestLoadPartitionWithLoadFields() { + ctx := context.Background() + + suite.Run("init_load", func() { + // Test load partition + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + LoadFields: []int64{100, 101, 102}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + suite.EqualValues(1, suite.meta.GetReplicaNumber(collection)) + suite.targetMgr.UpdateCollectionCurrentTarget(collection) + suite.assertCollectionLoaded(collection) + } + }) + + suite.Run("load_with_same_load_fields", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + LoadFields: []int64{102, 101, 100}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) + + suite.Run("load_with_diff_load_fields", func() { + // Test load partition with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + LoadFields: []int64{100, 101}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + }) + + suite.Run("load_legacy_proxy", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100}, + {FieldID: 101}, + {FieldID: 102}, + }, + }, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) +} + func (suite *JobSuite) TestDynamicLoad() { ctx := context.Background()