milvus/internal/querynode/flow_graph_delete_node.go
Xiaofan 2273e2a45a
Limit the go routine numbers (#25171)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
2023-06-28 10:54:46 +08:00

233 lines
7.1 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import (
"errors"
"fmt"
"reflect"
"runtime"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
)
type primaryKey = storage.PrimaryKey
type int64PrimaryKey = storage.Int64PrimaryKey
type varCharPrimaryKey = storage.VarCharPrimaryKey
var newInt64PrimaryKey = storage.NewInt64PrimaryKey
var newVarCharPrimaryKey = storage.NewVarCharPrimaryKey
var deletePool *concurrency.Pool
var deletePoolInitOnce sync.Once
func initDeletePool() {
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0), ants.WithPreAlloc(false))
if err != nil {
// shall no happen here
panic(err)
}
deletePool = pool
}
func getOrCreateDeletePool() *concurrency.Pool {
deletePoolInitOnce.Do(initDeletePool)
return deletePool
}
// deleteNode is the one of nodes in delta flow graph
type deleteNode struct {
baseNode
collectionID UniqueID
metaReplica ReplicaInterface // historical
deltaVchannel Channel
dmlVchannel Channel
pool *concurrency.Pool
}
// Name returns the name of deleteNode
func (dNode *deleteNode) Name() string {
return fmt.Sprintf("dNode-%s", dNode.deltaVchannel)
}
func (dNode *deleteNode) IsValidInMsg(in []Msg) bool {
if !dNode.baseNode.IsValidInMsg(in) {
return false
}
_, ok := in[0].(*deleteMsg)
if !ok {
log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name()))
return false
}
return true
}
// Operate handles input messages, do delete operations
func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
dMsg, ok := in[0].(*deleteMsg)
if !ok {
log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name()))
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range dMsg.deleteMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()
if dMsg.IsCloseMsg() {
return []Msg{
&serviceTimeMsg{BaseMsg: flowgraph.NewBaseMsg(true)},
}
}
delData := &deleteData{
deleteIDs: map[UniqueID][]primaryKey{},
deleteTimestamps: map[UniqueID][]Timestamp{},
}
// 1. filter segment by bloom filter
for i, delMsg := range dMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Debug("delete in historical replica",
zap.String("vchannel", dNode.deltaVchannel),
zap.Int64("collectionID", delMsg.CollectionID),
zap.String("collectionName", delMsg.CollectionName),
zap.Int64("numPKs", delMsg.NumRows),
zap.Int("numTS", len(delMsg.Timestamps)),
zap.Uint64("timestampBegin", delMsg.BeginTs()),
zap.Uint64("timestampEnd", delMsg.EndTs()),
zap.Int("segmentNum", dNode.metaReplica.getSegmentNum(segmentTypeSealed)),
zap.String("traceID", traceID),
)
if dNode.metaReplica.getSegmentNum(segmentTypeSealed) != 0 {
err := processDeleteMessages(dNode.metaReplica, segmentTypeSealed, delMsg, delData, dNode.dmlVchannel)
if err != nil {
// error occurs when missing meta info or unexpected pk type, should not happen
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.deltaVchannel)
log.Error(err.Error())
panic(err)
}
}
}
// 2. do delete
wg := sync.WaitGroup{}
for segmentID := range delData.deleteIDs {
segmentID := segmentID
wg.Add(1)
go func() {
defer wg.Done()
err := dNode.delete(delData, segmentID)
if err != nil {
// error occurs when segment cannot be found, calling cgo function delete failed and etc...
log.Warn("failed to apply deletions to segment",
zap.Int64("segmentID", segmentID),
zap.Error(err),
)
// For cases: segment compacted, not loaded yet, or just released,
// to ignore the error,
// panic otherwise.
if !errors.Is(err, ErrSegmentNotFound) && !errors.Is(err, ErrSegmentUnhealthy) {
panic(err)
}
}
}()
}
wg.Wait()
var res Msg = &serviceTimeMsg{
timeRange: dMsg.timeRange,
}
return []Msg{res}
}
// delete will do delete operation at segment which id is segmentID
func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID) error {
targetSegment, err := dNode.metaReplica.getSegmentByID(segmentID, segmentTypeSealed)
if err != nil {
return WrapSegmentNotFound(segmentID)
}
if targetSegment.getType() != segmentTypeSealed {
return fmt.Errorf("unexpected segmentType when delete, segmentID = %d, segmentType = %s", segmentID, targetSegment.segmentType.String())
}
ids := deleteData.deleteIDs[segmentID]
timestamps := deleteData.deleteTimestamps[segmentID]
_, err = dNode.pool.Submit(func() (any, error) {
err := targetSegment.segmentDelete(ids, timestamps)
return nil, err
}).Await()
if err != nil {
return fmt.Errorf("segmentDelete failed, segmentID = %d, err=%w", segmentID, err)
}
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])),
zap.Int64("segmentID", segmentID),
zap.String("SegmentType", targetSegment.getType().String()),
zap.String("vchannel", dNode.deltaVchannel))
return nil
}
// newDeleteNode returns a new deleteNode
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, deltaVchannel Channel) (*deleteNode, error) {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
baseNode := baseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
dmlVChannel, err := funcutil.ConvertChannelName(deltaVchannel, Params.CommonCfg.RootCoordDelta, Params.CommonCfg.RootCoordDml)
if err != nil {
log.Error("failed to convert deltaVChannel to dmlVChannel", zap.String("deltaVChannel", deltaVchannel), zap.Error(err))
return nil, err
}
return &deleteNode{
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
deltaVchannel: deltaVchannel,
dmlVchannel: dmlVChannel,
pool: getOrCreateDeletePool(),
}, nil
}