From f6e23458aade18939aaecbb613edded249866774 Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Wed, 22 Dec 2021 10:23:00 +0800 Subject: [PATCH] Refine log of ready to reduce (#13448) Signed-off-by: dragondriver --- internal/proxy/task_scheduler.go | 50 ++++++++++-------------------- internal/util/funcutil/set.go | 11 +++++++ internal/util/funcutil/set_test.go | 18 +++++++++++ 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 010c55cac7..1f886e3d14 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -24,6 +24,8 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/milvus-io/milvus/internal/util/funcutil" "go.uber.org/zap" @@ -523,6 +525,7 @@ func (sched *taskScheduler) queryLoop() { } type resultBufHeader struct { + msgID UniqueID usedVChans map[interface{}]struct{} // set of vChan receivedVChansSet map[interface{}]struct{} // set of vChan receivedSealedSegmentIDsSet map[interface{}]struct{} // set of UniqueID @@ -540,7 +543,7 @@ type queryResultBuf struct { resultBuf []*internalpb.RetrieveResults } -func newSearchResultBuf() *searchResultBuf { +func newSearchResultBuf(msgID UniqueID) *searchResultBuf { return &searchResultBuf{ resultBufHeader: resultBufHeader{ usedVChans: make(map[interface{}]struct{}), @@ -553,7 +556,7 @@ func newSearchResultBuf() *searchResultBuf { } } -func newQueryResultBuf() *queryResultBuf { +func newQueryResultBuf(msgID UniqueID) *queryResultBuf { return &queryResultBuf{ resultBufHeader: resultBufHeader{ usedVChans: make(map[interface{}]struct{}), @@ -572,41 +575,20 @@ func (sr *resultBufHeader) readyToReduce() bool { return true } - receivedVChansSetStrMap := make(map[string]int) - - for x := range sr.receivedVChansSet { - receivedVChansSetStrMap[x.(string)] = 1 - } - - usedVChansSetStrMap := make(map[string]int) - for x := range sr.usedVChans { - usedVChansSetStrMap[x.(string)] = 1 - } - - sealedSegmentIDsStrMap := make(map[int64]int) - - for x := range sr.receivedSealedSegmentIDsSet { - sealedSegmentIDsStrMap[x.(int64)] = 1 - } - - sealedGlobalSegmentIDsStrMap := make(map[int64]int) - for x := range sr.receivedGlobalSegmentIDsSet { - sealedGlobalSegmentIDsStrMap[x.(int64)] = 1 - } + log.Debug("check if result buf is ready to reduce", + zap.String("role", typeutil.ProxyRole), + zap.Int64("MsgID", sr.msgID), + zap.Any("receivedVChansSet", funcutil.SetToSlice(sr.receivedVChansSet)), + zap.Any("usedVChans", funcutil.SetToSlice(sr.usedVChans)), + zap.Any("receivedSealedSegmentIDsSet", funcutil.SetToSlice(sr.receivedSealedSegmentIDsSet)), + zap.Any("receivedGlobalSegmentIDsSet", funcutil.SetToSlice(sr.receivedGlobalSegmentIDsSet))) ret1 := funcutil.SetContain(sr.receivedVChansSet, sr.usedVChans) - log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("receivedVChansSet", receivedVChansSetStrMap), - zap.Any("usedVChans", usedVChansSetStrMap), - zap.Any("receivedSealedSegmentIDsSet", sealedSegmentIDsStrMap), - zap.Any("receivedGlobalSegmentIDsSet", sealedGlobalSegmentIDsStrMap), - zap.Any("ret1", ret1), - ) if !ret1 { return false } - ret := funcutil.SetContain(sr.receivedSealedSegmentIDsSet, sr.receivedGlobalSegmentIDsSet) - log.Debug("Proxy searchResultBuf readyToReduce", zap.Any("ret", ret)) - return ret + + return funcutil.SetContain(sr.receivedSealedSegmentIDsSet, sr.receivedGlobalSegmentIDsSet) } func (sr *resultBufHeader) addPartialResult(vchans []vChan, searchSegIDs, globalSegIDs []UniqueID) { @@ -706,7 +688,7 @@ func (sched *taskScheduler) collectResultLoop() { resultBuf, ok := searchResultBufs[reqID] if !ok { - resultBuf = newSearchResultBuf() + resultBuf = newSearchResultBuf(reqID) vchans, err := st.getVChannels() log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans), zap.Error(err)) @@ -805,7 +787,7 @@ func (sched *taskScheduler) collectResultLoop() { resultBuf, ok := queryResultBufs[reqID] if !ok { - resultBuf = newQueryResultBuf() + resultBuf = newQueryResultBuf(reqID) vchans, err := st.getVChannels() log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans), zap.Error(err)) diff --git a/internal/util/funcutil/set.go b/internal/util/funcutil/set.go index 344194d189..39df9e211c 100644 --- a/internal/util/funcutil/set.go +++ b/internal/util/funcutil/set.go @@ -26,3 +26,14 @@ func SetContain(m1, m2 map[interface{}]struct{}) bool { return true } + +// SetToSlice transform the set to a slice. +func SetToSlice(m map[interface{}]struct{}) []interface{} { + ret := make([]interface{}, 0, len(m)) + + for k := range m { + ret = append(ret, k) + } + + return ret +} diff --git a/internal/util/funcutil/set_test.go b/internal/util/funcutil/set_test.go index c8ddea3874..4d99d846f4 100644 --- a/internal/util/funcutil/set_test.go +++ b/internal/util/funcutil/set_test.go @@ -38,3 +38,21 @@ func Test_SetContain(t *testing.T) { m1[key2] = struct{}{} assert.True(t, SetContain(m1, m2)) } + +func TestSetToSlice(t *testing.T) { + m := map[interface{}]struct{}{ + 1: {}, + 2.1: {}, + "string": {}, + } + + histogram := make(map[interface{}]struct{}) + s := SetToSlice(m) + for _, k := range s { + _, exist := m[k] + assert.True(t, exist) + _, twice := histogram[k] + assert.False(t, twice) + histogram[k] = struct{}{} + } +}