diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index b28d507181..c2b21bbf07 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -1169,14 +1169,15 @@ type softTimeTickBarrier struct { minTtInterval Timestamp lastTt Timestamp outTt chan Timestamp - ttStream *MsgStream + ttStream MsgStream ctx context.Context } func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp,error) func (ttBarrier *softTimeTickBarrier) Start() error +func (ttBarrier *softTimeTickBarrier) Close() -func newSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId, minTtInterval Timestamp) *softTimeTickBarrier +func NewSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier ``` @@ -1189,14 +1190,15 @@ func newSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds [] type hardTimeTickBarrier struct { peer2Tt map[UniqueId]List outTt chan Timestamp - ttStream *MsgStream + ttStream MsgStream ctx context.Context } func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp,error) func (ttBarrier *hardTimeTickBarrier) Start() error +func (ttBarrier *hardTimeTickBarrier) Close() -func newHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId) *softTimeTickBarrier +func NewHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueID) *hardTimeTickBarrier ``` @@ -1210,6 +1212,7 @@ func newHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds [] type TimeTickBarrier interface { GetTimeTick() (Timestamp,error) Start() error + Close() } type timeSyncMsgProducer struct { diff --git a/internal/master/timesync/timesync.go b/internal/master/timesync/timesync.go index ce11cd1955..dbcabf5e5f 100644 --- a/internal/master/timesync/timesync.go +++ b/internal/master/timesync/timesync.go @@ -3,213 +3,261 @@ package timesync import ( "context" "log" - "sort" - "strconv" - "sync" - "time" + "math" - "github.com/zilliztech/milvus-distributed/internal/conf" - - "github.com/apache/pulsar-client-go/pulsar" - "github.com/golang/protobuf/proto" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/errors" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" ) -const stopReadFlagId int64 = -1 - -type TimeTickReader struct { - pulsarClient pulsar.Client - - timeTickConsumer pulsar.Consumer - readerProducer []pulsar.Producer - - interval int64 - proxyIdList []UniqueID - - timeTickPeerProxy map[UniqueID]Timestamp - ctx context.Context -} - -func (r *TimeTickReader) Start() { - go r.readTimeTick() - go r.timeSync() - -} - -func (r *TimeTickReader) Close() { - if r.timeTickConsumer != nil { - r.timeTickConsumer.Close() +type ( + softTimeTickBarrier struct { + peer2LastTt map[UniqueID]Timestamp + minTtInterval Timestamp + lastTt Timestamp + outTt chan Timestamp + ttStream ms.MsgStream + ctx context.Context + closeCh chan struct{} // close goroutinue in Start() + closed bool } - for i := 0; i < len(r.readerProducer); i++ { - if r.readerProducer[i] != nil { - r.readerProducer[i].Close() - } + hardTimeTickBarrier struct { + peer2Tt map[UniqueID]Timestamp + outTt chan Timestamp + ttStream ms.MsgStream + ctx context.Context + closeCh chan struct{} // close goroutinue in Start() + closed bool } - if r.pulsarClient != nil { - r.pulsarClient.Close() - } -} +) -func (r *TimeTickReader) timeSync() { - ctx := r.ctx +func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { + isEmpty := true for { + + if ttBarrier.closed { + return 0, errors.Errorf("[GetTimeTick] closed.") + } + select { - case <-ctx.Done(): - return + case ts := <-ttBarrier.outTt: + isEmpty = false + ttBarrier.lastTt = ts + default: - time.Sleep(time.Millisecond * time.Duration(r.interval)) - var minTimeStamp Timestamp - for _, minTimeStamp = range r.timeTickPeerProxy { - break - } - for _, ts := range r.timeTickPeerProxy { - if ts < minTimeStamp { - minTimeStamp = ts - } - } - //send timestamp flag to reader channel - msg := internalpb.TimeTickMsg{ - Timestamp: minTimeStamp, - MsgType: internalpb.MsgType_kTimeTick, - } - payload, err := proto.Marshal(&msg) - if err != nil { - //TODO log error - log.Printf("Marshal InsertOrDeleteMsg flag error %v", err) - } else { - wg := sync.WaitGroup{} - wg.Add(len(r.readerProducer)) - for index := range r.readerProducer { - go r.sendEOFMsg(ctx, &pulsar.ProducerMessage{Payload: payload}, index, &wg) - } - wg.Wait() + if isEmpty { + continue } + return ttBarrier.lastTt, nil } } } -func (r *TimeTickReader) readTimeTick() { +func (ttBarrier *softTimeTickBarrier) Start() error { + ttBarrier.closeCh = make(chan struct{}) + go func() { + for { + select { + + case <-ttBarrier.closeCh: + log.Printf("[TtBarrierStart] closed\n") + return + + case <-ttBarrier.ctx.Done(): + log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) + ttBarrier.closed = true + return + + case ttmsgs := <-ttBarrier.ttStream.Chan(): + if len(ttmsgs.Msgs) > 0 { + for _, timetickmsg := range ttmsgs.Msgs { + ttmsg := (*timetickmsg).(*ms.TimeTickMsg) + oldT, ok := ttBarrier.peer2LastTt[ttmsg.PeerId] + log.Printf("[softTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerId, ttmsg.Timestamp) + + if !ok { + log.Printf("[softTimeTickBarrier] Warning: peerId %d not exist\n", ttmsg.PeerId) + continue + } + + if ttmsg.Timestamp > oldT { + ttBarrier.peer2LastTt[ttmsg.PeerId] = ttmsg.Timestamp + + // get a legal Timestamp + ts := ttBarrier.minTimestamp() + + if ttBarrier.lastTt != 0 && ttBarrier.minTtInterval > ts-ttBarrier.lastTt { + continue + } + + ttBarrier.outTt <- ts + } + } + } + + default: + } + } + }() + return nil +} + +func NewSoftTimeTickBarrier(ctx context.Context, + ttStream *ms.MsgStream, + peerIds []UniqueID, + minTtInterval Timestamp) *softTimeTickBarrier { + + if len(peerIds) <= 0 { + log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is emtpy!\n") + return nil + } + + sttbarrier := softTimeTickBarrier{} + sttbarrier.minTtInterval = minTtInterval + sttbarrier.ttStream = *ttStream + sttbarrier.outTt = make(chan Timestamp, 1024) + sttbarrier.ctx = ctx + sttbarrier.closed = false + + sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp) + for _, id := range peerIds { + sttbarrier.peer2LastTt[id] = Timestamp(0) + } + if len(peerIds) != len(sttbarrier.peer2LastTt) { + log.Printf("[NewSoftTimeTickBarrier] Warning: there are duplicate peerIds!\n") + } + + return &sttbarrier +} + +func (ttBarrier *softTimeTickBarrier) Close() { + + if ttBarrier.closeCh != nil { + ttBarrier.closeCh <- struct{}{} + } + + ttBarrier.closed = true +} + +func (ttBarrier *softTimeTickBarrier) minTimestamp() Timestamp { + tempMin := Timestamp(math.MaxUint64) + for _, tt := range ttBarrier.peer2LastTt { + if tt < tempMin { + tempMin = tt + } + } + return tempMin +} + +func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) { for { + + if ttBarrier.closed { + return 0, errors.Errorf("[GetTimeTick] closed.") + } + select { - case <-r.ctx.Done(): - return - case cm, ok := <-r.timeTickConsumer.Chan(): - if ok == false { - log.Printf("timesync consumer closed") + case ts := <-ttBarrier.outTt: + return ts, nil + default: + } + } +} + +func (ttBarrier *hardTimeTickBarrier) Start() error { + ttBarrier.closeCh = make(chan struct{}) + + go func() { + // Last timestamp synchronized + state := Timestamp(0) + for { + select { + + case <-ttBarrier.closeCh: + log.Printf("[TtBarrierStart] closed\n") + return + + case <-ttBarrier.ctx.Done(): + log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) + ttBarrier.closed = true + return + + case ttmsgs := <-ttBarrier.ttStream.Chan(): + if len(ttmsgs.Msgs) > 0 { + for _, timetickmsg := range ttmsgs.Msgs { + + // Suppose ttmsg.Timestamp from stream is always larger than the previous one, + // that `ttmsg.Timestamp > oldT` + ttmsg := (*timetickmsg).(*ms.TimeTickMsg) + log.Printf("[hardTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerId, ttmsg.Timestamp) + + oldT, ok := ttBarrier.peer2Tt[ttmsg.PeerId] + if !ok { + log.Printf("[hardTimeTickBarrier] Warning: peerId %d not exist\n", ttmsg.PeerId) + continue + } + + if oldT > state { + log.Printf("[hardTimeTickBarrier] Warning: peer(%d) timestamp(%d) ahead\n", + ttmsg.PeerId, ttmsg.Timestamp) + } + + ttBarrier.peer2Tt[ttmsg.PeerId] = ttmsg.Timestamp + + newState := ttBarrier.minTimestamp() + if newState > state { + ttBarrier.outTt <- newState + state = newState + } + } + } + default: } + } + }() + return nil +} - msg := cm.Message - var tsm internalpb.TimeTickMsg - if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil { - log.Printf("UnMarshal timetick flag error %v", err) - } - - r.timeTickPeerProxy[tsm.PeerId] = tsm.Timestamp - r.timeTickConsumer.AckID(msg.ID()) +func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp { + tempMin := Timestamp(math.MaxUint64) + for _, tt := range ttBarrier.peer2Tt { + if tt < tempMin { + tempMin = tt } } + return tempMin } -func (r *TimeTickReader) sendEOFMsg(ctx context.Context, msg *pulsar.ProducerMessage, index int, wg *sync.WaitGroup) { - if _, err := r.readerProducer[index].Send(ctx, msg); err != nil { - log.Printf("Send timesync flag error %v", err) +func NewHardTimeTickBarrier(ctx context.Context, + ttStream *ms.MsgStream, + peerIds []UniqueID) *hardTimeTickBarrier { + + if len(peerIds) <= 0 { + log.Printf("[NewSoftTimeTickBarrier] Error: peerIds is emtpy!") + return nil } - wg.Done() + + sttbarrier := hardTimeTickBarrier{} + sttbarrier.ttStream = *ttStream + sttbarrier.outTt = make(chan Timestamp, 1024) + sttbarrier.ctx = ctx + sttbarrier.closed = false + + sttbarrier.peer2Tt = make(map[UniqueID]Timestamp) + for _, id := range peerIds { + sttbarrier.peer2Tt[id] = Timestamp(0) + } + if len(peerIds) != len(sttbarrier.peer2Tt) { + log.Printf("[NewSoftTimeTickBarrier] Warning: there are duplicate peerIds!") + } + + return &sttbarrier } -func TimeTickService() { - timeTickTopic := "timeTick" - timeTickSubName := "master" - readTopics := make([]string, 0) - for i := conf.Config.Reader.TopicStart; i < conf.Config.Reader.TopicEnd; i++ { - str := "InsertOrDelete-" - str = str + strconv.Itoa(i) - readTopics = append(readTopics, str) +func (ttBarrier *hardTimeTickBarrier) Close() { + if ttBarrier.closeCh != nil { + ttBarrier.closeCh <- struct{}{} } - - proxyIdList := conf.Config.Master.ProxyIdList - timeTickReader := newTimeTickReader(context.Background(), timeTickTopic, timeTickSubName, readTopics, proxyIdList) - timeTickReader.Start() -} - -func newTimeTickReader( - ctx context.Context, - timeTickTopic string, - timeTickSubName string, - readTopics []string, - proxyIdList []UniqueID, -) *TimeTickReader { - pulsarAddr := "pulsar://" - pulsarAddr += conf.Config.Pulsar.Address - pulsarAddr += ":" - pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) - interval := int64(conf.Config.Timesync.Interval) - - //check if proxyId has duplication - if len(proxyIdList) == 0 { - log.Printf("proxy id list is empty") - } - if len(proxyIdList) > 1 { - sort.Slice(proxyIdList, func(i int, j int) bool { return proxyIdList[i] < proxyIdList[j] }) - } - for i := 1; i < len(proxyIdList); i++ { - if proxyIdList[i] == proxyIdList[i-1] { - log.Printf("there are two proxies have the same id = %d", proxyIdList[i]) - } - } - r := TimeTickReader{} - r.interval = interval - r.proxyIdList = proxyIdList - readerQueueSize := conf.Config.Reader.ReaderQueueSize - - //check if read topic is empty - if len(readTopics) == 0 { - log.Printf("read topic is empyt") - } - //set default value - if readerQueueSize == 0 { - readerQueueSize = 1024 - } - - r.timeTickPeerProxy = make(map[UniqueID]Timestamp) - r.ctx = ctx - - var client pulsar.Client - var err error - if conf.Config.Pulsar.Authentication { - client, err = pulsar.NewClient(pulsar.ClientOptions{ - URL: pulsarAddr, - Authentication: pulsar.NewAuthenticationToken(conf.Config.Pulsar.Token), - }) - } else { - client, err = pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddr}) - } - - if err != nil { - log.Printf("connect pulsar failed, %v", err) - } - r.pulsarClient = client - - timeSyncChan := make(chan pulsar.ConsumerMessage, len(r.proxyIdList)) - if r.timeTickConsumer, err = r.pulsarClient.Subscribe(pulsar.ConsumerOptions{ - Topic: timeTickTopic, - SubscriptionName: timeTickSubName, - Type: pulsar.KeyShared, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, - MessageChannel: timeSyncChan, - }); err != nil { - log.Printf("failed to subscribe topic %s, error = %v", timeTickTopic, err) - } - - r.readerProducer = make([]pulsar.Producer, 0, len(readTopics)) - for i := 0; i < len(readTopics); i++ { - rp, err := r.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readTopics[i]}) - if err != nil { - log.Printf("failed to create reader producer %s, error = %v", readTopics[i], err) - } - r.readerProducer = append(r.readerProducer, rp) - } - - return &r + ttBarrier.closed = true + return } diff --git a/internal/master/timesync/timesync_test.go b/internal/master/timesync/timesync_test.go new file mode 100644 index 0000000000..06ad10162a --- /dev/null +++ b/internal/master/timesync/timesync_test.go @@ -0,0 +1,426 @@ +package timesync + +import ( + "context" + "log" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +func getTtMsg(msgType internalPb.MsgType, peerId UniqueID, timeStamp uint64) *ms.TsMsg { + var tsMsg ms.TsMsg + baseMsg := ms.BaseMsg{ + HashValues: []int32{int32(peerId)}, + } + timeTickResult := internalPb.TimeTickMsg{ + MsgType: internalPb.MsgType_kTimeTick, + PeerId: peerId, + Timestamp: timeStamp, + } + timeTickMsg := &ms.TimeTickMsg{ + BaseMsg: baseMsg, + TimeTickMsg: timeTickResult, + } + tsMsg = timeTickMsg + return &tsMsg +} + +func initPulsarStream(pulsarAddress string, + producerChannels []string, + consumerChannels []string, + consumerSubName string) (*ms.MsgStream, *ms.MsgStream) { + + // set input stream + inputStream := ms.NewPulsarMsgStream(context.Background(), 100) + inputStream.SetPulsarCient(pulsarAddress) + inputStream.CreatePulsarProducers(producerChannels) + var input ms.MsgStream = inputStream + + // set output stream + outputStream := ms.NewPulsarMsgStream(context.Background(), 100) + outputStream.SetPulsarCient(pulsarAddress) + unmarshalDispatcher := ms.NewUnmarshalDispatcher() + outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100) + outputStream.Start() + var output ms.MsgStream = outputStream + + return &input, &output +} + +func getMsgPack(ttmsgs [][2]int) *ms.MsgPack { + msgPack := ms.MsgPack{} + for _, vi := range ttmsgs { + msgPack.Msgs = append(msgPack.Msgs, getTtMsg(internalPb.MsgType_kTimeTick, UniqueID(vi[0]), Timestamp(vi[1]))) + } + return &msgPack +} + +func getEmptyMsgPack() *ms.MsgPack { + msgPack := ms.MsgPack{} + return &msgPack +} + +func producer(channels []string, ttmsgs [][2]int) (*ms.MsgStream, *ms.MsgStream) { + pulsarAddress := "pulsar://localhost:6650" + consumerSubName := "subTimetick" + producerChannels := channels + consumerChannels := channels + + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + + msgPackAddr := getMsgPack(ttmsgs) + (*inputStream).Produce(msgPackAddr) + return inputStream, outputStream +} + +func TestTt_NewSoftTtBarrier(t *testing.T) { + channels := []string{"NewSoftTtBarrier"} + ttmsgs := [][2]int{ + {1, 10}, + {2, 20}, + {3, 30}, + {4, 40}, + {1, 30}, + {2, 30}, + } + + inStream, ttStream := producer(channels, ttmsgs) + defer func() { + (*inStream).Close() + (*ttStream).Close() + }() + + minTtInterval := Timestamp(10) + + validPeerIds := []UniqueID{1, 2, 3} + + sttbarrier := NewSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds, minTtInterval) + assert.NotNil(t, sttbarrier) + sttbarrier.Close() + + validPeerIds2 := []UniqueID{1, 1, 1} + sttbarrier = NewSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds2, minTtInterval) + assert.NotNil(t, sttbarrier) + sttbarrier.Close() + + // invalid peerIds + invalidPeerIds1 := make([]UniqueID, 0, 3) + sttbarrier = NewSoftTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds1, minTtInterval) + assert.Nil(t, sttbarrier) + + invalidPeerIds2 := []UniqueID{} + sttbarrier = NewSoftTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds2, minTtInterval) + assert.Nil(t, sttbarrier) +} + +func TestTt_NewHardTtBarrier(t *testing.T) { + channels := []string{"NewHardTtBarrier"} + ttmsgs := [][2]int{ + {1, 10}, + {2, 20}, + {3, 30}, + {4, 40}, + {1, 30}, + {2, 30}, + } + inStream, ttStream := producer(channels, ttmsgs) + defer func() { + (*inStream).Close() + (*ttStream).Close() + }() + + validPeerIds := []UniqueID{1, 2, 3} + + sttbarrier := NewHardTimeTickBarrier(context.TODO(), ttStream, validPeerIds) + assert.NotNil(t, sttbarrier) + sttbarrier.Close() + + validPeerIds2 := []UniqueID{1, 1, 1} + sttbarrier = NewHardTimeTickBarrier(context.TODO(), ttStream, validPeerIds2) + assert.NotNil(t, sttbarrier) + sttbarrier.Close() + + // invalid peerIds + invalidPeerIds1 := make([]UniqueID, 0, 3) + sttbarrier = NewHardTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds1) + assert.Nil(t, sttbarrier) + + invalidPeerIds2 := []UniqueID{} + sttbarrier = NewHardTimeTickBarrier(context.TODO(), ttStream, invalidPeerIds2) + assert.Nil(t, sttbarrier) +} + +func TestTt_SoftTtBarrierStart(t *testing.T) { + channels := []string{"SoftTtBarrierStart"} + + ttmsgs := [][2]int{ + {1, 10}, + {2, 20}, + {3, 30}, + {4, 40}, + {1, 30}, + {2, 30}, + } + inStream, ttStream := producer(channels, ttmsgs) + defer func() { + (*inStream).Close() + (*ttStream).Close() + }() + + minTtInterval := Timestamp(10) + peerIds := []UniqueID{1, 2, 3} + sttbarrier := NewSoftTimeTickBarrier(context.TODO(), ttStream, peerIds, minTtInterval) + require.NotNil(t, sttbarrier) + + sttbarrier.Start() + defer sttbarrier.Close() + + // Make sure all msgs in outputStream is consumed + time.Sleep(100 * time.Millisecond) + + ts, err := sttbarrier.GetTimeTick() + assert.Nil(t, err) + assert.Equal(t, Timestamp(30), ts) +} + +func TestTt_SoftTtBarrierGetTimeTickClose(t *testing.T) { + channels := []string{"SoftTtBarrierGetTimeTickClose"} + ttmsgs := [][2]int{ + {1, 10}, + {2, 20}, + {3, 30}, + {4, 40}, + {1, 30}, + {2, 30}, + } + inStream, ttStream := producer(channels, ttmsgs) + defer func() { + (*inStream).Close() + (*ttStream).Close() + }() + + minTtInterval := Timestamp(10) + validPeerIds := []UniqueID{1, 2, 3} + + sttbarrier := NewSoftTimeTickBarrier(context.TODO(), ttStream, validPeerIds, minTtInterval) + require.NotNil(t, sttbarrier) + + sttbarrier.Start() + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + sttbarrier.Close() + }() + wg.Wait() + + ts, err := sttbarrier.GetTimeTick() + assert.NotNil(t, err) + assert.Equal(t, Timestamp(0), ts) + + // Receive empty msgPacks + channels01 := []string{"GetTimeTick01"} + ttmsgs01 := [][2]int{} + inStream01, ttStream01 := producer(channels01, ttmsgs01) + defer func() { + (*inStream01).Close() + (*ttStream01).Close() + }() + + minTtInterval = Timestamp(10) + validPeerIds = []UniqueID{1, 2, 3} + + sttbarrier01 := NewSoftTimeTickBarrier(context.TODO(), ttStream01, validPeerIds, minTtInterval) + require.NotNil(t, sttbarrier01) + sttbarrier01.Start() + + var wg1 sync.WaitGroup + wg1.Add(1) + + go func() { + defer wg1.Done() + sttbarrier01.Close() + }() + + ts, err = sttbarrier01.GetTimeTick() + assert.NotNil(t, err) + assert.Equal(t, Timestamp(0), ts) +} + +func TestTt_SoftTtBarrierGetTimeTickCancel(t *testing.T) { + channels := []string{"SoftTtBarrierGetTimeTickCancel"} + ttmsgs := [][2]int{ + {1, 10}, + {2, 20}, + {3, 30}, + {4, 40}, + {1, 30}, + {2, 30}, + } + inStream, ttStream := producer(channels, ttmsgs) + defer func() { + (*inStream).Close() + (*ttStream).Close() + }() + + minTtInterval := Timestamp(10) + validPeerIds := []UniqueID{1, 2, 3} + + ctx, cancel := context.WithCancel(context.Background()) + sttbarrier := NewSoftTimeTickBarrier(ctx, ttStream, validPeerIds, minTtInterval) + require.NotNil(t, sttbarrier) + + sttbarrier.Start() + + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + time.Sleep(10 * time.Millisecond) + sttbarrier.Close() + }() + + time.Sleep(20 * time.Millisecond) + + ts, err := sttbarrier.GetTimeTick() + assert.NotNil(t, err) + assert.Equal(t, Timestamp(0), ts) + log.Println(err) +} + +func TestTt_HardTtBarrierStart(t *testing.T) { + channels := []string{"HardTtBarrierStart"} + + ttmsgs := [][2]int{ + {1, 10}, + {2, 10}, + {3, 10}, + } + + inStream, ttStream := producer(channels, ttmsgs) + defer func() { + (*inStream).Close() + (*ttStream).Close() + }() + + peerIds := []UniqueID{1, 2, 3} + sttbarrier := NewHardTimeTickBarrier(context.TODO(), ttStream, peerIds) + require.NotNil(t, sttbarrier) + + sttbarrier.Start() + defer sttbarrier.Close() + + // Make sure all msgs in outputStream is consumed + time.Sleep(100 * time.Millisecond) + + ts, err := sttbarrier.GetTimeTick() + assert.Nil(t, err) + assert.Equal(t, Timestamp(10), ts) +} + +func TestTt_HardTtBarrierGetTimeTick(t *testing.T) { + + channels := []string{"HardTtBarrierGetTimeTick"} + + ttmsgs := [][2]int{ + {1, 10}, + {1, 20}, + {1, 30}, + {2, 10}, + {2, 20}, + {3, 10}, + {3, 20}, + } + + inStream, ttStream := producer(channels, ttmsgs) + defer func() { + (*inStream).Close() + (*ttStream).Close() + }() + + peerIds := []UniqueID{1, 2, 3} + sttbarrier := NewHardTimeTickBarrier(context.TODO(), ttStream, peerIds) + require.NotNil(t, sttbarrier) + + sttbarrier.Start() + defer sttbarrier.Close() + + // Make sure all msgs in outputStream is consumed + time.Sleep(100 * time.Millisecond) + + ts, err := sttbarrier.GetTimeTick() + assert.Nil(t, err) + assert.Equal(t, Timestamp(10), ts) + + ts, err = sttbarrier.GetTimeTick() + assert.Nil(t, err) + assert.Equal(t, Timestamp(20), ts) + + // ---------------------stuck-------------------------- + channelsStuck := []string{"HardTtBarrierGetTimeTickStuck"} + + ttmsgsStuck := [][2]int{ + {1, 10}, + {2, 10}, + } + + inStreamStuck, ttStreamStuck := producer(channelsStuck, ttmsgsStuck) + defer func() { + (*inStreamStuck).Close() + (*ttStreamStuck).Close() + }() + + peerIdsStuck := []UniqueID{1, 2, 3} + sttbarrierStuck := NewHardTimeTickBarrier(context.TODO(), ttStreamStuck, peerIdsStuck) + require.NotNil(t, sttbarrierStuck) + + sttbarrierStuck.Start() + go func() { + time.Sleep(1 * time.Second) + sttbarrierStuck.Close() + }() + + time.Sleep(100 * time.Millisecond) + + // This will stuck + ts, err = sttbarrierStuck.GetTimeTick() + + // ---------------------context cancel------------------------ + channelsCancel := []string{"HardTtBarrierGetTimeTickCancel"} + + ttmsgsCancel := [][2]int{ + {1, 10}, + {2, 10}, + } + + inStreamCancel, ttStreamCancel := producer(channelsCancel, ttmsgsCancel) + defer func() { + (*inStreamCancel).Close() + (*ttStreamCancel).Close() + }() + + peerIdsCancel := []UniqueID{1, 2, 3} + + ctx, cancel := context.WithCancel(context.Background()) + sttbarrierCancel := NewHardTimeTickBarrier(ctx, ttStreamCancel, peerIdsCancel) + require.NotNil(t, sttbarrierCancel) + + sttbarrierCancel.Start() + go func() { + time.Sleep(1 * time.Second) + cancel() + }() + + time.Sleep(100 * time.Millisecond) + + // This will stuck + ts, err = sttbarrierCancel.GetTimeTick() + +} diff --git a/internal/master/timesync/timetick.go b/internal/master/timesync/timetick.go index 68ec760ca5..715f78f4e9 100644 --- a/internal/master/timesync/timetick.go +++ b/internal/master/timesync/timetick.go @@ -2,10 +2,13 @@ package timesync import "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -type UniqueID = typeutil.UniqueID -type Timestamp = typeutil.Timestamp +type ( + UniqueID = typeutil.UniqueID + Timestamp = typeutil.Timestamp +) type TimeTickBarrier interface { - GetTimeTick() (Timestamp,error) + GetTimeTick() (Timestamp, error) Start() error + Close() } diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index c964a709b3..376e6ee55d 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -42,18 +42,18 @@ type PulsarMsgStream struct { repackFunc RepackFunc unmarshal *UnmarshalDispatcher receiveBuf chan *MsgPack - receiveBufSize int64 wait *sync.WaitGroup streamCancel func() } func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64) *PulsarMsgStream { streamCtx, streamCancel := context.WithCancel(ctx) - return &PulsarMsgStream{ + stream := &PulsarMsgStream{ ctx: streamCtx, streamCancel: streamCancel, - receiveBufSize: receiveBufSize, } + stream.receiveBuf = make(chan *MsgPack, receiveBufSize) + return stream } func (ms *PulsarMsgStream) SetPulsarCient(address string) { @@ -215,7 +215,6 @@ func (ms *PulsarMsgStream) Consume() *MsgPack { func (ms *PulsarMsgStream) bufMsgPackToChannel() { defer ms.wait.Done() - ms.receiveBuf = make(chan *MsgPack, ms.receiveBufSize) for { select { case <-ms.ctx.Done(): @@ -271,8 +270,8 @@ func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64) *PulsarTtMs pulsarMsgStream := PulsarMsgStream{ ctx: streamCtx, streamCancel: streamCancel, - receiveBufSize: receiveBufSize, } + pulsarMsgStream.receiveBuf = make(chan *MsgPack, receiveBufSize) return &PulsarTtMsgStream{ PulsarMsgStream: pulsarMsgStream, } @@ -288,7 +287,6 @@ func (ms *PulsarTtMsgStream) Start() { func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { defer ms.wait.Done() - ms.receiveBuf = make(chan *MsgPack, ms.receiveBufSize) ms.unsolvedBuf = make([]*TsMsg, 0) ms.inputBuf = make([]*TsMsg, 0) for { diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index 08d818d0db..877136a72b 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -35,18 +35,21 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb. defer it.cancel() - p.taskSch.DmQueue.Enqueue(it) - select { - case <-ctx.Done(): - log.Print("insert timeout!") - return &servicepb.IntegerRangeResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "insert timeout!", - }, - }, errors.New("insert timeout!") - case result := <-it.resultChan: - return result, nil + var t task = it + p.taskSch.DmQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("insert timeout!") + return &servicepb.IntegerRangeResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "insert timeout!", + }, + }, errors.New("insert timeout!") + case result := <-it.resultChan: + return result, nil + } } } @@ -66,16 +69,19 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc cct.ctx, cct.cancel = context.WithCancel(ctx) defer cct.cancel() - p.taskSch.DdQueue.Enqueue(cct) - select { - case <-ctx.Done(): - log.Print("create collection timeout!") - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "create collection timeout!", - }, errors.New("create collection timeout!") - case result := <-cct.resultChan: - return result, nil + var t task = cct + p.taskSch.DdQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("create collection timeout!") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "create collection timeout!", + }, errors.New("create collection timeout!") + case result := <-cct.resultChan: + return result, nil + } } } @@ -96,18 +102,21 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu qt.SearchRequest.Query.Value = queryBytes defer qt.cancel() - p.taskSch.DqQueue.Enqueue(qt) - select { - case <-ctx.Done(): - log.Print("query timeout!") - return &servicepb.QueryResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "query timeout!", - }, - }, errors.New("query timeout!") - case result := <-qt.resultChan: - return result, nil + var t task = qt + p.taskSch.DqQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("query timeout!") + return &servicepb.QueryResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "query timeout!", + }, + }, errors.New("query timeout!") + case result := <-qt.resultChan: + return result, nil + } } } @@ -125,16 +134,19 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam dct.ctx, dct.cancel = context.WithCancel(ctx) defer dct.cancel() - p.taskSch.DdQueue.Enqueue(dct) - select { - case <-ctx.Done(): - log.Print("create collection timeout!") - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "create collection timeout!", - }, errors.New("create collection timeout!") - case result := <-dct.resultChan: - return result, nil + var t task = dct + p.taskSch.DdQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("create collection timeout!") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "create collection timeout!", + }, errors.New("create collection timeout!") + case result := <-dct.resultChan: + return result, nil + } } } @@ -152,19 +164,22 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName hct.ctx, hct.cancel = context.WithCancel(ctx) defer hct.cancel() - p.taskSch.DqQueue.Enqueue(hct) - select { - case <-ctx.Done(): - log.Print("has collection timeout!") - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "has collection timeout!", - }, - Value: false, - }, errors.New("has collection timeout!") - case result := <-hct.resultChan: - return result, nil + var t task = hct + p.taskSch.DqQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("has collection timeout!") + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "has collection timeout!", + }, + Value: false, + }, errors.New("has collection timeout!") + case result := <-hct.resultChan: + return result, nil + } } } @@ -182,18 +197,21 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio dct.ctx, dct.cancel = context.WithCancel(ctx) defer dct.cancel() - p.taskSch.DqQueue.Enqueue(dct) - select { - case <-ctx.Done(): - log.Print("has collection timeout!") - return &servicepb.CollectionDescription{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "describe collection timeout!", - }, - }, errors.New("describe collection timeout!") - case result := <-dct.resultChan: - return result, nil + var t task = dct + p.taskSch.DqQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("has collection timeout!") + return &servicepb.CollectionDescription{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "describe collection timeout!", + }, + }, errors.New("describe collection timeout!") + case result := <-dct.resultChan: + return result, nil + } } } @@ -210,18 +228,21 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv sct.ctx, sct.cancel = context.WithCancel(ctx) defer sct.cancel() - p.taskSch.DqQueue.Enqueue(sct) - select { - case <-ctx.Done(): - log.Print("show collections timeout!") - return &servicepb.StringListResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "show collections timeout!", - }, - }, errors.New("show collections timeout!") - case result := <-sct.resultChan: - return result, nil + var t task = sct + p.taskSch.DqQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("show collections timeout!") + return &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "show collections timeout!", + }, + }, errors.New("show collections timeout!") + case result := <-sct.resultChan: + return result, nil + } } } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 9acc93a330..7ede1b8d2b 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "google.golang.org/grpc" "log" "math/rand" "net" @@ -15,6 +14,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "google.golang.org/grpc" ) type UniqueID = typeutil.UniqueID @@ -157,7 +157,7 @@ func (p *Proxy) queryResultLoop() { if len(queryResultBuf[reqId]) == 4 { // TODO: use the number of query node instead t := p.taskSch.getTaskByReqId(reqId) - qt := t.(*QueryTask) + qt := (*t).(*QueryTask) qt.resultBuf <- queryResultBuf[reqId] delete(queryResultBuf, reqId) } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index dcf1335adb..361733468c 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -11,7 +11,7 @@ import ( type BaseTaskQueue struct { unissuedTasks *list.List - activeTasks map[Timestamp]task + activeTasks map[Timestamp]*task utLock sync.Mutex atLock sync.Mutex } @@ -24,23 +24,23 @@ func (queue *BaseTaskQueue) Empty() bool { return queue.unissuedTasks.Len() <= 0 && len(queue.activeTasks) <= 0 } -func (queue *BaseTaskQueue) AddUnissuedTask(t task) { +func (queue *BaseTaskQueue) AddUnissuedTask(t *task) { queue.utLock.Lock() defer queue.utLock.Unlock() queue.unissuedTasks.PushBack(t) } -func (queue *BaseTaskQueue) FrontUnissuedTask() task { +func (queue *BaseTaskQueue) FrontUnissuedTask() *task { queue.utLock.Lock() defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { log.Fatal("sorry, but the unissued task list is empty!") return nil } - return queue.unissuedTasks.Front().Value.(task) + return queue.unissuedTasks.Front().Value.(*task) } -func (queue *BaseTaskQueue) PopUnissuedTask() task { +func (queue *BaseTaskQueue) PopUnissuedTask() *task { queue.utLock.Lock() defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { @@ -48,13 +48,13 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task { return nil } ft := queue.unissuedTasks.Front() - return queue.unissuedTasks.Remove(ft).(task) + return queue.unissuedTasks.Remove(ft).(*task) } -func (queue *BaseTaskQueue) AddActiveTask(t task) { +func (queue *BaseTaskQueue) AddActiveTask(t *task) { queue.atLock.Lock() defer queue.atLock.Lock() - ts := t.EndTs() + ts := (*t).EndTs() _, ok := queue.activeTasks[ts] if ok { log.Fatalf("task with timestamp %v already in active task list!", ts) @@ -62,7 +62,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) { queue.activeTasks[ts] = t } -func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task { +func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) *task { queue.atLock.Lock() defer queue.atLock.Lock() t, ok := queue.activeTasks[ts] @@ -74,19 +74,19 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task { return nil } -func (queue *BaseTaskQueue) getTaskByReqId(reqId UniqueID) task { +func (queue *BaseTaskQueue) getTaskByReqId(reqId UniqueID) *task { queue.utLock.Lock() defer queue.utLock.Lock() for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { - if e.Value.(task).Id() == reqId { - return e.Value.(task) + if (*(e.Value.(*task))).Id() == reqId { + return e.Value.(*task) } } queue.atLock.Lock() defer queue.atLock.Unlock() for ats := range queue.activeTasks { - if queue.activeTasks[ats].Id() == reqId { + if (*(queue.activeTasks[ats])).Id() == reqId { return queue.activeTasks[ats] } } @@ -98,7 +98,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { queue.utLock.Lock() defer queue.utLock.Unlock() for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { - if e.Value.(task).EndTs() >= ts { + if (*(e.Value.(*task))).EndTs() >= ts { return false } } @@ -114,20 +114,20 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { return true } -type DdTaskQueue struct { +type ddTaskQueue struct { BaseTaskQueue lock sync.Mutex } -type DmTaskQueue struct { +type dmTaskQueue struct { BaseTaskQueue } -type DqTaskQueue struct { +type dqTaskQueue struct { BaseTaskQueue } -func (queue *DdTaskQueue) Enqueue(t task) error { +func (queue *ddTaskQueue) Enqueue(t *task) error { queue.lock.Lock() defer queue.lock.Unlock() // TODO: set Ts, ReqId, ProxyId @@ -135,49 +135,22 @@ func (queue *DdTaskQueue) Enqueue(t task) error { return nil } -func (queue *DmTaskQueue) Enqueue(t task) error { +func (queue *dmTaskQueue) Enqueue(t *task) error { // TODO: set Ts, ReqId, ProxyId queue.AddUnissuedTask(t) return nil } -func (queue *DqTaskQueue) Enqueue(t task) error { +func (queue *dqTaskQueue) Enqueue(t *task) error { // TODO: set Ts, ReqId, ProxyId queue.AddUnissuedTask(t) return nil } -func NewDdTaskQueue() *DdTaskQueue { - return &DdTaskQueue{ - BaseTaskQueue: BaseTaskQueue{ - unissuedTasks: list.New(), - activeTasks: make(map[Timestamp]task), - }, - } -} - -func NewDmTaskQueue() *DmTaskQueue { - return &DmTaskQueue{ - BaseTaskQueue: BaseTaskQueue{ - unissuedTasks: list.New(), - activeTasks: make(map[Timestamp]task), - }, - } -} - -func NewDqTaskQueue() *DqTaskQueue { - return &DqTaskQueue{ - BaseTaskQueue: BaseTaskQueue{ - unissuedTasks: list.New(), - activeTasks: make(map[Timestamp]task), - }, - } -} - type TaskScheduler struct { - DdQueue *DdTaskQueue - DmQueue *DmTaskQueue - DqQueue *DqTaskQueue + DdQueue *ddTaskQueue + DmQueue *dmTaskQueue + DqQueue *dqTaskQueue idAllocator *allocator.IdAllocator tsoAllocator *allocator.TimestampAllocator @@ -192,9 +165,6 @@ func NewTaskScheduler(ctx context.Context, tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ - DdQueue: NewDdTaskQueue(), - DmQueue: NewDmTaskQueue(), - DqQueue: NewDqTaskQueue(), idAllocator: idAllocator, tsoAllocator: tsoAllocator, ctx: ctx1, @@ -204,19 +174,19 @@ func NewTaskScheduler(ctx context.Context, return s, nil } -func (sched *TaskScheduler) scheduleDdTask() task { +func (sched *TaskScheduler) scheduleDdTask() *task { return sched.DdQueue.PopUnissuedTask() } -func (sched *TaskScheduler) scheduleDmTask() task { +func (sched *TaskScheduler) scheduleDmTask() *task { return sched.DmQueue.PopUnissuedTask() } -func (sched *TaskScheduler) scheduleDqTask() task { +func (sched *TaskScheduler) scheduleDqTask() *task { return sched.DqQueue.PopUnissuedTask() } -func (sched *TaskScheduler) getTaskByReqId(reqId UniqueID) task { +func (sched *TaskScheduler) getTaskByReqId(reqId UniqueID) *task { if t := sched.DdQueue.getTaskByReqId(reqId); t != nil { return t } @@ -241,22 +211,22 @@ func (sched *TaskScheduler) definitionLoop() { //sched.DdQueue.atLock.Lock() t := sched.scheduleDdTask() - err := t.PreExecute() + err := (*t).PreExecute() if err != nil { return } - err = t.Execute() + err = (*t).Execute() if err != nil { log.Printf("execute definition task failed, error = %v", err) } - t.Notify(err) + (*t).Notify(err) sched.DdQueue.AddActiveTask(t) - t.WaitToFinish() - t.PostExecute() + (*t).WaitToFinish() + (*t).PostExecute() - sched.DdQueue.PopActiveTask(t.EndTs()) + sched.DdQueue.PopActiveTask((*t).EndTs()) } } @@ -272,27 +242,27 @@ func (sched *TaskScheduler) manipulationLoop() { sched.DmQueue.atLock.Lock() t := sched.scheduleDmTask() - if err := t.PreExecute(); err != nil { + if err := (*t).PreExecute(); err != nil { return } go func() { - err := t.Execute() + err := (*t).Execute() if err != nil { log.Printf("execute manipulation task failed, error = %v", err) } - t.Notify(err) + (*t).Notify(err) }() sched.DmQueue.AddActiveTask(t) sched.DmQueue.atLock.Unlock() go func() { - t.WaitToFinish() - t.PostExecute() + (*t).WaitToFinish() + (*t).PostExecute() // remove from active list - sched.DmQueue.PopActiveTask(t.EndTs()) + sched.DmQueue.PopActiveTask((*t).EndTs()) }() } } @@ -309,27 +279,27 @@ func (sched *TaskScheduler) queryLoop() { sched.DqQueue.atLock.Lock() t := sched.scheduleDqTask() - if err := t.PreExecute(); err != nil { + if err := (*t).PreExecute(); err != nil { return } go func() { - err := t.Execute() + err := (*t).Execute() if err != nil { log.Printf("execute query task failed, error = %v", err) } - t.Notify(err) + (*t).Notify(err) }() sched.DqQueue.AddActiveTask(t) sched.DqQueue.atLock.Unlock() go func() { - t.WaitToFinish() - t.PostExecute() + (*t).WaitToFinish() + (*t).PostExecute() // remove from active list - sched.DqQueue.PopActiveTask(t.EndTs()) + sched.DqQueue.PopActiveTask((*t).EndTs()) }() } } diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index 3778c25053..6269940b28 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -51,6 +51,7 @@ func newTimeTick(ctx context.Context, tsoAllocator *allocator.TimestampAllocator return t } + func (tt *timeTick) tick() error { if tt.lastTick == tt.currentTick { diff --git a/internal/proxy/timetick_test.go b/internal/proxy/timetick_test.go index e159188c70..edaa4bd5b1 100644 --- a/internal/proxy/timetick_test.go +++ b/internal/proxy/timetick_test.go @@ -33,7 +33,7 @@ func TestTimeTick(t *testing.T) { tt := timeTick{ interval: 200, pulsarProducer: producer, - peerID: 1, + peerID: 1, ctx: ctx, areRequestsDelivered: func(ts Timestamp) bool { return true }, } diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000000..5407d6bb3d --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,67 @@ +# Compile and install milvus-dustributed + +## Environment + +``` + OS: Ubuntu 18.04 + go:1.15 + cmake: >=3.16 + gcc: 7.5 +``` + +### Install dependencies + +```shell script + sudo apt install -y g++ gcc make libssl-dev zlib1g-dev libboost-regex-dev \ + libboost-program-options-dev libboost-system-dev libboost-filesystem-dev \ + libboost-serialization-dev python3-dev libboost-python-dev libcurl4-openssl-dev gfortran libtbb-dev + + export GO111MODULE=on + go get github.com/golang/protobuf/protoc-gen-go@v1.3.2 +``` + +#### Install OpenBlas library + +```shell script + wget https://github.com/xianyi/OpenBLAS/archive/v0.3.9.tar.gz && \ + tar zxvf v0.3.9.tar.gz && cd OpenBLAS-0.3.9 && \ + make TARGET=CORE2 DYNAMIC_ARCH=1 DYNAMIC_OLDER=1 USE_THREAD=0 USE_OPENMP=0 FC=gfortran CC=gcc COMMON_OPT="-O3 -g -fPIC" FCOMMON_OPT="-O3 -g -fPIC -frecursive" NMAX="NUM_THREADS=128" LIBPREFIX="libopenblas" LAPACKE="NO_LAPACKE=1" INTERFACE64=0 NO_STATIC=1 && \ + make PREFIX=/usr install +``` + +### Compile + +#### Generate the go files from proto file + +```shell script + cd milvus-distributed + pwd_dir=`pwd` + export PATH=$PATH:$(go env GOPATH)/bin + export protoc=${pwd_dir}/internal/core/cmake_build/thirdparty/protobuf/protobuf-build/protoc + ./ci/scripts/proto_gen_go.sh +``` + +#### Check code specifications + +```shell script + make verifiers +``` + +#### Compile + +```shell script + make all +``` + +#### Start service + +```shell script + cd deployments + docker-compose up -d +``` + +#### Run unittest + +```shell script + make unittest +```