Add released partitions and fix search error in empty partition (#5893)

* rename flowgraphType to loadType

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* add load type and released partitions

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* filter released partition

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-06-19 18:38:07 +08:00 committed by GitHub
parent 82053fbbf7
commit bbd8a7e13a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 135 additions and 34 deletions

View File

@ -23,6 +23,8 @@ package querynode
*/
import "C"
import (
"errors"
"fmt"
"math"
"sync"
"unsafe"
@ -42,8 +44,11 @@ type Collection struct {
vChannels []Channel
pChannels []Channel
releaseMu sync.RWMutex // guards releaseTime
releaseTime Timestamp
loadType loadType
releaseMu sync.RWMutex // guards release
releasedPartitions map[UniqueID]struct{}
releaseTime Timestamp
}
func (c *Collection) ID() UniqueID {
@ -102,6 +107,34 @@ func (c *Collection) getReleaseTime() Timestamp {
return c.releaseTime
}
func (c *Collection) addReleasedPartition(partitionID UniqueID) {
c.releaseMu.Lock()
defer c.releaseMu.Unlock()
c.releasedPartitions[partitionID] = struct{}{}
}
func (c *Collection) checkReleasedPartitions(partitionIDs []UniqueID) error {
c.releaseMu.RLock()
defer c.releaseMu.RUnlock()
for _, id := range partitionIDs {
if _, ok := c.releasedPartitions[id]; ok {
return errors.New("partition has been released" +
", collectionID = " + fmt.Sprintln(c.ID()) +
", partitionID = " + fmt.Sprintln(id))
}
}
return nil
}
func (c *Collection) setLoadType(l loadType) {
c.loadType = l
}
func (c *Collection) getLoadType() loadType {
return c.loadType
}
func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection {
/*
CCollection
@ -113,11 +146,12 @@ func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Co
collection := C.NewCollection(cSchemaBlob)
var newCollection = &Collection{
collectionPtr: collection,
id: collectionID,
schema: schema,
vChannels: make([]Channel, 0),
pChannels: make([]Channel, 0),
collectionPtr: collection,
id: collectionID,
schema: schema,
vChannels: make([]Channel, 0),
pChannels: make([]Channel, 0),
releasedPartitions: make(map[UniqueID]struct{}),
}
C.free(unsafe.Pointer(cSchemaBlob))

View File

@ -92,6 +92,8 @@ type collectionReplica struct {
partitions map[UniqueID]*Partition
segments map[UniqueID]*Segment
loadType
excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs
etcdKV *etcdkv.EtcdKV

View File

@ -23,11 +23,11 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
)
type flowGraphType = int32
type loadType = int32
const (
flowGraphTypeCollection = 0
flowGraphTypePartition = 1
loadTypeCollection = 0
loadTypePartition = 1
)
type dataSyncService struct {
@ -54,7 +54,7 @@ func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID,
// collection flow graph doesn't need partition id
partitionID := UniqueID(0)
newFlowGraph := newQueryNodeFlowGraph(dsService.ctx,
flowGraphTypeCollection,
loadTypeCollection,
collectionID,
partitionID,
dsService.streamingReplica,
@ -128,7 +128,7 @@ func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, p
}
for _, vChannel := range vChannels {
newFlowGraph := newQueryNodeFlowGraph(dsService.ctx,
flowGraphTypePartition,
loadTypePartition,
collectionID,
partitionID,
dsService.streamingReplica,

View File

@ -26,7 +26,7 @@ import (
type filterDmNode struct {
baseNode
graphType flowGraphType
loadType loadType
collectionID UniqueID
partitionID UniqueID
replica ReplicaInterface
@ -94,14 +94,14 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// check if collection and partition exist
collection := fdmNode.replica.hasCollection(msg.CollectionID)
partition := fdmNode.replica.hasPartition(msg.PartitionID)
if fdmNode.graphType == flowGraphTypeCollection && !collection {
if fdmNode.loadType == loadTypeCollection && !collection {
log.Debug("filter invalid insert message, collection dose not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
if fdmNode.graphType == flowGraphTypePartition && !partition {
if fdmNode.loadType == loadTypePartition && !partition {
log.Debug("filter invalid insert message, partition dose not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
@ -117,13 +117,26 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
}
// if the flow graph type is partition, check if the partition is target partition
if fdmNode.graphType == flowGraphTypePartition && msg.PartitionID != fdmNode.partitionID {
if fdmNode.loadType == loadTypePartition && msg.PartitionID != fdmNode.partitionID {
log.Debug("filter invalid insert message, partition is not the target partition",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
// check if partition has been released
if fdmNode.loadType == loadTypeCollection {
col, err := fdmNode.replica.getCollectionByID(msg.CollectionID)
if err != nil {
log.Error(err.Error())
return nil
}
if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil {
log.Warn(err.Error())
return nil
}
}
// Check if the segment is in excluded segments,
// messages after seekPosition may contain the redundant data from flushed slice of segment,
// so we need to compare the endTimestamp of received messages and position's timestamp.
@ -158,7 +171,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
}
func newFilteredDmNode(replica ReplicaInterface,
graphType flowGraphType,
loadType loadType,
collectionID UniqueID,
partitionID UniqueID) *filterDmNode {
@ -169,14 +182,14 @@ func newFilteredDmNode(replica ReplicaInterface,
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
if graphType != flowGraphTypeCollection && graphType != flowGraphTypePartition {
if loadType != loadTypeCollection && loadType != loadTypePartition {
err := errors.New("invalid flow graph type")
log.Error(err.Error())
}
return &filterDmNode{
baseNode: baseNode,
graphType: graphType,
loadType: loadType,
collectionID: collectionID,
partitionID: partitionID,
replica: replica,

View File

@ -34,7 +34,7 @@ type queryNodeFlowGraph struct {
}
func newQueryNodeFlowGraph(ctx context.Context,
flowGraphType flowGraphType,
loadType loadType,
collectionID UniqueID,
partitionID UniqueID,
streamingReplica ReplicaInterface,
@ -54,9 +54,9 @@ func newQueryNodeFlowGraph(ctx context.Context,
}
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
var filterDmNode node = newFilteredDmNode(streamingReplica, flowGraphType, collectionID, partitionID)
var filterDmNode node = newFilteredDmNode(streamingReplica, loadType, collectionID, partitionID)
var insertNode node = newInsertNode(streamingReplica)
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, flowGraphType, collectionID, partitionID, channel, factory)
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, loadType, collectionID, partitionID, channel, factory)
q.flowGraph.AddNode(dmStreamNode)
q.flowGraph.AddNode(filterDmNode)

View File

@ -23,7 +23,7 @@ import (
type serviceTimeNode struct {
baseNode
graphType flowGraphType
loadType loadType
collectionID UniqueID
partitionID UniqueID
vChannel Channel
@ -59,7 +59,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// update service time
var id UniqueID
if stNode.graphType == flowGraphTypePartition {
if stNode.loadType == loadTypePartition {
id = stNode.partitionID
} else {
id = stNode.collectionID
@ -106,7 +106,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
func newServiceTimeNode(ctx context.Context,
tSafeReplica TSafeReplicaInterface,
graphType flowGraphType,
loadType loadType,
collectionID UniqueID,
partitionID UniqueID,
channel Channel,
@ -131,7 +131,7 @@ func newServiceTimeNode(ctx context.Context,
return &serviceTimeNode{
baseNode: baseNode,
graphType: graphType,
loadType: loadType,
collectionID: collectionID,
partitionID: partitionID,
vChannel: channel,

View File

@ -99,14 +99,26 @@ func (h *historical) search(searchReqs []*searchRequest,
}
}
col, err := h.replica.getCollectionByID(collID)
if err != nil {
return nil, nil, err
}
// all partitions have been released
if len(searchPartIDs) == 0 {
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition {
return nil, nil, errors.New("partitions have been released , collectionID = " +
fmt.Sprintln(collID) +
"target partitionIDs = " +
fmt.Sprintln(partIDs))
}
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
if err = col.checkReleasedPartitions(partIDs); err != nil {
return nil, nil, err
}
return nil, nil, nil
}
log.Debug("doing search in historical",
zap.Any("collectionID", collID),
zap.Any("reqPartitionIDs", partIDs),

View File

@ -101,14 +101,26 @@ func (s *streaming) search(searchReqs []*searchRequest,
}
}
col, err := s.replica.getCollectionByID(collID)
if err != nil {
return nil, nil, err
}
// all partitions have been released
if len(searchPartIDs) == 0 {
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition {
return nil, nil, errors.New("partitions have been released , collectionID = " +
fmt.Sprintln(collID) +
"target partitionIDs = " +
fmt.Sprintln(partIDs))
}
if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
if err = col.checkReleasedPartitions(partIDs); err != nil {
return nil, nil, err
}
return nil, nil, nil
}
log.Debug("doing search in streaming",
zap.Any("collectionID", collID),
zap.Any("vChannel", vChannel),

View File

@ -166,18 +166,32 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
w.node.streaming.replica.initExcludedSegments(collectionID)
}
collection, err := w.node.streaming.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
collection.addVChannels(vChannels)
collection.addPChannels(pChannels)
if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical {
err := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
if err != nil {
return err
}
}
var l loadType
if loadPartition {
l = loadTypePartition
} else {
l = loadTypeCollection
}
sCol, err := w.node.streaming.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
sCol.addVChannels(vChannels)
sCol.addPChannels(pChannels)
sCol.setLoadType(l)
hCol, err := w.node.historical.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
hCol.addVChannels(vChannels)
hCol.addPChannels(pChannels)
hCol.setLoadType(l)
if loadPartition {
if hasPartitionInStreaming := w.node.streaming.replica.hasPartition(partitionID); !hasPartitionInStreaming {
err := w.node.streaming.replica.addPartition(collectionID, partitionID)
@ -475,6 +489,18 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Error(err.Error())
return err
}
sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Error(err.Error())
return err
}
for _, id := range r.req.PartitionIDs {
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
hasPartitionInHistorical := r.node.historical.replica.hasPartition(id)
@ -485,6 +511,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
log.Error(err.Error())
}
}
hCol.addReleasedPartition(id)
hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id)
if hasPartitionInStreaming {
@ -493,6 +520,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
log.Error(err.Error())
}
}
sCol.addReleasedPartition(id)
}
log.Debug("release partition task done",