mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
issue: #42942 This pr includes the following changes: 1. Added checks for index checker in querycoord to generate drop index tasks 2. Added drop index interface to querynode 3. To avoid search failure after dropping the index, the querynode allows the use of lazy mode (warmup=disable) to load raw data even when indexes contain raw data. 4. In segcore, loading the index no longer deletes raw data; instead, it evicts it. 5. In expr, the index is pinned to prevent concurrent errors. --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
1004 lines
32 KiB
Go
1004 lines
32 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package delegator
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/samber/lo"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
|
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/segcorepb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/conc"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/retry"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
// delegator data related part
|
|
|
|
// InsertData
|
|
type InsertData struct {
|
|
RowIDs []int64
|
|
PrimaryKeys []storage.PrimaryKey
|
|
Timestamps []uint64
|
|
InsertRecord *segcorepb.InsertRecord
|
|
BM25Stats map[int64]*storage.BM25Stats
|
|
|
|
StartPosition *msgpb.MsgPosition
|
|
PartitionID int64
|
|
}
|
|
|
|
type DeleteData struct {
|
|
PartitionID int64
|
|
PrimaryKeys []storage.PrimaryKey
|
|
Timestamps []uint64
|
|
RowCount int64
|
|
}
|
|
|
|
// Append appends another delete data into this one.
|
|
func (d *DeleteData) Append(ad DeleteData) {
|
|
d.PrimaryKeys = append(d.PrimaryKeys, ad.PrimaryKeys...)
|
|
d.Timestamps = append(d.Timestamps, ad.Timestamps...)
|
|
d.RowCount += ad.RowCount
|
|
}
|
|
|
|
// ProcessInsert handles insert data in delegator.
|
|
func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|
method := "ProcessInsert"
|
|
tr := timerecord.NewTimeRecorder(method)
|
|
log := sd.getLogger(context.Background())
|
|
for segmentID, insertData := range insertRecords {
|
|
growing := sd.segmentManager.GetGrowing(segmentID)
|
|
newGrowingSegment := false
|
|
if growing == nil {
|
|
var err error
|
|
// TODO: It's a wired implementation that growing segment have load info.
|
|
// we should separate the growing segment and sealed segment by type system.
|
|
growing, err = segments.NewSegment(
|
|
context.Background(),
|
|
sd.collection,
|
|
sd.segmentManager,
|
|
segments.SegmentTypeGrowing,
|
|
0,
|
|
&querypb.SegmentLoadInfo{
|
|
SegmentID: segmentID,
|
|
PartitionID: insertData.PartitionID,
|
|
CollectionID: sd.collectionID,
|
|
InsertChannel: sd.vchannelName,
|
|
StartPosition: insertData.StartPosition,
|
|
DeltaPosition: insertData.StartPosition,
|
|
Level: datapb.SegmentLevel_L1,
|
|
},
|
|
)
|
|
if err != nil {
|
|
log.Error("failed to create new segment",
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err))
|
|
panic(err)
|
|
}
|
|
newGrowingSegment = true
|
|
}
|
|
|
|
err := growing.Insert(context.Background(), insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord)
|
|
if err != nil {
|
|
log.Error("failed to insert data into growing segment",
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Error(err),
|
|
)
|
|
if errors.IsAny(err, merr.ErrSegmentNotLoaded, merr.ErrSegmentNotFound) {
|
|
log.Warn("try to insert data into released segment, skip it", zap.Error(err))
|
|
continue
|
|
}
|
|
// panic here, insert failure
|
|
panic(err)
|
|
}
|
|
growing.UpdateBloomFilter(insertData.PrimaryKeys)
|
|
|
|
if newGrowingSegment {
|
|
sd.growingSegmentLock.Lock()
|
|
// check whether segment has been excluded
|
|
// all segment in excluded segment should not be add again
|
|
// don not check excluded ts
|
|
// because dropped segment in excluded segment may use wrong excluded ts
|
|
// which use checkpoint ts as excluded ts
|
|
// but checkpoint_ts < segment_end_ts cause exclueded data is not filtered out at filter node
|
|
// should be excluded here
|
|
if ok := sd.VerifyExcludedSegments(segmentID, 0); !ok {
|
|
log.Warn("try to insert data into released segment, skip it", zap.Int64("segmentID", segmentID))
|
|
sd.growingSegmentLock.Unlock()
|
|
growing.Release(context.Background())
|
|
continue
|
|
}
|
|
|
|
if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) {
|
|
// register created growing segment after insert, avoid to add empty growing to delegator
|
|
sd.pkOracle.Register(growing, paramtable.GetNodeID())
|
|
if sd.idfOracle != nil {
|
|
sd.idfOracle.Register(segmentID, insertData.BM25Stats, segments.SegmentTypeGrowing)
|
|
}
|
|
sd.segmentManager.Put(context.Background(), segments.SegmentTypeGrowing, growing)
|
|
sd.addGrowing(SegmentEntry{
|
|
NodeID: paramtable.GetNodeID(),
|
|
SegmentID: segmentID,
|
|
PartitionID: insertData.PartitionID,
|
|
Version: 0,
|
|
TargetVersion: initialTargetVersion,
|
|
})
|
|
}
|
|
|
|
sd.growingSegmentLock.Unlock()
|
|
} else if sd.idfOracle != nil {
|
|
sd.idfOracle.UpdateGrowing(growing.ID(), insertData.BM25Stats)
|
|
}
|
|
log.Info("insert into growing segment",
|
|
zap.Int64("collectionID", growing.Collection()),
|
|
zap.Int64("segmentID", segmentID),
|
|
zap.Int("rowCount", len(insertData.RowIDs)),
|
|
zap.Uint64("maxTimestamp", insertData.Timestamps[len(insertData.Timestamps)-1]),
|
|
)
|
|
}
|
|
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).
|
|
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
|
}
|
|
|
|
// ProcessDelete handles delete data in delegator.
|
|
// delegator puts deleteData into buffer first,
|
|
// then dispatch data to segments acoording to the result of pkOracle.
|
|
func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
|
|
method := "ProcessDelete"
|
|
tr := timerecord.NewTimeRecorder(method)
|
|
// block load segment handle delete buffer
|
|
sd.deleteMut.Lock()
|
|
defer sd.deleteMut.Unlock()
|
|
|
|
log := sd.getLogger(context.Background())
|
|
|
|
log.Debug("start to process delete", zap.Uint64("ts", ts))
|
|
// add deleteData into buffer.
|
|
cacheItems := make([]deletebuffer.BufferItem, 0, len(deleteData))
|
|
for _, entry := range deleteData {
|
|
cacheItems = append(cacheItems, deletebuffer.BufferItem{
|
|
PartitionID: entry.PartitionID,
|
|
DeleteData: storage.DeleteData{
|
|
Pks: entry.PrimaryKeys,
|
|
Tss: entry.Timestamps,
|
|
RowCount: entry.RowCount,
|
|
},
|
|
})
|
|
}
|
|
|
|
sd.deleteBuffer.Put(&deletebuffer.Item{
|
|
Ts: ts,
|
|
Data: cacheItems,
|
|
})
|
|
|
|
sd.forwardStreamingDeletion(context.Background(), deleteData)
|
|
|
|
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
|
|
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
|
}
|
|
|
|
type BatchApplyRet = struct {
|
|
DeleteDataIdx int
|
|
StartIdx int
|
|
Segment2Hits map[int64][]bool
|
|
}
|
|
|
|
func (sd *shardDelegator) applyBFInParallel(deleteDatas []*DeleteData, pool *conc.Pool[any]) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
|
|
retIdx := 0
|
|
retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]()
|
|
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
|
|
|
|
var futures []*conc.Future[any]
|
|
for didx, data := range deleteDatas {
|
|
pks := data.PrimaryKeys
|
|
for idx := 0; idx < len(pks); idx += batchSize {
|
|
startIdx := idx
|
|
endIdx := startIdx + batchSize
|
|
if endIdx > len(pks) {
|
|
endIdx = len(pks)
|
|
}
|
|
|
|
retIdx += 1
|
|
tmpRetIndex := retIdx
|
|
deleteDataId := didx
|
|
partitionID := data.PartitionID
|
|
future := pool.Submit(func() (any, error) {
|
|
ret := sd.pkOracle.BatchGet(pks[startIdx:endIdx], pkoracle.WithPartitionID(partitionID))
|
|
retMap.Insert(tmpRetIndex, &BatchApplyRet{
|
|
DeleteDataIdx: deleteDataId,
|
|
StartIdx: startIdx,
|
|
Segment2Hits: ret,
|
|
})
|
|
return nil, nil
|
|
})
|
|
futures = append(futures, future)
|
|
}
|
|
}
|
|
conc.AwaitAll(futures...)
|
|
|
|
return retMap
|
|
}
|
|
|
|
// applyDelete handles delete record and apply them to corresponding workers.
|
|
func (sd *shardDelegator) applyDelete(ctx context.Context,
|
|
nodeID int64,
|
|
worker cluster.Worker,
|
|
delRecords func(segmentID int64) (DeleteData, bool),
|
|
entries []SegmentEntry,
|
|
scope querypb.DataScope,
|
|
) []int64 {
|
|
offlineSegments := typeutil.NewConcurrentSet[int64]()
|
|
log := sd.getLogger(ctx)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0) * 4)
|
|
defer pool.Release()
|
|
|
|
var futures []*conc.Future[struct{}]
|
|
for _, segmentEntry := range entries {
|
|
segmentEntry := segmentEntry
|
|
delRecord, ok := delRecords(segmentEntry.SegmentID)
|
|
log := log.With(
|
|
zap.Int64("segmentID", segmentEntry.SegmentID),
|
|
zap.Int64("workerID", nodeID),
|
|
zap.Int("forwardRowCount", len(delRecord.PrimaryKeys)),
|
|
)
|
|
if ok {
|
|
future := pool.Submit(func() (struct{}, error) {
|
|
log.Debug("delegator plan to applyDelete via worker")
|
|
err := retry.Handle(ctx, func() (bool, error) {
|
|
if sd.Stopped() {
|
|
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
|
|
}
|
|
|
|
err := worker.Delete(ctx, &querypb.DeleteRequest{
|
|
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
|
|
CollectionId: sd.collectionID,
|
|
PartitionId: segmentEntry.PartitionID,
|
|
VchannelName: sd.vchannelName,
|
|
SegmentId: segmentEntry.SegmentID,
|
|
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
|
|
Timestamps: delRecord.Timestamps,
|
|
Scope: scope,
|
|
})
|
|
if errors.Is(err, merr.ErrNodeNotFound) {
|
|
log.Warn("try to delete data on non-exist node")
|
|
// cancel other request
|
|
cancel()
|
|
return false, err
|
|
} else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) {
|
|
log.Warn("try to delete data of released segment")
|
|
return false, nil
|
|
} else if err != nil {
|
|
log.Warn("worker failed to delete on segment", zap.Error(err))
|
|
return true, err
|
|
}
|
|
return false, nil
|
|
}, retry.Attempts(10))
|
|
if err != nil {
|
|
log.Warn("apply delete for segment failed, marking it offline")
|
|
offlineSegments.Insert(segmentEntry.SegmentID)
|
|
}
|
|
return struct{}{}, err
|
|
})
|
|
futures = append(futures, future)
|
|
}
|
|
}
|
|
conc.AwaitAll(futures...)
|
|
return offlineSegments.Collect()
|
|
}
|
|
|
|
// markSegmentOffline makes segment go offline and waits for QueryCoord to fix.
|
|
func (sd *shardDelegator) markSegmentOffline(segmentIDs ...int64) {
|
|
sd.distribution.MarkOfflineSegments(segmentIDs...)
|
|
}
|
|
|
|
// addGrowing add growing segment record for delegator.
|
|
func (sd *shardDelegator) addGrowing(entries ...SegmentEntry) {
|
|
log := sd.getLogger(context.Background())
|
|
log.Info("add growing segments to delegator", zap.Int64s("segmentIDs", lo.Map(entries, func(entry SegmentEntry, _ int) int64 {
|
|
return entry.SegmentID
|
|
})))
|
|
sd.distribution.AddGrowing(entries...)
|
|
}
|
|
|
|
// LoadGrowing load growing segments locally.
|
|
func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
|
|
log := sd.getLogger(ctx)
|
|
|
|
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
|
|
log.Info("loading growing segments...", zap.Int64s("segmentIDs", segmentIDs))
|
|
loaded, err := sd.loader.Load(ctx, sd.collectionID, segments.SegmentTypeGrowing, version, infos...)
|
|
if err != nil {
|
|
log.Warn("failed to load growing segment", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
for _, segment := range loaded {
|
|
err = sd.addL0ForGrowing(ctx, segment)
|
|
if err != nil {
|
|
log.Warn("failed to forward L0 deletions to growing segment",
|
|
zap.Error(err),
|
|
)
|
|
|
|
// clear loaded growing segments
|
|
for _, segment := range loaded {
|
|
segment.Release(ctx)
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() })
|
|
log.Info("load growing segments done", zap.Int64s("segmentIDs", segmentIDs))
|
|
|
|
for _, segment := range loaded {
|
|
sd.pkOracle.Register(segment, paramtable.GetNodeID())
|
|
if sd.idfOracle != nil {
|
|
sd.idfOracle.Register(segment.ID(), segment.GetBM25Stats(), segments.SegmentTypeGrowing)
|
|
}
|
|
}
|
|
sd.addGrowing(lo.Map(loaded, func(segment segments.Segment, _ int) SegmentEntry {
|
|
return SegmentEntry{
|
|
NodeID: paramtable.GetNodeID(),
|
|
SegmentID: segment.ID(),
|
|
PartitionID: segment.Partition(),
|
|
Version: version,
|
|
TargetVersion: sd.distribution.getTargetVersion(),
|
|
}
|
|
})...)
|
|
return nil
|
|
}
|
|
|
|
// LoadSegments load segments local or remotely depends on the target node.
|
|
func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
|
|
if len(req.GetInfos()) == 0 {
|
|
return nil
|
|
}
|
|
|
|
log := sd.getLogger(ctx)
|
|
|
|
targetNodeID := req.GetDstNodeID()
|
|
// add common log fields
|
|
log = log.With(
|
|
zap.Int64("workID", targetNodeID),
|
|
zap.Int64s("segments", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })),
|
|
)
|
|
|
|
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
|
|
return merr.WrapErrServiceInternal("load L0 segment is not supported, l0 segment should only be loaded by watchChannel")
|
|
}
|
|
|
|
// pin all segments to prevent delete buffer has been cleaned up during worker load segments
|
|
// Note: if delete records is pinned, it will skip cleanup during SyncTargetVersion
|
|
// which means after segment is loaded, then delete buffer will be cleaned up by next SyncTargetVersion call
|
|
for _, info := range req.GetInfos() {
|
|
sd.deleteBuffer.Pin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID())
|
|
}
|
|
defer func() {
|
|
for _, info := range req.GetInfos() {
|
|
sd.deleteBuffer.Unpin(info.GetStartPosition().GetTimestamp(), info.GetSegmentID())
|
|
}
|
|
}()
|
|
|
|
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
|
if err != nil {
|
|
log.Warn("delegator failed to find worker", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
req.Base.TargetID = targetNodeID
|
|
log.Debug("worker loads segments...")
|
|
|
|
sLoad := func(ctx context.Context, req *querypb.LoadSegmentsRequest) error {
|
|
segmentID := req.GetInfos()[0].GetSegmentID()
|
|
nodeID := req.GetDstNodeID()
|
|
_, err, _ := sd.sf.Do(fmt.Sprintf("%d-%d", nodeID, segmentID), func() (struct{}, error) {
|
|
err := worker.LoadSegments(ctx, req)
|
|
return struct{}{}, err
|
|
})
|
|
return err
|
|
}
|
|
|
|
// separate infos into different load task
|
|
if len(req.GetInfos()) > 1 {
|
|
var reqs []*querypb.LoadSegmentsRequest
|
|
for _, info := range req.GetInfos() {
|
|
newReq := typeutil.Clone(req)
|
|
newReq.Infos = []*querypb.SegmentLoadInfo{info}
|
|
reqs = append(reqs, newReq)
|
|
}
|
|
|
|
group, ctx := errgroup.WithContext(ctx)
|
|
for _, req := range reqs {
|
|
req := req
|
|
group.Go(func() error {
|
|
return sLoad(ctx, req)
|
|
})
|
|
}
|
|
err = group.Wait()
|
|
} else {
|
|
err = sLoad(ctx, req)
|
|
}
|
|
|
|
if err != nil {
|
|
log.Warn("worker failed to load segments", zap.Error(err))
|
|
return err
|
|
}
|
|
log.Debug("work loads segments done")
|
|
|
|
// load index segment need no stream delete and distribution change
|
|
if req.GetLoadScope() == querypb.LoadScope_Index {
|
|
return nil
|
|
}
|
|
|
|
entries := lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) SegmentEntry {
|
|
return SegmentEntry{
|
|
SegmentID: info.GetSegmentID(),
|
|
PartitionID: info.GetPartitionID(),
|
|
NodeID: req.GetDstNodeID(),
|
|
Version: req.GetVersion(),
|
|
Level: info.GetLevel(),
|
|
}
|
|
})
|
|
// load bloom filter only when candidate not exists
|
|
infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool {
|
|
return !sd.pkOracle.Exists(pkoracle.NewCandidateKey(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed), targetNodeID)
|
|
})
|
|
|
|
var bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats]
|
|
if sd.idfOracle != nil {
|
|
bm25Stats, err = sd.loader.LoadBM25Stats(ctx, req.GetCollectionID(), infos...)
|
|
if err != nil {
|
|
log.Warn("failed to load bm25 stats for segment", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
candidates, err := sd.loader.LoadBloomFilterSet(ctx, req.GetCollectionID(), req.GetVersion(), infos...)
|
|
if err != nil {
|
|
log.Warn("failed to load bloom filter set for segment", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
log.Debug("load delete...")
|
|
err = sd.loadStreamDelete(ctx, candidates, bm25Stats, infos, req, targetNodeID, worker)
|
|
if err != nil {
|
|
log.Warn("load stream delete failed", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return sd.addDistributionIfVersionOK(req.GetLoadMeta().GetSchemaVersion(), entries...)
|
|
}
|
|
|
|
func (sd *shardDelegator) addDistributionIfVersionOK(version uint64, entries ...SegmentEntry) error {
|
|
sd.schemaChangeMutex.Lock()
|
|
defer sd.schemaChangeMutex.Unlock()
|
|
if version < sd.schemaVersion {
|
|
return merr.WrapErrServiceInternal("schema version changed")
|
|
}
|
|
|
|
// alter distribution
|
|
sd.distribution.AddDistributions(entries...)
|
|
return nil
|
|
}
|
|
|
|
// LoadGrowing load growing segments locally.
|
|
func (sd *shardDelegator) LoadL0(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error {
|
|
log := sd.getLogger(ctx)
|
|
|
|
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
|
|
log.Info("loading l0 segments...", zap.Int64s("segmentIDs", segmentIDs))
|
|
|
|
loaded := make([]segments.Segment, 0)
|
|
if sd.l0ForwardPolicy == L0ForwardPolicyRemoteLoad {
|
|
for _, info := range infos {
|
|
l0Seg, err := segments.NewL0Segment(sd.collection, segments.SegmentTypeSealed, version, info)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
loaded = append(loaded, l0Seg)
|
|
}
|
|
} else {
|
|
var err error
|
|
loaded, err = sd.loader.Load(ctx, sd.collectionID, segments.SegmentTypeSealed, version, infos...)
|
|
if err != nil {
|
|
log.Warn("failed to load l0 segment", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
segmentIDs = lo.Map(loaded, func(segment segments.Segment, _ int) int64 { return segment.ID() })
|
|
log.Info("load l0 segments done", zap.Int64s("segmentIDs", segmentIDs))
|
|
|
|
sd.deleteBuffer.RegisterL0(loaded...)
|
|
// register l0 segment
|
|
sd.RefreshLevel0DeletionStats()
|
|
return nil
|
|
}
|
|
|
|
func (sd *shardDelegator) rangeHitL0Deletions(partitionID int64, candidate pkoracle.Candidate, fn func(pk storage.PrimaryKey, ts uint64) error) error {
|
|
level0Segments := sd.deleteBuffer.ListL0()
|
|
|
|
if len(level0Segments) == 0 {
|
|
return nil
|
|
}
|
|
|
|
log := sd.getLogger(context.Background())
|
|
start := time.Now()
|
|
totalL0Rows := 0
|
|
totalBfHitRows := int64(0)
|
|
processedL0Count := 0
|
|
|
|
for _, segment := range level0Segments {
|
|
segment := segment.(*segments.L0Segment)
|
|
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
|
|
segmentPks, segmentTss := segment.DeleteRecords()
|
|
totalL0Rows += len(segmentPks)
|
|
processedL0Count++
|
|
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
|
|
|
|
for idx := 0; idx < len(segmentPks); idx += batchSize {
|
|
endIdx := idx + batchSize
|
|
if endIdx > len(segmentPks) {
|
|
endIdx = len(segmentPks)
|
|
}
|
|
|
|
lc := storage.NewBatchLocationsCache(segmentPks[idx:endIdx])
|
|
hits := candidate.BatchPkExist(lc)
|
|
for i, hit := range hits {
|
|
if hit {
|
|
totalBfHitRows += 1
|
|
if err := fn(segmentPks[idx+i], segmentTss[idx+i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Info("forward delete from L0 segments to worker",
|
|
zap.Int64("targetSegmentID", candidate.ID()),
|
|
zap.String("channel", sd.vchannelName),
|
|
zap.Int("l0SegmentCount", processedL0Count),
|
|
zap.Int("totalDeleteRowsInL0", totalL0Rows),
|
|
zap.Int64("totalBfHitRows", totalBfHitRows),
|
|
zap.Int64("totalCost", time.Since(start).Milliseconds()),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) {
|
|
deltaData := storage.NewDeltaData(0)
|
|
|
|
sd.rangeHitL0Deletions(partitionID, candidate, func(pk storage.PrimaryKey, ts uint64) error {
|
|
deltaData.Append(pk, ts)
|
|
return nil
|
|
})
|
|
|
|
return deltaData.DeletePks(), deltaData.DeleteTimestamps()
|
|
}
|
|
|
|
func (sd *shardDelegator) StreamForwardLevel0Deletions(bufferedForwarder *BufferForwarder, partitionID int64, candidate pkoracle.Candidate) error {
|
|
err := sd.rangeHitL0Deletions(partitionID, candidate, func(pk storage.PrimaryKey, ts uint64) error {
|
|
return bufferedForwarder.Buffer(pk, ts)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bufferedForwarder.Flush()
|
|
}
|
|
|
|
func (sd *shardDelegator) RefreshLevel0DeletionStats() {
|
|
level0Segments := sd.deleteBuffer.ListL0()
|
|
totalSize := int64(0)
|
|
for _, segment := range level0Segments {
|
|
segment := segment.(*segments.L0Segment)
|
|
pks, tss := segment.DeleteRecords()
|
|
totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
|
|
}
|
|
|
|
metrics.QueryNodeNumSegments.WithLabelValues(
|
|
fmt.Sprint(paramtable.GetNodeID()),
|
|
fmt.Sprint(sd.Collection()),
|
|
commonpb.SegmentState_Sealed.String(),
|
|
datapb.SegmentLevel_L0.String(),
|
|
).Set(float64(len(level0Segments)))
|
|
|
|
metrics.QueryNodeLevelZeroSize.WithLabelValues(
|
|
fmt.Sprint(paramtable.GetNodeID()),
|
|
fmt.Sprint(sd.collectionID),
|
|
sd.vchannelName,
|
|
).Set(float64(totalSize))
|
|
}
|
|
|
|
func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
|
|
candidates []*pkoracle.BloomFilterSet,
|
|
bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats],
|
|
infos []*querypb.SegmentLoadInfo,
|
|
req *querypb.LoadSegmentsRequest,
|
|
targetNodeID int64,
|
|
worker cluster.Worker,
|
|
) error {
|
|
log := sd.getLogger(ctx)
|
|
|
|
idCandidates := lo.SliceToMap(candidates, func(candidate *pkoracle.BloomFilterSet) (int64, *pkoracle.BloomFilterSet) {
|
|
return candidate.ID(), candidate
|
|
})
|
|
for _, info := range infos {
|
|
candidate := idCandidates[info.GetSegmentID()]
|
|
// forward l0 deletion
|
|
err := sd.forwardL0Deletion(ctx, info, req, candidate, targetNodeID, worker)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
sd.deleteMut.RLock()
|
|
defer sd.deleteMut.RUnlock()
|
|
// apply buffered delete for new segments
|
|
// no goroutines here since qnv2 has no load merging logic
|
|
for _, info := range infos {
|
|
candidate := idCandidates[info.GetSegmentID()]
|
|
// after L0 segment feature
|
|
// growing segemnts should have load stream delete as well
|
|
deleteScope := querypb.DataScope_All
|
|
switch candidate.Type() {
|
|
case commonpb.SegmentState_Sealed:
|
|
deleteScope = querypb.DataScope_Historical
|
|
case commonpb.SegmentState_Growing:
|
|
deleteScope = querypb.DataScope_Streaming
|
|
}
|
|
|
|
bufferedForwarder := NewBufferedForwarder(paramtable.Get().QueryNodeCfg.ForwardBatchSize.GetAsInt64(),
|
|
deleteViaWorker(ctx, worker, targetNodeID, info, deleteScope))
|
|
|
|
// list buffered delete
|
|
deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp())
|
|
tsHitDeleteRows := int64(0)
|
|
bfHitDeleteRows := int64(0)
|
|
start := time.Now()
|
|
for _, entry := range deleteRecords {
|
|
for _, record := range entry.Data {
|
|
tsHitDeleteRows += int64(len(record.DeleteData.Pks))
|
|
if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID {
|
|
continue
|
|
}
|
|
pks := record.DeleteData.Pks
|
|
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
|
|
for idx := 0; idx < len(pks); idx += batchSize {
|
|
endIdx := idx + batchSize
|
|
if endIdx > len(pks) {
|
|
endIdx = len(pks)
|
|
}
|
|
|
|
lc := storage.NewBatchLocationsCache(pks[idx:endIdx])
|
|
hits := candidate.BatchPkExist(lc)
|
|
for i, hit := range hits {
|
|
if hit {
|
|
bfHitDeleteRows += 1
|
|
err := bufferedForwarder.Buffer(pks[idx+i], record.DeleteData.Tss[idx+i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
log.Info("forward delete to worker...",
|
|
zap.String("channel", info.InsertChannel),
|
|
zap.Int64("segmentID", info.GetSegmentID()),
|
|
zap.Time("startPosition", tsoutil.PhysicalTime(info.GetStartPosition().GetTimestamp())),
|
|
zap.Int64("tsHitDeleteRowNum", tsHitDeleteRows),
|
|
zap.Int64("bfHitDeleteRowNum", bfHitDeleteRows),
|
|
zap.Int64("bfCost", time.Since(start).Milliseconds()),
|
|
)
|
|
err := bufferedForwarder.Flush()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// add candidate after load success
|
|
for _, candidate := range candidates {
|
|
log.Info("register sealed segment bfs into pko candidates",
|
|
zap.Int64("segmentID", candidate.ID()),
|
|
)
|
|
sd.pkOracle.Register(candidate, targetNodeID)
|
|
}
|
|
|
|
if sd.idfOracle != nil && bm25Stats != nil {
|
|
bm25Stats.Range(func(segmentID int64, stats map[int64]*storage.BM25Stats) bool {
|
|
log.Info("register sealed segment bm25 stats into idforacle",
|
|
zap.Int64("segmentID", segmentID),
|
|
)
|
|
sd.idfOracle.Register(segmentID, stats, segments.SegmentTypeSealed)
|
|
return false
|
|
})
|
|
}
|
|
|
|
log.Info("load delete done")
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReleaseSegments releases segments local or remotely depending on the target node.
|
|
func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error {
|
|
log := sd.getLogger(ctx)
|
|
|
|
targetNodeID := req.GetNodeID()
|
|
level0Segments := typeutil.NewSet(lo.Map(sd.deleteBuffer.ListL0(), func(segment segments.Segment, _ int) int64 {
|
|
return segment.ID()
|
|
})...)
|
|
hasLevel0 := false
|
|
for _, segmentID := range req.GetSegmentIDs() {
|
|
hasLevel0 = level0Segments.Contain(segmentID)
|
|
if hasLevel0 {
|
|
return merr.WrapErrServiceInternal("release L0 segment is not supported, l0 segment should only be released by unSubChannel/SyncDataDistribution")
|
|
}
|
|
}
|
|
|
|
// add common log fields
|
|
log = log.With(
|
|
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
|
zap.Int64("nodeID", req.GetNodeID()),
|
|
zap.String("scope", req.GetScope().String()),
|
|
zap.Bool("force", force))
|
|
|
|
log.Info("delegator start to release segments")
|
|
// alter distribution first
|
|
var sealed, growing []SegmentEntry
|
|
convertSealed := func(segmentID int64, _ int) SegmentEntry {
|
|
return SegmentEntry{
|
|
SegmentID: segmentID,
|
|
NodeID: targetNodeID,
|
|
}
|
|
}
|
|
convertGrowing := func(segmentID int64, _ int) SegmentEntry {
|
|
return SegmentEntry{
|
|
SegmentID: segmentID,
|
|
}
|
|
}
|
|
switch req.GetScope() {
|
|
case querypb.DataScope_All:
|
|
sealed = lo.Map(req.GetSegmentIDs(), convertSealed)
|
|
growing = lo.Map(req.GetSegmentIDs(), convertGrowing)
|
|
case querypb.DataScope_Streaming:
|
|
growing = lo.Map(req.GetSegmentIDs(), convertGrowing)
|
|
case querypb.DataScope_Historical:
|
|
sealed = lo.Map(req.GetSegmentIDs(), convertSealed)
|
|
}
|
|
signal := sd.distribution.RemoveDistributions(sealed, growing)
|
|
// wait cleared signal
|
|
<-signal
|
|
|
|
if len(growing) > 0 {
|
|
sd.growingSegmentLock.Lock()
|
|
}
|
|
// when we try to release a segment, add it to pipeline's exclude list first
|
|
// in case of consumed it's growing segment again
|
|
droppedInfos := lo.SliceToMap(req.GetSegmentIDs(), func(id int64) (int64, uint64) {
|
|
if req.GetCheckpoint() == nil {
|
|
return id, typeutil.MaxTimestamp
|
|
}
|
|
|
|
return id, req.GetCheckpoint().GetTimestamp()
|
|
})
|
|
sd.AddExcludedSegments(droppedInfos)
|
|
|
|
if len(sealed) > 0 {
|
|
sd.pkOracle.Remove(
|
|
pkoracle.WithSegmentIDs(lo.Map(sealed, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...),
|
|
pkoracle.WithSegmentType(commonpb.SegmentState_Sealed),
|
|
pkoracle.WithWorkerID(targetNodeID),
|
|
)
|
|
}
|
|
if len(growing) > 0 {
|
|
sd.pkOracle.Remove(
|
|
pkoracle.WithSegmentIDs(lo.Map(growing, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...),
|
|
pkoracle.WithSegmentType(commonpb.SegmentState_Growing),
|
|
)
|
|
}
|
|
|
|
var releaseErr error
|
|
if !force {
|
|
worker, err := sd.workerManager.GetWorker(ctx, targetNodeID)
|
|
if err != nil {
|
|
log.Warn("delegator failed to find worker", zap.Error(err))
|
|
releaseErr = err
|
|
}
|
|
req.Base.TargetID = targetNodeID
|
|
err = worker.ReleaseSegments(ctx, req)
|
|
if err != nil {
|
|
log.Warn("worker failed to release segments", zap.Error(err))
|
|
releaseErr = err
|
|
}
|
|
}
|
|
if len(growing) > 0 {
|
|
sd.growingSegmentLock.Unlock()
|
|
}
|
|
|
|
if releaseErr != nil {
|
|
return releaseErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sd *shardDelegator) SyncTargetVersion(action *querypb.SyncAction, partitions []int64) {
|
|
sd.distribution.SyncTargetVersion(action, partitions)
|
|
// clean delete buffer after distribution becomes serviceable
|
|
if sd.distribution.queryView.Serviceable() {
|
|
checkpoint := action.GetCheckpoint()
|
|
deleteSeekPos := action.GetDeleteCP()
|
|
if deleteSeekPos == nil {
|
|
// for compatible with 2.4, we use checkpoint as deleteCP when deleteCP is nil
|
|
deleteSeekPos = checkpoint
|
|
log.Info("use checkpoint as deleteCP",
|
|
zap.String("channelName", sd.vchannelName),
|
|
zap.Time("deleteSeekPos", tsoutil.PhysicalTime(action.GetCheckpoint().GetTimestamp())))
|
|
}
|
|
|
|
start := time.Now()
|
|
sizeBeforeClean, _ := sd.deleteBuffer.Size()
|
|
l0NumBeforeClean := len(sd.deleteBuffer.ListL0())
|
|
sd.deleteBuffer.UnRegister(deleteSeekPos.GetTimestamp())
|
|
sizeAfterClean, _ := sd.deleteBuffer.Size()
|
|
l0NumAfterClean := len(sd.deleteBuffer.ListL0())
|
|
|
|
if sizeAfterClean < sizeBeforeClean || l0NumAfterClean < l0NumBeforeClean {
|
|
log.Info("clean delete buffer",
|
|
zap.String("channel", sd.vchannelName),
|
|
zap.Time("deleteSeekPos", tsoutil.PhysicalTime(deleteSeekPos.GetTimestamp())),
|
|
zap.Time("channelCP", tsoutil.PhysicalTime(checkpoint.GetTimestamp())),
|
|
zap.Int64("sizeBeforeClean", sizeBeforeClean),
|
|
zap.Int64("sizeAfterClean", sizeAfterClean),
|
|
zap.Int("l0NumBeforeClean", l0NumBeforeClean),
|
|
zap.Int("l0NumAfterClean", l0NumAfterClean),
|
|
zap.Duration("cost", time.Since(start)),
|
|
)
|
|
}
|
|
sd.RefreshLevel0DeletionStats()
|
|
}
|
|
}
|
|
|
|
func (sd *shardDelegator) GetChannelQueryView() *channelQueryView {
|
|
return sd.distribution.GetQueryView()
|
|
}
|
|
|
|
func (sd *shardDelegator) AddExcludedSegments(excludeInfo map[int64]uint64) {
|
|
sd.excludedSegments.Insert(excludeInfo)
|
|
}
|
|
|
|
func (sd *shardDelegator) VerifyExcludedSegments(segmentID int64, ts uint64) bool {
|
|
return sd.excludedSegments.Verify(segmentID, ts)
|
|
}
|
|
|
|
func (sd *shardDelegator) TryCleanExcludedSegments(ts uint64) {
|
|
if sd.excludedSegments.ShouldClean() {
|
|
sd.excludedSegments.CleanInvalid(ts)
|
|
}
|
|
}
|
|
|
|
func (sd *shardDelegator) buildBM25IDF(req *internalpb.SearchRequest) (float64, error) {
|
|
pb := &commonpb.PlaceholderGroup{}
|
|
proto.Unmarshal(req.GetPlaceholderGroup(), pb)
|
|
|
|
if len(pb.Placeholders) != 1 || len(pb.Placeholders[0].Values) == 0 {
|
|
return 0, merr.WrapErrParameterInvalidMsg("please provide varchar/text for BM25 Function based search")
|
|
}
|
|
|
|
holder := pb.Placeholders[0]
|
|
if holder.Type != commonpb.PlaceholderType_VarChar {
|
|
return 0, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("please provide varchar/text for BM25 Function based search, got %s", holder.Type.String()))
|
|
}
|
|
|
|
texts := funcutil.GetVarCharFromPlaceholder(holder)
|
|
datas := []any{texts}
|
|
functionRunner, ok := sd.functionRunners[req.GetFieldId()]
|
|
if !ok {
|
|
return 0, fmt.Errorf("functionRunner not found for field: %d", req.GetFieldId())
|
|
}
|
|
|
|
if len(functionRunner.GetInputFields()) == 2 {
|
|
analyzerName := "default"
|
|
if name := req.GetAnalyzerName(); name != "" {
|
|
// use user provided analyzer name
|
|
analyzerName = name
|
|
}
|
|
|
|
analyzers := make([]string, len(texts))
|
|
for i := range texts {
|
|
analyzers[i] = analyzerName
|
|
}
|
|
datas = append(datas, analyzers)
|
|
}
|
|
|
|
// get search text term frequency
|
|
output, err := functionRunner.BatchRun(datas...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
tfArray, ok := output[0].(*schemapb.SparseFloatArray)
|
|
if !ok {
|
|
return 0, errors.New("functionRunner return unknown data")
|
|
}
|
|
|
|
idfSparseVector, avgdl, err := sd.idfOracle.BuildIDF(req.GetFieldId(), tfArray)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if avgdl <= 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
for _, idf := range idfSparseVector {
|
|
metrics.QueryNodeSearchFTSNumTokens.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(sd.collectionID), fmt.Sprint(req.GetFieldId())).Observe(float64(typeutil.SparseFloatRowElementCount(idf)))
|
|
}
|
|
|
|
err = SetBM25Params(req, avgdl)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
req.PlaceholderGroup = funcutil.SparseVectorDataToPlaceholderGroupBytes(idfSparseVector)
|
|
return avgdl, nil
|
|
}
|
|
|
|
func (sd *shardDelegator) DropIndex(ctx context.Context, req *querypb.DropIndexRequest) error {
|
|
workers := sd.workerManager.GetAllWorkers()
|
|
for _, worker := range workers {
|
|
if err := worker.DropIndex(ctx, req); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|