mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Modify logic of time tick
Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
parent
9a9bc4ecd5
commit
1bd7938d72
6
Makefile
6
Makefile
@ -36,9 +36,9 @@ fmt:
|
||||
lint:
|
||||
@echo "Running $@ check"
|
||||
@GO111MODULE=on ${GOPATH}/bin/golangci-lint cache clean
|
||||
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./internal/ || true
|
||||
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./cmd/ || true
|
||||
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./test/ || true
|
||||
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./internal/... || true
|
||||
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./cmd/... || true
|
||||
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=1m --config ./.golangci.yml ./test/... || true
|
||||
|
||||
ruleguard:
|
||||
@echo "Running $@ check"
|
||||
|
||||
@ -33,8 +33,8 @@ func GetMarshaler(MsgType MsgType) *TsMsgMarshaler {
|
||||
searchResultMarshler := &SearchResultMarshaler{}
|
||||
var tsMsgMarshaller TsMsgMarshaler = searchResultMarshler
|
||||
return &tsMsgMarshaller
|
||||
case KTimeSync:
|
||||
timeSyncMarshaler := &TimeSyncMarshaler{}
|
||||
case KTimeTick:
|
||||
timeSyncMarshaler := &TimeTickMarshaler{}
|
||||
var tsMsgMarshaller TsMsgMarshaler = timeSyncMarshaler
|
||||
return &tsMsgMarshaller
|
||||
default:
|
||||
@ -145,10 +145,10 @@ func (srm *SearchResultMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Stat
|
||||
|
||||
/////////////////////////////////////TimeSync///////////////////////////////////////////////
|
||||
|
||||
type TimeSyncMarshaler struct{}
|
||||
type TimeTickMarshaler struct{}
|
||||
|
||||
func (tm *TimeSyncMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
|
||||
timeSyncTask := (*input).(TimeSyncTask)
|
||||
func (tm *TimeTickMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
|
||||
timeSyncTask := (*input).(TimeTickMsg)
|
||||
timeSyncMsg := &timeSyncTask.TimeTickMsg
|
||||
mb, err := proto.Marshal(timeSyncMsg)
|
||||
if err != nil {
|
||||
@ -157,10 +157,10 @@ func (tm *TimeSyncMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) {
|
||||
return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}
|
||||
}
|
||||
|
||||
func (tm *TimeSyncMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
|
||||
func (tm *TimeTickMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) {
|
||||
timeSyncMsg := internalPb.TimeTickMsg{}
|
||||
err := proto.Unmarshal(input, &timeSyncMsg)
|
||||
timeSyncTask := TimeSyncTask{TimeTickMsg: timeSyncMsg}
|
||||
timeSyncTask := TimeTickMsg{TimeTickMsg: timeSyncMsg}
|
||||
if err != nil {
|
||||
return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR}
|
||||
}
|
||||
|
||||
@ -85,17 +85,17 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
|
||||
SearchResult: searchResult,
|
||||
}
|
||||
tsMsg = searchResultMsg
|
||||
case kTimeSync:
|
||||
case KTimeSync:
|
||||
timeSyncResult := internalPb.TimeTickMsg{
|
||||
PeerId: reqId,
|
||||
Timestamp: 1,
|
||||
}
|
||||
timeSyncMsg := TimeSyncTask{
|
||||
timeSyncMsg := TimeTickMsg{
|
||||
HashValues: []int32{hashValue},
|
||||
TimeTickMsg: timeSyncResult,
|
||||
}
|
||||
tsMsg = timeSyncMsg
|
||||
case kTimeTick:
|
||||
case KTimeTick:
|
||||
insertRequest := internalPb.InsertRequest{
|
||||
ReqType: internalPb.ReqType_kTimeTick,
|
||||
ReqId: reqId,
|
||||
@ -236,9 +236,9 @@ func TestStream_BroadCast(t *testing.T) {
|
||||
consumerSubName := "subInsert"
|
||||
|
||||
msgPack := MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kTimeTick, 0, 0))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(kTimeTick, 3, 3))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 0, 0))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 3, 3))
|
||||
|
||||
//run stream
|
||||
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kInsert, kInsert)
|
||||
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KInsert, KInsert)
|
||||
}
|
||||
|
||||
@ -184,28 +184,28 @@ func (srt SearchResultTask) HashKeys() []int32 {
|
||||
}
|
||||
|
||||
/////////////////////////////////////////TimeSync//////////////////////////////////////////
|
||||
type TimeSyncTask struct {
|
||||
type TimeTickMsg struct {
|
||||
HashValues []int32
|
||||
internalPb.TimeTickMsg
|
||||
}
|
||||
|
||||
func (tst TimeSyncTask) SetTs(ts Timestamp) {
|
||||
func (tst TimeTickMsg) SetTs(ts Timestamp) {
|
||||
tst.Timestamp = uint64(ts)
|
||||
}
|
||||
|
||||
func (tst TimeSyncTask) BeginTs() Timestamp {
|
||||
func (tst TimeTickMsg) BeginTs() Timestamp {
|
||||
return Timestamp(tst.Timestamp)
|
||||
}
|
||||
|
||||
func (tst TimeSyncTask) EndTs() Timestamp {
|
||||
func (tst TimeTickMsg) EndTs() Timestamp {
|
||||
return Timestamp(tst.Timestamp)
|
||||
}
|
||||
|
||||
func (tst TimeSyncTask) Type() MsgType {
|
||||
func (tst TimeTickMsg) Type() MsgType {
|
||||
return KTimeSync
|
||||
}
|
||||
|
||||
func (tst TimeSyncTask) HashKeys() []int32 {
|
||||
func (tst TimeTickMsg) HashKeys() []int32 {
|
||||
return tst.HashValues
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user