diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f15a5db9f5..c76cbd8ce7 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -11,7 +11,7 @@ nodeID: # will be deprecated after v0.2 - proxyIDList: [1] + proxyIDList: [0] queryNodeIDList: [2] writeNodeIDList: [3] diff --git a/internal/master/param_table_test.go b/internal/master/param_table_test.go index c35213e626..339ecfb6d4 100644 --- a/internal/master/param_table_test.go +++ b/internal/master/param_table_test.go @@ -86,14 +86,14 @@ func TestParamTable_ProxyIDList(t *testing.T) { Params.Init() ids := Params.ProxyIDList assert.Equal(t, len(ids), 1) - assert.Equal(t, ids[0], int64(1)) + assert.Equal(t, ids[0], int64(0)) } func TestParamTable_ProxyTimeTickChannelNames(t *testing.T) { Params.Init() names := Params.ProxyTimeTickChannelNames assert.Equal(t, len(names), 1) - assert.Equal(t, names[0], "proxyTimeTick-1") + assert.Equal(t, names[0], "proxyTimeTick-0") } func TestParamTable_MsgChannelSubName(t *testing.T) { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 4f89642eaf..82cc0660a4 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -94,6 +94,8 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { return nil, err } + p.tick = newTimeTick(p.proxyLoopCtx, p.tsoAllocator, time.Millisecond*200, p.sched.TaskDoneTest) + return p, nil } @@ -114,6 +116,7 @@ func (p *Proxy) startProxy() error { p.idAllocator.Start() p.tsoAllocator.Start() p.segAssigner.Start() + p.tick.Start() // Start callbacks for _, cb := range p.startCallbacks { @@ -184,6 +187,8 @@ func (p *Proxy) stopProxyLoop() { p.queryMsgStream.Close() + p.tick.Close() + p.proxyLoopWg.Wait() } diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index c44286934a..721c3a276c 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -70,6 +70,9 @@ func (tt *timeTick) tick() error { } msgPack := msgstream.MsgPack{} timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []int32{int32(Params.ProxyID())}, + }, TimeTickMsg: internalpb.TimeTickMsg{ MsgType: internalpb.MsgType_kTimeTick, PeerID: tt.peerID, @@ -77,7 +80,12 @@ func (tt *timeTick) tick() error { }, } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - tt.tickMsgStream.Produce(&msgPack) + err := tt.tickMsgStream.Produce(&msgPack) + if err != nil { + log.Printf("proxy send time tick error: %v", err) + } else { + log.Printf("proxy send time tick message") + } tt.lastTick = tt.currentTick return nil }