mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: https://github.com/milvus-io/milvus/issues/34743 pr: https://github.com/milvus-io/milvus/pull/34744 --------- Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
parent
3d9ec2ab23
commit
08c6ab8cfb
@ -96,6 +96,7 @@ func newMockIDAllocatorInterface() allocator.Interface {
|
||||
}
|
||||
|
||||
type mockTask struct {
|
||||
baseTask
|
||||
*TaskCondition
|
||||
id UniqueID
|
||||
name string
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
@ -107,6 +108,20 @@ type task interface {
|
||||
PostExecute(ctx context.Context) error
|
||||
WaitToFinish() error
|
||||
Notify(err error)
|
||||
SetOnEnqueueTime()
|
||||
GetDurationInQueue() time.Duration
|
||||
}
|
||||
|
||||
type baseTask struct {
|
||||
onEnqueueTime time.Time
|
||||
}
|
||||
|
||||
func (bt *baseTask) SetOnEnqueueTime() {
|
||||
bt.onEnqueueTime = time.Now()
|
||||
}
|
||||
|
||||
func (bt *baseTask) GetDurationInQueue() time.Duration {
|
||||
return time.Since(bt.onEnqueueTime)
|
||||
}
|
||||
|
||||
type dmlTask interface {
|
||||
@ -118,6 +133,7 @@ type dmlTask interface {
|
||||
type BaseInsertTask = msgstream.InsertMsg
|
||||
|
||||
type createCollectionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.CreateCollectionRequest
|
||||
ctx context.Context
|
||||
@ -317,6 +333,7 @@ func (t *createCollectionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type dropCollectionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DropCollectionRequest
|
||||
ctx context.Context
|
||||
@ -362,12 +379,12 @@ func (t *dropCollectionTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_DropCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *dropCollectionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_DropCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
if err := validateCollectionName(t.CollectionName); err != nil {
|
||||
return err
|
||||
@ -386,6 +403,7 @@ func (t *dropCollectionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type hasCollectionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.HasCollectionRequest
|
||||
ctx context.Context
|
||||
@ -426,13 +444,15 @@ func (t *hasCollectionTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (t *hasCollectionTask) OnEnqueue() error {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_HasCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *hasCollectionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_HasCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
if err := validateCollectionName(t.CollectionName); err != nil {
|
||||
return err
|
||||
@ -460,6 +480,7 @@ func (t *hasCollectionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type describeCollectionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DescribeCollectionRequest
|
||||
ctx context.Context
|
||||
@ -500,13 +521,15 @@ func (t *describeCollectionTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (t *describeCollectionTask) OnEnqueue() error {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_DescribeCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *describeCollectionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_DescribeCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
if t.CollectionID != 0 && len(t.CollectionName) == 0 {
|
||||
return nil
|
||||
@ -594,6 +617,7 @@ func (t *describeCollectionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type showCollectionsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.ShowCollectionsRequest
|
||||
ctx context.Context
|
||||
@ -636,12 +660,12 @@ func (t *showCollectionsTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (t *showCollectionsTask) OnEnqueue() error {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
t.Base.MsgType = commonpb.MsgType_ShowCollections
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *showCollectionsTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_ShowCollections
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
if t.GetType() == milvuspb.ShowType_InMemory {
|
||||
for _, collectionName := range t.CollectionNames {
|
||||
if err := validateCollectionName(collectionName); err != nil {
|
||||
@ -753,6 +777,7 @@ func (t *showCollectionsTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type alterCollectionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.AlterCollectionRequest
|
||||
ctx context.Context
|
||||
@ -796,12 +821,12 @@ func (t *alterCollectionTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_AlterCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_AlterCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -817,6 +842,7 @@ func (t *alterCollectionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type createPartitionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.CreatePartitionRequest
|
||||
ctx context.Context
|
||||
@ -860,12 +886,12 @@ func (t *createPartitionTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_CreatePartition
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *createPartitionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_CreatePartition
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName, partitionTag := t.CollectionName, t.PartitionName
|
||||
|
||||
@ -904,6 +930,7 @@ func (t *createPartitionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type dropPartitionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DropPartitionRequest
|
||||
ctx context.Context
|
||||
@ -948,12 +975,12 @@ func (t *dropPartitionTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_DropPartition
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *dropPartitionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_DropPartition
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName, partitionTag := t.CollectionName, t.PartitionName
|
||||
|
||||
@ -1018,6 +1045,7 @@ func (t *dropPartitionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type hasPartitionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.HasPartitionRequest
|
||||
ctx context.Context
|
||||
@ -1058,13 +1086,15 @@ func (t *hasPartitionTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (t *hasPartitionTask) OnEnqueue() error {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_HasPartition
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *hasPartitionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_HasPartition
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName, partitionTag := t.CollectionName, t.PartitionName
|
||||
|
||||
@ -1094,6 +1124,7 @@ func (t *hasPartitionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type showPartitionsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.ShowPartitionsRequest
|
||||
ctx context.Context
|
||||
@ -1135,13 +1166,15 @@ func (t *showPartitionsTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (t *showPartitionsTask) OnEnqueue() error {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_ShowPartitions
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *showPartitionsTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_ShowPartitions
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
if err := validateCollectionName(t.CollectionName); err != nil {
|
||||
return err
|
||||
@ -1256,6 +1289,7 @@ func (t *showPartitionsTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type flushTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.FlushRequest
|
||||
ctx context.Context
|
||||
@ -1301,12 +1335,12 @@ func (t *flushTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_Flush
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *flushTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_Flush
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1360,6 +1394,7 @@ func (t *flushTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type loadCollectionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.LoadCollectionRequest
|
||||
ctx context.Context
|
||||
@ -1407,14 +1442,14 @@ func (t *loadCollectionTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_LoadCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *loadCollectionTask) PreExecute(ctx context.Context) error {
|
||||
log.Ctx(ctx).Debug("loadCollectionTask PreExecute",
|
||||
zap.String("role", typeutil.ProxyRole))
|
||||
t.Base.MsgType = commonpb.MsgType_LoadCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName := t.CollectionName
|
||||
|
||||
@ -1512,6 +1547,7 @@ func (t *loadCollectionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type releaseCollectionTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.ReleaseCollectionRequest
|
||||
ctx context.Context
|
||||
@ -1558,12 +1594,12 @@ func (t *releaseCollectionTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_ReleaseCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *releaseCollectionTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_ReleaseCollection
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName := t.CollectionName
|
||||
|
||||
@ -1605,6 +1641,7 @@ func (t *releaseCollectionTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type loadPartitionsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.LoadPartitionsRequest
|
||||
ctx context.Context
|
||||
@ -1652,12 +1689,12 @@ func (t *loadPartitionsTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_LoadPartitions
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *loadPartitionsTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_LoadPartitions
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName := t.CollectionName
|
||||
|
||||
@ -1755,6 +1792,7 @@ func (t *loadPartitionsTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type releasePartitionsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.ReleasePartitionsRequest
|
||||
ctx context.Context
|
||||
@ -1801,12 +1839,12 @@ func (t *releasePartitionsTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_ReleasePartitions
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *releasePartitionsTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_ReleasePartitions
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName := t.CollectionName
|
||||
|
||||
@ -1862,6 +1900,7 @@ func (t *releasePartitionsTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type CreateResourceGroupTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.CreateResourceGroupRequest
|
||||
ctx context.Context
|
||||
@ -1905,12 +1944,12 @@ func (t *CreateResourceGroupTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_CreateResourceGroup
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_CreateResourceGroup
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1926,6 +1965,7 @@ func (t *CreateResourceGroupTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type DropResourceGroupTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DropResourceGroupRequest
|
||||
ctx context.Context
|
||||
@ -1969,12 +2009,12 @@ func (t *DropResourceGroupTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_DropResourceGroup
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_DropResourceGroup
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1990,6 +2030,7 @@ func (t *DropResourceGroupTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type DescribeResourceGroupTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DescribeResourceGroupRequest
|
||||
ctx context.Context
|
||||
@ -2030,13 +2071,15 @@ func (t *DescribeResourceGroupTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (t *DescribeResourceGroupTask) OnEnqueue() error {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -2111,6 +2154,7 @@ func (t *DescribeResourceGroupTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type TransferNodeTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.TransferNodeRequest
|
||||
ctx context.Context
|
||||
@ -2154,12 +2198,12 @@ func (t *TransferNodeTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_TransferNode
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TransferNodeTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_TransferNode
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -2175,6 +2219,7 @@ func (t *TransferNodeTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type TransferReplicaTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.TransferReplicaRequest
|
||||
ctx context.Context
|
||||
@ -2218,12 +2263,12 @@ func (t *TransferReplicaTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_TransferReplica
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TransferReplicaTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_TransferReplica
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -2248,6 +2293,7 @@ func (t *TransferReplicaTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type ListResourceGroupsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.ListResourceGroupsRequest
|
||||
ctx context.Context
|
||||
@ -2288,13 +2334,15 @@ func (t *ListResourceGroupsTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (t *ListResourceGroupsTask) OnEnqueue() error {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_ListResourceGroups
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_ListResourceGroups
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
|
||||
// CreateAliasTask contains task information of CreateAlias
|
||||
type CreateAliasTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.CreateAliasRequest
|
||||
ctx context.Context
|
||||
@ -80,13 +81,13 @@ func (t *CreateAliasTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_CreateAlias
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreExecute defines the tion before task execution
|
||||
func (t *CreateAliasTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_CreateAlias
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collAlias := t.Alias
|
||||
// collection alias uses the same format as collection name
|
||||
@ -115,6 +116,7 @@ func (t *CreateAliasTask) PostExecute(ctx context.Context) error {
|
||||
|
||||
// DropAliasTask is the task to drop alias
|
||||
type DropAliasTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DropAliasRequest
|
||||
ctx context.Context
|
||||
@ -162,12 +164,12 @@ func (t *DropAliasTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_DropAlias
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *DropAliasTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_DropAlias
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
collAlias := t.Alias
|
||||
if err := ValidateCollectionAlias(collAlias); err != nil {
|
||||
return err
|
||||
@ -187,6 +189,7 @@ func (t *DropAliasTask) PostExecute(ctx context.Context) error {
|
||||
|
||||
// AlterAliasTask is the task to alter alias
|
||||
type AlterAliasTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.AlterAliasRequest
|
||||
ctx context.Context
|
||||
@ -230,12 +233,12 @@ func (t *AlterAliasTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_AlterAlias
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *AlterAliasTask) PreExecute(ctx context.Context) error {
|
||||
t.Base.MsgType = commonpb.MsgType_AlterAlias
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collAlias := t.Alias
|
||||
// collection alias uses the same format as collection name
|
||||
@ -263,6 +266,7 @@ func (t *AlterAliasTask) PostExecute(ctx context.Context) error {
|
||||
|
||||
// DescribeAliasTask is the task to describe alias
|
||||
type DescribeAliasTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
nodeID UniqueID
|
||||
*milvuspb.DescribeAliasRequest
|
||||
@ -305,12 +309,12 @@ func (a *DescribeAliasTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (a *DescribeAliasTask) OnEnqueue() error {
|
||||
a.Base = commonpbutil.NewMsgBase()
|
||||
a.Base.MsgType = commonpb.MsgType_DescribeAlias
|
||||
a.Base.SourceID = a.nodeID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *DescribeAliasTask) PreExecute(ctx context.Context) error {
|
||||
a.Base.MsgType = commonpb.MsgType_DescribeAlias
|
||||
a.Base.SourceID = a.nodeID
|
||||
// collection alias uses the same format as collection name
|
||||
if err := ValidateCollectionAlias(a.GetAlias()); err != nil {
|
||||
return err
|
||||
@ -330,6 +334,7 @@ func (a *DescribeAliasTask) PostExecute(ctx context.Context) error {
|
||||
|
||||
// ListAliasesTask is the task to list aliases
|
||||
type ListAliasesTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
nodeID UniqueID
|
||||
*milvuspb.ListAliasesRequest
|
||||
@ -372,12 +377,12 @@ func (a *ListAliasesTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (a *ListAliasesTask) OnEnqueue() error {
|
||||
a.Base = commonpbutil.NewMsgBase()
|
||||
a.Base.MsgType = commonpb.MsgType_ListAliases
|
||||
a.Base.SourceID = a.nodeID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ListAliasesTask) PreExecute(ctx context.Context) error {
|
||||
a.Base.MsgType = commonpb.MsgType_ListAliases
|
||||
a.Base.SourceID = a.nodeID
|
||||
|
||||
if len(a.GetCollectionName()) > 0 {
|
||||
if err := validateCollectionName(a.GetCollectionName()); err != nil {
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
)
|
||||
|
||||
type createDatabaseTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.CreateDatabaseRequest
|
||||
ctx context.Context
|
||||
@ -80,6 +81,7 @@ func (cdt *createDatabaseTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type dropDatabaseTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DropDatabaseRequest
|
||||
ctx context.Context
|
||||
@ -150,6 +152,7 @@ func (ddt *dropDatabaseTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type listDatabaseTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.ListDatabasesRequest
|
||||
ctx context.Context
|
||||
|
||||
@ -34,6 +34,7 @@ import (
|
||||
type BaseDeleteTask = msgstream.DeleteMsg
|
||||
|
||||
type deleteTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
ctx context.Context
|
||||
tr *timerecord.TimeRecorder
|
||||
@ -95,6 +96,11 @@ func (dt *deleteTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (dt *deleteTask) OnEnqueue() error {
|
||||
if dt.req.Base == nil {
|
||||
dt.req.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
dt.req.Base.MsgType = commonpb.MsgType_Delete
|
||||
dt.req.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -52,6 +52,7 @@ const (
|
||||
)
|
||||
|
||||
type createIndexTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
req *milvuspb.CreateIndexRequest
|
||||
ctx context.Context
|
||||
@ -106,6 +107,8 @@ func (cit *createIndexTask) OnEnqueue() error {
|
||||
if cit.req.Base == nil {
|
||||
cit.req.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
cit.req.Base.MsgType = commonpb.MsgType_CreateIndex
|
||||
cit.req.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -359,8 +362,6 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro
|
||||
}
|
||||
|
||||
func (cit *createIndexTask) PreExecute(ctx context.Context) error {
|
||||
cit.req.Base.MsgType = commonpb.MsgType_CreateIndex
|
||||
cit.req.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName := cit.req.GetCollectionName()
|
||||
|
||||
@ -422,6 +423,7 @@ func (cit *createIndexTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type describeIndexTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.DescribeIndexRequest
|
||||
ctx context.Context
|
||||
@ -465,12 +467,12 @@ func (dit *describeIndexTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (dit *describeIndexTask) OnEnqueue() error {
|
||||
dit.Base = commonpbutil.NewMsgBase()
|
||||
dit.Base.MsgType = commonpb.MsgType_DescribeIndex
|
||||
dit.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dit *describeIndexTask) PreExecute(ctx context.Context) error {
|
||||
dit.Base.MsgType = commonpb.MsgType_DescribeIndex
|
||||
dit.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
if err := validateCollectionName(dit.CollectionName); err != nil {
|
||||
return err
|
||||
@ -545,6 +547,7 @@ func (dit *describeIndexTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type getIndexStatisticsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.GetIndexStatisticsRequest
|
||||
ctx context.Context
|
||||
@ -589,12 +592,12 @@ func (dit *getIndexStatisticsTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (dit *getIndexStatisticsTask) OnEnqueue() error {
|
||||
dit.Base = commonpbutil.NewMsgBase()
|
||||
dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics
|
||||
dit.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dit *getIndexStatisticsTask) PreExecute(ctx context.Context) error {
|
||||
dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics
|
||||
dit.Base.SourceID = dit.nodeID
|
||||
|
||||
if err := validateCollectionName(dit.CollectionName); err != nil {
|
||||
return err
|
||||
@ -661,6 +664,7 @@ func (dit *getIndexStatisticsTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type dropIndexTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
ctx context.Context
|
||||
*milvuspb.DropIndexRequest
|
||||
@ -709,12 +713,12 @@ func (dit *dropIndexTask) OnEnqueue() error {
|
||||
if dit.Base == nil {
|
||||
dit.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
dit.Base.MsgType = commonpb.MsgType_DropIndex
|
||||
dit.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
|
||||
dit.Base.MsgType = commonpb.MsgType_DropIndex
|
||||
dit.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collName, fieldName := dit.CollectionName, dit.FieldName
|
||||
|
||||
@ -781,6 +785,7 @@ func (dit *dropIndexTask) PostExecute(ctx context.Context) error {
|
||||
|
||||
// Deprecated: use describeIndexTask instead
|
||||
type getIndexBuildProgressTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.GetIndexBuildProgressRequest
|
||||
ctx context.Context
|
||||
@ -825,12 +830,12 @@ func (gibpt *getIndexBuildProgressTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (gibpt *getIndexBuildProgressTask) OnEnqueue() error {
|
||||
gibpt.Base = commonpbutil.NewMsgBase()
|
||||
gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress
|
||||
gibpt.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gibpt *getIndexBuildProgressTask) PreExecute(ctx context.Context) error {
|
||||
gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress
|
||||
gibpt.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
if err := validateCollectionName(gibpt.CollectionName); err != nil {
|
||||
return err
|
||||
@ -870,6 +875,7 @@ func (gibpt *getIndexBuildProgressTask) PostExecute(ctx context.Context) error {
|
||||
|
||||
// Deprecated: use describeIndexTask instead
|
||||
type getIndexStateTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.GetIndexStateRequest
|
||||
ctx context.Context
|
||||
@ -914,12 +920,12 @@ func (gist *getIndexStateTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (gist *getIndexStateTask) OnEnqueue() error {
|
||||
gist.Base = commonpbutil.NewMsgBase()
|
||||
gist.Base.MsgType = commonpb.MsgType_GetIndexState
|
||||
gist.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gist *getIndexStateTask) PreExecute(ctx context.Context) error {
|
||||
gist.Base.MsgType = commonpb.MsgType_GetIndexState
|
||||
gist.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
if err := validateCollectionName(gist.CollectionName); err != nil {
|
||||
return err
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
@ -22,6 +23,7 @@ import (
|
||||
)
|
||||
|
||||
type insertTask struct {
|
||||
baseTask
|
||||
// req *milvuspb.InsertRequest
|
||||
Condition
|
||||
insertMsg *BaseInsertTask
|
||||
@ -90,6 +92,11 @@ func (it *insertTask) getChannels() []pChan {
|
||||
}
|
||||
|
||||
func (it *insertTask) OnEnqueue() error {
|
||||
if it.insertMsg.Base == nil {
|
||||
it.insertMsg.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
it.insertMsg.Base.MsgType = commonpb.MsgType_Insert
|
||||
it.insertMsg.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
@ -42,6 +43,7 @@ const (
|
||||
)
|
||||
|
||||
type queryTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*internalpb.RetrieveRequest
|
||||
|
||||
@ -666,6 +668,10 @@ func (t *queryTask) SetTs(ts Timestamp) {
|
||||
}
|
||||
|
||||
func (t *queryTask) OnEnqueue() error {
|
||||
if t.Base == nil {
|
||||
t.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
t.Base.MsgType = commonpb.MsgType_Retrieve
|
||||
t.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package proxy
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -26,6 +27,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
@ -179,6 +181,7 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
|
||||
// we always use same msg id and ts for now.
|
||||
t.SetID(UniqueID(ts))
|
||||
|
||||
t.SetOnEnqueueTime()
|
||||
return queue.addUnissuedTask(t)
|
||||
}
|
||||
|
||||
@ -440,6 +443,11 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
|
||||
}()
|
||||
span.AddEvent("scheduler process PreExecute")
|
||||
|
||||
waitDuration := t.GetDurationInQueue()
|
||||
metrics.ProxyReqInQueueLatency.
|
||||
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.Type().String()).
|
||||
Observe(float64(waitDuration.Milliseconds()))
|
||||
|
||||
err := t.PreExecute(ctx)
|
||||
|
||||
defer func() {
|
||||
|
||||
@ -46,6 +46,7 @@ const (
|
||||
)
|
||||
|
||||
type searchTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*internalpb.SearchRequest
|
||||
ctx context.Context
|
||||
|
||||
@ -32,6 +32,7 @@ const (
|
||||
)
|
||||
|
||||
type getStatisticsTask struct {
|
||||
baseTask
|
||||
request *milvuspb.GetStatisticsRequest
|
||||
result *milvuspb.GetStatisticsResponse
|
||||
Condition
|
||||
@ -93,6 +94,9 @@ func (g *getStatisticsTask) OnEnqueue() error {
|
||||
g.GetStatisticsRequest = &internalpb.GetStatisticsRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
}
|
||||
|
||||
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
||||
g.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -106,10 +110,6 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error {
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics-PreExecute")
|
||||
defer sp.End()
|
||||
|
||||
// TODO: Maybe we should create a new MsgType: GetStatistics?
|
||||
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
||||
g.Base.SourceID = paramtable.GetNodeID()
|
||||
|
||||
collID, err := globalMetaCache.GetCollectionID(ctx, g.request.GetDbName(), g.collectionName)
|
||||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
@ -589,6 +589,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP
|
||||
// old version of get statistics
|
||||
// please remove it after getStatisticsTask below is stable
|
||||
type getCollectionStatisticsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.GetCollectionStatisticsRequest
|
||||
ctx context.Context
|
||||
@ -632,12 +633,12 @@ func (g *getCollectionStatisticsTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (g *getCollectionStatisticsTask) OnEnqueue() error {
|
||||
g.Base = commonpbutil.NewMsgBase()
|
||||
g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics
|
||||
g.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error {
|
||||
g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics
|
||||
g.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -674,6 +675,7 @@ func (g *getCollectionStatisticsTask) PostExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
type getPartitionStatisticsTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
*milvuspb.GetPartitionStatisticsRequest
|
||||
ctx context.Context
|
||||
@ -717,12 +719,12 @@ func (g *getPartitionStatisticsTask) SetTs(ts Timestamp) {
|
||||
|
||||
func (g *getPartitionStatisticsTask) OnEnqueue() error {
|
||||
g.Base = commonpbutil.NewMsgBase()
|
||||
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
||||
g.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error {
|
||||
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
||||
g.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -914,6 +914,7 @@ func TestHasCollectionTask(t *testing.T) {
|
||||
rootCoord: rc,
|
||||
result: nil,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_HasCollection, task.Type())
|
||||
@ -976,6 +977,7 @@ func TestDescribeCollectionTask(t *testing.T) {
|
||||
rootCoord: rc,
|
||||
result: nil,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_DescribeCollection, task.Type())
|
||||
@ -1224,6 +1226,7 @@ func TestCreatePartitionTask(t *testing.T) {
|
||||
rootCoord: rc,
|
||||
result: nil,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_CreatePartition, task.Type())
|
||||
@ -1299,6 +1302,7 @@ func TestDropPartitionTask(t *testing.T) {
|
||||
queryCoord: qc,
|
||||
result: nil,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_DropPartition, task.Type())
|
||||
@ -1416,6 +1420,7 @@ func TestHasPartitionTask(t *testing.T) {
|
||||
rootCoord: rc,
|
||||
result: nil,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_HasPartition, task.Type())
|
||||
@ -1463,6 +1468,7 @@ func TestShowPartitionsTask(t *testing.T) {
|
||||
rootCoord: rc,
|
||||
result: nil,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_ShowPartitions, task.Type())
|
||||
@ -2549,6 +2555,7 @@ func TestCreateResourceGroupTask(t *testing.T) {
|
||||
ctx: ctx,
|
||||
queryCoord: qc,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_CreateResourceGroup, task.Type())
|
||||
@ -2588,6 +2595,7 @@ func TestDropResourceGroupTask(t *testing.T) {
|
||||
ctx: ctx,
|
||||
queryCoord: qc,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_DropResourceGroup, task.Type())
|
||||
@ -2629,6 +2637,7 @@ func TestTransferNodeTask(t *testing.T) {
|
||||
ctx: ctx,
|
||||
queryCoord: qc,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_TransferNode, task.Type())
|
||||
@ -2671,6 +2680,7 @@ func TestTransferReplicaTask(t *testing.T) {
|
||||
ctx: ctx,
|
||||
queryCoord: qc,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_TransferReplica, task.Type())
|
||||
@ -2710,6 +2720,7 @@ func TestListResourceGroupsTask(t *testing.T) {
|
||||
ctx: ctx,
|
||||
queryCoord: qc,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_ListResourceGroups, task.Type())
|
||||
@ -2762,6 +2773,7 @@ func TestDescribeResourceGroupTask(t *testing.T) {
|
||||
ctx: ctx,
|
||||
queryCoord: qc,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type())
|
||||
@ -2808,6 +2820,7 @@ func TestDescribeResourceGroupTaskFailed(t *testing.T) {
|
||||
ctx: ctx,
|
||||
queryCoord: qc,
|
||||
}
|
||||
task.OnEnqueue()
|
||||
task.PreExecute(ctx)
|
||||
|
||||
assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type())
|
||||
|
||||
@ -41,6 +41,7 @@ import (
|
||||
)
|
||||
|
||||
type upsertTask struct {
|
||||
baseTask
|
||||
Condition
|
||||
|
||||
upsertMsg *msgstream.UpsertMsg
|
||||
@ -133,6 +134,11 @@ func (it *upsertTask) getChannels() []pChan {
|
||||
}
|
||||
|
||||
func (it *upsertTask) OnEnqueue() error {
|
||||
if it.req.Base == nil {
|
||||
it.req.Base = commonpbutil.NewMsgBase()
|
||||
}
|
||||
it.req.Base.MsgType = commonpb.MsgType_Upsert
|
||||
it.req.Base.SourceID = paramtable.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -322,6 +322,16 @@ var (
|
||||
Name: "slow_query_count",
|
||||
Help: "count of slow query executed",
|
||||
}, []string{nodeIDLabelName, msgTypeLabelName})
|
||||
|
||||
// ProxyReqInQueueLatency records the latency that requests wait in the queue, like "CreateCollection".
|
||||
ProxyReqInQueueLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "req_in_queue_latency",
|
||||
Help: "latency which request waits in the queue",
|
||||
Buckets: buckets, // unit: ms
|
||||
}, []string{nodeIDLabelName, functionLabelName})
|
||||
)
|
||||
|
||||
// RegisterProxy registers Proxy metrics
|
||||
@ -370,6 +380,7 @@ func RegisterProxy(registry *prometheus.Registry) {
|
||||
registry.MustRegister(ProxyRateLimitReqCount)
|
||||
|
||||
registry.MustRegister(ProxySlowQueryCount)
|
||||
registry.MustRegister(ProxyReqInQueueLatency)
|
||||
}
|
||||
|
||||
func CleanupCollectionMetrics(nodeID int64, collection string) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user