From 596357dc852fe3b70f80b8fef17296d08b9d9ca6 Mon Sep 17 00:00:00 2001 From: dragondriver Date: Thu, 9 Sep 2021 19:02:08 +0800 Subject: [PATCH] Add more data definition test cases (#7623) Signed-off-by: dragondriver --- internal/proxy/channels_mgr_test.go | 20 +- internal/proxy/impl.go | 40 +- internal/proxy/{mock.go => mock_test.go} | 110 +++++ internal/proxy/msgstream_mock.go | 127 ------ internal/proxy/proxy_test.go | 532 ++++++++++++++++++++++- internal/proxy/task.go | 400 ++++++++--------- internal/proxy/task_scheduler_test.go | 2 +- internal/util/flowgraph/node.go | 9 +- 8 files changed, 879 insertions(+), 361 deletions(-) rename internal/proxy/{mock.go => mock_test.go} (68%) delete mode 100644 internal/proxy/msgstream_mock.go diff --git a/internal/proxy/channels_mgr_test.go b/internal/proxy/channels_mgr_test.go index 5accf90a6c..bb2da0bde4 100644 --- a/internal/proxy/channels_mgr_test.go +++ b/internal/proxy/channels_mgr_test.go @@ -22,7 +22,7 @@ import ( func TestChannelsMgrImpl_getChannels(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -40,7 +40,7 @@ func TestChannelsMgrImpl_getChannels(t *testing.T) { func TestChannelsMgrImpl_getVChannels(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -58,7 +58,7 @@ func TestChannelsMgrImpl_getVChannels(t *testing.T) { func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -80,7 +80,7 @@ func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) { func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -98,7 +98,7 @@ func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) { func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -125,7 +125,7 @@ func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) { func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -140,7 +140,7 @@ func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) { func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -153,7 +153,7 @@ func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) { func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -171,7 +171,7 @@ func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) { func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() @@ -198,7 +198,7 @@ func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) { func TestChannelsMgrImpl_removeAllDQLMsgStream(t *testing.T) { master := newMockGetChannelsService() query := newMockGetChannelsService() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() mgr := newChannelsMgrImpl(master.GetChannels, nil, query.GetChannels, nil, factory) defer mgr.removeAllDMLStream() diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 2263397914..444c940a16 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -121,7 +121,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat if !node.checkHealthy() { return unhealthyStatus(), nil } - cct := &CreateCollectionTask{ + cct := &createCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), CreateCollectionRequest: request, @@ -174,7 +174,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol if !node.checkHealthy() { return unhealthyStatus(), nil } - dct := &DropCollectionTask{ + dct := &dropCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), DropCollectionRequest: request, @@ -228,7 +228,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle Status: unhealthyStatus(), }, nil } - hct := &HasCollectionTask{ + hct := &hasCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), HasCollectionRequest: request, @@ -282,7 +282,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol if !node.checkHealthy() { return unhealthyStatus(), nil } - lct := &LoadCollectionTask{ + lct := &loadCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), LoadCollectionRequest: request, @@ -332,7 +332,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele if !node.checkHealthy() { return unhealthyStatus(), nil } - rct := &ReleaseCollectionTask{ + rct := &releaseCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), ReleaseCollectionRequest: request, @@ -385,7 +385,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des Status: unhealthyStatus(), }, nil } - dct := &DescribeCollectionTask{ + dct := &describeCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), DescribeCollectionRequest: request, @@ -441,7 +441,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp Status: unhealthyStatus(), }, nil } - g := &GetCollectionStatisticsTask{ + g := &getCollectionStatisticsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), GetCollectionStatisticsRequest: request, @@ -497,7 +497,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo Status: unhealthyStatus(), }, nil } - sct := &ShowCollectionsTask{ + sct := &showCollectionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), ShowCollectionsRequest: request, @@ -520,7 +520,10 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo log.Debug("ShowCollections", zap.String("role", Params.RoleName), - zap.Any("request", request)) + zap.String("DbName", sct.ShowCollectionsRequest.DbName), + zap.String("ShowType", sct.ShowCollectionsRequest.Type.String()), + zap.Any("CollectionNames", sct.ShowCollectionsRequest.CollectionNames), + ) defer func() { log.Debug("ShowCollections Done", zap.Error(err), @@ -546,7 +549,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create if !node.checkHealthy() { return unhealthyStatus(), nil } - cpt := &CreatePartitionTask{ + cpt := &createPartitionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), CreatePartitionRequest: request, @@ -599,7 +602,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart if !node.checkHealthy() { return unhealthyStatus(), nil } - dpt := &DropPartitionTask{ + dpt := &dropPartitionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), DropPartitionRequest: request, @@ -654,7 +657,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit Status: unhealthyStatus(), }, nil } - hpt := &HasPartitionTask{ + hpt := &hasPartitionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), HasPartitionRequest: request, @@ -713,7 +716,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar if !node.checkHealthy() { return unhealthyStatus(), nil } - lpt := &LoadPartitionTask{ + lpt := &loadPartitionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), LoadPartitionsRequest: request, @@ -766,7 +769,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele if !node.checkHealthy() { return unhealthyStatus(), nil } - rpt := &ReleasePartitionTask{ + rpt := &releasePartitionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), ReleasePartitionsRequest: request, @@ -821,7 +824,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb Status: unhealthyStatus(), }, nil } - g := &GetPartitionStatisticsTask{ + g := &getPartitionStatisticsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), GetPartitionStatisticsRequest: request, @@ -880,7 +883,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar Status: unhealthyStatus(), }, nil } - spt := &ShowPartitionsTask{ + spt := &showPartitionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), ShowPartitionsRequest: request, @@ -904,7 +907,10 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar log.Debug("ShowPartitions", zap.String("role", Params.RoleName), - zap.Any("request", request)) + zap.String("db", spt.ShowPartitionsRequest.DbName), + zap.String("collection", spt.ShowPartitionsRequest.CollectionName), + zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames), + ) defer func() { log.Debug("ShowPartitions Done", zap.Error(err), diff --git a/internal/proxy/mock.go b/internal/proxy/mock_test.go similarity index 68% rename from internal/proxy/mock.go rename to internal/proxy/mock_test.go index 03f67f0693..89997b12a0 100644 --- a/internal/proxy/mock.go +++ b/internal/proxy/mock_test.go @@ -17,6 +17,8 @@ import ( "sync" "time" + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/uniquegenerator" @@ -243,3 +245,111 @@ func newMockDqlTask(ctx context.Context) *mockDqlTask { func newDefaultMockDqlTask() *mockDqlTask { return newMockDqlTask(context.Background()) } + +type simpleMockMsgStream struct { + msgChan chan *msgstream.MsgPack + + msgCount int + msgCountMtx sync.RWMutex +} + +func (ms *simpleMockMsgStream) Start() { +} + +func (ms *simpleMockMsgStream) Close() { +} + +func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.MsgPack { + return ms.msgChan +} + +func (ms *simpleMockMsgStream) AsProducer(channels []string) { +} + +func (ms *simpleMockMsgStream) AsConsumer(channels []string, subName string) { +} + +func (ms *simpleMockMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { + return nil +} + +func (ms *simpleMockMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) { +} + +func (ms *simpleMockMsgStream) getMsgCount() int { + ms.msgCountMtx.RLock() + defer ms.msgCountMtx.RUnlock() + + return ms.msgCount +} + +func (ms *simpleMockMsgStream) increaseMsgCount(delta int) { + ms.msgCountMtx.Lock() + defer ms.msgCountMtx.Unlock() + + ms.msgCount += delta +} + +func (ms *simpleMockMsgStream) decreaseMsgCount(delta int) { + ms.increaseMsgCount(-delta) +} + +func (ms *simpleMockMsgStream) Produce(pack *msgstream.MsgPack) error { + defer ms.increaseMsgCount(1) + + ms.msgChan <- pack + + return nil +} + +func (ms *simpleMockMsgStream) Broadcast(pack *msgstream.MsgPack) error { + return nil +} + +func (ms *simpleMockMsgStream) GetProduceChannels() []string { + return nil +} + +func (ms *simpleMockMsgStream) Consume() *msgstream.MsgPack { + if ms.getMsgCount() <= 0 { + return nil + } + + defer ms.decreaseMsgCount(1) + + return <-ms.msgChan +} + +func (ms *simpleMockMsgStream) Seek(offset []*msgstream.MsgPosition) error { + return nil +} + +func newSimpleMockMsgStream() *simpleMockMsgStream { + return &simpleMockMsgStream{ + msgChan: make(chan *msgstream.MsgPack, 1024), + msgCount: 0, + } +} + +type simpleMockMsgStreamFactory struct { +} + +func (factory *simpleMockMsgStreamFactory) SetParams(params map[string]interface{}) error { + return nil +} + +func (factory *simpleMockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { + return newSimpleMockMsgStream(), nil +} + +func (factory *simpleMockMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) { + return newSimpleMockMsgStream(), nil +} + +func (factory *simpleMockMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) { + return newSimpleMockMsgStream(), nil +} + +func newSimpleMockMsgStreamFactory() *simpleMockMsgStreamFactory { + return &simpleMockMsgStreamFactory{} +} diff --git a/internal/proxy/msgstream_mock.go b/internal/proxy/msgstream_mock.go deleted file mode 100644 index 845a8fb76f..0000000000 --- a/internal/proxy/msgstream_mock.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxy - -import ( - "context" - "sync" - - "github.com/milvus-io/milvus/internal/msgstream" -) - -type SimpleMsgStream struct { - msgChan chan *msgstream.MsgPack - - msgCount int - msgCountMtx sync.RWMutex -} - -func (ms *SimpleMsgStream) Start() { -} - -func (ms *SimpleMsgStream) Close() { -} - -func (ms *SimpleMsgStream) Chan() <-chan *msgstream.MsgPack { - return ms.msgChan -} - -func (ms *SimpleMsgStream) AsProducer(channels []string) { -} - -func (ms *SimpleMsgStream) AsConsumer(channels []string, subName string) { -} - -func (ms *SimpleMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { - return nil -} - -func (ms *SimpleMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) { -} - -func (ms *SimpleMsgStream) getMsgCount() int { - ms.msgCountMtx.RLock() - defer ms.msgCountMtx.RUnlock() - - return ms.msgCount -} - -func (ms *SimpleMsgStream) increaseMsgCount(delta int) { - ms.msgCountMtx.Lock() - defer ms.msgCountMtx.Unlock() - - ms.msgCount += delta -} - -func (ms *SimpleMsgStream) decreaseMsgCount(delta int) { - ms.increaseMsgCount(-delta) -} - -func (ms *SimpleMsgStream) Produce(pack *msgstream.MsgPack) error { - defer ms.increaseMsgCount(1) - - ms.msgChan <- pack - - return nil -} - -func (ms *SimpleMsgStream) Broadcast(pack *msgstream.MsgPack) error { - return nil -} - -func (ms *SimpleMsgStream) GetProduceChannels() []string { - return nil -} - -func (ms *SimpleMsgStream) Consume() *msgstream.MsgPack { - if ms.getMsgCount() <= 0 { - return nil - } - - defer ms.decreaseMsgCount(1) - - return <-ms.msgChan -} - -func (ms *SimpleMsgStream) Seek(offset []*msgstream.MsgPosition) error { - return nil -} - -func NewSimpleMsgStream() *SimpleMsgStream { - return &SimpleMsgStream{ - msgChan: make(chan *msgstream.MsgPack, 1024), - msgCount: 0, - } -} - -type SimpleMsgStreamFactory struct { -} - -func (factory *SimpleMsgStreamFactory) SetParams(params map[string]interface{}) error { - return nil -} - -func (factory *SimpleMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return NewSimpleMsgStream(), nil -} - -func (factory *SimpleMsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return NewSimpleMsgStream(), nil -} - -func (factory *SimpleMsgStreamFactory) NewQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return NewSimpleMsgStream(), nil -} - -func NewSimpleMsgStreamFactory() *SimpleMsgStreamFactory { - return &SimpleMsgStreamFactory{} -} diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index cf19773f36..f85042bc61 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -459,6 +459,9 @@ func TestProxy(t *testing.T) { prefix := "test_proxy_" dbName := "" collectionName := prefix + funcutil.GenRandomStr() + otherCollectionName := collectionName + funcutil.GenRandomStr() + partitionName := prefix + funcutil.GenRandomStr() + otherPartitionName := partitionName + funcutil.GenRandomStr() shardsNum := int32(2) int64Field := "int64" floatVecField := "fVec" @@ -501,9 +504,9 @@ func TestProxy(t *testing.T) { }, } } + schema := constructCollectionSchema() constructCreateCollectionRequest := func() *milvuspb.CreateCollectionRequest { - schema := constructCollectionSchema() bs, err := proto.Marshal(schema) assert.NoError(t, err) return &milvuspb.CreateCollectionRequest{ @@ -514,12 +517,512 @@ func TestProxy(t *testing.T) { ShardsNum: shardsNum, } } + createCollectionReq := constructCreateCollectionRequest() t.Run("create collection", func(t *testing.T) { - req := constructCreateCollectionRequest() + req := createCollectionReq resp, err := proxy.CreateCollection(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // recreate -> fail + req2 := constructCreateCollectionRequest() + resp, err = proxy.CreateCollection(ctx, req2) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("has collection", func(t *testing.T) { + resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + TimeStamp: 0, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.True(t, resp.Value) + + // has other collection: false + resp, err = proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + TimeStamp: 0, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.False(t, resp.Value) + }) + + t.Run("load collection", func(t *testing.T) { + resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // load other collection -> fail + resp, err = proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("show in-memory collections", func(t *testing.T) { + resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: nil, + DbName: dbName, + TimeStamp: 0, + Type: milvuspb.ShowType_InMemory, + CollectionNames: nil, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, 1, len(resp.CollectionNames)) + + // get in-memory percentage + resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: nil, + DbName: dbName, + TimeStamp: 0, + Type: milvuspb.ShowType_InMemory, + CollectionNames: []string{collectionName}, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, 1, len(resp.CollectionNames)) + assert.Equal(t, 1, len(resp.InMemoryPercentages)) + + // get in-memory percentage of not loaded collection -> fail + resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: nil, + DbName: dbName, + TimeStamp: 0, + Type: milvuspb.ShowType_InMemory, + CollectionNames: []string{otherCollectionName}, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("describe collection", func(t *testing.T) { + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + assert.NoError(t, err) + + resp, err := proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + CollectionID: collectionID, + TimeStamp: 0, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, collectionID, resp.CollectionID) + // TODO(dragondriver): shards num + assert.Equal(t, len(schema.Fields), len(resp.Schema.Fields)) + // TODO(dragondriver): compare fields schema, not sure the order of fields + + // describe other collection -> fail + resp, err = proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + CollectionID: collectionID, + TimeStamp: 0, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("get collection statistics", func(t *testing.T) { + resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + // TODO(dragondriver): check num rows + + // get statistics of other collection -> fail + resp, err = proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("show collections", func(t *testing.T) { + resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: nil, + DbName: dbName, + TimeStamp: 0, + Type: milvuspb.ShowType_All, + CollectionNames: nil, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, 1, len(resp.CollectionNames)) + }) + + t.Run("create partition", func(t *testing.T) { + resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // recreate -> fail + resp, err = proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // create partition with non-exist collection -> fail + resp, err = proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("has partition", func(t *testing.T) { + resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.True(t, resp.Value) + + resp, err = proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: otherPartitionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.False(t, resp.Value) + + // non-exist collection -> fail + resp, err = proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("load partitions", func(t *testing.T) { + resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: []string{partitionName}, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // non-exist partition -> fail + resp, err = proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: []string{otherPartitionName}, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // non-exist collection-> fail + resp, err = proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + PartitionNames: []string{partitionName}, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("show in-memory partitions", func(t *testing.T) { + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + assert.NoError(t, err) + + resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + CollectionID: collectionID, + PartitionNames: nil, + Type: milvuspb.ShowType_InMemory, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + // default partition? + assert.Equal(t, 1, len(resp.PartitionNames)) + + // show partition not in-memory -> fail + resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + CollectionID: collectionID, + PartitionNames: []string{otherPartitionName}, + Type: milvuspb.ShowType_InMemory, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + // non-exist collection -> fail + resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + CollectionID: collectionID, + PartitionNames: []string{partitionName}, + Type: milvuspb.ShowType_InMemory, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("get partition statistics", func(t *testing.T) { + resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + // non-exist partition -> fail + resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: otherPartitionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + + // non-exist collection -> fail + resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("show partitions", func(t *testing.T) { + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + assert.NoError(t, err) + + resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + CollectionID: collectionID, + PartitionNames: nil, + Type: milvuspb.ShowType_All, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + // default partition + assert.Equal(t, 2, len(resp.PartitionNames)) + + // non-exist collection -> fail + resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + CollectionID: collectionID + 1, + PartitionNames: nil, + Type: milvuspb.ShowType_All, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("release partition", func(t *testing.T) { + resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionNames: []string{partitionName}, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("show in-memory partitions after release partition", func(t *testing.T) { + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + assert.NoError(t, err) + + resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + CollectionID: collectionID, + PartitionNames: nil, + Type: milvuspb.ShowType_InMemory, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + // default partition + assert.Equal(t, 1, len(resp.PartitionNames)) + + resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + CollectionID: collectionID, + PartitionNames: []string{partitionName}, // released + Type: milvuspb.ShowType_InMemory, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("drop partition", func(t *testing.T) { + resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // invalidate meta cache + resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // drop non-exist partition -> fail + + resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: otherCollectionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: otherCollectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("has partition after drop partition", func(t *testing.T) { + resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + PartitionName: partitionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.False(t, resp.Value) + }) + + t.Run("show partitions after drop partition", func(t *testing.T) { + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + assert.NoError(t, err) + + resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + CollectionID: collectionID, + PartitionNames: nil, + Type: milvuspb.ShowType_All, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + // default partition + assert.Equal(t, 1, len(resp.PartitionNames)) + }) + + t.Run("release collection", func(t *testing.T) { + collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) + assert.NoError(t, err) + + resp, err := proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + + // release dql message stream + resp, err = proxy.ReleaseDQLMessageStream(ctx, &proxypb.ReleaseDQLMessageStreamRequest{ + Base: nil, + DbID: 0, + CollectionID: collectionID, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("show in-memory collections after release", func(t *testing.T) { + resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: nil, + DbName: dbName, + TimeStamp: 0, + Type: milvuspb.ShowType_InMemory, + CollectionNames: nil, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, 0, len(resp.CollectionNames)) }) t.Run("drop collection", func(t *testing.T) { @@ -552,5 +1055,30 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + t.Run("has collection after drop collection", func(t *testing.T) { + resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + TimeStamp: 0, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.False(t, resp.Value) + }) + + t.Run("show all collections after drop collection", func(t *testing.T) { + resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: nil, + DbName: dbName, + TimeStamp: 0, + Type: milvuspb.ShowType_All, + CollectionNames: nil, + }) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, 0, len(resp.CollectionNames)) + }) + cancel() } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 5b95f21ce7..ef6470d790 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -75,8 +75,8 @@ const ( FlushTaskName = "FlushTask" LoadCollectionTaskName = "LoadCollectionTask" ReleaseCollectionTaskName = "ReleaseCollectionTask" - LoadPartitionTaskName = "LoadPartitionTask" - ReleasePartitionTaskName = "ReleasePartitionTask" + LoadPartitionTaskName = "LoadPartitionsTask" + ReleasePartitionTaskName = "ReleasePartitionsTask" ) type task interface { @@ -1069,7 +1069,7 @@ func (it *InsertTask) PostExecute(ctx context.Context) error { return nil } -type CreateCollectionTask struct { +type createCollectionTask struct { Condition *milvuspb.CreateCollectionRequest ctx context.Context @@ -1079,44 +1079,44 @@ type CreateCollectionTask struct { schema *schemapb.CollectionSchema } -func (cct *CreateCollectionTask) TraceCtx() context.Context { +func (cct *createCollectionTask) TraceCtx() context.Context { return cct.ctx } -func (cct *CreateCollectionTask) ID() UniqueID { +func (cct *createCollectionTask) ID() UniqueID { return cct.Base.MsgID } -func (cct *CreateCollectionTask) SetID(uid UniqueID) { +func (cct *createCollectionTask) SetID(uid UniqueID) { cct.Base.MsgID = uid } -func (cct *CreateCollectionTask) Name() string { +func (cct *createCollectionTask) Name() string { return CreateCollectionTaskName } -func (cct *CreateCollectionTask) Type() commonpb.MsgType { +func (cct *createCollectionTask) Type() commonpb.MsgType { return cct.Base.MsgType } -func (cct *CreateCollectionTask) BeginTs() Timestamp { +func (cct *createCollectionTask) BeginTs() Timestamp { return cct.Base.Timestamp } -func (cct *CreateCollectionTask) EndTs() Timestamp { +func (cct *createCollectionTask) EndTs() Timestamp { return cct.Base.Timestamp } -func (cct *CreateCollectionTask) SetTs(ts Timestamp) { +func (cct *createCollectionTask) SetTs(ts Timestamp) { cct.Base.Timestamp = ts } -func (cct *CreateCollectionTask) OnEnqueue() error { +func (cct *createCollectionTask) OnEnqueue() error { cct.Base = &commonpb.MsgBase{} return nil } -func (cct *CreateCollectionTask) PreExecute(ctx context.Context) error { +func (cct *createCollectionTask) PreExecute(ctx context.Context) error { cct.Base.MsgType = commonpb.MsgType_CreateCollection cct.Base.SourceID = Params.ProxyID @@ -1190,17 +1190,17 @@ func (cct *CreateCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (cct *CreateCollectionTask) Execute(ctx context.Context) error { +func (cct *createCollectionTask) Execute(ctx context.Context) error { var err error cct.result, err = cct.rootCoord.CreateCollection(ctx, cct.CreateCollectionRequest) return err } -func (cct *CreateCollectionTask) PostExecute(ctx context.Context) error { +func (cct *createCollectionTask) PostExecute(ctx context.Context) error { return nil } -type DropCollectionTask struct { +type dropCollectionTask struct { Condition *milvuspb.DropCollectionRequest ctx context.Context @@ -1210,44 +1210,44 @@ type DropCollectionTask struct { chTicker channelsTimeTicker } -func (dct *DropCollectionTask) TraceCtx() context.Context { +func (dct *dropCollectionTask) TraceCtx() context.Context { return dct.ctx } -func (dct *DropCollectionTask) ID() UniqueID { +func (dct *dropCollectionTask) ID() UniqueID { return dct.Base.MsgID } -func (dct *DropCollectionTask) SetID(uid UniqueID) { +func (dct *dropCollectionTask) SetID(uid UniqueID) { dct.Base.MsgID = uid } -func (dct *DropCollectionTask) Name() string { +func (dct *dropCollectionTask) Name() string { return DropCollectionTaskName } -func (dct *DropCollectionTask) Type() commonpb.MsgType { +func (dct *dropCollectionTask) Type() commonpb.MsgType { return dct.Base.MsgType } -func (dct *DropCollectionTask) BeginTs() Timestamp { +func (dct *dropCollectionTask) BeginTs() Timestamp { return dct.Base.Timestamp } -func (dct *DropCollectionTask) EndTs() Timestamp { +func (dct *dropCollectionTask) EndTs() Timestamp { return dct.Base.Timestamp } -func (dct *DropCollectionTask) SetTs(ts Timestamp) { +func (dct *dropCollectionTask) SetTs(ts Timestamp) { dct.Base.Timestamp = ts } -func (dct *DropCollectionTask) OnEnqueue() error { +func (dct *dropCollectionTask) OnEnqueue() error { dct.Base = &commonpb.MsgBase{} return nil } -func (dct *DropCollectionTask) PreExecute(ctx context.Context) error { +func (dct *dropCollectionTask) PreExecute(ctx context.Context) error { dct.Base.MsgType = commonpb.MsgType_DropCollection dct.Base.SourceID = Params.ProxyID @@ -1257,7 +1257,7 @@ func (dct *DropCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (dct *DropCollectionTask) Execute(ctx context.Context) error { +func (dct *dropCollectionTask) Execute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(ctx, dct.CollectionName) if err != nil { return err @@ -1279,7 +1279,7 @@ func (dct *DropCollectionTask) Execute(ctx context.Context) error { return nil } -func (dct *DropCollectionTask) PostExecute(ctx context.Context) error { +func (dct *dropCollectionTask) PostExecute(ctx context.Context) error { globalMetaCache.RemoveCollection(ctx, dct.CollectionName) return nil } @@ -2521,7 +2521,7 @@ func (qt *QueryTask) PostExecute(ctx context.Context) error { return nil } -type HasCollectionTask struct { +type hasCollectionTask struct { Condition *milvuspb.HasCollectionRequest ctx context.Context @@ -2529,44 +2529,44 @@ type HasCollectionTask struct { result *milvuspb.BoolResponse } -func (hct *HasCollectionTask) TraceCtx() context.Context { +func (hct *hasCollectionTask) TraceCtx() context.Context { return hct.ctx } -func (hct *HasCollectionTask) ID() UniqueID { +func (hct *hasCollectionTask) ID() UniqueID { return hct.Base.MsgID } -func (hct *HasCollectionTask) SetID(uid UniqueID) { +func (hct *hasCollectionTask) SetID(uid UniqueID) { hct.Base.MsgID = uid } -func (hct *HasCollectionTask) Name() string { +func (hct *hasCollectionTask) Name() string { return HasCollectionTaskName } -func (hct *HasCollectionTask) Type() commonpb.MsgType { +func (hct *hasCollectionTask) Type() commonpb.MsgType { return hct.Base.MsgType } -func (hct *HasCollectionTask) BeginTs() Timestamp { +func (hct *hasCollectionTask) BeginTs() Timestamp { return hct.Base.Timestamp } -func (hct *HasCollectionTask) EndTs() Timestamp { +func (hct *hasCollectionTask) EndTs() Timestamp { return hct.Base.Timestamp } -func (hct *HasCollectionTask) SetTs(ts Timestamp) { +func (hct *hasCollectionTask) SetTs(ts Timestamp) { hct.Base.Timestamp = ts } -func (hct *HasCollectionTask) OnEnqueue() error { +func (hct *hasCollectionTask) OnEnqueue() error { hct.Base = &commonpb.MsgBase{} return nil } -func (hct *HasCollectionTask) PreExecute(ctx context.Context) error { +func (hct *hasCollectionTask) PreExecute(ctx context.Context) error { hct.Base.MsgType = commonpb.MsgType_HasCollection hct.Base.SourceID = Params.ProxyID @@ -2576,7 +2576,7 @@ func (hct *HasCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (hct *HasCollectionTask) Execute(ctx context.Context) error { +func (hct *hasCollectionTask) Execute(ctx context.Context) error { var err error hct.result, err = hct.rootCoord.HasCollection(ctx, hct.HasCollectionRequest) if hct.result == nil { @@ -2588,11 +2588,11 @@ func (hct *HasCollectionTask) Execute(ctx context.Context) error { return err } -func (hct *HasCollectionTask) PostExecute(ctx context.Context) error { +func (hct *hasCollectionTask) PostExecute(ctx context.Context) error { return nil } -type DescribeCollectionTask struct { +type describeCollectionTask struct { Condition *milvuspb.DescribeCollectionRequest ctx context.Context @@ -2600,44 +2600,44 @@ type DescribeCollectionTask struct { result *milvuspb.DescribeCollectionResponse } -func (dct *DescribeCollectionTask) TraceCtx() context.Context { +func (dct *describeCollectionTask) TraceCtx() context.Context { return dct.ctx } -func (dct *DescribeCollectionTask) ID() UniqueID { +func (dct *describeCollectionTask) ID() UniqueID { return dct.Base.MsgID } -func (dct *DescribeCollectionTask) SetID(uid UniqueID) { +func (dct *describeCollectionTask) SetID(uid UniqueID) { dct.Base.MsgID = uid } -func (dct *DescribeCollectionTask) Name() string { +func (dct *describeCollectionTask) Name() string { return DescribeCollectionTaskName } -func (dct *DescribeCollectionTask) Type() commonpb.MsgType { +func (dct *describeCollectionTask) Type() commonpb.MsgType { return dct.Base.MsgType } -func (dct *DescribeCollectionTask) BeginTs() Timestamp { +func (dct *describeCollectionTask) BeginTs() Timestamp { return dct.Base.Timestamp } -func (dct *DescribeCollectionTask) EndTs() Timestamp { +func (dct *describeCollectionTask) EndTs() Timestamp { return dct.Base.Timestamp } -func (dct *DescribeCollectionTask) SetTs(ts Timestamp) { +func (dct *describeCollectionTask) SetTs(ts Timestamp) { dct.Base.Timestamp = ts } -func (dct *DescribeCollectionTask) OnEnqueue() error { +func (dct *describeCollectionTask) OnEnqueue() error { dct.Base = &commonpb.MsgBase{} return nil } -func (dct *DescribeCollectionTask) PreExecute(ctx context.Context) error { +func (dct *describeCollectionTask) PreExecute(ctx context.Context) error { dct.Base.MsgType = commonpb.MsgType_DescribeCollection dct.Base.SourceID = Params.ProxyID @@ -2647,7 +2647,7 @@ func (dct *DescribeCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (dct *DescribeCollectionTask) Execute(ctx context.Context) error { +func (dct *describeCollectionTask) Execute(ctx context.Context) error { var err error dct.result = &milvuspb.DescribeCollectionResponse{ Status: &commonpb.Status{ @@ -2700,11 +2700,11 @@ func (dct *DescribeCollectionTask) Execute(ctx context.Context) error { return nil } -func (dct *DescribeCollectionTask) PostExecute(ctx context.Context) error { +func (dct *describeCollectionTask) PostExecute(ctx context.Context) error { return nil } -type GetCollectionStatisticsTask struct { +type getCollectionStatisticsTask struct { Condition *milvuspb.GetCollectionStatisticsRequest ctx context.Context @@ -2712,50 +2712,50 @@ type GetCollectionStatisticsTask struct { result *milvuspb.GetCollectionStatisticsResponse } -func (g *GetCollectionStatisticsTask) TraceCtx() context.Context { +func (g *getCollectionStatisticsTask) TraceCtx() context.Context { return g.ctx } -func (g *GetCollectionStatisticsTask) ID() UniqueID { +func (g *getCollectionStatisticsTask) ID() UniqueID { return g.Base.MsgID } -func (g *GetCollectionStatisticsTask) SetID(uid UniqueID) { +func (g *getCollectionStatisticsTask) SetID(uid UniqueID) { g.Base.MsgID = uid } -func (g *GetCollectionStatisticsTask) Name() string { +func (g *getCollectionStatisticsTask) Name() string { return GetCollectionStatisticsTaskName } -func (g *GetCollectionStatisticsTask) Type() commonpb.MsgType { +func (g *getCollectionStatisticsTask) Type() commonpb.MsgType { return g.Base.MsgType } -func (g *GetCollectionStatisticsTask) BeginTs() Timestamp { +func (g *getCollectionStatisticsTask) BeginTs() Timestamp { return g.Base.Timestamp } -func (g *GetCollectionStatisticsTask) EndTs() Timestamp { +func (g *getCollectionStatisticsTask) EndTs() Timestamp { return g.Base.Timestamp } -func (g *GetCollectionStatisticsTask) SetTs(ts Timestamp) { +func (g *getCollectionStatisticsTask) SetTs(ts Timestamp) { g.Base.Timestamp = ts } -func (g *GetCollectionStatisticsTask) OnEnqueue() error { +func (g *getCollectionStatisticsTask) OnEnqueue() error { g.Base = &commonpb.MsgBase{} return nil } -func (g *GetCollectionStatisticsTask) PreExecute(ctx context.Context) error { +func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error { g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics g.Base.SourceID = Params.ProxyID return nil } -func (g *GetCollectionStatisticsTask) Execute(ctx context.Context) error { +func (g *getCollectionStatisticsTask) Execute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(ctx, g.CollectionName) if err != nil { return err @@ -2787,11 +2787,11 @@ func (g *GetCollectionStatisticsTask) Execute(ctx context.Context) error { return nil } -func (g *GetCollectionStatisticsTask) PostExecute(ctx context.Context) error { +func (g *getCollectionStatisticsTask) PostExecute(ctx context.Context) error { return nil } -type GetPartitionStatisticsTask struct { +type getPartitionStatisticsTask struct { Condition *milvuspb.GetPartitionStatisticsRequest ctx context.Context @@ -2799,50 +2799,50 @@ type GetPartitionStatisticsTask struct { result *milvuspb.GetPartitionStatisticsResponse } -func (g *GetPartitionStatisticsTask) TraceCtx() context.Context { +func (g *getPartitionStatisticsTask) TraceCtx() context.Context { return g.ctx } -func (g *GetPartitionStatisticsTask) ID() UniqueID { +func (g *getPartitionStatisticsTask) ID() UniqueID { return g.Base.MsgID } -func (g *GetPartitionStatisticsTask) SetID(uid UniqueID) { +func (g *getPartitionStatisticsTask) SetID(uid UniqueID) { g.Base.MsgID = uid } -func (g *GetPartitionStatisticsTask) Name() string { +func (g *getPartitionStatisticsTask) Name() string { return GetPartitionStatisticsTaskName } -func (g *GetPartitionStatisticsTask) Type() commonpb.MsgType { +func (g *getPartitionStatisticsTask) Type() commonpb.MsgType { return g.Base.MsgType } -func (g *GetPartitionStatisticsTask) BeginTs() Timestamp { +func (g *getPartitionStatisticsTask) BeginTs() Timestamp { return g.Base.Timestamp } -func (g *GetPartitionStatisticsTask) EndTs() Timestamp { +func (g *getPartitionStatisticsTask) EndTs() Timestamp { return g.Base.Timestamp } -func (g *GetPartitionStatisticsTask) SetTs(ts Timestamp) { +func (g *getPartitionStatisticsTask) SetTs(ts Timestamp) { g.Base.Timestamp = ts } -func (g *GetPartitionStatisticsTask) OnEnqueue() error { +func (g *getPartitionStatisticsTask) OnEnqueue() error { g.Base = &commonpb.MsgBase{} return nil } -func (g *GetPartitionStatisticsTask) PreExecute(ctx context.Context) error { +func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error { g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics g.Base.SourceID = Params.ProxyID return nil } -func (g *GetPartitionStatisticsTask) Execute(ctx context.Context) error { +func (g *getPartitionStatisticsTask) Execute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(ctx, g.CollectionName) if err != nil { return err @@ -2879,11 +2879,11 @@ func (g *GetPartitionStatisticsTask) Execute(ctx context.Context) error { return nil } -func (g *GetPartitionStatisticsTask) PostExecute(ctx context.Context) error { +func (g *getPartitionStatisticsTask) PostExecute(ctx context.Context) error { return nil } -type ShowCollectionsTask struct { +type showCollectionsTask struct { Condition *milvuspb.ShowCollectionsRequest ctx context.Context @@ -2892,44 +2892,44 @@ type ShowCollectionsTask struct { result *milvuspb.ShowCollectionsResponse } -func (sct *ShowCollectionsTask) TraceCtx() context.Context { +func (sct *showCollectionsTask) TraceCtx() context.Context { return sct.ctx } -func (sct *ShowCollectionsTask) ID() UniqueID { +func (sct *showCollectionsTask) ID() UniqueID { return sct.Base.MsgID } -func (sct *ShowCollectionsTask) SetID(uid UniqueID) { +func (sct *showCollectionsTask) SetID(uid UniqueID) { sct.Base.MsgID = uid } -func (sct *ShowCollectionsTask) Name() string { +func (sct *showCollectionsTask) Name() string { return ShowCollectionTaskName } -func (sct *ShowCollectionsTask) Type() commonpb.MsgType { +func (sct *showCollectionsTask) Type() commonpb.MsgType { return sct.Base.MsgType } -func (sct *ShowCollectionsTask) BeginTs() Timestamp { +func (sct *showCollectionsTask) BeginTs() Timestamp { return sct.Base.Timestamp } -func (sct *ShowCollectionsTask) EndTs() Timestamp { +func (sct *showCollectionsTask) EndTs() Timestamp { return sct.Base.Timestamp } -func (sct *ShowCollectionsTask) SetTs(ts Timestamp) { +func (sct *showCollectionsTask) SetTs(ts Timestamp) { sct.Base.Timestamp = ts } -func (sct *ShowCollectionsTask) OnEnqueue() error { +func (sct *showCollectionsTask) OnEnqueue() error { sct.Base = &commonpb.MsgBase{} return nil } -func (sct *ShowCollectionsTask) PreExecute(ctx context.Context) error { +func (sct *showCollectionsTask) PreExecute(ctx context.Context) error { sct.Base.MsgType = commonpb.MsgType_ShowCollections sct.Base.SourceID = Params.ProxyID if sct.GetType() == milvuspb.ShowType_InMemory { @@ -2943,7 +2943,7 @@ func (sct *ShowCollectionsTask) PreExecute(ctx context.Context) error { return nil } -func (sct *ShowCollectionsTask) Execute(ctx context.Context) error { +func (sct *showCollectionsTask) Execute(ctx context.Context) error { respFromRootCoord, err := sct.rootCoord.ShowCollections(ctx, sct.ShowCollectionsRequest) if err != nil { @@ -3034,11 +3034,11 @@ func (sct *ShowCollectionsTask) Execute(ctx context.Context) error { return nil } -func (sct *ShowCollectionsTask) PostExecute(ctx context.Context) error { +func (sct *showCollectionsTask) PostExecute(ctx context.Context) error { return nil } -type CreatePartitionTask struct { +type createPartitionTask struct { Condition *milvuspb.CreatePartitionRequest ctx context.Context @@ -3046,44 +3046,44 @@ type CreatePartitionTask struct { result *commonpb.Status } -func (cpt *CreatePartitionTask) TraceCtx() context.Context { +func (cpt *createPartitionTask) TraceCtx() context.Context { return cpt.ctx } -func (cpt *CreatePartitionTask) ID() UniqueID { +func (cpt *createPartitionTask) ID() UniqueID { return cpt.Base.MsgID } -func (cpt *CreatePartitionTask) SetID(uid UniqueID) { +func (cpt *createPartitionTask) SetID(uid UniqueID) { cpt.Base.MsgID = uid } -func (cpt *CreatePartitionTask) Name() string { +func (cpt *createPartitionTask) Name() string { return CreatePartitionTaskName } -func (cpt *CreatePartitionTask) Type() commonpb.MsgType { +func (cpt *createPartitionTask) Type() commonpb.MsgType { return cpt.Base.MsgType } -func (cpt *CreatePartitionTask) BeginTs() Timestamp { +func (cpt *createPartitionTask) BeginTs() Timestamp { return cpt.Base.Timestamp } -func (cpt *CreatePartitionTask) EndTs() Timestamp { +func (cpt *createPartitionTask) EndTs() Timestamp { return cpt.Base.Timestamp } -func (cpt *CreatePartitionTask) SetTs(ts Timestamp) { +func (cpt *createPartitionTask) SetTs(ts Timestamp) { cpt.Base.Timestamp = ts } -func (cpt *CreatePartitionTask) OnEnqueue() error { +func (cpt *createPartitionTask) OnEnqueue() error { cpt.Base = &commonpb.MsgBase{} return nil } -func (cpt *CreatePartitionTask) PreExecute(ctx context.Context) error { +func (cpt *createPartitionTask) PreExecute(ctx context.Context) error { cpt.Base.MsgType = commonpb.MsgType_CreatePartition cpt.Base.SourceID = Params.ProxyID @@ -3100,7 +3100,7 @@ func (cpt *CreatePartitionTask) PreExecute(ctx context.Context) error { return nil } -func (cpt *CreatePartitionTask) Execute(ctx context.Context) (err error) { +func (cpt *createPartitionTask) Execute(ctx context.Context) (err error) { cpt.result, err = cpt.rootCoord.CreatePartition(ctx, cpt.CreatePartitionRequest) if cpt.result == nil { return errors.New("get collection statistics resp is nil") @@ -3111,11 +3111,11 @@ func (cpt *CreatePartitionTask) Execute(ctx context.Context) (err error) { return err } -func (cpt *CreatePartitionTask) PostExecute(ctx context.Context) error { +func (cpt *createPartitionTask) PostExecute(ctx context.Context) error { return nil } -type DropPartitionTask struct { +type dropPartitionTask struct { Condition *milvuspb.DropPartitionRequest ctx context.Context @@ -3123,44 +3123,44 @@ type DropPartitionTask struct { result *commonpb.Status } -func (dpt *DropPartitionTask) TraceCtx() context.Context { +func (dpt *dropPartitionTask) TraceCtx() context.Context { return dpt.ctx } -func (dpt *DropPartitionTask) ID() UniqueID { +func (dpt *dropPartitionTask) ID() UniqueID { return dpt.Base.MsgID } -func (dpt *DropPartitionTask) SetID(uid UniqueID) { +func (dpt *dropPartitionTask) SetID(uid UniqueID) { dpt.Base.MsgID = uid } -func (dpt *DropPartitionTask) Name() string { +func (dpt *dropPartitionTask) Name() string { return DropPartitionTaskName } -func (dpt *DropPartitionTask) Type() commonpb.MsgType { +func (dpt *dropPartitionTask) Type() commonpb.MsgType { return dpt.Base.MsgType } -func (dpt *DropPartitionTask) BeginTs() Timestamp { +func (dpt *dropPartitionTask) BeginTs() Timestamp { return dpt.Base.Timestamp } -func (dpt *DropPartitionTask) EndTs() Timestamp { +func (dpt *dropPartitionTask) EndTs() Timestamp { return dpt.Base.Timestamp } -func (dpt *DropPartitionTask) SetTs(ts Timestamp) { +func (dpt *dropPartitionTask) SetTs(ts Timestamp) { dpt.Base.Timestamp = ts } -func (dpt *DropPartitionTask) OnEnqueue() error { +func (dpt *dropPartitionTask) OnEnqueue() error { dpt.Base = &commonpb.MsgBase{} return nil } -func (dpt *DropPartitionTask) PreExecute(ctx context.Context) error { +func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error { dpt.Base.MsgType = commonpb.MsgType_DropPartition dpt.Base.SourceID = Params.ProxyID @@ -3177,7 +3177,7 @@ func (dpt *DropPartitionTask) PreExecute(ctx context.Context) error { return nil } -func (dpt *DropPartitionTask) Execute(ctx context.Context) (err error) { +func (dpt *dropPartitionTask) Execute(ctx context.Context) (err error) { dpt.result, err = dpt.rootCoord.DropPartition(ctx, dpt.DropPartitionRequest) if dpt.result == nil { return errors.New("get collection statistics resp is nil") @@ -3188,11 +3188,11 @@ func (dpt *DropPartitionTask) Execute(ctx context.Context) (err error) { return err } -func (dpt *DropPartitionTask) PostExecute(ctx context.Context) error { +func (dpt *dropPartitionTask) PostExecute(ctx context.Context) error { return nil } -type HasPartitionTask struct { +type hasPartitionTask struct { Condition *milvuspb.HasPartitionRequest ctx context.Context @@ -3200,44 +3200,44 @@ type HasPartitionTask struct { result *milvuspb.BoolResponse } -func (hpt *HasPartitionTask) TraceCtx() context.Context { +func (hpt *hasPartitionTask) TraceCtx() context.Context { return hpt.ctx } -func (hpt *HasPartitionTask) ID() UniqueID { +func (hpt *hasPartitionTask) ID() UniqueID { return hpt.Base.MsgID } -func (hpt *HasPartitionTask) SetID(uid UniqueID) { +func (hpt *hasPartitionTask) SetID(uid UniqueID) { hpt.Base.MsgID = uid } -func (hpt *HasPartitionTask) Name() string { +func (hpt *hasPartitionTask) Name() string { return HasPartitionTaskName } -func (hpt *HasPartitionTask) Type() commonpb.MsgType { +func (hpt *hasPartitionTask) Type() commonpb.MsgType { return hpt.Base.MsgType } -func (hpt *HasPartitionTask) BeginTs() Timestamp { +func (hpt *hasPartitionTask) BeginTs() Timestamp { return hpt.Base.Timestamp } -func (hpt *HasPartitionTask) EndTs() Timestamp { +func (hpt *hasPartitionTask) EndTs() Timestamp { return hpt.Base.Timestamp } -func (hpt *HasPartitionTask) SetTs(ts Timestamp) { +func (hpt *hasPartitionTask) SetTs(ts Timestamp) { hpt.Base.Timestamp = ts } -func (hpt *HasPartitionTask) OnEnqueue() error { +func (hpt *hasPartitionTask) OnEnqueue() error { hpt.Base = &commonpb.MsgBase{} return nil } -func (hpt *HasPartitionTask) PreExecute(ctx context.Context) error { +func (hpt *hasPartitionTask) PreExecute(ctx context.Context) error { hpt.Base.MsgType = commonpb.MsgType_HasPartition hpt.Base.SourceID = Params.ProxyID @@ -3253,7 +3253,7 @@ func (hpt *HasPartitionTask) PreExecute(ctx context.Context) error { return nil } -func (hpt *HasPartitionTask) Execute(ctx context.Context) (err error) { +func (hpt *hasPartitionTask) Execute(ctx context.Context) (err error) { hpt.result, err = hpt.rootCoord.HasPartition(ctx, hpt.HasPartitionRequest) if hpt.result == nil { return errors.New("get collection statistics resp is nil") @@ -3264,11 +3264,11 @@ func (hpt *HasPartitionTask) Execute(ctx context.Context) (err error) { return err } -func (hpt *HasPartitionTask) PostExecute(ctx context.Context) error { +func (hpt *hasPartitionTask) PostExecute(ctx context.Context) error { return nil } -type ShowPartitionsTask struct { +type showPartitionsTask struct { Condition *milvuspb.ShowPartitionsRequest ctx context.Context @@ -3277,44 +3277,44 @@ type ShowPartitionsTask struct { result *milvuspb.ShowPartitionsResponse } -func (spt *ShowPartitionsTask) TraceCtx() context.Context { +func (spt *showPartitionsTask) TraceCtx() context.Context { return spt.ctx } -func (spt *ShowPartitionsTask) ID() UniqueID { +func (spt *showPartitionsTask) ID() UniqueID { return spt.Base.MsgID } -func (spt *ShowPartitionsTask) SetID(uid UniqueID) { +func (spt *showPartitionsTask) SetID(uid UniqueID) { spt.Base.MsgID = uid } -func (spt *ShowPartitionsTask) Name() string { +func (spt *showPartitionsTask) Name() string { return ShowPartitionTaskName } -func (spt *ShowPartitionsTask) Type() commonpb.MsgType { +func (spt *showPartitionsTask) Type() commonpb.MsgType { return spt.Base.MsgType } -func (spt *ShowPartitionsTask) BeginTs() Timestamp { +func (spt *showPartitionsTask) BeginTs() Timestamp { return spt.Base.Timestamp } -func (spt *ShowPartitionsTask) EndTs() Timestamp { +func (spt *showPartitionsTask) EndTs() Timestamp { return spt.Base.Timestamp } -func (spt *ShowPartitionsTask) SetTs(ts Timestamp) { +func (spt *showPartitionsTask) SetTs(ts Timestamp) { spt.Base.Timestamp = ts } -func (spt *ShowPartitionsTask) OnEnqueue() error { +func (spt *showPartitionsTask) OnEnqueue() error { spt.Base = &commonpb.MsgBase{} return nil } -func (spt *ShowPartitionsTask) PreExecute(ctx context.Context) error { +func (spt *showPartitionsTask) PreExecute(ctx context.Context) error { spt.Base.MsgType = commonpb.MsgType_ShowPartitions spt.Base.SourceID = Params.ProxyID @@ -3333,7 +3333,7 @@ func (spt *ShowPartitionsTask) PreExecute(ctx context.Context) error { return nil } -func (spt *ShowPartitionsTask) Execute(ctx context.Context) error { +func (spt *showPartitionsTask) Execute(ctx context.Context) error { respFromRootCoord, err := spt.rootCoord.ShowPartitions(ctx, spt.ShowPartitionsRequest) if err != nil { return err @@ -3429,7 +3429,7 @@ func (spt *ShowPartitionsTask) Execute(ctx context.Context) error { return nil } -func (spt *ShowPartitionsTask) PostExecute(ctx context.Context) error { +func (spt *showPartitionsTask) PostExecute(ctx context.Context) error { return nil } @@ -4227,7 +4227,7 @@ func (ft *FlushTask) PostExecute(ctx context.Context) error { return nil } -type LoadCollectionTask struct { +type loadCollectionTask struct { Condition *milvuspb.LoadCollectionRequest ctx context.Context @@ -4235,45 +4235,45 @@ type LoadCollectionTask struct { result *commonpb.Status } -func (lct *LoadCollectionTask) TraceCtx() context.Context { +func (lct *loadCollectionTask) TraceCtx() context.Context { return lct.ctx } -func (lct *LoadCollectionTask) ID() UniqueID { +func (lct *loadCollectionTask) ID() UniqueID { return lct.Base.MsgID } -func (lct *LoadCollectionTask) SetID(uid UniqueID) { +func (lct *loadCollectionTask) SetID(uid UniqueID) { lct.Base.MsgID = uid } -func (lct *LoadCollectionTask) Name() string { +func (lct *loadCollectionTask) Name() string { return LoadCollectionTaskName } -func (lct *LoadCollectionTask) Type() commonpb.MsgType { +func (lct *loadCollectionTask) Type() commonpb.MsgType { return lct.Base.MsgType } -func (lct *LoadCollectionTask) BeginTs() Timestamp { +func (lct *loadCollectionTask) BeginTs() Timestamp { return lct.Base.Timestamp } -func (lct *LoadCollectionTask) EndTs() Timestamp { +func (lct *loadCollectionTask) EndTs() Timestamp { return lct.Base.Timestamp } -func (lct *LoadCollectionTask) SetTs(ts Timestamp) { +func (lct *loadCollectionTask) SetTs(ts Timestamp) { lct.Base.Timestamp = ts } -func (lct *LoadCollectionTask) OnEnqueue() error { +func (lct *loadCollectionTask) OnEnqueue() error { lct.Base = &commonpb.MsgBase{} return nil } -func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error { - log.Debug("LoadCollectionTask PreExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) +func (lct *loadCollectionTask) PreExecute(ctx context.Context) error { + log.Debug("loadCollectionTask PreExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) lct.Base.MsgType = commonpb.MsgType_LoadCollection lct.Base.SourceID = Params.ProxyID @@ -4286,8 +4286,8 @@ func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) { - log.Debug("LoadCollectionTask Execute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) +func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) { + log.Debug("loadCollectionTask Execute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) collID, err := globalMetaCache.GetCollectionID(ctx, lct.CollectionName) if err != nil { return err @@ -4317,12 +4317,12 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) { return nil } -func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error { - log.Debug("LoadCollectionTask PostExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) +func (lct *loadCollectionTask) PostExecute(ctx context.Context) error { + log.Debug("loadCollectionTask PostExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID)) return nil } -type ReleaseCollectionTask struct { +type releaseCollectionTask struct { Condition *milvuspb.ReleaseCollectionRequest ctx context.Context @@ -4331,44 +4331,44 @@ type ReleaseCollectionTask struct { chMgr channelsMgr } -func (rct *ReleaseCollectionTask) TraceCtx() context.Context { +func (rct *releaseCollectionTask) TraceCtx() context.Context { return rct.ctx } -func (rct *ReleaseCollectionTask) ID() UniqueID { +func (rct *releaseCollectionTask) ID() UniqueID { return rct.Base.MsgID } -func (rct *ReleaseCollectionTask) SetID(uid UniqueID) { +func (rct *releaseCollectionTask) SetID(uid UniqueID) { rct.Base.MsgID = uid } -func (rct *ReleaseCollectionTask) Name() string { +func (rct *releaseCollectionTask) Name() string { return ReleaseCollectionTaskName } -func (rct *ReleaseCollectionTask) Type() commonpb.MsgType { +func (rct *releaseCollectionTask) Type() commonpb.MsgType { return rct.Base.MsgType } -func (rct *ReleaseCollectionTask) BeginTs() Timestamp { +func (rct *releaseCollectionTask) BeginTs() Timestamp { return rct.Base.Timestamp } -func (rct *ReleaseCollectionTask) EndTs() Timestamp { +func (rct *releaseCollectionTask) EndTs() Timestamp { return rct.Base.Timestamp } -func (rct *ReleaseCollectionTask) SetTs(ts Timestamp) { +func (rct *releaseCollectionTask) SetTs(ts Timestamp) { rct.Base.Timestamp = ts } -func (rct *ReleaseCollectionTask) OnEnqueue() error { +func (rct *releaseCollectionTask) OnEnqueue() error { rct.Base = &commonpb.MsgBase{} return nil } -func (rct *ReleaseCollectionTask) PreExecute(ctx context.Context) error { +func (rct *releaseCollectionTask) PreExecute(ctx context.Context) error { rct.Base.MsgType = commonpb.MsgType_ReleaseCollection rct.Base.SourceID = Params.ProxyID @@ -4381,7 +4381,7 @@ func (rct *ReleaseCollectionTask) PreExecute(ctx context.Context) error { return nil } -func (rct *ReleaseCollectionTask) Execute(ctx context.Context) (err error) { +func (rct *releaseCollectionTask) Execute(ctx context.Context) (err error) { collID, err := globalMetaCache.GetCollectionID(ctx, rct.CollectionName) if err != nil { return err @@ -4404,11 +4404,11 @@ func (rct *ReleaseCollectionTask) Execute(ctx context.Context) (err error) { return err } -func (rct *ReleaseCollectionTask) PostExecute(ctx context.Context) error { +func (rct *releaseCollectionTask) PostExecute(ctx context.Context) error { return nil } -type LoadPartitionTask struct { +type loadPartitionsTask struct { Condition *milvuspb.LoadPartitionsRequest ctx context.Context @@ -4416,44 +4416,44 @@ type LoadPartitionTask struct { result *commonpb.Status } -func (lpt *LoadPartitionTask) TraceCtx() context.Context { +func (lpt *loadPartitionsTask) TraceCtx() context.Context { return lpt.ctx } -func (lpt *LoadPartitionTask) ID() UniqueID { +func (lpt *loadPartitionsTask) ID() UniqueID { return lpt.Base.MsgID } -func (lpt *LoadPartitionTask) SetID(uid UniqueID) { +func (lpt *loadPartitionsTask) SetID(uid UniqueID) { lpt.Base.MsgID = uid } -func (lpt *LoadPartitionTask) Name() string { +func (lpt *loadPartitionsTask) Name() string { return LoadPartitionTaskName } -func (lpt *LoadPartitionTask) Type() commonpb.MsgType { +func (lpt *loadPartitionsTask) Type() commonpb.MsgType { return lpt.Base.MsgType } -func (lpt *LoadPartitionTask) BeginTs() Timestamp { +func (lpt *loadPartitionsTask) BeginTs() Timestamp { return lpt.Base.Timestamp } -func (lpt *LoadPartitionTask) EndTs() Timestamp { +func (lpt *loadPartitionsTask) EndTs() Timestamp { return lpt.Base.Timestamp } -func (lpt *LoadPartitionTask) SetTs(ts Timestamp) { +func (lpt *loadPartitionsTask) SetTs(ts Timestamp) { lpt.Base.Timestamp = ts } -func (lpt *LoadPartitionTask) OnEnqueue() error { +func (lpt *loadPartitionsTask) OnEnqueue() error { lpt.Base = &commonpb.MsgBase{} return nil } -func (lpt *LoadPartitionTask) PreExecute(ctx context.Context) error { +func (lpt *loadPartitionsTask) PreExecute(ctx context.Context) error { lpt.Base.MsgType = commonpb.MsgType_LoadPartitions lpt.Base.SourceID = Params.ProxyID @@ -4466,7 +4466,7 @@ func (lpt *LoadPartitionTask) PreExecute(ctx context.Context) error { return nil } -func (lpt *LoadPartitionTask) Execute(ctx context.Context) error { +func (lpt *loadPartitionsTask) Execute(ctx context.Context) error { var partitionIDs []int64 collID, err := globalMetaCache.GetCollectionID(ctx, lpt.CollectionName) if err != nil { @@ -4499,11 +4499,11 @@ func (lpt *LoadPartitionTask) Execute(ctx context.Context) error { return err } -func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error { +func (lpt *loadPartitionsTask) PostExecute(ctx context.Context) error { return nil } -type ReleasePartitionTask struct { +type releasePartitionsTask struct { Condition *milvuspb.ReleasePartitionsRequest ctx context.Context @@ -4511,44 +4511,44 @@ type ReleasePartitionTask struct { result *commonpb.Status } -func (rpt *ReleasePartitionTask) TraceCtx() context.Context { +func (rpt *releasePartitionsTask) TraceCtx() context.Context { return rpt.ctx } -func (rpt *ReleasePartitionTask) ID() UniqueID { +func (rpt *releasePartitionsTask) ID() UniqueID { return rpt.Base.MsgID } -func (rpt *ReleasePartitionTask) SetID(uid UniqueID) { +func (rpt *releasePartitionsTask) SetID(uid UniqueID) { rpt.Base.MsgID = uid } -func (rpt *ReleasePartitionTask) Type() commonpb.MsgType { +func (rpt *releasePartitionsTask) Type() commonpb.MsgType { return rpt.Base.MsgType } -func (rpt *ReleasePartitionTask) Name() string { +func (rpt *releasePartitionsTask) Name() string { return ReleasePartitionTaskName } -func (rpt *ReleasePartitionTask) BeginTs() Timestamp { +func (rpt *releasePartitionsTask) BeginTs() Timestamp { return rpt.Base.Timestamp } -func (rpt *ReleasePartitionTask) EndTs() Timestamp { +func (rpt *releasePartitionsTask) EndTs() Timestamp { return rpt.Base.Timestamp } -func (rpt *ReleasePartitionTask) SetTs(ts Timestamp) { +func (rpt *releasePartitionsTask) SetTs(ts Timestamp) { rpt.Base.Timestamp = ts } -func (rpt *ReleasePartitionTask) OnEnqueue() error { +func (rpt *releasePartitionsTask) OnEnqueue() error { rpt.Base = &commonpb.MsgBase{} return nil } -func (rpt *ReleasePartitionTask) PreExecute(ctx context.Context) error { +func (rpt *releasePartitionsTask) PreExecute(ctx context.Context) error { rpt.Base.MsgType = commonpb.MsgType_ReleasePartitions rpt.Base.SourceID = Params.ProxyID @@ -4561,7 +4561,7 @@ func (rpt *ReleasePartitionTask) PreExecute(ctx context.Context) error { return nil } -func (rpt *ReleasePartitionTask) Execute(ctx context.Context) (err error) { +func (rpt *releasePartitionsTask) Execute(ctx context.Context) (err error) { var partitionIDs []int64 collID, err := globalMetaCache.GetCollectionID(ctx, rpt.CollectionName) if err != nil { @@ -4589,7 +4589,7 @@ func (rpt *ReleasePartitionTask) Execute(ctx context.Context) (err error) { return err } -func (rpt *ReleasePartitionTask) PostExecute(ctx context.Context) error { +func (rpt *releasePartitionsTask) PostExecute(ctx context.Context) error { return nil } diff --git a/internal/proxy/task_scheduler_test.go b/internal/proxy/task_scheduler_test.go index 416e2f22d5..af38b506c6 100644 --- a/internal/proxy/task_scheduler_test.go +++ b/internal/proxy/task_scheduler_test.go @@ -455,7 +455,7 @@ func TestTaskScheduler(t *testing.T) { ctx := context.Background() tsoAllocatorIns := newMockTsoAllocator() idAllocatorIns := newMockIDAllocatorInterface() - factory := NewSimpleMsgStreamFactory() + factory := newSimpleMockMsgStreamFactory() sched, err := newTaskScheduler(ctx, idAllocatorIns, tsoAllocatorIns, factory) assert.NoError(t, err) diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 2c11b79746..1ed87e5b71 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -103,10 +103,11 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) { } func (nodeCtx *nodeCtx) Close() { - for _, channel := range nodeCtx.inputChannels { - close(channel) - log.Warn("close inputChannel") - } + // data race with nodeCtx.ReceiveMsg { nodeCtx.inputChannels[inputChanIdx] <- msg } + //for _, channel := range nodeCtx.inputChannels { + // close(channel) + // log.Warn("close inputChannel") + //} nodeCtx.node.Close() }