From cbe8c03224062fe62dcc0fefc2ace41b6ae58fca Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Sun, 26 Sep 2021 17:36:07 +0800 Subject: [PATCH] Fix golint warnings in msg.go (#8590) Signed-off-by: Xiangyu Wang --- internal/msgstream/mq_msgstream.go | 4 +- internal/msgstream/msg.go | 169 ++++++++++++++++++++++++--- internal/msgstream/msg_test.go | 6 +- internal/msgstream/unmarshal_test.go | 2 +- 4 files changed, 158 insertions(+), 23 deletions(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 7fb2d43a99..a24e2a2815 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -236,7 +236,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { return err } - m, err := ConvertToByteArray(mb) + m, err := convertToByteArray(mb) if err != nil { return err } @@ -275,7 +275,7 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error { return err } - m, err := ConvertToByteArray(mb) + m, err := convertToByteArray(mb) if err != nil { return err } diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index b7207e5ff4..79565881c5 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -22,9 +22,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" ) +// MsgType is an alias ofo commonpb.MsgType type MsgType = commonpb.MsgType + +// MarshalType is an empty interface type MarshalType = interface{} +// TsMsg provides methods to get begin timestamp and end timestamp of a message pack type TsMsg interface { TraceCtx() context.Context SetTraceCtx(ctx context.Context) @@ -40,6 +44,7 @@ type TsMsg interface { SetPosition(*MsgPosition) } +// BaseMsg is a basic structure that contains begin timestamp, end timestamp and the position of msgstream type BaseMsg struct { Ctx context.Context BeginTimestamp Timestamp @@ -48,35 +53,42 @@ type BaseMsg struct { MsgPosition *MsgPosition } +// TraceCtx returns the context of opentracing func (bm *BaseMsg) TraceCtx() context.Context { return bm.Ctx } +// SetTraceCtx is used to set context for opentracing func (bm *BaseMsg) SetTraceCtx(ctx context.Context) { bm.Ctx = ctx } +// BeginTs returns the begin timestamp of this message pack func (bm *BaseMsg) BeginTs() Timestamp { return bm.BeginTimestamp } +// EndTs returns the end timestamp of this message pack func (bm *BaseMsg) EndTs() Timestamp { return bm.EndTimestamp } +// HashKeys returns the end timestamp of this message pack func (bm *BaseMsg) HashKeys() []uint32 { return bm.HashValues } +// Position returns the position of this message pack in msgstream func (bm *BaseMsg) Position() *MsgPosition { return bm.MsgPosition } +// SetPosition is used to set position of this message in msgstream func (bm *BaseMsg) SetPosition(position *MsgPosition) { bm.MsgPosition = position } -func ConvertToByteArray(input interface{}) ([]byte, error) { +func convertToByteArray(input interface{}) ([]byte, error) { switch output := input.(type) { case []byte: return output, nil @@ -86,6 +98,8 @@ func ConvertToByteArray(input interface{}) ([]byte, error) { } /////////////////////////////////////////Insert////////////////////////////////////////// + +// InsertMsg is a message pack that contains insert request type InsertMsg struct { BaseMsg internalpb.InsertRequest @@ -94,18 +108,22 @@ type InsertMsg struct { // interface implementation validation var _ TsMsg = &InsertMsg{} +// ID returns the ID of this message pack func (it *InsertMsg) ID() UniqueID { return it.Base.MsgID } +// Type returns the type of this message pack func (it *InsertMsg) Type() MsgType { return it.Base.MsgType } +// SourceID indicated which component generated this message func (it *InsertMsg) SourceID() int64 { return it.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) { insertMsg := input.(*InsertMsg) insertRequest := &insertMsg.InsertRequest @@ -116,9 +134,10 @@ func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error) { insertRequest := internalpb.InsertRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -145,6 +164,8 @@ func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////Delete////////////////////////////////////////// + +// DeleteMsg is a message pack that contains delete request type DeleteMsg struct { BaseMsg internalpb.DeleteRequest @@ -153,18 +174,22 @@ type DeleteMsg struct { // interface implementation validation var _ TsMsg = &DeleteMsg{} +// ID returns the ID of this message pack func (dt *DeleteMsg) ID() UniqueID { return dt.Base.MsgID } +// Type returns the type of this message pack func (dt *DeleteMsg) Type() MsgType { return dt.Base.MsgType } +// SourceID indicated which component generated this message func (dt *DeleteMsg) SourceID() int64 { return dt.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) { deleteMsg := input.(*DeleteMsg) deleteRequest := &deleteMsg.DeleteRequest @@ -176,9 +201,10 @@ func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) { deleteRequest := internalpb.DeleteRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -194,6 +220,8 @@ func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////Search////////////////////////////////////////// + +// SearchMsg is a message pack that contains search request type SearchMsg struct { BaseMsg internalpb.SearchRequest @@ -202,26 +230,34 @@ type SearchMsg struct { // interface implementation validation var _ TsMsg = &SearchMsg{} +// ID returns the ID of this message pack func (st *SearchMsg) ID() UniqueID { return st.Base.MsgID } +// Type returns the type of this message pack func (st *SearchMsg) Type() MsgType { return st.Base.MsgType } +// SourceID indicated which component generated this message func (st *SearchMsg) SourceID() int64 { return st.Base.SourceID } +// GuaranteeTs returns the guarantee timestamp that querynode can perform this search request. This timestamp +// filled in client(e.g. pymilvus). The timestamp will be 0 if client never execute any insert, otherwise equals +// the timestamp from last insert response. func (st *SearchMsg) GuaranteeTs() Timestamp { return st.GetGuaranteeTimestamp() } +// TravelTs returns the timestamp of a time travel search request func (st *SearchMsg) TravelTs() Timestamp { return st.GetTravelTimestamp() } +// Marshal is used to serializing a message pack to byte array func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) { searchTask := input.(*SearchMsg) searchRequest := &searchTask.SearchRequest @@ -232,9 +268,10 @@ func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (st *SearchMsg) Unmarshal(input MarshalType) (TsMsg, error) { searchRequest := internalpb.SearchRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -250,6 +287,8 @@ func (st *SearchMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////SearchResult////////////////////////////////////////// + +// SearchResultMsg is a message pack that contains the result of search request type SearchResultMsg struct { BaseMsg internalpb.SearchResults @@ -258,18 +297,22 @@ type SearchResultMsg struct { // interface implementation validation var _ TsMsg = &SearchResultMsg{} +// ID returns the ID of this message pack func (srt *SearchResultMsg) ID() UniqueID { return srt.Base.MsgID } +// Type returns the type of this message pack func (srt *SearchResultMsg) Type() MsgType { return srt.Base.MsgType } +// SourceID indicated which component generated this message func (srt *SearchResultMsg) SourceID() int64 { return srt.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) { searchResultTask := input.(*SearchResultMsg) searchResultRequest := &searchResultTask.SearchResults @@ -280,9 +323,10 @@ func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { searchResultRequest := internalpb.SearchResults{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -298,6 +342,8 @@ func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { } ////////////////////////////////////////Retrieve///////////////////////////////////////// + +// RetrieveMsg is a message pack that contains retrieve request type RetrieveMsg struct { BaseMsg internalpb.RetrieveRequest @@ -306,26 +352,34 @@ type RetrieveMsg struct { // interface implementation validation var _ TsMsg = &RetrieveMsg{} +// ID returns the ID of this message pack func (rm *RetrieveMsg) ID() UniqueID { return rm.Base.MsgID } +// Type returns the type of this message pack func (rm *RetrieveMsg) Type() MsgType { return rm.Base.MsgType } +// SourceID indicated which component generated this message func (rm *RetrieveMsg) SourceID() int64 { return rm.Base.SourceID } +// GuaranteeTs returns the guarantee timestamp that querynode can perform this query request. This timestamp +// filled in client(e.g. pymilvus). The timestamp will be 0 if client never execute any insert, otherwise equals +// the timestamp from last insert response. func (rm *RetrieveMsg) GuaranteeTs() Timestamp { return rm.GetGuaranteeTimestamp() } +// TravelTs returns the timestamp of a time travel query request func (rm *RetrieveMsg) TravelTs() Timestamp { return rm.GetTravelTimestamp() } +// Marshal is used to serializing a message pack to byte array func (rm *RetrieveMsg) Marshal(input TsMsg) (MarshalType, error) { retrieveTask := input.(*RetrieveMsg) retrieveRequest := &retrieveTask.RetrieveRequest @@ -336,9 +390,10 @@ func (rm *RetrieveMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (rm *RetrieveMsg) Unmarshal(input MarshalType) (TsMsg, error) { retrieveRequest := internalpb.RetrieveRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -354,6 +409,8 @@ func (rm *RetrieveMsg) Unmarshal(input MarshalType) (TsMsg, error) { } //////////////////////////////////////RetrieveResult/////////////////////////////////////// + +// RetrieveResultMsg is a message pack that contains the result of query request type RetrieveResultMsg struct { BaseMsg internalpb.RetrieveResults @@ -362,18 +419,22 @@ type RetrieveResultMsg struct { // interface implementation validation var _ TsMsg = &RetrieveResultMsg{} +// ID returns the ID of this message pack func (rrm *RetrieveResultMsg) ID() UniqueID { return rrm.Base.MsgID } +// Type returns the type of this message pack func (rrm *RetrieveResultMsg) Type() MsgType { return rrm.Base.MsgType } +// SourceID indicated which component generated this message func (rrm *RetrieveResultMsg) SourceID() int64 { return rrm.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (rrm *RetrieveResultMsg) Marshal(input TsMsg) (MarshalType, error) { retrieveResultTask := input.(*RetrieveResultMsg) retrieveResultRequest := &retrieveResultTask.RetrieveResults @@ -384,9 +445,10 @@ func (rrm *RetrieveResultMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (rrm *RetrieveResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { retrieveResultRequest := internalpb.RetrieveResults{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -402,6 +464,8 @@ func (rrm *RetrieveResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////TimeTick////////////////////////////////////////// + +// TimeTickMsg is a message pack that contains time tick only type TimeTickMsg struct { BaseMsg internalpb.TimeTickMsg @@ -410,18 +474,22 @@ type TimeTickMsg struct { // interface implementation validation var _ TsMsg = &TimeTickMsg{} +// ID returns the ID of this message pack func (tst *TimeTickMsg) ID() UniqueID { return tst.Base.MsgID } +// Type returns the type of this message pack func (tst *TimeTickMsg) Type() MsgType { return tst.Base.MsgType } +// SourceID indicated which component generated this message func (tst *TimeTickMsg) SourceID() int64 { return tst.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) { timeTickTask := input.(*TimeTickMsg) timeTick := &timeTickTask.TimeTickMsg @@ -432,9 +500,10 @@ func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error) { timeTickMsg := internalpb.TimeTickMsg{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -450,6 +519,8 @@ func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////QueryNodeStats////////////////////////////////////////// + +// QueryNodeStatsMsg is a message pack that contains statistic from querynode // GOOSE TODO: remove QueryNodeStats type QueryNodeStatsMsg struct { BaseMsg @@ -459,26 +530,32 @@ type QueryNodeStatsMsg struct { // interface implementation validation var _ TsMsg = &QueryNodeStatsMsg{} +// TraceCtx returns the context of opentracing func (qs *QueryNodeStatsMsg) TraceCtx() context.Context { return qs.BaseMsg.Ctx } +// SetTraceCtx is used to set context for opentracing func (qs *QueryNodeStatsMsg) SetTraceCtx(ctx context.Context) { qs.BaseMsg.Ctx = ctx } +// ID returns the ID of this message pack func (qs *QueryNodeStatsMsg) ID() UniqueID { return qs.Base.MsgID } +// Type returns the type of this message pack func (qs *QueryNodeStatsMsg) Type() MsgType { return qs.Base.MsgType } +// SourceID indicated which component generated this message func (qs *QueryNodeStatsMsg) SourceID() int64 { return qs.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) { queryNodeSegStatsTask := input.(*QueryNodeStatsMsg) queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats @@ -489,9 +566,10 @@ func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (qs *QueryNodeStatsMsg) Unmarshal(input MarshalType) (TsMsg, error) { queryNodeSegStats := internalpb.QueryNodeStats{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -505,6 +583,8 @@ func (qs *QueryNodeStatsMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////SegmentStatisticsMsg////////////////////////////////////////// + +// SegmentStatisticsMsg is a message pack that contains segment statistic type SegmentStatisticsMsg struct { BaseMsg internalpb.SegmentStatistics @@ -513,26 +593,32 @@ type SegmentStatisticsMsg struct { // interface implementation validation var _ TsMsg = &SegmentStatisticsMsg{} +// TraceCtx returns the context of opentracing func (ss *SegmentStatisticsMsg) TraceCtx() context.Context { return ss.BaseMsg.Ctx } +// SetTraceCtx is used to set context for opentracing func (ss *SegmentStatisticsMsg) SetTraceCtx(ctx context.Context) { ss.BaseMsg.Ctx = ctx } +// ID returns the ID of this message pack func (ss *SegmentStatisticsMsg) ID() UniqueID { return ss.Base.MsgID } +// Type returns the type of this message pack func (ss *SegmentStatisticsMsg) Type() MsgType { return ss.Base.MsgType } +// SourceID indicated which component generated this message func (ss *SegmentStatisticsMsg) SourceID() int64 { return ss.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) { segStatsTask := input.(*SegmentStatisticsMsg) segStats := &segStatsTask.SegmentStatistics @@ -543,9 +629,10 @@ func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) { segStats := internalpb.SegmentStatistics{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -559,6 +646,8 @@ func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////CreateCollection////////////////////////////////////////// + +// CreateCollectionMsg is a message pack that contains create collection request type CreateCollectionMsg struct { BaseMsg internalpb.CreateCollectionRequest @@ -567,18 +656,22 @@ type CreateCollectionMsg struct { // interface implementation validation var _ TsMsg = &CreateCollectionMsg{} +// ID returns the ID of this message pack func (cc *CreateCollectionMsg) ID() UniqueID { return cc.Base.MsgID } +// Type returns the type of this message pack func (cc *CreateCollectionMsg) Type() MsgType { return cc.Base.MsgType } +// SourceID indicated which component generated this message func (cc *CreateCollectionMsg) SourceID() int64 { return cc.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { createCollectionMsg := input.(*CreateCollectionMsg) createCollectionRequest := &createCollectionMsg.CreateCollectionRequest @@ -589,9 +682,10 @@ func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { createCollectionRequest := internalpb.CreateCollectionRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -607,6 +701,8 @@ func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////DropCollection////////////////////////////////////////// + +// DropCollectionMsg is a message pack that contains drop collection request type DropCollectionMsg struct { BaseMsg internalpb.DropCollectionRequest @@ -615,18 +711,22 @@ type DropCollectionMsg struct { // interface implementation validation var _ TsMsg = &DropCollectionMsg{} +// ID returns the ID of this message pack func (dc *DropCollectionMsg) ID() UniqueID { return dc.Base.MsgID } +// Type returns the type of this message pack func (dc *DropCollectionMsg) Type() MsgType { return dc.Base.MsgType } +// SourceID indicated which component generated this message func (dc *DropCollectionMsg) SourceID() int64 { return dc.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { dropCollectionMsg := input.(*DropCollectionMsg) dropCollectionRequest := &dropCollectionMsg.DropCollectionRequest @@ -637,9 +737,10 @@ func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { dropCollectionRequest := internalpb.DropCollectionRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -655,6 +756,8 @@ func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////CreatePartition////////////////////////////////////////// + +// CreatePartitionMsg is a message pack that contains create partition request type CreatePartitionMsg struct { BaseMsg internalpb.CreatePartitionRequest @@ -663,18 +766,22 @@ type CreatePartitionMsg struct { // interface implementation validation var _ TsMsg = &CreatePartitionMsg{} +// ID returns the ID of this message pack func (cp *CreatePartitionMsg) ID() UniqueID { return cp.Base.MsgID } +// Type returns the type of this message pack func (cp *CreatePartitionMsg) Type() MsgType { return cp.Base.MsgType } +// SourceID indicated which component generated this message func (cp *CreatePartitionMsg) SourceID() int64 { return cp.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error) { createPartitionMsg := input.(*CreatePartitionMsg) createPartitionRequest := &createPartitionMsg.CreatePartitionRequest @@ -685,9 +792,10 @@ func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) { createPartitionRequest := internalpb.CreatePartitionRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -703,6 +811,8 @@ func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) { } /////////////////////////////////////////DropPartition////////////////////////////////////////// + +// DropPartitionMsg is a message pack that contains drop partition request type DropPartitionMsg struct { BaseMsg internalpb.DropPartitionRequest @@ -711,18 +821,22 @@ type DropPartitionMsg struct { // interface implementation validation var _ TsMsg = &DropPartitionMsg{} +// ID returns the ID of this message pack func (dp *DropPartitionMsg) ID() UniqueID { return dp.Base.MsgID } +// Type returns the type of this message pack func (dp *DropPartitionMsg) Type() MsgType { return dp.Base.MsgType } +// SourceID indicated which component generated this message func (dp *DropPartitionMsg) SourceID() int64 { return dp.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error) { dropPartitionMsg := input.(*DropPartitionMsg) dropPartitionRequest := &dropPartitionMsg.DropPartitionRequest @@ -733,9 +847,10 @@ func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) { dropPartitionRequest := internalpb.DropPartitionRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -758,26 +873,32 @@ type LoadIndexMsg struct { internalpb.LoadIndex } +// TraceCtx returns the context of opentracing func (lim *LoadIndexMsg) TraceCtx() context.Context { return lim.BaseMsg.Ctx } +// SetTraceCtx is used to set context for opentracing func (lim *LoadIndexMsg) SetTraceCtx(ctx context.Context) { lim.BaseMsg.Ctx = ctx } +// ID returns the ID of this message pack func (lim *LoadIndexMsg) ID() UniqueID { return lim.Base.MsgID } +// Type returns the type of this message pack func (lim *LoadIndexMsg) Type() MsgType { return lim.Base.MsgType } +// SourceID indicated which component generated this message func (lim *LoadIndexMsg) SourceID() int64 { return lim.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (lim *LoadIndexMsg) Marshal(input TsMsg) (MarshalType, error) { loadIndexMsg := input.(*LoadIndexMsg) loadIndexRequest := &loadIndexMsg.LoadIndex @@ -788,9 +909,10 @@ func (lim *LoadIndexMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (lim *LoadIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) { loadIndexRequest := internalpb.LoadIndex{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -805,6 +927,8 @@ func (lim *LoadIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) { */ /////////////////////////////////////////LoadBalanceSegments////////////////////////////////////////// + +// LoadBalanceSegmentsMsg is a message pack that contains load balance segments request type LoadBalanceSegmentsMsg struct { BaseMsg internalpb.LoadBalanceSegmentsRequest @@ -813,18 +937,22 @@ type LoadBalanceSegmentsMsg struct { // interface implementation validation var _ TsMsg = &LoadBalanceSegmentsMsg{} +// ID returns the ID of this message pack func (l *LoadBalanceSegmentsMsg) ID() UniqueID { return l.Base.MsgID } +// Type returns the type of this message pack func (l *LoadBalanceSegmentsMsg) Type() MsgType { return l.Base.MsgType } +// SourceID indicated which component generated this message func (l *LoadBalanceSegmentsMsg) SourceID() int64 { return l.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (l *LoadBalanceSegmentsMsg) Marshal(input TsMsg) (MarshalType, error) { load := input.(*LoadBalanceSegmentsMsg) loadReq := &load.LoadBalanceSegmentsRequest @@ -835,9 +963,10 @@ func (l *LoadBalanceSegmentsMsg) Marshal(input TsMsg) (MarshalType, error) { return mb, nil } +// Unmarshal is used to deserializing a message pack from byte array func (l *LoadBalanceSegmentsMsg) Unmarshal(input MarshalType) (TsMsg, error) { loadReq := internalpb.LoadBalanceSegmentsRequest{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } @@ -852,6 +981,7 @@ func (l *LoadBalanceSegmentsMsg) Unmarshal(input MarshalType) (TsMsg, error) { return loadMsg, nil } +// DataNodeTtMsg is a message pack that contains datanode time tick type DataNodeTtMsg struct { BaseMsg datapb.DataNodeTtMsg @@ -860,18 +990,22 @@ type DataNodeTtMsg struct { // interface implementation validation var _ TsMsg = &DataNodeTtMsg{} +// ID returns the ID of this message pack func (m *DataNodeTtMsg) ID() UniqueID { return m.Base.MsgID } +// Type returns the type of this message pack func (m *DataNodeTtMsg) Type() MsgType { return m.Base.MsgType } +// SourceID indicated which component generated this message func (m *DataNodeTtMsg) SourceID() int64 { return m.Base.SourceID } +// Marshal is used to serializing a message pack to byte array func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error) { msg := input.(*DataNodeTtMsg) t, err := proto.Marshal(&msg.DataNodeTtMsg) @@ -881,9 +1015,10 @@ func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error) { return t, nil } +// Unmarshal is used to deserializing a message pack from byte array func (m *DataNodeTtMsg) Unmarshal(input MarshalType) (TsMsg, error) { msg := datapb.DataNodeTtMsg{} - in, err := ConvertToByteArray(input) + in, err := convertToByteArray(input) if err != nil { return nil, err } diff --git a/internal/msgstream/msg_test.go b/internal/msgstream/msg_test.go index 7d6fe5b534..90f8cfbfb4 100644 --- a/internal/msgstream/msg_test.go +++ b/internal/msgstream/msg_test.go @@ -49,17 +49,17 @@ func TestBaseMsg(t *testing.T) { assert.Equal(t, position, baseMsg.Position()) } -func Test_ConvertToByteArray(t *testing.T) { +func Test_convertToByteArray(t *testing.T) { { bytes := []byte{1, 2, 3} - byteArray, err := ConvertToByteArray(bytes) + byteArray, err := convertToByteArray(bytes) assert.Equal(t, bytes, byteArray) assert.Nil(t, err) } { bytes := 4 - byteArray, err := ConvertToByteArray(bytes) + byteArray, err := convertToByteArray(bytes) assert.Equal(t, ([]byte)(nil), byteArray) assert.NotNil(t, err) } diff --git a/internal/msgstream/unmarshal_test.go b/internal/msgstream/unmarshal_test.go index ae9c079b51..f261acbf7b 100644 --- a/internal/msgstream/unmarshal_test.go +++ b/internal/msgstream/unmarshal_test.go @@ -70,7 +70,7 @@ func Test_ProtoUnmarshalDispatcher(t *testing.T) { headerMsg := commonpb.MsgHeader{} payload, err := v.Marshal(v) assert.Nil(t, err) - p, err := ConvertToByteArray(payload) + p, err := convertToByteArray(payload) assert.Nil(t, err) err = proto.Unmarshal(p, &headerMsg) assert.Nil(t, err)