From 8b89744c6b61a6aeeff45e18715a2e1b1fac1b2f Mon Sep 17 00:00:00 2001 From: neza2017 Date: Sat, 10 Jul 2021 10:21:52 +0800 Subject: [PATCH] search with exp (#6409) * process float vector and binary vector field on the search result Signed-off-by: yefu.chen * add more debug info on etcd Signed-off-by: yefu.chen * add debug info when grpc failed Signed-off-by: yefu.chen --- .../distributed/datacoord/client/client.go | 1 + .../distributed/datanode/client/client.go | 1 + .../distributed/indexcoord/client/client.go | 1 + .../distributed/indexnode/client/client.go | 1 + internal/distributed/proxy/client/client.go | 1 + .../distributed/querycoord/client/client.go | 1 + .../distributed/querynode/client/client.go | 1 + .../distributed/rootcoord/client/client.go | 1 + internal/proxy/task.go | 13 ++++- internal/querynode/query_collection.go | 57 ++++++++++++++++++- internal/rootcoord/root_coord.go | 1 + internal/util/sessionutil/session_util.go | 4 ++ internal/util/typeutil/schema.go | 22 ++++++- 13 files changed, 100 insertions(+), 5 deletions(-) diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 88d163bead..e23276c8e8 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -131,6 +131,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + log.Debug("DataCoord Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to datacoord failed with error:\n" + err.Error()) diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index d4cf48663e..3b277bcc8c 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -112,6 +112,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + log.Debug("DataNode Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to datanode failed with error:\n" + err.Error()) diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 21b5f6d204..2a29f75207 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -126,6 +126,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + log.Debug("IndexCoord Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to indexcoord failed with error:\n" + err.Error()) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index e60abcf680..9f4dc7ee66 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -107,6 +107,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + log.Debug("IndexNode Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to indexnode failed with error:\n" + err.Error()) diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 4365cef113..2508d8614e 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -106,6 +106,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + log.Debug("Proxy Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to proxy failed with error:\n" + err.Error()) diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 1ccd294e2d..fbf0483eaf 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -134,6 +134,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + log.Debug("QueryCoord Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to querycoord failed with error:\n" + err.Error()) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index adfaad949e..f5b4b0d005 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -106,6 +106,7 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) if err == nil { return ret, nil } + log.Debug("QueryNode Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to querynode failed with error:\n" + err.Error()) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 7e8759d7d5..ee3d6a64af 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -155,6 +155,7 @@ func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, er if err == nil { return ret, nil } + log.Debug("RootCoord Client grpc error", zap.Error(err)) err = c.connect() if err != nil { return ret, errors.New("Connect to rootcoord failed with error:\n" + err.Error()) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 921ff53bb1..9528d417c9 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -1722,13 +1722,21 @@ func reduceSearchResultDataParallel(searchResultData []*schemapb.SearchResultDat switch vectorType := fieldType.Vectors.Data.(type) { case *schemapb.VectorField_BinaryVector: if ret.Results.FieldsData[k].GetVectors().GetBinaryVector() == nil { - ret.Results.FieldsData[k].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector = []byte{vectorType.BinaryVector[curIdx*int((dim/8))]} + bvec := &schemapb.VectorField_BinaryVector{ + BinaryVector: vectorType.BinaryVector[curIdx*int((dim/8)) : (curIdx+1)*int((dim/8))], + } + ret.Results.FieldsData[k].GetVectors().Data = bvec } else { ret.Results.FieldsData[k].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector = append(ret.Results.FieldsData[k].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector, vectorType.BinaryVector[curIdx*int((dim/8)):(curIdx+1)*int((dim/8))]...) } case *schemapb.VectorField_FloatVector: if ret.Results.FieldsData[k].GetVectors().GetFloatVector() == nil { - ret.Results.FieldsData[k].GetVectors().GetFloatVector().Data = []float32{vectorType.FloatVector.Data[curIdx*int(dim)]} + fvec := &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: vectorType.FloatVector.Data[curIdx*int(dim) : (curIdx+1)*int(dim)], + }, + } + ret.Results.FieldsData[k].GetVectors().Data = fvec } else { ret.Results.FieldsData[k].GetVectors().GetFloatVector().Data = append(ret.Results.FieldsData[k].GetVectors().GetFloatVector().Data, vectorType.FloatVector.Data[curIdx*int(dim):(curIdx+1)*int(dim)]...) } @@ -1902,7 +1910,6 @@ func (st *SearchTask) PostExecute(ctx context.Context) error { } } } - log.Debug("Proxy Search PostExecute Done") return nil } diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 4a17a48d32..4135438471 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -18,6 +18,7 @@ import ( "math" "reflect" "sync" + "unsafe" oplog "github.com/opentracing/opentracing-go/log" "go.uber.org/zap" @@ -697,11 +698,65 @@ func translateHits(schema *typeutil.SchemaHelper, fieldIDs []int64, rawHits [][] finalResult.FieldsData = append(finalResult.FieldsData, newCol) blobOffset += blobLen case schemapb.DataType_FloatVector: + dim, err := schema.GetVectorDimFromID(fieldID) + if err != nil { + return nil, err + } + blobLen := dim * 4 + var colData []float32 + for _, hit := range hits { + for _, row := range hit.RowData { + dataBlob := row[blobOffset : blobOffset+blobLen] + //ref https://github.com/golang/go/wiki/cgo#turning-c-arrays-into-go-slices + ptr := unsafe.Pointer(&dataBlob[0]) + farray := (*[1 << 28]float32)(ptr) + colData = append(colData, farray[:dim:dim]...) + } + } + newCol := &schemapb.FieldData{ + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: colData, + }, + }, + }, + }, + } + finalResult.FieldsData = append(finalResult.FieldsData, newCol) + blobOffset += blobLen case schemapb.DataType_BinaryVector: - return nil, fmt.Errorf("unsupported") + dim, err := schema.GetVectorDimFromID(fieldID) + if err != nil { + return nil, err + } + blobLen := dim / 8 + var colData []byte + for _, hit := range hits { + for _, row := range hit.RowData { + dataBlob := row[blobOffset : blobOffset+blobLen] + colData = append(colData, dataBlob...) + } + } + newCol := &schemapb.FieldData{ + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_BinaryVector{ + BinaryVector: colData, + }, + }, + }, + } + finalResult.FieldsData = append(finalResult.FieldsData, newCol) + blobOffset += blobLen default: + return nil, fmt.Errorf("unsupport data type %s", schemapb.DataType_name[int32(fieldMeta.DataType)]) } } + return finalResult, nil } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index a4d347d43c..3a2b24e7b5 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -275,6 +275,7 @@ func (c *Core) sessionLoop() { time.Sleep(time.Second) os.Exit(-1) }() + return } } } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index d8b71fabe5..46c09bfaba 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -169,6 +169,7 @@ func (s *Session) getServerIDWithKey(key string, retryTimes uint) (int64, error) // it is false. Otherwise, set it to true. func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) { var ch <-chan *clientv3.LeaseKeepAliveResponse + log.Debug("Session Register Begin") registerFn := func() error { resp, err := s.etcdCli.Grant(s.ctx, DefaultTTL) if err != nil { @@ -207,6 +208,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er fmt.Printf("keep alive error %s\n", err) return err } + log.Debug("Session Register End", zap.Int64("ServerID", s.ServerID)) return nil } err := retry.Do(s.ctx, registerFn, retry.Attempts(DefaultRetryTimes), retry.Sleep(500*time.Millisecond)) @@ -228,10 +230,12 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes return case resp, ok := <-ch: if !ok { + log.Debug("session keepalive channel closed") close(failCh) return } if resp == nil { + log.Debug("session keepalive response failed") close(failCh) return } diff --git a/internal/util/typeutil/schema.go b/internal/util/typeutil/schema.go index 0acb0be58d..083636b6b2 100644 --- a/internal/util/typeutil/schema.go +++ b/internal/util/typeutil/schema.go @@ -109,11 +109,31 @@ func (helper *SchemaHelper) GetFieldFromName(fieldName string) (*schemapb.FieldS func (helper *SchemaHelper) GetFieldFromID(fieldID int64) (*schemapb.FieldSchema, error) { offset, ok := helper.idOffset[fieldID] if !ok { - return nil, fmt.Errorf("fieldName(%d) not found", fieldID) + return nil, fmt.Errorf("fieldID(%d) not found", fieldID) } return helper.schema.Fields[offset], nil } +func (helper *SchemaHelper) GetVectorDimFromID(filedID int64) (int, error) { + sch, err := helper.GetFieldFromID(filedID) + if err != nil { + return 0, err + } + if !IsVectorType(sch.DataType) { + return 0, fmt.Errorf("field type = %s not has dim", schemapb.DataType_name[int32(sch.DataType)]) + } + for _, kv := range sch.TypeParams { + if kv.Key == "dim" { + dim, err := strconv.Atoi(kv.Value) + if err != nil { + return 0, err + } + return dim, nil + } + } + return 0, fmt.Errorf("fieldID(%d) not has dim", filedID) +} + func IsVectorType(dataType schemapb.DataType) bool { switch dataType { case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector: