diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index eb9c207204..bf3b87b80f 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -107,38 +107,6 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule it.SetState(indexpb.JobState_JobStateFinished, "fake finished index success") return true } - // vector index build needs information of optional scalar fields data - optionalFields := make([]*indexpb.OptionalFieldInfo, 0) - partitionKeyIsolation := false - if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) { - collInfo, err := dependency.handler.GetCollection(ctx, segIndex.CollectionID) - if err != nil || collInfo == nil { - log.Ctx(ctx).Warn("get collection failed", zap.Int64("collID", segIndex.CollectionID), zap.Error(err)) - it.SetState(indexpb.JobState_JobStateInit, err.Error()) - return true - } - colSchema := collInfo.Schema - partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(colSchema) - if partitionKeyField == nil || err != nil { - log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("taskID", it.taskID), zap.Error(err)) - } else { - if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) { - optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{ - FieldID: partitionKeyField.FieldID, - FieldName: partitionKeyField.Name, - FieldType: int32(partitionKeyField.DataType), - DataIds: getBinLogIDs(segment, partitionKeyField.FieldID), - }) - iso, isoErr := common.IsPartitionKeyIsolationPropEnabled(collInfo.Properties) - if isoErr != nil { - log.Ctx(ctx).Warn("failed to parse partition key isolation", zap.Error(isoErr)) - } - if iso { - partitionKeyIsolation = true - } - } - } - } typeParams := dependency.meta.indexMeta.GetTypeParams(segIndex.CollectionID, segIndex.IndexID) @@ -202,6 +170,37 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule // don't return, maybe field is scalar field or sparseFloatVector } + // vector index build needs information of optional scalar fields data + optionalFields := make([]*indexpb.OptionalFieldInfo, 0) + partitionKeyIsolation := false + if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) && typeutil.IsDenseFloatVectorType(field.DataType) { + if collectionInfo == nil { + log.Ctx(ctx).Warn("get collection failed", zap.Int64("collID", segIndex.CollectionID), zap.Error(err)) + it.SetState(indexpb.JobState_JobStateInit, err.Error()) + return true + } + partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(schema) + if partitionKeyField == nil || err != nil { + log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("taskID", it.taskID), zap.Error(err)) + } else { + if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) { + optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{ + FieldID: partitionKeyField.FieldID, + FieldName: partitionKeyField.Name, + FieldType: int32(partitionKeyField.DataType), + DataIds: getBinLogIDs(segment, partitionKeyField.FieldID), + }) + iso, isoErr := common.IsPartitionKeyIsolationPropEnabled(collectionInfo.Properties) + if isoErr != nil { + log.Ctx(ctx).Warn("failed to parse partition key isolation", zap.Error(isoErr)) + } + if iso { + partitionKeyIsolation = true + } + } + } + } + if Params.CommonCfg.EnableStorageV2.GetAsBool() { storePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID()) if err != nil { diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index c92bba778a..8281023efd 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -1349,7 +1349,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { }, }, }, nil - }).Twice() + }).Once() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once() // retry --> init @@ -1366,7 +1366,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { {FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}}, }, }, - }, nil).Twice() + }, nil).Once() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil).Once() // inProgress --> Finished @@ -1595,6 +1595,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { mt.indexMeta.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW + mt.collections[collID].Schema.Fields[0].DataType = schemapb.DataType_FloatVector mt.collections[collID].Schema.Fields[1].IsPartitionKey = true mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar } @@ -1681,7 +1682,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { - s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set") return merr.Success(), nil }).Once() t := &indexBuildTask{ @@ -1698,6 +1699,33 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { resetMetaFunc() }) + s.Run("enqueue returns empty when vector type is not dense vector", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + for _, dataType := range []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_SparseFloatVector, + } { + mt.collections[collID].Schema.Fields[0].DataType = dataType + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set") + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + taskID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler.enqueue(t) + waitTaskDoneFunc(scheduler) + resetMetaFunc() + } + }) + s.Run("enqueue returns empty optional field when the data type is not STRING or VARCHAR or Integer", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") for _, dataType := range []schemapb.DataType{ @@ -1710,7 +1738,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { mt.collections[collID].Schema.Fields[1].DataType = dataType in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { - s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set") return merr.Success(), nil }).Once() t := &indexBuildTask{ @@ -1733,7 +1761,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { mt.collections[collID].Schema.Fields[1].IsPartitionKey = false in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { - s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set") return merr.Success(), nil }).Once() t := &indexBuildTask{