mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
enhance: Reduce bloom filter lock contention between insert and delete in query coord (#32643)
issue: #32530 cause ProcessDelete need to check whether pk exist in bloom filter, and ProcessInsert need to update pk to bloom filter, when execute ProcessInsert and ProcessDelete in parallel, it will cause race condition in segment's bloom filter This PR execute ProcessInsert and ProcessDelete in serial to avoid block each other Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
310bfe71c2
commit
39f56678a0
@ -461,7 +461,7 @@ func (node *QueryNode) Stop() error {
|
|||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(len(sealedSegments)))
|
metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(len(sealedSegments)))
|
||||||
metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(channelNum))
|
metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(node.GetNodeID())).Set(float64(channelNum))
|
||||||
log.Info("migrate data...", zap.Int64("ServerID", paramtable.GetNodeID()),
|
log.Info("migrate data...", zap.Int64("ServerID", node.GetNodeID()),
|
||||||
zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 {
|
zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 {
|
||||||
return s.ID()
|
return s.ID()
|
||||||
})),
|
})),
|
||||||
|
|||||||
@ -17,12 +17,6 @@
|
|||||||
package pipeline
|
package pipeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -35,63 +29,16 @@ type Node interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type nodeCtx struct {
|
type nodeCtx struct {
|
||||||
node Node
|
node Node
|
||||||
|
|
||||||
inputChannel chan Msg
|
inputChannel chan Msg
|
||||||
|
next *nodeCtx
|
||||||
next *nodeCtx
|
checker *timerecord.GroupChecker
|
||||||
checker *timerecord.GroupChecker
|
|
||||||
|
|
||||||
closeCh chan struct{} // notify work to exit
|
|
||||||
closeWg sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *nodeCtx) Start() {
|
|
||||||
c.closeWg.Add(1)
|
|
||||||
c.node.Start()
|
|
||||||
go c.work()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *nodeCtx) Close() {
|
|
||||||
close(c.closeCh)
|
|
||||||
c.closeWg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *nodeCtx) work() {
|
|
||||||
defer c.closeWg.Done()
|
|
||||||
name := fmt.Sprintf("nodeCtxTtChecker-%s", c.node.Name())
|
|
||||||
if c.checker != nil {
|
|
||||||
c.checker.Check(name)
|
|
||||||
defer c.checker.Remove(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
// close
|
|
||||||
case <-c.closeCh:
|
|
||||||
c.node.Close()
|
|
||||||
close(c.inputChannel)
|
|
||||||
log.Debug("pipeline node closed", zap.String("nodeName", c.node.Name()))
|
|
||||||
return
|
|
||||||
case input := <-c.inputChannel:
|
|
||||||
var output Msg
|
|
||||||
output = c.node.Operate(input)
|
|
||||||
if c.checker != nil {
|
|
||||||
c.checker.Check(name)
|
|
||||||
}
|
|
||||||
if c.next != nil && output != nil {
|
|
||||||
c.next.inputChannel <- output
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNodeCtx(node Node) *nodeCtx {
|
func newNodeCtx(node Node) *nodeCtx {
|
||||||
return &nodeCtx{
|
return &nodeCtx{
|
||||||
node: node,
|
node: node,
|
||||||
inputChannel: make(chan Msg, node.MaxQueueLength()),
|
inputChannel: make(chan Msg, node.MaxQueueLength()),
|
||||||
closeCh: make(chan struct{}),
|
|
||||||
closeWg: sync.WaitGroup{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
package pipeline
|
package pipeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -36,6 +37,8 @@ type pipeline struct {
|
|||||||
inputChannel chan Msg
|
inputChannel chan Msg
|
||||||
nodeTtInterval time.Duration
|
nodeTtInterval time.Duration
|
||||||
enableTtChecker bool
|
enableTtChecker bool
|
||||||
|
|
||||||
|
checkerNames map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pipeline) Add(nodes ...Node) {
|
func (p *pipeline) Add(nodes ...Node) {
|
||||||
@ -50,6 +53,10 @@ func (p *pipeline) addNode(node Node) {
|
|||||||
nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) {
|
nodeCtx.checker = timerecord.GetGroupChecker("fgNode", p.nodeTtInterval, func(list []string) {
|
||||||
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval))
|
log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", p.nodeTtInterval))
|
||||||
})
|
})
|
||||||
|
if p.checkerNames == nil {
|
||||||
|
p.checkerNames = make(map[string]string)
|
||||||
|
}
|
||||||
|
p.checkerNames[nodeCtx.node.Name()] = fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(p.nodes) != 0 {
|
if len(p.nodes) != 0 {
|
||||||
@ -62,17 +69,31 @@ func (p *pipeline) addNode(node Node) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pipeline) Start() error {
|
func (p *pipeline) Start() error {
|
||||||
if len(p.nodes) == 0 {
|
|
||||||
return ErrEmptyPipeline
|
|
||||||
}
|
|
||||||
for _, node := range p.nodes {
|
|
||||||
node.Start()
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pipeline) Close() {
|
func (p *pipeline) Close() {
|
||||||
for _, node := range p.nodes {
|
}
|
||||||
node.Close()
|
|
||||||
|
func (p *pipeline) process() {
|
||||||
|
if len(p.nodes) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
curNode := p.nodes[0]
|
||||||
|
for curNode != nil {
|
||||||
|
if len(curNode.inputChannel) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
input := <-curNode.inputChannel
|
||||||
|
output := curNode.node.Operate(input)
|
||||||
|
if _, ok := p.checkerNames[curNode.node.Name()]; ok {
|
||||||
|
curNode.checker.Check(p.checkerNames[curNode.node.Name()])
|
||||||
|
}
|
||||||
|
if curNode.next != nil && output != nil {
|
||||||
|
curNode.next.inputChannel <- output
|
||||||
|
}
|
||||||
|
curNode = curNode.next
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -31,8 +31,9 @@ type testNode struct {
|
|||||||
|
|
||||||
func (t *testNode) Operate(in Msg) Msg {
|
func (t *testNode) Operate(in Msg) Msg {
|
||||||
msg := in.(*msgstream.MsgPack)
|
msg := in.(*msgstream.MsgPack)
|
||||||
msg.BeginTs++
|
if t.outChannel != nil {
|
||||||
t.outChannel <- msg.BeginTs
|
t.outChannel <- msg.BeginTs
|
||||||
|
}
|
||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,7 +44,7 @@ type PipelineSuite struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (suite *PipelineSuite) SetupTest() {
|
func (suite *PipelineSuite) SetupTest() {
|
||||||
suite.outChannel = make(chan msgstream.Timestamp)
|
suite.outChannel = make(chan msgstream.Timestamp, 1)
|
||||||
suite.pipeline = &pipeline{
|
suite.pipeline = &pipeline{
|
||||||
nodes: []*nodeCtx{},
|
nodes: []*nodeCtx{},
|
||||||
nodeTtInterval: 0,
|
nodeTtInterval: 0,
|
||||||
@ -52,7 +53,21 @@ func (suite *PipelineSuite) SetupTest() {
|
|||||||
|
|
||||||
suite.pipeline.addNode(&testNode{
|
suite.pipeline.addNode(&testNode{
|
||||||
BaseNode: &BaseNode{
|
BaseNode: &BaseNode{
|
||||||
name: "test-node",
|
name: "test-node1",
|
||||||
|
maxQueueLength: 8,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.pipeline.addNode(&testNode{
|
||||||
|
BaseNode: &BaseNode{
|
||||||
|
name: "test-node2",
|
||||||
|
maxQueueLength: 8,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.pipeline.addNode(&testNode{
|
||||||
|
BaseNode: &BaseNode{
|
||||||
|
name: "test-node3",
|
||||||
maxQueueLength: 8,
|
maxQueueLength: 8,
|
||||||
},
|
},
|
||||||
outChannel: suite.outChannel,
|
outChannel: suite.outChannel,
|
||||||
@ -62,10 +77,13 @@ func (suite *PipelineSuite) SetupTest() {
|
|||||||
func (suite *PipelineSuite) TestBasic() {
|
func (suite *PipelineSuite) TestBasic() {
|
||||||
suite.pipeline.Start()
|
suite.pipeline.Start()
|
||||||
defer suite.pipeline.Close()
|
defer suite.pipeline.Close()
|
||||||
suite.pipeline.inputChannel <- &msgstream.MsgPack{}
|
|
||||||
|
|
||||||
output := <-suite.outChannel
|
for i := 0; i < 100; i++ {
|
||||||
suite.Equal(msgstream.Timestamp(1), output)
|
suite.pipeline.inputChannel <- &msgstream.MsgPack{BeginTs: msgstream.Timestamp(i)}
|
||||||
|
suite.pipeline.process()
|
||||||
|
output := <-suite.outChannel
|
||||||
|
suite.Equal(i, int(output))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPipeline(t *testing.T) {
|
func TestPipeline(t *testing.T) {
|
||||||
|
|||||||
@ -37,7 +37,7 @@ type StreamPipeline interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type streamPipeline struct {
|
type streamPipeline struct {
|
||||||
*pipeline
|
pipeline *pipeline
|
||||||
input <-chan *msgstream.MsgPack
|
input <-chan *msgstream.MsgPack
|
||||||
dispatcher msgdispatcher.Client
|
dispatcher msgdispatcher.Client
|
||||||
startOnce sync.Once
|
startOnce sync.Once
|
||||||
@ -57,7 +57,8 @@ func (p *streamPipeline) work() {
|
|||||||
return
|
return
|
||||||
case msg := <-p.input:
|
case msg := <-p.input:
|
||||||
log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs)))
|
log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs)))
|
||||||
p.nodes[0].inputChannel <- msg
|
p.pipeline.inputChannel <- msg
|
||||||
|
p.pipeline.process()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -86,6 +87,10 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *streamPipeline) Add(nodes ...Node) {
|
||||||
|
p.pipeline.Add(nodes...)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *streamPipeline) Start() error {
|
func (p *streamPipeline) Start() error {
|
||||||
var err error
|
var err error
|
||||||
p.startOnce.Do(func() {
|
p.startOnce.Do(func() {
|
||||||
|
|||||||
@ -68,11 +68,11 @@ func (suite *StreamPipelineSuite) TestBasic() {
|
|||||||
|
|
||||||
suite.pipeline.Start()
|
suite.pipeline.Start()
|
||||||
defer suite.pipeline.Close()
|
defer suite.pipeline.Close()
|
||||||
suite.inChannel <- &msgstream.MsgPack{}
|
suite.inChannel <- &msgstream.MsgPack{BeginTs: 1001}
|
||||||
|
|
||||||
for i := 1; i <= suite.length; i++ {
|
for i := 1; i <= suite.length; i++ {
|
||||||
output := <-suite.outChannel
|
output := <-suite.outChannel
|
||||||
suite.Equal(msgstream.Timestamp(i), output)
|
suite.Equal(int64(1001), int64(output))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user