From f30c22626efa3abd57a94ab2c39dba556e87b65a Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 25 Apr 2024 11:13:25 +0800 Subject: [PATCH] enhance: Pre-cache result for frequent filters (#32580) See also #32165 Add segment dist and leader view filter criterion struct to store frequent filter conditions. Add collection/channel filter results for these two meta --------- Signed-off-by: Congqi Xia --- .../querycoordv2/meta/leader_view_manager.go | 147 ++++++++++-------- .../querycoordv2/meta/segment_dist_manager.go | 115 ++++++++++---- 2 files changed, 163 insertions(+), 99 deletions(-) diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 24672e342e..0dc0c46932 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -24,10 +24,16 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" ) +type lvCriterion struct { + nodeID int64 + channelName string + collectionID int64 + hasOtherFilter bool +} + type LeaderViewFilter interface { Match(*LeaderView) bool - Node() (int64, bool) - ChannelName() (string, bool) + AddFilter(*lvCriterion) } type lvFilterFunc func(view *LeaderView) bool @@ -36,12 +42,8 @@ func (f lvFilterFunc) Match(view *LeaderView) bool { return f(view) } -func (f lvFilterFunc) Node() (int64, bool) { - return -1, false -} - -func (f lvFilterFunc) ChannelName() (string, bool) { - return "", false +func (f lvFilterFunc) AddFilter(c *lvCriterion) { + c.hasOtherFilter = true } type lvChannelNameFilter string @@ -50,12 +52,8 @@ func (f lvChannelNameFilter) Match(v *LeaderView) bool { return v.Channel == string(f) } -func (f lvChannelNameFilter) Node() (int64, bool) { - return -1, false -} - -func (f lvChannelNameFilter) ChannelName() (string, bool) { - return string(f), true +func (f lvChannelNameFilter) AddFilter(c *lvCriterion) { + c.channelName = string(f) } type lvNodeFilter int64 @@ -64,12 +62,18 @@ func (f lvNodeFilter) Match(v *LeaderView) bool { return v.ID == int64(f) } -func (f lvNodeFilter) Node() (int64, bool) { - return int64(f), true +func (f lvNodeFilter) AddFilter(c *lvCriterion) { + c.nodeID = int64(f) } -func (f lvNodeFilter) ChannelName() (string, bool) { - return "", false +type lvCollectionFilter int64 + +func (f lvCollectionFilter) Match(v *LeaderView) bool { + return v.CollectionID == int64(f) +} + +func (f lvCollectionFilter) AddFilter(c *lvCriterion) { + c.collectionID = int64(f) } func WithNodeID2LeaderView(nodeID int64) LeaderViewFilter { @@ -81,9 +85,7 @@ func WithChannelName2LeaderView(channelName string) LeaderViewFilter { } func WithCollectionID2LeaderView(collectionID int64) LeaderViewFilter { - return lvFilterFunc(func(view *LeaderView) bool { - return view.CollectionID == collectionID - }) + return lvCollectionFilter(collectionID) } func WithReplica2LeaderView(replica *Replica) LeaderViewFilter { @@ -140,16 +142,64 @@ func (view *LeaderView) Clone() *LeaderView { } } -type channelViews map[string]*LeaderView +type nodeViews struct { + views []*LeaderView + // channel name => LeaderView + channelView map[string]*LeaderView + // collection id => leader views + collectionViews map[int64][]*LeaderView +} + +func (v nodeViews) Filter(criterion *lvCriterion, filters ...LeaderViewFilter) []*LeaderView { + mergedFilter := func(view *LeaderView) bool { + for _, filter := range filters { + if !filter.Match(view) { + return false + } + } + return true + } + + var views []*LeaderView + switch { + case criterion.channelName != "": + if view, ok := v.channelView[criterion.channelName]; ok { + views = append(views, view) + } + case criterion.collectionID != 0: + views = v.collectionViews[criterion.collectionID] + default: + views = v.views + } + + if criterion.hasOtherFilter { + views = lo.Filter(views, func(view *LeaderView, _ int) bool { + return mergedFilter(view) + }) + } + return views +} + +func composeNodeViews(views ...*LeaderView) nodeViews { + return nodeViews{ + views: views, + channelView: lo.SliceToMap(views, func(view *LeaderView) (string, *LeaderView) { + return view.Channel, view + }), + collectionViews: lo.GroupBy(views, func(view *LeaderView) int64 { + return view.CollectionID + }), + } +} type LeaderViewManager struct { rwmutex sync.RWMutex - views map[int64]channelViews // LeaderID -> Views (one per shard) + views map[int64]nodeViews // LeaderID -> Views (one per shard) } func NewLeaderViewManager() *LeaderViewManager { return &LeaderViewManager{ - views: make(map[int64]channelViews), + views: make(map[int64]nodeViews), } } @@ -157,17 +207,14 @@ func NewLeaderViewManager() *LeaderViewManager { func (mgr *LeaderViewManager) Update(leaderID int64, views ...*LeaderView) { mgr.rwmutex.Lock() defer mgr.rwmutex.Unlock() - mgr.views[leaderID] = make(channelViews, len(views)) - for _, view := range views { - mgr.views[leaderID][view.Channel] = view - } + mgr.views[leaderID] = composeNodeViews(views...) } func (mgr *LeaderViewManager) GetLeaderShardView(id int64, shard string) *LeaderView { mgr.rwmutex.RLock() defer mgr.rwmutex.RUnlock() - return mgr.views[id][shard] + return mgr.views[id].channelView[shard] } func (mgr *LeaderViewManager) GetByFilter(filters ...LeaderViewFilter) []*LeaderView { @@ -178,34 +225,14 @@ func (mgr *LeaderViewManager) GetByFilter(filters ...LeaderViewFilter) []*Leader } func (mgr *LeaderViewManager) getByFilter(filters ...LeaderViewFilter) []*LeaderView { - otherFilters := make([]LeaderViewFilter, 0, len(filters)) - var nodeID int64 - var channelName string - var hasNodeID, hasChannelName bool - + criterion := &lvCriterion{} for _, filter := range filters { - if node, ok := filter.Node(); ok { - nodeID, hasNodeID = node, true - continue - } - if channel, ok := filter.ChannelName(); ok { - channelName, hasChannelName = channel, true - continue - } - otherFilters = append(otherFilters, filter) - } - mergedFilter := func(view *LeaderView) bool { - for _, filter := range otherFilters { - if !filter.Match(view) { - return false - } - } - return true + filter.AddFilter(criterion) } - var candidates []channelViews - if hasNodeID { - nodeView, ok := mgr.views[nodeID] + var candidates []nodeViews + if criterion.nodeID > 0 { + nodeView, ok := mgr.views[criterion.nodeID] if ok { candidates = append(candidates, nodeView) } @@ -215,17 +242,7 @@ func (mgr *LeaderViewManager) getByFilter(filters ...LeaderViewFilter) []*Leader var result []*LeaderView for _, candidate := range candidates { - if hasChannelName { - if view, ok := candidate[channelName]; ok && mergedFilter(view) { - result = append(result, view) - } - } else { - for _, view := range candidate { - if mergedFilter(view) { - result = append(result, view) - } - } - } + result = append(result, candidate.Filter(criterion, filters...)...) } return result } diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 1cf7b46f19..f8c37054df 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -27,9 +27,16 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +type segDistCriterion struct { + nodes []int64 + collectionID int64 + channel string + hasOtherFilter bool +} + type SegmentDistFilter interface { Match(s *Segment) bool - NodeIDs() ([]int64, bool) + AddFilter(*segDistCriterion) } type SegmentDistFilterFunc func(s *Segment) bool @@ -38,8 +45,8 @@ func (f SegmentDistFilterFunc) Match(s *Segment) bool { return f(s) } -func (f SegmentDistFilterFunc) NodeIDs() ([]int64, bool) { - return nil, false +func (f SegmentDistFilterFunc) AddFilter(filter *segDistCriterion) { + filter.hasOtherFilter = true } type ReplicaSegDistFilter struct { @@ -50,8 +57,9 @@ func (f *ReplicaSegDistFilter) Match(s *Segment) bool { return f.GetCollectionID() == s.GetCollectionID() && f.Contains(s.Node) } -func (f *ReplicaSegDistFilter) NodeIDs() ([]int64, bool) { - return f.GetNodes(), true +func (f ReplicaSegDistFilter) AddFilter(filter *segDistCriterion) { + filter.nodes = f.GetNodes() + filter.collectionID = f.GetCollectionID() } func WithReplica(replica *Replica) SegmentDistFilter { @@ -66,8 +74,8 @@ func (f NodeSegDistFilter) Match(s *Segment) bool { return s.Node == int64(f) } -func (f NodeSegDistFilter) NodeIDs() ([]int64, bool) { - return []int64{int64(f)}, true +func (f NodeSegDistFilter) AddFilter(filter *segDistCriterion) { + filter.nodes = []int64{int64(f)} } func WithNodeID(nodeID int64) SegmentDistFilter { @@ -80,16 +88,32 @@ func WithSegmentID(segmentID int64) SegmentDistFilter { }) } +type CollectionSegDistFilter int64 + +func (f CollectionSegDistFilter) Match(s *Segment) bool { + return s.GetCollectionID() == int64(f) +} + +func (f CollectionSegDistFilter) AddFilter(filter *segDistCriterion) { + filter.collectionID = int64(f) +} + func WithCollectionID(collectionID typeutil.UniqueID) SegmentDistFilter { - return SegmentDistFilterFunc(func(s *Segment) bool { - return s.CollectionID == collectionID - }) + return CollectionSegDistFilter(collectionID) +} + +type ChannelSegDistFilter string + +func (f ChannelSegDistFilter) Match(s *Segment) bool { + return s.GetInsertChannel() == string(f) +} + +func (f ChannelSegDistFilter) AddFilter(filter *segDistCriterion) { + filter.channel = string(f) } func WithChannel(channelName string) SegmentDistFilter { - return SegmentDistFilterFunc(func(s *Segment) bool { - return s.GetInsertChannel() == channelName - }) + return ChannelSegDistFilter(channelName) } type Segment struct { @@ -118,12 +142,44 @@ type SegmentDistManager struct { rwmutex sync.RWMutex // nodeID -> []*Segment - segments map[typeutil.UniqueID][]*Segment + segments map[typeutil.UniqueID]nodeSegments +} + +type nodeSegments struct { + segments []*Segment + collSegments map[int64][]*Segment + channelSegments map[string][]*Segment +} + +func (s nodeSegments) Filter(criterion *segDistCriterion, filter func(*Segment) bool) []*Segment { + var segments []*Segment + switch { + case criterion.channel != "": + segments = s.channelSegments[criterion.channel] + case criterion.collectionID != 0: + segments = s.collSegments[criterion.collectionID] + default: + segments = s.segments + } + if criterion.hasOtherFilter { + segments = lo.Filter(segments, func(segment *Segment, _ int) bool { + return filter(segment) + }) + } + return segments +} + +func composeNodeSegments(segments []*Segment) nodeSegments { + return nodeSegments{ + segments: segments, + collSegments: lo.GroupBy(segments, func(segment *Segment) int64 { return segment.GetCollectionID() }), + channelSegments: lo.GroupBy(segments, func(segment *Segment) string { return segment.GetInsertChannel() }), + } } func NewSegmentDistManager() *SegmentDistManager { return &SegmentDistManager{ - segments: make(map[typeutil.UniqueID][]*Segment), + segments: make(map[typeutil.UniqueID]nodeSegments), } } @@ -134,7 +190,7 @@ func (m *SegmentDistManager) Update(nodeID typeutil.UniqueID, segments ...*Segme for _, segment := range segments { segment.Node = nodeID } - m.segments[nodeID] = segments + m.segments[nodeID] = composeNodeSegments(segments) } // GetByFilter return segment list which match all given filters @@ -142,14 +198,9 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen m.rwmutex.RLock() defer m.rwmutex.RUnlock() - nodes := make(typeutil.Set[int64]) - var hasNodeIDs bool - + criterion := &segDistCriterion{} for _, filter := range filters { - if ids, ok := filter.NodeIDs(); ok { - nodes.Insert(ids...) - hasNodeIDs = true - } + filter.AddFilter(criterion) } mergedFilters := func(s *Segment) bool { @@ -161,22 +212,18 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen return true } - var candidates [][]*Segment - if hasNodeIDs { - candidates = lo.Map(nodes.Collect(), func(nodeID int64, _ int) []*Segment { + var candidates []nodeSegments + if criterion.nodes != nil { + candidates = lo.Map(criterion.nodes, func(nodeID int64, _ int) nodeSegments { return m.segments[nodeID] }) } else { candidates = lo.Values(m.segments) } - ret := make([]*Segment, 0) - for _, segments := range candidates { - for _, segment := range segments { - if mergedFilters(segment) { - ret = append(ret, segment) - } - } + var ret []*Segment + for _, nodeSegments := range candidates { + ret = append(ret, nodeSegments.Filter(criterion, mergedFilters)...) } return ret } @@ -188,7 +235,7 @@ func (m *SegmentDistManager) GetSegmentDist(segmentID int64) []int64 { ret := make([]int64, 0) for nodeID, segments := range m.segments { - for _, segment := range segments { + for _, segment := range segments.segments { if segment.GetID() == segmentID { ret = append(ret, nodeID) break