From a988e7cabc6f4b1a580bbbaca2513fe90d5bc8b4 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 22 May 2024 20:53:40 +0800 Subject: [PATCH] enhance: Reduce bloom filter lock contention between insert and delete in query coord (#32643) (#33284) issue: #32530 pr: #32643 cause ProcessDelete need to check whether pk exist in bloom filter, and ProcessInsert need to update pk to bloom filter, when execute ProcessInsert and ProcessDelete in parallel, it will cause race condition in segment's bloom filter This PR execute ProcessInsert and ProcessDelete in serial to avoid block each other Signed-off-by: Wei Liu --- internal/querynodev2/server.go | 2 +- internal/util/pipeline/node.go | 59 +------------------ internal/util/pipeline/pipeline.go | 37 +++++++++--- internal/util/pipeline/pipeline_test.go | 32 +++++++--- internal/util/pipeline/stream_pipeline.go | 9 ++- .../util/pipeline/stream_pipeline_test.go | 4 +- 6 files changed, 67 insertions(+), 76 deletions(-) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 1f26770f46..57f059f533 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -456,7 +456,7 @@ func (node *QueryNode) Stop() error { case <-time.After(time.Second): metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(len(sealedSegments))) metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(channelNum)) - log.Info("migrate data...", zap.Int64("ServerID", paramtable.GetNodeID()), + log.Info("migrate data...", zap.Int64("ServerID", node.GetNodeID()), zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 { return s.ID() })), diff --git a/internal/util/pipeline/node.go b/internal/util/pipeline/node.go index ad42e6318f..fe16397dce 100644 --- a/internal/util/pipeline/node.go +++ b/internal/util/pipeline/node.go @@ -17,12 +17,6 @@ package pipeline import ( - "fmt" - "sync" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -35,63 +29,16 @@ type Node interface { } type nodeCtx struct { - node Node - + node Node inputChannel chan Msg - - next *nodeCtx - checker *timerecord.GroupChecker - - closeCh chan struct{} // notify work to exit - closeWg sync.WaitGroup -} - -func (c *nodeCtx) Start() { - c.closeWg.Add(1) - c.node.Start() - go c.work() -} - -func (c *nodeCtx) Close() { - close(c.closeCh) - c.closeWg.Wait() -} - -func (c *nodeCtx) work() { - defer c.closeWg.Done() - name := fmt.Sprintf("nodeCtxTtChecker-%s", c.node.Name()) - if c.checker != nil { - c.checker.Check(name) - defer c.checker.Remove(name) - } - - for { - select { - // close - case <-c.closeCh: - c.node.Close() - close(c.inputChannel) - log.Debug("pipeline node closed", zap.String("nodeName", c.node.Name())) - return - case input := <-c.inputChannel: - var output Msg - output = c.node.Operate(input) - if c.checker != nil { - c.checker.Check(name) - } - if c.next != nil && output != nil { - c.next.inputChannel <- output - } - } - } + next *nodeCtx + checker *timerecord.GroupChecker } func newNodeCtx(node Node) *nodeCtx { return &nodeCtx{ node: node, inputChannel: make(chan Msg, node.MaxQueueLength()), - closeCh: make(chan struct{}), - closeWg: sync.WaitGroup{}, } } diff --git a/internal/util/pipeline/pipeline.go b/internal/util/pipeline/pipeline.go index 047bf65f48..61212f4581 100644 --- a/internal/util/pipeline/pipeline.go +++ b/internal/util/pipeline/pipeline.go @@ -17,6 +17,7 @@ package pipeline import ( + "fmt" "time" "go.uber.org/zap" @@ -36,6 +37,8 @@ type pipeline struct { inputChannel chan Msg nodeTtInterval time.Duration enableTtChecker bool + + checkerNames map[string]string } func (p *pipeline) Add(nodes ...Node) { @@ -50,6 +53,10 @@ func (p *pipeline) addNode(node Node) { nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) { log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval)) }) + if p.checkerNames == nil { + p.checkerNames = make(map[string]string) + } + p.checkerNames[nodeCtx.node.Name()] = fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name()) } if len(p.nodes) != 0 { @@ -62,17 +69,31 @@ func (p *pipeline) addNode(node Node) { } func (p *pipeline) Start() error { - if len(p.nodes) == 0 { - return ErrEmptyPipeline - } - for _, node := range p.nodes { - node.Start() - } return nil } func (p *pipeline) Close() { - for _, node := range p.nodes { - node.Close() +} + +func (p *pipeline) process() { + if len(p.nodes) == 0 { + return + } + + curNode := p.nodes[0] + for curNode != nil { + if len(curNode.inputChannel) == 0 { + break + } + + input := <-curNode.inputChannel + output := curNode.node.Operate(input) + if _, ok := p.checkerNames[curNode.node.Name()]; ok { + curNode.checker.Check(p.checkerNames[curNode.node.Name()]) + } + if curNode.next != nil && output != nil { + curNode.next.inputChannel <- output + } + curNode = curNode.next } } diff --git a/internal/util/pipeline/pipeline_test.go b/internal/util/pipeline/pipeline_test.go index 8ddeb9c355..909893d458 100644 --- a/internal/util/pipeline/pipeline_test.go +++ b/internal/util/pipeline/pipeline_test.go @@ -31,8 +31,9 @@ type testNode struct { func (t *testNode) Operate(in Msg) Msg { msg := in.(*msgstream.MsgPack) - msg.BeginTs++ - t.outChannel <- msg.BeginTs + if t.outChannel != nil { + t.outChannel <- msg.BeginTs + } return msg } @@ -43,7 +44,7 @@ type PipelineSuite struct { } func (suite *PipelineSuite) SetupTest() { - suite.outChannel = make(chan msgstream.Timestamp) + suite.outChannel = make(chan msgstream.Timestamp, 1) suite.pipeline = &pipeline{ nodes: []*nodeCtx{}, nodeTtInterval: 0, @@ -52,7 +53,21 @@ func (suite *PipelineSuite) SetupTest() { suite.pipeline.addNode(&testNode{ BaseNode: &BaseNode{ - name: "test-node", + name: "test-node1", + maxQueueLength: 8, + }, + }) + + suite.pipeline.addNode(&testNode{ + BaseNode: &BaseNode{ + name: "test-node2", + maxQueueLength: 8, + }, + }) + + suite.pipeline.addNode(&testNode{ + BaseNode: &BaseNode{ + name: "test-node3", maxQueueLength: 8, }, outChannel: suite.outChannel, @@ -62,10 +77,13 @@ func (suite *PipelineSuite) SetupTest() { func (suite *PipelineSuite) TestBasic() { suite.pipeline.Start() defer suite.pipeline.Close() - suite.pipeline.inputChannel <- &msgstream.MsgPack{} - output := <-suite.outChannel - suite.Equal(msgstream.Timestamp(1), output) + for i := 0; i < 100; i++ { + suite.pipeline.inputChannel <- &msgstream.MsgPack{BeginTs: msgstream.Timestamp(i)} + suite.pipeline.process() + output := <-suite.outChannel + suite.Equal(i, int(output)) + } } func TestPipeline(t *testing.T) { diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index 6cb6b6900e..3c22c6e99f 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -37,7 +37,7 @@ type StreamPipeline interface { } type streamPipeline struct { - *pipeline + pipeline *pipeline input <-chan *msgstream.MsgPack dispatcher msgdispatcher.Client startOnce sync.Once @@ -57,7 +57,8 @@ func (p *streamPipeline) work() { return case msg := <-p.input: log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs))) - p.nodes[0].inputChannel <- msg + p.pipeline.inputChannel <- msg + p.pipeline.process() } } } @@ -86,6 +87,10 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { return nil } +func (p *streamPipeline) Add(nodes ...Node) { + p.pipeline.Add(nodes...) +} + func (p *streamPipeline) Start() error { var err error p.startOnce.Do(func() { diff --git a/internal/util/pipeline/stream_pipeline_test.go b/internal/util/pipeline/stream_pipeline_test.go index 7bf28a5a0c..a731a18ff3 100644 --- a/internal/util/pipeline/stream_pipeline_test.go +++ b/internal/util/pipeline/stream_pipeline_test.go @@ -68,11 +68,11 @@ func (suite *StreamPipelineSuite) TestBasic() { suite.pipeline.Start() defer suite.pipeline.Close() - suite.inChannel <- &msgstream.MsgPack{} + suite.inChannel <- &msgstream.MsgPack{BeginTs: 1001} for i := 1; i <= suite.length; i++ { output := <-suite.outChannel - suite.Equal(msgstream.Timestamp(i), output) + suite.Equal(int64(1001), int64(output)) } }