Add default value for the parameter roundDeciaml (#9626)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2021-10-26 18:24:23 +08:00 committed by GitHub
parent 1227b9ebae
commit 3734ff49ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 351 additions and 3 deletions

View File

@ -1517,18 +1517,22 @@ func (st *searchTask) PreExecute(ctx context.Context) error {
}
roundDecimalStr, err := funcutil.GetAttrByKeyFromRepeatedKV(RoundDecimalKey, st.query.SearchParams)
if err != nil {
return errors.New(RoundDecimalKey + "not found in search_params")
roundDecimalStr = "-1"
}
roundDeciaml, err := strconv.Atoi(roundDecimalStr)
roundDecimal, err := strconv.Atoi(roundDecimalStr)
if err != nil {
return errors.New(RoundDecimalKey + " " + roundDecimalStr + " is not invalid")
}
if roundDecimal != -1 && (roundDecimal > 6 || roundDecimal < 0) {
return errors.New(RoundDecimalKey + " " + roundDecimalStr + " is not invalid")
}
queryInfo := &planpb.QueryInfo{
Topk: int64(topK),
MetricType: metricType,
SearchParams: searchParams,
RoundDecimal: int64(roundDeciaml),
RoundDecimal: int64(roundDecimal),
}
log.Debug("create query plan",

View File

@ -1993,6 +1993,350 @@ func TestSearchTask_all(t *testing.T) {
wg.Wait()
}
func TestSearchTaskWithInvalidRoundDecimal(t *testing.T) {
var err error
Params.Init()
Params.SearchResultChannelNames = []string{funcutil.GenRandomStr()}
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
ctx := context.Background()
err = InitMetaCache(rc)
assert.NoError(t, err)
shardsNum := int32(2)
prefix := "TestSearchTask_all"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
boolField := "bool"
int32Field := "int32"
int64Field := "int64"
floatField := "float"
doubleField := "double"
floatVecField := "fvec"
binaryVecField := "bvec"
fieldsLen := len([]string{boolField, int32Field, int64Field, floatField, doubleField, floatVecField, binaryVecField})
dim := 128
expr := fmt.Sprintf("%s > 0", int64Field)
nq := 10
topk := 10
roundDecimal := 7
nprobe := 10
schema := constructCollectionSchemaWithAllType(
boolField, int32Field, int64Field, floatField, doubleField,
floatVecField, binaryVecField, dim, collectionName)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createColT := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
assert.NoError(t, createColT.OnEnqueue())
assert.NoError(t, createColT.PreExecute(ctx))
assert.NoError(t, createColT.Execute(ctx))
assert.NoError(t, createColT.PostExecute(ctx))
dmlChannelsFunc := getDmlChannelsFunc(ctx, rc)
query := newMockGetChannelsService()
factory := newSimpleMockMsgStreamFactory()
chMgr := newChannelsMgrImpl(dmlChannelsFunc, nil, query.GetChannels, nil, factory)
defer chMgr.removeAllDMLStream()
defer chMgr.removeAllDQLStream()
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
assert.NoError(t, err)
qc := NewQueryCoordMock()
qc.Start()
defer qc.Stop()
status, err := qc.LoadCollection(ctx, &querypb.LoadCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadCollection,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbID: 0,
CollectionID: collectionID,
Schema: nil,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
req := constructSearchRequest(dbName, collectionName,
expr,
floatVecField,
nq, dim, nprobe, topk, roundDecimal)
task := &searchTask{
Condition: NewTaskCondition(ctx),
SearchRequest: &internalpb.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Search,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
ResultChannelID: strconv.FormatInt(Params.ProxyID, 10),
DbID: 0,
CollectionID: 0,
PartitionIDs: nil,
Dsl: "",
PlaceholderGroup: nil,
DslType: 0,
SerializedExprPlan: nil,
OutputFieldsId: nil,
TravelTimestamp: 0,
GuaranteeTimestamp: 0,
},
ctx: ctx,
resultBuf: make(chan []*internalpb.SearchResults),
result: nil,
query: req,
chMgr: chMgr,
qc: qc,
}
// simple mock for query node
// TODO(dragondriver): should we replace this mock using RocksMq or MemMsgStream?
err = chMgr.createDQLStream(collectionID)
assert.NoError(t, err)
stream, err := chMgr.getDQLStream(collectionID)
assert.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
consumeCtx, cancel := context.WithCancel(ctx)
go func() {
defer wg.Done()
for {
select {
case <-consumeCtx.Done():
return
case pack := <-stream.Chan():
for _, msg := range pack.Msgs {
_, ok := msg.(*msgstream.SearchMsg)
assert.True(t, ok)
// TODO(dragondriver): construct result according to the request
constructSearchResulstData := func() *schemapb.SearchResultData {
resultData := &schemapb.SearchResultData{
NumQueries: int64(nq),
TopK: int64(topk),
FieldsData: make([]*schemapb.FieldData, fieldsLen),
Scores: make([]float32, nq*topk),
Ids: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: make([]int64, nq*topk),
},
},
},
Topks: make([]int64, nq),
}
resultData.FieldsData[0] = &schemapb.FieldData{
Type: schemapb.DataType_Bool,
FieldName: boolField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: generateBoolArray(nq * topk),
},
},
},
},
FieldId: common.StartOfUserFieldID + 0,
}
resultData.FieldsData[1] = &schemapb.FieldData{
Type: schemapb.DataType_Int32,
FieldName: int32Field,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: generateInt32Array(nq * topk),
},
},
},
},
FieldId: common.StartOfUserFieldID + 1,
}
resultData.FieldsData[2] = &schemapb.FieldData{
Type: schemapb.DataType_Int64,
FieldName: int64Field,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: generateInt64Array(nq * topk),
},
},
},
},
FieldId: common.StartOfUserFieldID + 2,
}
resultData.FieldsData[3] = &schemapb.FieldData{
Type: schemapb.DataType_Float,
FieldName: floatField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: generateFloat32Array(nq * topk),
},
},
},
},
FieldId: common.StartOfUserFieldID + 3,
}
resultData.FieldsData[4] = &schemapb.FieldData{
Type: schemapb.DataType_Double,
FieldName: doubleField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: generateFloat64Array(nq * topk),
},
},
},
},
FieldId: common.StartOfUserFieldID + 4,
}
resultData.FieldsData[5] = &schemapb.FieldData{
Type: schemapb.DataType_FloatVector,
FieldName: doubleField,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: generateFloatVectors(nq*topk, dim),
},
},
},
},
FieldId: common.StartOfUserFieldID + 5,
}
resultData.FieldsData[6] = &schemapb.FieldData{
Type: schemapb.DataType_BinaryVector,
FieldName: doubleField,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_BinaryVector{
BinaryVector: generateBinaryVectors(nq*topk, dim),
},
},
},
FieldId: common.StartOfUserFieldID + 6,
}
for i := 0; i < nq; i++ {
for j := 0; j < topk; j++ {
offset := i*topk + j
score := float32(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) // increasingly
id := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
resultData.Scores[offset] = score
resultData.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data[offset] = id
}
resultData.Topks[i] = int64(topk)
}
return resultData
}
result1 := &internalpb.SearchResults{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SearchResult,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
ResultChannelID: "",
MetricType: distance.L2,
NumQueries: int64(nq),
TopK: int64(topk),
SealedSegmentIDsSearched: nil,
ChannelIDsSearched: nil,
GlobalSealedSegmentIDs: nil,
SlicedBlob: nil,
SlicedNumCount: 1,
SlicedOffset: 0,
}
resultData := constructSearchResulstData()
sliceBlob, err := proto.Marshal(resultData)
assert.NoError(t, err)
result1.SlicedBlob = sliceBlob
// result2.SliceBlob = nil, will be skipped in decode stage
result2 := &internalpb.SearchResults{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SearchResult,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
ResultChannelID: "",
MetricType: distance.L2,
NumQueries: int64(nq),
TopK: int64(topk),
SealedSegmentIDsSearched: nil,
ChannelIDsSearched: nil,
GlobalSealedSegmentIDs: nil,
SlicedBlob: nil,
SlicedNumCount: 1,
SlicedOffset: 0,
}
// send search result
task.resultBuf <- []*internalpb.SearchResults{result1, result2}
}
}
}
}()
assert.NoError(t, task.OnEnqueue())
assert.Error(t, task.PreExecute(ctx))
cancel()
wg.Wait()
}
func TestSearchTask_7803_reduce(t *testing.T) {
var err error