diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 4e01ec8919..fdb747bf67 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -720,7 +720,7 @@ dataCoord: memoryLimitPerSlot: 160 # The memory limit (in MB) of buffer size per slot for pre-import/import task. gracefulStopTimeout: 5 # seconds. force stop node without graceful stop slot: - clusteringCompactionUsage: 65536 # slot usage of clustering compaction task, setting it to 65536 means it takes up a whole worker. + clusteringCompactionUsage: 65535 # slot usage of clustering compaction task, setting it to 65536 means it takes up a whole worker. mixCompactionUsage: 4 # slot usage of mix compaction task. l0DeleteCompactionUsage: 8 # slot usage of l0 compaction task. indexTaskSlotUsage: 64 # slot usage of index task per 512mb diff --git a/internal/datanode/index_services.go b/internal/datanode/index_services.go index 26c6a0bfcd..d5c0dd7211 100644 --- a/internal/datanode/index_services.go +++ b/internal/datanode/index_services.go @@ -256,7 +256,7 @@ func (node *DataNode) CreateJobV2(ctx context.Context, req *workerpb.CreateJobV2 } func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJobRequest) (*commonpb.Status, error) { - log.Info("DataNode building index ...", + log.Ctx(ctx).Info("DataNode building index ...", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("segmentID", req.GetSegmentID()), @@ -277,6 +277,10 @@ func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJ zap.Int64("taskSlot", req.GetTaskSlot()), zap.Int64("lackBinlogRows", req.GetLackBinlogRows()), ) + if req.GetTaskSlot() <= 0 { + log.Ctx(ctx).Warn("receive index task with invalid slot, set to 64", zap.Int64("taskSlot", req.GetTaskSlot())) + req.TaskSlot = 64 + } taskCtx, taskCancel := context.WithCancel(node.ctx) if oldInfo := node.taskManager.LoadOrStoreIndexTask(req.GetClusterID(), req.GetBuildID(), &index.IndexTaskInfo{ Cancel: taskCancel, @@ -314,7 +318,7 @@ func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJ } func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.AnalyzeRequest) (*commonpb.Status, error) { - log.Info("receive analyze job", zap.Int64("collectionID", req.GetCollectionID()), + log.Ctx(ctx).Info("receive analyze job", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("fieldID", req.GetFieldID()), zap.String("fieldName", req.GetFieldName()), @@ -326,6 +330,11 @@ func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.Analy zap.Int64("taskSlot", req.GetTaskSlot()), ) + if req.GetTaskSlot() <= 0 { + log.Ctx(ctx).Warn("receive analyze task with invalid slot, set to 65535", zap.Int64("taskSlot", req.GetTaskSlot())) + req.TaskSlot = 65535 + } + taskCtx, taskCancel := context.WithCancel(node.ctx) if oldInfo := node.taskManager.LoadOrStoreAnalyzeTask(req.GetClusterID(), req.GetTaskID(), &index.AnalyzeTaskInfo{ Cancel: taskCancel, @@ -348,7 +357,7 @@ func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.Analy } func (node *DataNode) createStatsTask(ctx context.Context, req *workerpb.CreateStatsRequest) (*commonpb.Status, error) { - log.Info("receive stats job", zap.Int64("collectionID", req.GetCollectionID()), + log.Ctx(ctx).Info("receive stats job", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("segmentID", req.GetSegmentID()), zap.Int64("numRows", req.GetNumRows()), @@ -359,6 +368,11 @@ func (node *DataNode) createStatsTask(ctx context.Context, req *workerpb.CreateS zap.Int64("taskSlot", req.GetTaskSlot()), ) + if req.GetTaskSlot() <= 0 { + log.Ctx(ctx).Warn("receive stats task with invalid slot, set to 64", zap.Int64("taskSlot", req.GetTaskSlot())) + req.TaskSlot = 64 + } + taskCtx, taskCancel := context.WithCancel(node.ctx) if oldInfo := node.taskManager.LoadOrStoreStatsTask(req.GetClusterID(), req.GetTaskID(), &index.StatsTaskInfo{ Cancel: taskCancel, diff --git a/pkg/proto/worker.proto b/pkg/proto/worker.proto index 89715a429c..2cf64c5dc9 100644 --- a/pkg/proto/worker.proto +++ b/pkg/proto/worker.proto @@ -99,9 +99,9 @@ message CreateJobRequest { schema.FieldSchema field = 25; bool partition_key_isolation = 26; int32 current_scalar_index_version = 27; - int64 storage_version = 28; - int64 lack_binlog_rows = 29; - int64 task_slot = 30; + int64 task_slot = 28; + int64 storage_version = 29; + int64 lack_binlog_rows = 30; repeated data.FieldBinlog insert_logs = 31; } diff --git a/pkg/proto/workerpb/worker.pb.go b/pkg/proto/workerpb/worker.pb.go index c11cfbc5cf..27f1d04771 100644 --- a/pkg/proto/workerpb/worker.pb.go +++ b/pkg/proto/workerpb/worker.pb.go @@ -286,9 +286,9 @@ type CreateJobRequest struct { Field *schemapb.FieldSchema `protobuf:"bytes,25,opt,name=field,proto3" json:"field,omitempty"` PartitionKeyIsolation bool `protobuf:"varint,26,opt,name=partition_key_isolation,json=partitionKeyIsolation,proto3" json:"partition_key_isolation,omitempty"` CurrentScalarIndexVersion int32 `protobuf:"varint,27,opt,name=current_scalar_index_version,json=currentScalarIndexVersion,proto3" json:"current_scalar_index_version,omitempty"` - StorageVersion int64 `protobuf:"varint,28,opt,name=storage_version,json=storageVersion,proto3" json:"storage_version,omitempty"` - LackBinlogRows int64 `protobuf:"varint,29,opt,name=lack_binlog_rows,json=lackBinlogRows,proto3" json:"lack_binlog_rows,omitempty"` - TaskSlot int64 `protobuf:"varint,30,opt,name=task_slot,json=taskSlot,proto3" json:"task_slot,omitempty"` + TaskSlot int64 `protobuf:"varint,28,opt,name=task_slot,json=taskSlot,proto3" json:"task_slot,omitempty"` + StorageVersion int64 `protobuf:"varint,29,opt,name=storage_version,json=storageVersion,proto3" json:"storage_version,omitempty"` + LackBinlogRows int64 `protobuf:"varint,30,opt,name=lack_binlog_rows,json=lackBinlogRows,proto3" json:"lack_binlog_rows,omitempty"` InsertLogs []*datapb.FieldBinlog `protobuf:"bytes,31,rep,name=insert_logs,json=insertLogs,proto3" json:"insert_logs,omitempty"` } @@ -513,6 +513,13 @@ func (x *CreateJobRequest) GetCurrentScalarIndexVersion() int32 { return 0 } +func (x *CreateJobRequest) GetTaskSlot() int64 { + if x != nil { + return x.TaskSlot + } + return 0 +} + func (x *CreateJobRequest) GetStorageVersion() int64 { if x != nil { return x.StorageVersion @@ -527,13 +534,6 @@ func (x *CreateJobRequest) GetLackBinlogRows() int64 { return 0 } -func (x *CreateJobRequest) GetTaskSlot() int64 { - if x != nil { - return x.TaskSlot - } - return 0 -} - func (x *CreateJobRequest) GetInsertLogs() []*datapb.FieldBinlog { if x != nil { return x.InsertLogs @@ -2245,14 +2245,14 @@ var file_worker_proto_rawDesc = []byte{ 0x61, 0x6c, 0x61, 0x72, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x1b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x19, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x61, 0x6c, 0x61, 0x72, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x1c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x73, 0x74, - 0x6f, 0x72, 0x61, 0x67, 0x65, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x10, - 0x6c, 0x61, 0x63, 0x6b, 0x5f, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x5f, 0x72, 0x6f, 0x77, 0x73, - 0x18, 0x1d, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6c, 0x61, 0x63, 0x6b, 0x42, 0x69, 0x6e, 0x6c, - 0x6f, 0x67, 0x52, 0x6f, 0x77, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x73, - 0x6c, 0x6f, 0x74, 0x18, 0x1e, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x53, - 0x6c, 0x6f, 0x74, 0x12, 0x3f, 0x0a, 0x0b, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x5f, 0x6c, 0x6f, + 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x73, 0x6c, 0x6f, 0x74, + 0x18, 0x1c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x53, 0x6c, 0x6f, 0x74, + 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x18, 0x1d, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x6c, 0x61, 0x63, + 0x6b, 0x5f, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x1e, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6c, 0x61, 0x63, 0x6b, 0x42, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x52, + 0x6f, 0x77, 0x73, 0x12, 0x3f, 0x0a, 0x0b, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x73, 0x18, 0x1f, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x42, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 338e9849b2..6f6792d088 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5076,7 +5076,7 @@ if param targetVecIndexVersion is not set, the default value is -1, which means Key: "dataCoord.slot.clusteringCompactionUsage", Version: "2.4.6", Doc: "slot usage of clustering compaction task, setting it to 65536 means it takes up a whole worker.", - DefaultValue: "65536", + DefaultValue: "65535", PanicIfEmpty: false, Export: true, Formatter: func(value string) string { @@ -5140,7 +5140,7 @@ if param targetVecIndexVersion is not set, the default value is -1, which means Key: "dataCoord.slot.analyzeTaskSlotUsage", Version: "2.5.8", Doc: "slot usage of analyze task", - DefaultValue: "65536", + DefaultValue: "65535", PanicIfEmpty: false, Export: true, Formatter: func(value string) string {