diff --git a/internal/core/src/common/Array.h b/internal/core/src/common/Array.h index 77b4d16cd4..4cf311f0ac 100644 --- a/internal/core/src/common/Array.h +++ b/internal/core/src/common/Array.h @@ -130,10 +130,10 @@ class Array { size_t size, DataType element_type, std::vector&& element_offsets) - : size_(size), - element_type_(element_type), + : length_(element_offsets.size()), + size_(size), offsets_(std::move(element_offsets)), - length_(element_offsets.size()) { + element_type_(element_type) { delete[] data_; data_ = new char[size]; std::copy(data, data + size, data_); diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index 6ccc2a879b..22dc50e08b 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -381,13 +381,10 @@ class ConcurrentVector : public ConcurrentVectorImpl { public: explicit ConcurrentVector(int64_t dim, int64_t size_per_chunk) - : binary_dim_(dim), ConcurrentVectorImpl(dim / 8, size_per_chunk) { + : ConcurrentVectorImpl(dim / 8, size_per_chunk) { AssertInfo(dim % 8 == 0, fmt::format("dim is not a multiple of 8, dim={}", dim)); } - - private: - int64_t binary_dim_; }; template <> diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 57d03538c0..e1222c855c 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2279,8 +2279,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) // InsertCnt always equals to the number of entities in the request it.result.InsertCnt = int64(request.NumRows) - receiveSize := proto.Size(it.insertMsg) - rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize)) + rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(it.insertMsg.Size())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() @@ -2469,10 +2468,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) setErrorIndex() } - insertReceiveSize := proto.Size(it.upsertMsg.InsertMsg) - deleteReceiveSize := proto.Size(it.upsertMsg.DeleteMsg) - - rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(insertReceiveSize+deleteReceiveSize)) + rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.DeleteMsg.Size()+it.upsertMsg.DeleteMsg.Size())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index ca77022d10..8e4205cb66 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -20,7 +20,6 @@ import ( "fmt" "reflect" - "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -108,7 +107,7 @@ func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error { switch msg.Type() { case commonpb.MsgType_Insert: insertMsg := msg.(*msgstream.InsertMsg) - metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(proto.Size(insertMsg))) + metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(insertMsg.Size())) for _, policy := range fNode.InsertMsgPolicys { err := policy(fNode, c, insertMsg) if err != nil { @@ -118,7 +117,7 @@ func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error { case commonpb.MsgType_Delete: deleteMsg := msg.(*msgstream.DeleteMsg) - metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(proto.Size(deleteMsg))) + metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(deleteMsg.Size())) for _, policy := range fNode.DeleteMsgPolicys { err := policy(fNode, c, deleteMsg) if err != nil { diff --git a/internal/querynodev2/pipeline/message.go b/internal/querynodev2/pipeline/message.go index c5d0f6781a..fd5f3acda7 100644 --- a/internal/querynodev2/pipeline/message.go +++ b/internal/querynodev2/pipeline/message.go @@ -17,8 +17,6 @@ package pipeline import ( - "github.com/golang/protobuf/proto" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -42,11 +40,11 @@ func (msg *insertNodeMsg) append(taskMsg msgstream.TsMsg) error { case commonpb.MsgType_Insert: insertMsg := taskMsg.(*InsertMsg) msg.insertMsgs = append(msg.insertMsgs, insertMsg) - collector.Rate.Add(metricsinfo.InsertConsumeThroughput, float64(proto.Size(&insertMsg.InsertRequest))) + collector.Rate.Add(metricsinfo.InsertConsumeThroughput, float64(insertMsg.Size())) case commonpb.MsgType_Delete: deleteMsg := taskMsg.(*DeleteMsg) msg.deleteMsgs = append(msg.deleteMsgs, deleteMsg) - collector.Rate.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&deleteMsg.DeleteRequest))) + collector.Rate.Add(metricsinfo.DeleteConsumeThroughput, float64(deleteMsg.Size())) default: return merr.WrapErrParameterInvalid("msgType is Insert or Delete", "not") }