diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 4796a5f310..8ec23985e1 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -50,117 +50,6 @@ import ( // TODO maybe move to manager and change segment constructor var channelMapper = metautil.NewDynChannelMapper() -// SegmentFilter is the interface for segment selection criteria. -type SegmentFilter interface { - Filter(segment Segment) bool - SegmentType() (SegmentType, bool) - SegmentIDs() ([]int64, bool) -} - -// SegmentFilterFunc is a type wrapper for `func(Segment) bool` to SegmentFilter. -type SegmentFilterFunc func(segment Segment) bool - -func (f SegmentFilterFunc) Filter(segment Segment) bool { - return f(segment) -} - -func (f SegmentFilterFunc) SegmentType() (SegmentType, bool) { - return commonpb.SegmentState_SegmentStateNone, false -} - -func (s SegmentFilterFunc) SegmentIDs() ([]int64, bool) { - return nil, false -} - -// SegmentIDFilter is the specific segment filter for SegmentID only. -type SegmentIDFilter int64 - -func (f SegmentIDFilter) Filter(segment Segment) bool { - return segment.ID() == int64(f) -} - -func (f SegmentIDFilter) SegmentType() (SegmentType, bool) { - return commonpb.SegmentState_SegmentStateNone, false -} - -func (f SegmentIDFilter) SegmentIDs() ([]int64, bool) { - return []int64{int64(f)}, true -} - -type SegmentIDsFilter struct { - segmentIDs typeutil.Set[int64] -} - -func (f SegmentIDsFilter) Filter(segment Segment) bool { - return f.segmentIDs.Contain(segment.ID()) -} - -func (f SegmentIDsFilter) SegmentType() (SegmentType, bool) { - return commonpb.SegmentState_SegmentStateNone, false -} - -func (f SegmentIDsFilter) SegmentIDs() ([]int64, bool) { - return f.segmentIDs.Collect(), true -} - -type SegmentTypeFilter SegmentType - -func (f SegmentTypeFilter) Filter(segment Segment) bool { - return segment.Type() == SegmentType(f) -} - -func (f SegmentTypeFilter) SegmentType() (SegmentType, bool) { - return SegmentType(f), true -} - -func (f SegmentTypeFilter) SegmentIDs() ([]int64, bool) { - return nil, false -} - -func WithSkipEmpty() SegmentFilter { - return SegmentFilterFunc(func(segment Segment) bool { - return segment.InsertCount() > 0 - }) -} - -func WithPartition(partitionID typeutil.UniqueID) SegmentFilter { - return SegmentFilterFunc(func(segment Segment) bool { - return segment.Partition() == partitionID - }) -} - -func WithChannel(channel string) SegmentFilter { - ac, err := metautil.ParseChannel(channel, channelMapper) - if err != nil { - return SegmentFilterFunc(func(segment Segment) bool { - return false - }) - } - return SegmentFilterFunc(func(segment Segment) bool { - return segment.Shard().Equal(ac) - }) -} - -func WithType(typ SegmentType) SegmentFilter { - return SegmentTypeFilter(typ) -} - -func WithID(id int64) SegmentFilter { - return SegmentIDFilter(id) -} - -func WithIDs(ids ...int64) SegmentFilter { - return SegmentIDsFilter{ - segmentIDs: typeutil.NewSet(ids...), - } -} - -func WithLevel(level datapb.SegmentLevel) SegmentFilter { - return SegmentFilterFunc(func(segment Segment) bool { - return segment.Level() == level - }) -} - type SegmentAction func(segment Segment) bool func IncreaseVersion(version int64) SegmentAction { @@ -313,12 +202,156 @@ type SegmentManager interface { var _ SegmentManager = (*segmentManager)(nil) +type secondarySegmentIndex struct { + shardSegments map[metautil.Channel]segments +} + +func newSecondarySegmentIndex() secondarySegmentIndex { + return secondarySegmentIndex{ + shardSegments: make(map[metautil.Channel]segments), + } +} + +func (si secondarySegmentIndex) Put(ctx context.Context, segmentType SegmentType, segment Segment) { + shard := segment.Shard() + segments, ok := si.shardSegments[shard] + if !ok { + segments = newSegments() + si.shardSegments[shard] = segments + } + segments.Put(ctx, segmentType, segment) +} + +func (si secondarySegmentIndex) Remove(s Segment) { + shard := s.Shard() + segments, ok := si.shardSegments[shard] + if !ok { + return + } + segments.Remove(s) + if segments.Empty() { + delete(si.shardSegments, shard) + } +} + +type segments struct { + growingSegments map[typeutil.UniqueID]Segment + sealedSegments map[typeutil.UniqueID]Segment +} + +func (segments segments) Put(_ context.Context, segmentType SegmentType, segment Segment) { + switch segmentType { + case SegmentTypeGrowing: + segments.growingSegments[segment.ID()] = segment + case SegmentTypeSealed: + segments.sealedSegments[segment.ID()] = segment + } +} + +func (segments segments) Get(segmentID int64) (Segment, bool) { + if segment, ok := segments.growingSegments[segmentID]; ok { + return segment, true + } else if segment, ok = segments.sealedSegments[segmentID]; ok { + return segment, true + } + + return nil, false +} + +func (segments segments) GetWithType(segmentID int64, segmentType SegmentType) (Segment, bool) { + // var targetMap map[int64]Segment + var segment Segment + var ok bool + switch segmentType { + case SegmentTypeGrowing: + segment, ok = segments.growingSegments[segmentID] + case SegmentTypeSealed: + segment, ok = segments.sealedSegments[segmentID] + } + return segment, ok +} + +func (segments segments) RemoveWithType(segmentID int64, segmentType SegmentType) (Segment, bool) { + var segment Segment + var ok bool + switch segmentType { + case SegmentTypeGrowing: + segment, ok = segments.growingSegments[segmentID] + delete(segments.growingSegments, segmentID) + case SegmentTypeSealed: + segment, ok = segments.sealedSegments[segmentID] + delete(segments.sealedSegments, segmentID) + } + return segment, ok +} + +func (segments segments) Remove(segment Segment) { + switch segment.Type() { + case SegmentTypeGrowing: + delete(segments.growingSegments, segment.ID()) + case SegmentTypeSealed: + delete(segments.sealedSegments, segment.ID()) + } +} + +func (segments segments) RangeWithFilter(criterion *segmentCriterion, process func(id int64, segType SegmentType, segment Segment) bool) { + if criterion.segmentIDs != nil { + for id := range criterion.segmentIDs { + var segment Segment + var ok bool + if criterion.segmentType == commonpb.SegmentState_SegmentStateNone { + segment, ok = segments.Get(id) + } else { + segment, ok = segments.GetWithType(id, criterion.segmentType) + } + + if ok && criterion.Match(segment) { + if !process(id, segment.Type(), segment) { + return + } + } + } + return + } + + var candidates []map[int64]Segment + switch criterion.segmentType { + case SegmentTypeGrowing: + candidates = []map[int64]Segment{segments.growingSegments} + case SegmentTypeSealed: + candidates = []map[int64]Segment{segments.sealedSegments} + default: + candidates = []map[int64]Segment{segments.growingSegments, segments.sealedSegments} + } + + for _, candidate := range candidates { + for id, segment := range candidate { + if criterion.Match(segment) { + if !process(id, segment.Type(), segment) { + return + } + } + } + } +} + +func (segments segments) Empty() bool { + return len(segments.growingSegments) == 0 && len(segments.sealedSegments) == 0 +} + +func newSegments() segments { + return segments{ + growingSegments: make(map[int64]Segment), + sealedSegments: make(map[int64]Segment), + } +} + // Manager manages all collections and segments type segmentManager struct { mu sync.RWMutex // guards all - growingSegments map[typeutil.UniqueID]Segment - sealedSegments map[typeutil.UniqueID]Segment + globalSegments segments + secondaryIndex secondarySegmentIndex // releaseCallback is the callback function when a segment is released. releaseCallback func(s Segment) @@ -329,30 +362,27 @@ type segmentManager struct { func NewSegmentManager() *segmentManager { return &segmentManager{ - growingSegments: make(map[int64]Segment), - sealedSegments: make(map[int64]Segment), + globalSegments: newSegments(), + secondaryIndex: newSecondarySegmentIndex(), growingOnReleasingSegments: typeutil.NewUniqueSet(), sealedOnReleasingSegments: typeutil.NewUniqueSet(), } } +// put is the internal put method updating both global segments and secondary index. +func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) { + mgr.globalSegments.Put(ctx, segmentType, segment) + mgr.secondaryIndex.Put(ctx, segmentType, segment) +} + func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, segments ...Segment) { var replacedSegment []Segment mgr.mu.Lock() defer mgr.mu.Unlock() - var targetMap map[int64]Segment - switch segmentType { - case SegmentTypeGrowing: - targetMap = mgr.growingSegments - case SegmentTypeSealed: - targetMap = mgr.sealedSegments - default: - panic("unexpected segment type") - } + log := log.Ctx(ctx) for _, segment := range segments { - oldSegment, ok := targetMap[segment.ID()] - + oldSegment, ok := mgr.globalSegments.GetWithType(segment.ID(), segmentType) if ok { if oldSegment.Version() >= segment.Version() { log.Warn("Invalid segment distribution changed, skip it", @@ -366,7 +396,8 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg } replacedSegment = append(replacedSegment, oldSegment) } - targetMap[segment.ID()] = segment + + mgr.put(ctx, segmentType, segment) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection()))) metrics.QueryNodeNumSegments.WithLabelValues( @@ -409,17 +440,18 @@ func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilt func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) bool { mgr.mu.RLock() defer mgr.mu.RUnlock() + + _, ok := mgr.globalSegments.GetWithType(segmentID, typ) + if ok { + return true + } switch typ { case SegmentTypeGrowing: - if _, ok := mgr.growingSegments[segmentID]; ok { - return true - } else if mgr.growingOnReleasingSegments.Contain(segmentID) { + if mgr.growingOnReleasingSegments.Contain(segmentID) { return true } case SegmentTypeSealed: - if _, ok := mgr.sealedSegments[segmentID]; ok { - return true - } else if mgr.sealedOnReleasingSegments.Contain(segmentID) { + if mgr.sealedOnReleasingSegments.Contain(segmentID) { return true } } @@ -431,27 +463,16 @@ func (mgr *segmentManager) Get(segmentID typeutil.UniqueID) Segment { mgr.mu.RLock() defer mgr.mu.RUnlock() - if segment, ok := mgr.growingSegments[segmentID]; ok { - return segment - } else if segment, ok = mgr.sealedSegments[segmentID]; ok { - return segment - } - - return nil + segment, _ := mgr.globalSegments.Get(segmentID) + return segment } func (mgr *segmentManager) GetWithType(segmentID typeutil.UniqueID, typ SegmentType) Segment { mgr.mu.RLock() defer mgr.mu.RUnlock() - switch typ { - case SegmentTypeSealed: - return mgr.sealedSegments[segmentID] - case SegmentTypeGrowing: - return mgr.growingSegments[segmentID] - default: - return nil - } + segment, _ := mgr.globalSegments.GetWithType(segmentID, typ) + return segment } func (mgr *segmentManager) GetBy(filters ...SegmentFilter) []Segment { @@ -512,36 +533,31 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter) }() for _, id := range segments { - growing, growingExist := mgr.growingSegments[id] - sealed, sealedExist := mgr.sealedSegments[id] + // growing, growingExist := mgr.growingSegments[id] + // sealed, sealedExist := mgr.sealedSegments[id] + segment, ok := mgr.globalSegments.Get(id) - // L0 Segment should not be queryable. - if sealedExist && sealed.Level() == datapb.SegmentLevel_L0 { - continue - } - - growingExist = growingExist && filter(growing, filters...) - sealedExist = sealedExist && filter(sealed, filters...) - - if growingExist { - err = growing.PinIfNotReleased() - if err != nil { - return nil, err - } - lockedSegments = append(lockedSegments, growing) - } - if sealedExist { - err = sealed.PinIfNotReleased() - if err != nil { - return nil, err - } - lockedSegments = append(lockedSegments, sealed) - } - - if !growingExist && !sealedExist { + if !ok { err = merr.WrapErrSegmentNotLoaded(id, "segment not found") return nil, err } + + // L0 Segment should not be queryable. + if segment.Level() == datapb.SegmentLevel_L0 { + continue + } + + // growingExist = growingExist && filter(growing, filters...) + // sealedExist = sealedExist && filter(sealed, filters...) + if !filter(segment, filters...) { + continue + } + + err = segment.PinIfNotReleased() + if err != nil { + return nil, err + } + lockedSegments = append(lockedSegments, segment) } return lockedSegments, nil @@ -554,74 +570,26 @@ func (mgr *segmentManager) Unpin(segments []Segment) { } func (mgr *segmentManager) rangeWithFilter(process func(id int64, segType SegmentType, segment Segment) bool, filters ...SegmentFilter) { - var segType SegmentType - var hasSegType, hasSegIDs bool - segmentIDs := typeutil.NewSet[int64]() - - otherFilters := make([]SegmentFilter, 0, len(filters)) + criterion := &segmentCriterion{} for _, filter := range filters { - if sType, ok := filter.SegmentType(); ok { - segType = sType - hasSegType = true - continue - } - if segIDs, ok := filter.SegmentIDs(); ok { - hasSegIDs = true - segmentIDs.Insert(segIDs...) - continue - } - otherFilters = append(otherFilters, filter) + filter.AddFilter(criterion) } - mergedFilter := func(info Segment) bool { - for _, filter := range otherFilters { - if !filter.Filter(info) { - return false - } - } - return true - } - - var candidates map[SegmentType]map[int64]Segment - switch segType { - case SegmentTypeSealed: - candidates = map[SegmentType]map[int64]Segment{SegmentTypeSealed: mgr.sealedSegments} - case SegmentTypeGrowing: - candidates = map[SegmentType]map[int64]Segment{SegmentTypeGrowing: mgr.growingSegments} - default: - if !hasSegType { - candidates = map[SegmentType]map[int64]Segment{ - SegmentTypeSealed: mgr.sealedSegments, - SegmentTypeGrowing: mgr.growingSegments, - } + target := mgr.globalSegments + var ok bool + if !criterion.channel.IsZero() { + target, ok = mgr.secondaryIndex.shardSegments[criterion.channel] + if !ok { + return } } - for segType, candidate := range candidates { - if hasSegIDs { - for id := range segmentIDs { - segment, has := candidate[id] - if has && mergedFilter(segment) { - if !process(id, segType, segment) { - break - } - } - } - } else { - for id, segment := range candidate { - if mergedFilter(segment) { - if !process(id, segType, segment) { - break - } - } - } - } - } + target.RangeWithFilter(criterion, process) } func filter(segment Segment, filters ...SegmentFilter) bool { for _, filter := range filters { - if !filter.Filter(segment) { + if !filter.Match(segment) { return false } } @@ -629,32 +597,18 @@ func filter(segment Segment, filters ...SegmentFilter) bool { } func (mgr *segmentManager) GetSealed(segmentID typeutil.UniqueID) Segment { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - - if segment, ok := mgr.sealedSegments[segmentID]; ok { - return segment - } - - return nil + return mgr.GetWithType(segmentID, SegmentTypeSealed) } func (mgr *segmentManager) GetGrowing(segmentID typeutil.UniqueID) Segment { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - - if segment, ok := mgr.growingSegments[segmentID]; ok { - return segment - } - - return nil + return mgr.GetWithType(segmentID, SegmentTypeGrowing) } func (mgr *segmentManager) Empty() bool { mgr.mu.RLock() defer mgr.mu.RUnlock() - return len(mgr.growingSegments)+len(mgr.sealedSegments) == 0 + return len(mgr.globalSegments.growingSegments)+len(mgr.globalSegments.sealedSegments) == 0 } // returns true if the segment exists, @@ -703,27 +657,21 @@ func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.Unique } func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID typeutil.UniqueID) Segment { - switch typ { - case SegmentTypeGrowing: - s, ok := mgr.growingSegments[segmentID] - if ok { - delete(mgr.growingSegments, segmentID) - mgr.growingOnReleasingSegments.Insert(segmentID) - return s - } - - case SegmentTypeSealed: - s, ok := mgr.sealedSegments[segmentID] - if ok { - delete(mgr.sealedSegments, segmentID) - mgr.sealedOnReleasingSegments.Insert(segmentID) - return s - } - default: + segment, ok := mgr.globalSegments.RemoveWithType(segmentID, typ) + if !ok { return nil } - return nil + switch typ { + case SegmentTypeGrowing: + mgr.growingOnReleasingSegments.Insert(segmentID) + case SegmentTypeSealed: + mgr.sealedOnReleasingSegments.Insert(segmentID) + } + + mgr.secondaryIndex.Remove(segment) + + return segment } func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilter) (int, int) { @@ -757,17 +705,17 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte func (mgr *segmentManager) Clear(ctx context.Context) { mgr.mu.Lock() - for id := range mgr.growingSegments { + for id := range mgr.globalSegments.growingSegments { mgr.growingOnReleasingSegments.Insert(id) } - growingWaitForRelease := mgr.growingSegments - mgr.growingSegments = make(map[int64]Segment) + growingWaitForRelease := mgr.globalSegments.growingSegments - for id := range mgr.sealedSegments { + for id := range mgr.globalSegments.sealedSegments { mgr.sealedOnReleasingSegments.Insert(id) } - sealedWaitForRelease := mgr.sealedSegments - mgr.sealedSegments = make(map[int64]Segment) + sealedWaitForRelease := mgr.globalSegments.sealedSegments + mgr.globalSegments = newSegments() + mgr.secondaryIndex = newSecondarySegmentIndex() mgr.updateMetric() mgr.mu.Unlock() @@ -787,21 +735,21 @@ func (mgr *segmentManager) registerReleaseCallback(callback func(s Segment)) { func (mgr *segmentManager) updateMetric() { // update collection and partiation metric - collections, partiations := make(typeutil.Set[int64]), make(typeutil.Set[int64]) - for _, seg := range mgr.growingSegments { + collections, partitions := make(typeutil.Set[int64]), make(typeutil.Set[int64]) + for _, seg := range mgr.globalSegments.growingSegments { collections.Insert(seg.Collection()) if seg.Partition() != common.AllPartitionsID { - partiations.Insert(seg.Partition()) + partitions.Insert(seg.Partition()) } } - for _, seg := range mgr.sealedSegments { + for _, seg := range mgr.globalSegments.sealedSegments { collections.Insert(seg.Collection()) if seg.Partition() != common.AllPartitionsID { - partiations.Insert(seg.Partition()) + partitions.Insert(seg.Partition()) } } metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(collections.Len())) - metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partiations.Len())) + metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partitions.Len())) } func (mgr *segmentManager) release(ctx context.Context, segment Segment) { diff --git a/internal/querynodev2/segments/segment_filter.go b/internal/querynodev2/segments/segment_filter.go new file mode 100644 index 0000000000..51a334f116 --- /dev/null +++ b/internal/querynodev2/segments/segment_filter.go @@ -0,0 +1,142 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package segments + +import ( + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// segmentCriterion is the segment filter criterion obj. +type segmentCriterion struct { + segmentIDs typeutil.Set[int64] + channel metautil.Channel + segmentType SegmentType + others []SegmentFilter +} + +func (c *segmentCriterion) Match(segment Segment) bool { + for _, filter := range c.others { + if !filter.Match(segment) { + return false + } + } + return true +} + +// SegmentFilter is the interface for segment selection criteria. +type SegmentFilter interface { + Match(segment Segment) bool + AddFilter(*segmentCriterion) +} + +// SegmentFilterFunc is a type wrapper for `func(Segment) bool` to SegmentFilter. +type SegmentFilterFunc func(segment Segment) bool + +func (f SegmentFilterFunc) Match(segment Segment) bool { + return f(segment) +} + +func (f SegmentFilterFunc) AddFilter(c *segmentCriterion) { + c.others = append(c.others, f) +} + +// SegmentIDFilter is the specific segment filter for SegmentID only. +type SegmentIDFilter int64 + +func (f SegmentIDFilter) Match(segment Segment) bool { + return segment.ID() == int64(f) +} + +func (f SegmentIDFilter) AddFilter(c *segmentCriterion) { + if c.segmentIDs == nil { + c.segmentIDs = typeutil.NewSet(int64(f)) + return + } + c.segmentIDs = c.segmentIDs.Intersection(typeutil.NewSet(int64(f))) +} + +type SegmentIDsFilter struct { + segmentIDs typeutil.Set[int64] +} + +func (f SegmentIDsFilter) Match(segment Segment) bool { + return f.segmentIDs.Contain(segment.ID()) +} + +func (f SegmentIDsFilter) AddFilter(c *segmentCriterion) { + if c.segmentIDs == nil { + c.segmentIDs = f.segmentIDs + return + } + c.segmentIDs = c.segmentIDs.Intersection(f.segmentIDs) +} + +type SegmentTypeFilter SegmentType + +func (f SegmentTypeFilter) Match(segment Segment) bool { + return segment.Type() == SegmentType(f) +} + +func (f SegmentTypeFilter) AddFilter(c *segmentCriterion) { + c.segmentType = SegmentType(f) +} + +func WithSkipEmpty() SegmentFilter { + return SegmentFilterFunc(func(segment Segment) bool { + return segment.InsertCount() > 0 + }) +} + +func WithPartition(partitionID typeutil.UniqueID) SegmentFilter { + return SegmentFilterFunc(func(segment Segment) bool { + return segment.Partition() == partitionID + }) +} + +func WithChannel(channel string) SegmentFilter { + ac, err := metautil.ParseChannel(channel, channelMapper) + if err != nil { + return SegmentFilterFunc(func(segment Segment) bool { + return false + }) + } + return SegmentFilterFunc(func(segment Segment) bool { + return segment.Shard().Equal(ac) + }) +} + +func WithType(typ SegmentType) SegmentFilter { + return SegmentTypeFilter(typ) +} + +func WithID(id int64) SegmentFilter { + return SegmentIDFilter(id) +} + +func WithIDs(ids ...int64) SegmentFilter { + return SegmentIDsFilter{ + segmentIDs: typeutil.NewSet(ids...), + } +} + +func WithLevel(level datapb.SegmentLevel) SegmentFilter { + return SegmentFilterFunc(func(segment Segment) bool { + return segment.Level() == level + }) +} diff --git a/pkg/util/metautil/channel.go b/pkg/util/metautil/channel.go index 303f8c599a..361b9ad638 100644 --- a/pkg/util/metautil/channel.go +++ b/pkg/util/metautil/channel.go @@ -118,6 +118,10 @@ func (c Channel) EqualString(str string) bool { return c.Equal(ac) } +func (c Channel) IsZero() bool { + return c.ChannelMapper == nil +} + func ParseChannel(virtualName string, mapper ChannelMapper) (Channel, error) { if !channelNameFormat.MatchString(virtualName) { return Channel{}, merr.WrapErrParameterInvalidMsg("virtual channel name(%s) is not valid", virtualName)