mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: Check exclude segment before add new growing segment (#31803)
issue: #31479 #31797 milvus will add released segment to excluded info, and filter out it's stream data in filter_node. but for data buffered in insert_node's channel, if it belongs to growing segment which already be released, then it will all the growing segment back again. This PR maintain `excluded segments` in delegator, and check excluded segment before new growing segment. --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
f3f2a5a7e9
commit
df208d538c
@ -79,6 +79,11 @@ type ShardDelegator interface {
|
|||||||
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
|
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
|
||||||
GetTargetVersion() int64
|
GetTargetVersion() int64
|
||||||
|
|
||||||
|
// manage exclude segments
|
||||||
|
AddExcludedSegments(excludeInfo map[int64]uint64)
|
||||||
|
VerifyExcludedSegments(segmentID int64, ts uint64) bool
|
||||||
|
TryCleanExcludedSegments(ts uint64)
|
||||||
|
|
||||||
// control
|
// control
|
||||||
Serviceable() bool
|
Serviceable() bool
|
||||||
Start()
|
Start()
|
||||||
@ -121,6 +126,11 @@ type shardDelegator struct {
|
|||||||
queryHook optimizers.QueryHook
|
queryHook optimizers.QueryHook
|
||||||
partitionStats map[UniqueID]*storage.PartitionStatsSnapshot
|
partitionStats map[UniqueID]*storage.PartitionStatsSnapshot
|
||||||
chunkManager storage.ChunkManager
|
chunkManager storage.ChunkManager
|
||||||
|
|
||||||
|
excludedSegments *ExcludedSegments
|
||||||
|
// cause growing segment meta has been stored in segmentManager/distribution/pkOracle/excludeSegments
|
||||||
|
// in order to make add/remove growing be atomic, need lock before modify these meta info
|
||||||
|
growingSegmentLock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLogger returns the zap logger with pre-defined shard attributes.
|
// getLogger returns the zap logger with pre-defined shard attributes.
|
||||||
@ -836,6 +846,8 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
|||||||
sizePerBlock := paramtable.Get().QueryNodeCfg.DeleteBufferBlockSize.GetAsInt64()
|
sizePerBlock := paramtable.Get().QueryNodeCfg.DeleteBufferBlockSize.GetAsInt64()
|
||||||
log.Info("Init delete cache with list delete buffer", zap.Int64("sizePerBlock", sizePerBlock), zap.Time("startTime", tsoutil.PhysicalTime(startTs)))
|
log.Info("Init delete cache with list delete buffer", zap.Int64("sizePerBlock", sizePerBlock), zap.Time("startTime", tsoutil.PhysicalTime(startTs)))
|
||||||
|
|
||||||
|
excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second))
|
||||||
|
|
||||||
sd := &shardDelegator{
|
sd := &shardDelegator{
|
||||||
collectionID: collectionID,
|
collectionID: collectionID,
|
||||||
replicaID: replicaID,
|
replicaID: replicaID,
|
||||||
@ -856,6 +868,7 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
|||||||
queryHook: queryHook,
|
queryHook: queryHook,
|
||||||
chunkManager: chunkManager,
|
chunkManager: chunkManager,
|
||||||
partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot),
|
partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot),
|
||||||
|
excludedSegments: excludedSegments,
|
||||||
}
|
}
|
||||||
m := sync.Mutex{}
|
m := sync.Mutex{}
|
||||||
sd.tsCond = sync.NewCond(&m)
|
sd.tsCond = sync.NewCond(&m)
|
||||||
|
|||||||
@ -86,6 +86,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|||||||
log := sd.getLogger(context.Background())
|
log := sd.getLogger(context.Background())
|
||||||
for segmentID, insertData := range insertRecords {
|
for segmentID, insertData := range insertRecords {
|
||||||
growing := sd.segmentManager.GetGrowing(segmentID)
|
growing := sd.segmentManager.GetGrowing(segmentID)
|
||||||
|
newGrowingSegment := false
|
||||||
if growing == nil {
|
if growing == nil {
|
||||||
var err error
|
var err error
|
||||||
// TODO: It's a wired implementation that growing segment have load info.
|
// TODO: It's a wired implementation that growing segment have load info.
|
||||||
@ -111,6 +112,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
newGrowingSegment = true
|
||||||
}
|
}
|
||||||
|
|
||||||
err := growing.Insert(context.Background(), insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord)
|
err := growing.Insert(context.Background(), insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord)
|
||||||
@ -136,6 +138,16 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|||||||
).Add(float64(len(insertData.RowIDs)))
|
).Add(float64(len(insertData.RowIDs)))
|
||||||
growing.UpdateBloomFilter(insertData.PrimaryKeys)
|
growing.UpdateBloomFilter(insertData.PrimaryKeys)
|
||||||
|
|
||||||
|
if newGrowingSegment {
|
||||||
|
sd.growingSegmentLock.Lock()
|
||||||
|
// check whether segment has been excluded
|
||||||
|
if ok := sd.VerifyExcludedSegments(segmentID, typeutil.MaxTimestamp); !ok {
|
||||||
|
log.Warn("try to insert data into released segment, skip it", zap.Int64("segmentID", segmentID))
|
||||||
|
sd.growingSegmentLock.Unlock()
|
||||||
|
growing.Release()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) {
|
if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) {
|
||||||
// register created growing segment after insert, avoid to add empty growing to delegator
|
// register created growing segment after insert, avoid to add empty growing to delegator
|
||||||
sd.pkOracle.Register(growing, paramtable.GetNodeID())
|
sd.pkOracle.Register(growing, paramtable.GetNodeID())
|
||||||
@ -148,6 +160,8 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|||||||
TargetVersion: initialTargetVersion,
|
TargetVersion: initialTargetVersion,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
sd.growingSegmentLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
log.Debug("insert into growing segment",
|
log.Debug("insert into growing segment",
|
||||||
zap.Int64("collectionID", growing.Collection()),
|
zap.Int64("collectionID", growing.Collection()),
|
||||||
@ -819,6 +833,16 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
|
|||||||
sealed = lo.Map(req.GetSegmentIDs(), convertSealed)
|
sealed = lo.Map(req.GetSegmentIDs(), convertSealed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(growing) > 0 {
|
||||||
|
sd.growingSegmentLock.Lock()
|
||||||
|
}
|
||||||
|
// when we try to release a segment, add it to pipeline's exclude list first
|
||||||
|
// in case of consumed it's growing segment again
|
||||||
|
droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) {
|
||||||
|
return id, typeutil.MaxTimestamp
|
||||||
|
})
|
||||||
|
sd.AddExcludedSegments(droppedInfos)
|
||||||
|
|
||||||
signal := sd.distribution.RemoveDistributions(sealed, growing)
|
signal := sd.distribution.RemoveDistributions(sealed, growing)
|
||||||
// wait cleared signal
|
// wait cleared signal
|
||||||
<-signal
|
<-signal
|
||||||
@ -836,22 +860,26 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var releaseErr error
|
||||||
if !force {
|
if !force {
|
||||||
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("delegator failed to find worker",
|
log.Warn("delegator failed to find worker", zap.Error(err))
|
||||||
zap.Error(err),
|
releaseErr = err
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
req.Base.TargetID = targetNodeID
|
req.Base.TargetID = targetNodeID
|
||||||
err = worker.ReleaseSegments(ctx, req)
|
err = worker.ReleaseSegments(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("worker failed to release segments",
|
log.Warn("worker failed to release segments", zap.Error(err))
|
||||||
zap.Error(err),
|
releaseErr = err
|
||||||
)
|
|
||||||
}
|
}
|
||||||
return err
|
}
|
||||||
|
if len(growing) > 0 {
|
||||||
|
sd.growingSegmentLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if releaseErr != nil {
|
||||||
|
return releaseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if hasLevel0 {
|
if hasLevel0 {
|
||||||
@ -907,3 +935,17 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []
|
|||||||
func (sd *shardDelegator) GetTargetVersion() int64 {
|
func (sd *shardDelegator) GetTargetVersion() int64 {
|
||||||
return sd.distribution.getTargetVersion()
|
return sd.distribution.getTargetVersion()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sd *shardDelegator) AddExcludedSegments(excludeInfo map[int64]uint64) {
|
||||||
|
sd.excludedSegments.Insert(excludeInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sd *shardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool {
|
||||||
|
return sd.excludedSegments.Verify(segmentID, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sd *shardDelegator) TryCleanExcludedSegments(ts uint64) {
|
||||||
|
if sd.excludedSegments.ShouldClean() {
|
||||||
|
sd.excludedSegments.CleanInvalid(ts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
bloom "github.com/bits-and-blooms/bloom/v3"
|
bloom "github.com/bits-and-blooms/bloom/v3"
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
@ -69,6 +70,11 @@ type DelegatorDataSuite struct {
|
|||||||
func (s *DelegatorDataSuite) SetupSuite() {
|
func (s *DelegatorDataSuite) SetupSuite() {
|
||||||
paramtable.Init()
|
paramtable.Init()
|
||||||
paramtable.SetNodeID(1)
|
paramtable.SetNodeID(1)
|
||||||
|
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key, "1")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *DelegatorDataSuite) TearDownSuite() {
|
||||||
|
paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DelegatorDataSuite) SetupTest() {
|
func (s *DelegatorDataSuite) SetupTest() {
|
||||||
@ -1147,6 +1153,20 @@ func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
|
|||||||
s.Equal(2, len(result.Pks))
|
s.Equal(2, len(result.Pks))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *DelegatorDataSuite) TestDelegatorData_ExcludeSegments() {
|
||||||
|
s.delegator.AddExcludedSegments(map[int64]uint64{
|
||||||
|
1: 3,
|
||||||
|
})
|
||||||
|
|
||||||
|
s.False(s.delegator.VerifyExcludedSegments(1, 1))
|
||||||
|
s.True(s.delegator.VerifyExcludedSegments(1, 5))
|
||||||
|
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
s.delegator.TryCleanExcludedSegments(4)
|
||||||
|
s.True(s.delegator.VerifyExcludedSegments(1, 1))
|
||||||
|
s.True(s.delegator.VerifyExcludedSegments(1, 5))
|
||||||
|
}
|
||||||
|
|
||||||
func TestDelegatorDataSuite(t *testing.T) {
|
func TestDelegatorDataSuite(t *testing.T) {
|
||||||
suite.Run(t, new(DelegatorDataSuite))
|
suite.Run(t, new(DelegatorDataSuite))
|
||||||
}
|
}
|
||||||
|
|||||||
88
internal/querynodev2/delegator/exclude_info.go
Normal file
88
internal/querynodev2/delegator/exclude_info.go
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package delegator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ExcludedSegments struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
segments map[int64]uint64 // segmentID -> Excluded TS
|
||||||
|
lastClean atomic.Time
|
||||||
|
cleanInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewExcludedSegments(cleanInterval time.Duration) *ExcludedSegments {
|
||||||
|
return &ExcludedSegments{
|
||||||
|
segments: make(map[int64]uint64),
|
||||||
|
cleanInterval: cleanInterval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ExcludedSegments) Insert(excludeInfo map[int64]uint64) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
for segmentID, ts := range excludeInfo {
|
||||||
|
log.Debug("add exclude info",
|
||||||
|
zap.Int64("segmentID", segmentID),
|
||||||
|
zap.Uint64("ts", ts),
|
||||||
|
)
|
||||||
|
s.segments[segmentID] = ts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// return false if segment has been excluded
|
||||||
|
func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
if excludeTs, ok := s.segments[segmentID]; ok && ts <= excludeTs {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ExcludedSegments) CleanInvalid(ts uint64) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
invalidExcludedInfos := []int64{}
|
||||||
|
for segmentsID, excludeTs := range s.segments {
|
||||||
|
if excludeTs < ts {
|
||||||
|
invalidExcludedInfos = append(invalidExcludedInfos, segmentsID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, segmentID := range invalidExcludedInfos {
|
||||||
|
delete(s.segments, segmentID)
|
||||||
|
log.Info("remove segment from exclude info", zap.Int64("segmentID", segmentID))
|
||||||
|
}
|
||||||
|
s.lastClean.Store(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ExcludedSegments) ShouldClean() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return time.Since(s.lastClean.Load()) > s.cleanInterval
|
||||||
|
}
|
||||||
56
internal/querynodev2/delegator/exclude_info_test.go
Normal file
56
internal/querynodev2/delegator/exclude_info_test.go
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package delegator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ExcludedInfoSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
|
||||||
|
excludedSegments ExcludedSegments
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ExcludedInfoSuite) SetupSuite() {
|
||||||
|
s.excludedSegments = *NewExcludedSegments(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ExcludedInfoSuite) TestBasic() {
|
||||||
|
s.excludedSegments.Insert(map[int64]uint64{
|
||||||
|
1: 3,
|
||||||
|
})
|
||||||
|
|
||||||
|
s.False(s.excludedSegments.Verify(1, 1))
|
||||||
|
s.True(s.excludedSegments.Verify(1, 4))
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
s.True(s.excludedSegments.ShouldClean())
|
||||||
|
s.excludedSegments.CleanInvalid(5)
|
||||||
|
s.Len(s.excludedSegments.segments, 0)
|
||||||
|
|
||||||
|
s.True(s.excludedSegments.Verify(1, 1))
|
||||||
|
s.True(s.excludedSegments.Verify(1, 4))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExcludedInfoSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(ExcludedInfoSuite))
|
||||||
|
}
|
||||||
@ -253,6 +253,39 @@ func (_c *MockShardDelegator_GetTargetVersion_Call) RunAndReturn(run func() int6
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddExcludedSegments provides a mock function with given fields: excludeInfo
|
||||||
|
func (_m *MockShardDelegator) AddExcludedSegments(excludeInfo map[int64]uint64) {
|
||||||
|
_m.Called(excludeInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockShardDelegator_AddExcludedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddExcludedSegments'
|
||||||
|
type MockShardDelegator_AddExcludedSegments_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddExcludedSegments is a helper method to define mock.On call
|
||||||
|
// - excludeInfo map[int64]uint64
|
||||||
|
func (_e *MockShardDelegator_Expecter) AddExcludedSegments(excludeInfo interface{}) *MockShardDelegator_AddExcludedSegments_Call {
|
||||||
|
return &MockShardDelegator_AddExcludedSegments_Call{Call: _e.mock.On("AddExcludedSegments", excludeInfo)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_AddExcludedSegments_Call) Run(run func(excludeInfo map[int64]uint64)) *MockShardDelegator_AddExcludedSegments_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(map[int64]uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_AddExcludedSegments_Call) Return() *MockShardDelegator_AddExcludedSegments_Call {
|
||||||
|
_c.Call.Return()
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_AddExcludedSegments_Call) RunAndReturn(run func(map[int64]uint64)) *MockShardDelegator_AddExcludedSegments_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// LoadGrowing provides a mock function with given fields: ctx, infos, version
|
// LoadGrowing provides a mock function with given fields: ctx, infos, version
|
||||||
func (_m *MockShardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
|
func (_m *MockShardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
|
||||||
ret := _m.Called(ctx, infos, version)
|
ret := _m.Called(ctx, infos, version)
|
||||||
@ -763,6 +796,82 @@ func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TryCleanExcludedSegments provides a mock function with given fields: ts
|
||||||
|
func (_m *MockShardDelegator) TryCleanExcludedSegments(ts uint64) {
|
||||||
|
_m.Called(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockShardDelegator_TryCleanExcludedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TryCleanExcludedSegments'
|
||||||
|
type MockShardDelegator_TryCleanExcludedSegments_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryCleanExcludedSegments is a helper method to define mock.On call
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *MockShardDelegator_Expecter) TryCleanExcludedSegments(ts interface{}) *MockShardDelegator_TryCleanExcludedSegments_Call {
|
||||||
|
return &MockShardDelegator_TryCleanExcludedSegments_Call{Call: _e.mock.On("TryCleanExcludedSegments", ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) Run(run func(ts uint64)) *MockShardDelegator_TryCleanExcludedSegments_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) Return() *MockShardDelegator_TryCleanExcludedSegments_Call {
|
||||||
|
_c.Call.Return()
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_TryCleanExcludedSegments_Call) RunAndReturn(run func(uint64)) *MockShardDelegator_TryCleanExcludedSegments_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
// VerifyExcludedSegments provides a mock function with given fields: segmentID, ts
|
||||||
|
func (_m *MockShardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool {
|
||||||
|
ret := _m.Called(segmentID, ts)
|
||||||
|
|
||||||
|
var r0 bool
|
||||||
|
if rf, ok := ret.Get(0).(func(int64, uint64) bool); ok {
|
||||||
|
r0 = rf(segmentID, ts)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockShardDelegator_VerifyExcludedSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'VerifyExcludedSegments'
|
||||||
|
type MockShardDelegator_VerifyExcludedSegments_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// VerifyExcludedSegments is a helper method to define mock.On call
|
||||||
|
// - segmentID int64
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *MockShardDelegator_Expecter) VerifyExcludedSegments(segmentID interface{}, ts interface{}) *MockShardDelegator_VerifyExcludedSegments_Call {
|
||||||
|
return &MockShardDelegator_VerifyExcludedSegments_Call{Call: _e.mock.On("VerifyExcludedSegments", segmentID, ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_VerifyExcludedSegments_Call) Run(run func(segmentID int64, ts uint64)) *MockShardDelegator_VerifyExcludedSegments_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(int64), args[1].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_VerifyExcludedSegments_Call) Return(_a0 bool) *MockShardDelegator_VerifyExcludedSegments_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockShardDelegator_VerifyExcludedSegments_Call) RunAndReturn(run func(int64, uint64) bool) *MockShardDelegator_VerifyExcludedSegments_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// Version provides a mock function with given fields:
|
// Version provides a mock function with given fields:
|
||||||
func (_m *MockShardDelegator) Version() int64 {
|
func (_m *MockShardDelegator) Version() int64 {
|
||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
|
||||||
base "github.com/milvus-io/milvus/internal/util/pipeline"
|
base "github.com/milvus-io/milvus/internal/util/pipeline"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
@ -37,10 +38,11 @@ type filterNode struct {
|
|||||||
*BaseNode
|
*BaseNode
|
||||||
collectionID UniqueID
|
collectionID UniqueID
|
||||||
manager *DataManager
|
manager *DataManager
|
||||||
excludedSegments *ExcludedSegments
|
|
||||||
channel string
|
channel string
|
||||||
InsertMsgPolicys []InsertMsgFilter
|
InsertMsgPolicys []InsertMsgFilter
|
||||||
DeleteMsgPolicys []DeleteMsgFilter
|
DeleteMsgPolicys []DeleteMsgFilter
|
||||||
|
|
||||||
|
delegator delegator.ShardDelegator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fNode *filterNode) Operate(in Msg) Msg {
|
func (fNode *filterNode) Operate(in Msg) Msg {
|
||||||
@ -95,9 +97,7 @@ func (fNode *filterNode) Operate(in Msg) Msg {
|
|||||||
out.append(msg)
|
out.append(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if fNode.excludedSegments.ShouldClean() {
|
fNode.delegator.TryCleanExcludedSegments(streamMsgPack.EndTs)
|
||||||
fNode.excludedSegments.CleanInvalid(streamMsgPack.EndTs)
|
|
||||||
}
|
|
||||||
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc()
|
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc()
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -115,6 +115,14 @@ func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check segment whether excluded
|
||||||
|
ok := fNode.delegator.VerifyExcludedSegments(insertMsg.SegmentID, insertMsg.EndTimestamp)
|
||||||
|
if !ok {
|
||||||
|
m := fmt.Sprintf("Segment excluded, id: %d", insertMsg.GetSegmentID())
|
||||||
|
return merr.WrapErrSegmentLack(insertMsg.GetSegmentID(), m)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
case commonpb.MsgType_Delete:
|
case commonpb.MsgType_Delete:
|
||||||
deleteMsg := msg.(*msgstream.DeleteMsg)
|
deleteMsg := msg.(*msgstream.DeleteMsg)
|
||||||
metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(deleteMsg.Size()))
|
metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(deleteMsg.Size()))
|
||||||
@ -134,7 +142,7 @@ func newFilterNode(
|
|||||||
collectionID int64,
|
collectionID int64,
|
||||||
channel string,
|
channel string,
|
||||||
manager *DataManager,
|
manager *DataManager,
|
||||||
excludedSegments *ExcludedSegments,
|
delegator delegator.ShardDelegator,
|
||||||
maxQueueLength int32,
|
maxQueueLength int32,
|
||||||
) *filterNode {
|
) *filterNode {
|
||||||
return &filterNode{
|
return &filterNode{
|
||||||
@ -142,12 +150,11 @@ func newFilterNode(
|
|||||||
collectionID: collectionID,
|
collectionID: collectionID,
|
||||||
manager: manager,
|
manager: manager,
|
||||||
channel: channel,
|
channel: channel,
|
||||||
excludedSegments: excludedSegments,
|
delegator: delegator,
|
||||||
InsertMsgPolicys: []InsertMsgFilter{
|
InsertMsgPolicys: []InsertMsgFilter{
|
||||||
InsertNotAligned,
|
InsertNotAligned,
|
||||||
InsertEmpty,
|
InsertEmpty,
|
||||||
InsertOutOfTarget,
|
InsertOutOfTarget,
|
||||||
InsertExcluded,
|
|
||||||
},
|
},
|
||||||
DeleteMsgPolicys: []DeleteMsgFilter{
|
DeleteMsgPolicys: []DeleteMsgFilter{
|
||||||
DeleteNotAligned,
|
DeleteNotAligned,
|
||||||
|
|||||||
@ -18,12 +18,13 @@ package pipeline
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
@ -38,7 +39,6 @@ type FilterNodeSuite struct {
|
|||||||
channel string
|
channel string
|
||||||
|
|
||||||
validSegmentIDs []int64
|
validSegmentIDs []int64
|
||||||
excludedSegments *ExcludedSegments
|
|
||||||
excludedSegmentIDs []int64
|
excludedSegmentIDs []int64
|
||||||
insertSegmentIDs []int64
|
insertSegmentIDs []int64
|
||||||
deleteSegmentSum int
|
deleteSegmentSum int
|
||||||
@ -47,6 +47,8 @@ type FilterNodeSuite struct {
|
|||||||
|
|
||||||
// mocks
|
// mocks
|
||||||
manager *segments.Manager
|
manager *segments.Manager
|
||||||
|
|
||||||
|
delegator *delegator.MockShardDelegator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *FilterNodeSuite) SetupSuite() {
|
func (suite *FilterNodeSuite) SetupSuite() {
|
||||||
@ -61,13 +63,7 @@ func (suite *FilterNodeSuite) SetupSuite() {
|
|||||||
suite.deleteSegmentSum = 4
|
suite.deleteSegmentSum = 4
|
||||||
suite.errSegmentID = 7
|
suite.errSegmentID = 7
|
||||||
|
|
||||||
// init excludedSegment
|
suite.delegator = delegator.NewMockShardDelegator(suite.T())
|
||||||
suite.excludedSegments = NewExcludedSegments(0 * time.Second)
|
|
||||||
excludeInfo := map[int64]uint64{}
|
|
||||||
for _, id := range suite.excludedSegmentIDs {
|
|
||||||
excludeInfo[id] = 1
|
|
||||||
}
|
|
||||||
suite.excludedSegments.Insert(excludeInfo)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// test filter node with collection load collection
|
// test filter node with collection load collection
|
||||||
@ -91,7 +87,11 @@ func (suite *FilterNodeSuite) TestWithLoadCollection() {
|
|||||||
Segment: mockSegmentManager,
|
Segment: mockSegmentManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.excludedSegments, 8)
|
suite.delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, ts uint64) bool {
|
||||||
|
return !(lo.Contains(suite.excludedSegmentIDs, segmentID) && ts <= 1)
|
||||||
|
})
|
||||||
|
suite.delegator.EXPECT().TryCleanExcludedSegments(mock.Anything)
|
||||||
|
node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8)
|
||||||
in := suite.buildMsgPack()
|
in := suite.buildMsgPack()
|
||||||
out := node.Operate(in)
|
out := node.Operate(in)
|
||||||
|
|
||||||
@ -124,7 +124,11 @@ func (suite *FilterNodeSuite) TestWithLoadPartation() {
|
|||||||
Segment: mockSegmentManager,
|
Segment: mockSegmentManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.excludedSegments, 8)
|
suite.delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, ts uint64) bool {
|
||||||
|
return !(lo.Contains(suite.excludedSegmentIDs, segmentID) && ts <= 1)
|
||||||
|
})
|
||||||
|
suite.delegator.EXPECT().TryCleanExcludedSegments(mock.Anything)
|
||||||
|
node := newFilterNode(suite.collectionID, suite.channel, suite.manager, suite.delegator, 8)
|
||||||
in := suite.buildMsgPack()
|
in := suite.buildMsgPack()
|
||||||
out := node.Operate(in)
|
out := node.Operate(in)
|
||||||
|
|
||||||
|
|||||||
@ -17,14 +17,6 @@
|
|||||||
package pipeline
|
package pipeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.uber.org/atomic"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -59,15 +51,6 @@ func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error {
|
|
||||||
ok := n.excludedSegments.Verify(msg.SegmentID, msg.EndTimestamp)
|
|
||||||
if !ok {
|
|
||||||
m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID())
|
|
||||||
return merr.WrapErrSegmentLack(msg.GetSegmentID(), m)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func DeleteNotAligned(n *filterNode, c *Collection, msg *DeleteMsg) error {
|
func DeleteNotAligned(n *filterNode, c *Collection, msg *DeleteMsg) error {
|
||||||
err := msg.CheckAligned()
|
err := msg.CheckAligned()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -91,60 +74,3 @@ func DeleteOutOfTarget(n *filterNode, c *Collection, msg *DeleteMsg) error {
|
|||||||
// all growing will be in-memory to support dynamic partition load/release
|
// all growing will be in-memory to support dynamic partition load/release
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExcludedSegments struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
segments map[int64]uint64 // segmentID -> Excluded TS
|
|
||||||
lastClean atomic.Time
|
|
||||||
cleanInterval time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewExcludedSegments(cleanInterval time.Duration) *ExcludedSegments {
|
|
||||||
return &ExcludedSegments{
|
|
||||||
segments: make(map[int64]uint64),
|
|
||||||
cleanInterval: cleanInterval,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ExcludedSegments) Insert(excludeInfo map[int64]uint64) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
for segmentID, ts := range excludeInfo {
|
|
||||||
log.Debug("add exclude info",
|
|
||||||
zap.Int64("segmentID", segmentID),
|
|
||||||
zap.Uint64("ts", ts),
|
|
||||||
)
|
|
||||||
s.segments[segmentID] = ts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ExcludedSegments) Verify(segmentID int64, ts uint64) bool {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
if excludeTs, ok := s.segments[segmentID]; ok && ts <= excludeTs {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ExcludedSegments) CleanInvalid(ts uint64) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
invalidExcludedInfos := []int64{}
|
|
||||||
for segmentsID, excludeTs := range s.segments {
|
|
||||||
if excludeTs < ts {
|
|
||||||
invalidExcludedInfos = append(invalidExcludedInfos, segmentsID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, segmentID := range invalidExcludedInfos {
|
|
||||||
delete(s.segments, segmentID)
|
|
||||||
}
|
|
||||||
s.lastClean.Store(time.Now())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ExcludedSegments) ShouldClean() bool {
|
|
||||||
return time.Since(s.lastClean.Load()) > s.cleanInterval
|
|
||||||
}
|
|
||||||
|
|||||||
@ -17,8 +17,6 @@
|
|||||||
package pipeline
|
package pipeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
|
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
|
||||||
base "github.com/milvus-io/milvus/internal/util/pipeline"
|
base "github.com/milvus-io/milvus/internal/util/pipeline"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
@ -29,20 +27,14 @@ import (
|
|||||||
// pipeline used for querynode
|
// pipeline used for querynode
|
||||||
type Pipeline interface {
|
type Pipeline interface {
|
||||||
base.StreamPipeline
|
base.StreamPipeline
|
||||||
ExcludedSegments(info map[int64]uint64)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type pipeline struct {
|
type pipeline struct {
|
||||||
base.StreamPipeline
|
base.StreamPipeline
|
||||||
|
|
||||||
excludedSegments *ExcludedSegments
|
|
||||||
collectionID UniqueID
|
collectionID UniqueID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pipeline) ExcludedSegments(excludeInfo map[int64]uint64) { //(segInfos ...*datapb.SegmentInfo) {
|
|
||||||
p.excludedSegments.Insert(excludeInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pipeline) Close() {
|
func (p *pipeline) Close() {
|
||||||
p.StreamPipeline.Close()
|
p.StreamPipeline.Close()
|
||||||
metrics.CleanupQueryNodeCollectionMetrics(paramtable.GetNodeID(), p.collectionID)
|
metrics.CleanupQueryNodeCollectionMetrics(paramtable.GetNodeID(), p.collectionID)
|
||||||
@ -57,15 +49,13 @@ func NewPipeLine(
|
|||||||
delegator delegator.ShardDelegator,
|
delegator delegator.ShardDelegator,
|
||||||
) (Pipeline, error) {
|
) (Pipeline, error) {
|
||||||
pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
|
pipelineQueueLength := paramtable.Get().QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
|
||||||
excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second))
|
|
||||||
|
|
||||||
p := &pipeline{
|
p := &pipeline{
|
||||||
collectionID: collectionID,
|
collectionID: collectionID,
|
||||||
excludedSegments: excludedSegments,
|
|
||||||
StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel),
|
StreamPipeline: base.NewPipelineWithStream(dispatcher, nodeCtxTtInterval, enableTtChecker, channel),
|
||||||
}
|
}
|
||||||
|
|
||||||
filterNode := newFilterNode(collectionID, channel, manager, excludedSegments, pipelineQueueLength)
|
filterNode := newFilterNode(collectionID, channel, manager, delegator, pipelineQueueLength)
|
||||||
insertNode := newInsertNode(collectionID, channel, manager, delegator, pipelineQueueLength)
|
insertNode := newInsertNode(collectionID, channel, manager, delegator, pipelineQueueLength)
|
||||||
deleteNode := newDeleteNode(collectionID, channel, manager, tSafeManager, delegator, pipelineQueueLength)
|
deleteNode := newDeleteNode(collectionID, channel, manager, tSafeManager, delegator, pipelineQueueLength)
|
||||||
p.Add(filterNode, insertNode, deleteNode)
|
p.Add(filterNode, insertNode, deleteNode)
|
||||||
|
|||||||
@ -119,6 +119,10 @@ func (suite *PipelineTestSuite) TestBasic() {
|
|||||||
suite.msgDispatcher.EXPECT().Deregister(suite.channel)
|
suite.msgDispatcher.EXPECT().Deregister(suite.channel)
|
||||||
|
|
||||||
// mock delegator
|
// mock delegator
|
||||||
|
suite.delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe()
|
||||||
|
suite.delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||||
|
suite.delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe()
|
||||||
|
|
||||||
suite.delegator.EXPECT().ProcessInsert(mock.Anything).Run(
|
suite.delegator.EXPECT().ProcessInsert(mock.Anything).Run(
|
||||||
func(insertRecords map[int64]*delegator.InsertData) {
|
func(insertRecords map[int64]*delegator.InsertData) {
|
||||||
for segmentID := range insertRecords {
|
for segmentID := range insertRecords {
|
||||||
|
|||||||
@ -291,17 +291,17 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||||||
info := req.GetSegmentInfos()[id]
|
info := req.GetSegmentInfos()[id]
|
||||||
return id, info.GetDmlPosition().GetTimestamp()
|
return id, info.GetDmlPosition().GetTimestamp()
|
||||||
})
|
})
|
||||||
pipeline.ExcludedSegments(growingInfo)
|
delegator.AddExcludedSegments(growingInfo)
|
||||||
|
|
||||||
flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) {
|
flushedInfo := lo.SliceToMap(channel.GetFlushedSegmentIds(), func(id int64) (int64, uint64) {
|
||||||
return id, typeutil.MaxTimestamp
|
return id, typeutil.MaxTimestamp
|
||||||
})
|
})
|
||||||
pipeline.ExcludedSegments(flushedInfo)
|
delegator.AddExcludedSegments(flushedInfo)
|
||||||
for _, channelInfo := range req.GetInfos() {
|
for _, channelInfo := range req.GetInfos() {
|
||||||
droppedInfos := lo.SliceToMap(channelInfo.GetDroppedSegmentIds(), func(id int64) (int64, uint64) {
|
droppedInfos := lo.SliceToMap(channelInfo.GetDroppedSegmentIds(), func(id int64) (int64, uint64) {
|
||||||
return id, typeutil.MaxTimestamp
|
return id, typeutil.MaxTimestamp
|
||||||
})
|
})
|
||||||
pipeline.ExcludedSegments(droppedInfos)
|
delegator.AddExcludedSegments(droppedInfos)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = loadL0Segments(ctx, delegator, req)
|
err = loadL0Segments(ctx, delegator, req)
|
||||||
@ -545,16 +545,6 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
|
|||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// when we try to release a segment, add it to pipeline's exclude list first
|
|
||||||
// in case of consumed it's growing segment again
|
|
||||||
pipeline := node.pipelineManager.Get(req.GetShard())
|
|
||||||
if pipeline != nil {
|
|
||||||
droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) {
|
|
||||||
return id, typeutil.MaxTimestamp
|
|
||||||
})
|
|
||||||
pipeline.ExcludedSegments(droppedInfos)
|
|
||||||
}
|
|
||||||
|
|
||||||
req.NeedTransfer = false
|
req.NeedTransfer = false
|
||||||
err := delegator.ReleaseSegments(ctx, req, false)
|
err := delegator.ReleaseSegments(ctx, req, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1320,14 +1310,10 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||||||
})
|
})
|
||||||
case querypb.SyncType_UpdateVersion:
|
case querypb.SyncType_UpdateVersion:
|
||||||
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
|
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
|
||||||
pipeline := node.pipelineManager.Get(req.GetChannel())
|
|
||||||
if pipeline != nil {
|
|
||||||
droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) {
|
droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) {
|
||||||
return id, typeutil.MaxTimestamp
|
return id, typeutil.MaxTimestamp
|
||||||
})
|
})
|
||||||
|
shardDelegator.AddExcludedSegments(droppedInfos)
|
||||||
pipeline.ExcludedSegments(droppedInfos)
|
|
||||||
}
|
|
||||||
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
|
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
|
||||||
action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint())
|
action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint())
|
||||||
default:
|
default:
|
||||||
|
|||||||
@ -883,8 +883,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
|||||||
suite.node.delegators.Insert(suite.vchannel, delegator)
|
suite.node.delegators.Insert(suite.vchannel, delegator)
|
||||||
defer suite.node.delegators.GetAndRemove(suite.vchannel)
|
defer suite.node.delegators.GetAndRemove(suite.vchannel)
|
||||||
|
|
||||||
delegator.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe()
|
||||||
Return(nil)
|
delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||||
|
delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe()
|
||||||
|
delegator.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).Return(nil)
|
||||||
// data
|
// data
|
||||||
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false)
|
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false)
|
||||||
req := &querypb.LoadSegmentsRequest{
|
req := &querypb.LoadSegmentsRequest{
|
||||||
@ -932,6 +934,9 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
|||||||
delegator := &delegator.MockShardDelegator{}
|
delegator := &delegator.MockShardDelegator{}
|
||||||
suite.node.delegators.Insert(suite.vchannel, delegator)
|
suite.node.delegators.Insert(suite.vchannel, delegator)
|
||||||
defer suite.node.delegators.GetAndRemove(suite.vchannel)
|
defer suite.node.delegators.GetAndRemove(suite.vchannel)
|
||||||
|
delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe()
|
||||||
|
delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||||
|
delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe()
|
||||||
delegator.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
delegator.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
||||||
Return(errors.New("mocked error"))
|
Return(errors.New("mocked error"))
|
||||||
// data
|
// data
|
||||||
@ -1091,6 +1096,9 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() {
|
|||||||
suite.node.delegators.Insert(suite.vchannel, delegator)
|
suite.node.delegators.Insert(suite.vchannel, delegator)
|
||||||
defer suite.node.delegators.GetAndRemove(suite.vchannel)
|
defer suite.node.delegators.GetAndRemove(suite.vchannel)
|
||||||
|
|
||||||
|
delegator.EXPECT().AddExcludedSegments(mock.Anything).Maybe()
|
||||||
|
delegator.EXPECT().VerifyExcludedSegments(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||||
|
delegator.EXPECT().TryCleanExcludedSegments(mock.Anything).Maybe()
|
||||||
delegator.EXPECT().ReleaseSegments(mock.Anything, mock.AnythingOfType("*querypb.ReleaseSegmentsRequest"), false).
|
delegator.EXPECT().ReleaseSegments(mock.Anything, mock.AnythingOfType("*querypb.ReleaseSegmentsRequest"), false).
|
||||||
Return(errors.New("mocked error"))
|
Return(errors.New("mocked error"))
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user