From caf9cbfcd430e224e069c67c70f1aa9af6ad8807 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 14 Mar 2022 23:18:02 +0800 Subject: [PATCH] Fix load failure and remove parition release related code (#16038) Signed-off-by: bigsheeper --- internal/querynode/collection.go | 48 ++--------------- internal/querynode/collection_test.go | 15 ------ .../querynode/flow_graph_filter_dm_node.go | 21 -------- .../flow_graph_filter_dm_node_test.go | 24 --------- internal/querynode/historical.go | 3 -- internal/querynode/historical_test.go | 21 -------- internal/querynode/segment.go | 1 - internal/querynode/streaming.go | 3 -- internal/querynode/task.go | 54 +++++++------------ internal/querynode/task_test.go | 40 +++++++++++++- internal/querynode/type_def.go | 7 +-- 11 files changed, 64 insertions(+), 173 deletions(-) diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index 4ace4287a8..7ca238c042 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -29,7 +29,6 @@ package querynode */ import "C" import ( - "errors" "fmt" "github.com/milvus-io/milvus/internal/metrics" "math" @@ -79,9 +78,10 @@ func (c *Collection) addPartitionID(partitionID UniqueID) { c.releaseMu.Lock() defer c.releaseMu.Unlock() - log.Debug("queryNode collection add a partition", zap.Int64("collection", c.id), zap.Int64("partitionID", partitionID)) c.partitionIDs = append(c.partitionIDs, partitionID) - log.Debug("queryNode collection info after add a partition", zap.Int64("collectionID", c.id), zap.Int64s("partitions", c.partitionIDs), zap.Any("releasePartitions", c.releasedPartitions)) + log.Info("queryNode collection info after add a partition", + zap.Int64("partitionID", partitionID), zap.Int64("collectionID", c.id), + zap.Int64s("partitions", c.partitionIDs)) } // removePartitionID removes the partition id from partition id list of collection @@ -277,48 +277,6 @@ func (c *Collection) getReleaseTime() Timestamp { return c.releaseTime } -// addReleasedPartition records the partition to indicate that this partition has been released -func (c *Collection) addReleasedPartition(partitionID UniqueID) { - c.releaseMu.Lock() - defer c.releaseMu.Unlock() - - log.Debug("queryNode collection release a partition", zap.Int64("collectionID", c.id), zap.Int64("partition", partitionID)) - c.releasedPartitions[partitionID] = struct{}{} - partitions := make([]UniqueID, 0) - for _, id := range c.partitionIDs { - if id != partitionID { - partitions = append(partitions, id) - } - } - c.partitionIDs = partitions - log.Debug("queryNode collection info after release a partition", zap.Int64("collectionID", c.id), zap.Int64s("partitions", c.partitionIDs), zap.Any("releasePartitions", c.releasedPartitions)) -} - -// deleteReleasedPartition remove the released partition record from collection -func (c *Collection) deleteReleasedPartition(partitionID UniqueID) { - c.releaseMu.Lock() - defer c.releaseMu.Unlock() - - log.Debug("queryNode collection reload a released partition", zap.Int64("collectionID", c.id), zap.Int64("partition", partitionID)) - delete(c.releasedPartitions, partitionID) - log.Debug("queryNode collection info after reload a released partition", zap.Int64("collectionID", c.id), zap.Int64s("partitions", c.partitionIDs), zap.Any("releasePartitions", c.releasedPartitions)) -} - -// checkReleasedPartitions returns error if any partition has been released -func (c *Collection) checkReleasedPartitions(partitionIDs []UniqueID) error { - c.releaseMu.RLock() - defer c.releaseMu.RUnlock() - for _, id := range partitionIDs { - if _, ok := c.releasedPartitions[id]; ok { - return errors.New("partition has been released" + - ", collectionID = " + fmt.Sprintln(c.ID()) + - ", partitionID = " + fmt.Sprintln(id)) - } - } - - return nil -} - // setLoadType set the loading type of collection, which is loadTypeCollection or loadTypePartition func (c *Collection) setLoadType(l loadType) { c.loadType = l diff --git a/internal/querynode/collection_test.go b/internal/querynode/collection_test.go index e4c60ddebc..e4d335e546 100644 --- a/internal/querynode/collection_test.go +++ b/internal/querynode/collection_test.go @@ -121,21 +121,6 @@ func TestCollection_releaseTime(t *testing.T) { assert.Equal(t, t0, t1) } -func TestCollection_releasePartition(t *testing.T) { - collectionID := UniqueID(0) - collectionMeta := genTestCollectionMeta(collectionID, false) - - collection := newCollection(collectionMeta.ID, collectionMeta.Schema) - collection.addReleasedPartition(defaultPartitionID) - assert.Equal(t, 1, len(collection.releasedPartitions)) - err := collection.checkReleasedPartitions([]UniqueID{defaultPartitionID}) - assert.Error(t, err) - err = collection.checkReleasedPartitions([]UniqueID{UniqueID(1000)}) - assert.NoError(t, err) - collection.deleteReleasedPartition(defaultPartitionID) - assert.Equal(t, 0, len(collection.releasedPartitions)) -} - func TestCollection_loadType(t *testing.T) { collectionID := UniqueID(0) collectionMeta := genTestCollectionMeta(collectionID, false) diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index ad755b6ba3..95421543e8 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -129,19 +129,6 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg } } - // check if partition has been released - if col.getLoadType() == loadTypeCollection { - col, err := fdmNode.replica.getCollectionByID(msg.CollectionID) - if err != nil { - log.Warn(err.Error()) - return nil - } - if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil { - log.Warn(err.Error()) - return nil - } - } - if len(msg.PrimaryKeys) != len(msg.Timestamps) { log.Warn("Error, misaligned messages detected") return nil @@ -193,14 +180,6 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg } } - // check if partition has been released - if col.getLoadType() == loadTypeCollection { - if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil { - log.Warn(err.Error()) - return nil - } - } - // Check if the segment is in excluded segments, // messages after seekPosition may contain the redundant data from flushed slice of segment, // so we need to compare the endTimestamp of received messages and position's timestamp. diff --git a/internal/querynode/flow_graph_filter_dm_node_test.go b/internal/querynode/flow_graph_filter_dm_node_test.go index ee69d1a261..552d5ef361 100644 --- a/internal/querynode/flow_graph_filter_dm_node_test.go +++ b/internal/querynode/flow_graph_filter_dm_node_test.go @@ -95,18 +95,6 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { assert.Nil(t, res) }) - t.Run("test released partition", func(t *testing.T) { - msg, err := genSimpleInsertMsg() - assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) - assert.NoError(t, err) - col, err := fg.replica.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - col.addReleasedPartition(defaultPartitionID) - res := fg.filterInvalidInsertMessage(msg) - assert.Nil(t, res) - }) - t.Run("test no exclude segment", func(t *testing.T) { msg, err := genSimpleInsertMsg() assert.NoError(t, err) @@ -207,18 +195,6 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { assert.Nil(t, res) }) - t.Run("test delete released partition", func(t *testing.T) { - msg, err := genSimpleDeleteMsg() - assert.NoError(t, err) - fg, err := getFilterDMNode(ctx) - assert.NoError(t, err) - col, err := fg.replica.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - col.addReleasedPartition(defaultPartitionID) - res := fg.filterInvalidDeleteMessage(msg) - assert.Nil(t, res) - }) - t.Run("test delete misaligned messages", func(t *testing.T) { msg, err := genSimpleDeleteMsg() assert.NoError(t, err) diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go index fe1807e85c..de50f56313 100644 --- a/internal/querynode/historical.go +++ b/internal/querynode/historical.go @@ -151,9 +151,6 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID } if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection { - if err = col.checkReleasedPartitions(partIDs); err != nil { - return searchResults, searchSegmentIDs, searchPartIDs, err - } return searchResults, searchSegmentIDs, searchPartIDs, nil } diff --git a/internal/querynode/historical_test.go b/internal/querynode/historical_test.go index 0e5f3de3ee..31b5fa8942 100644 --- a/internal/querynode/historical_test.go +++ b/internal/querynode/historical_test.go @@ -104,25 +104,4 @@ func TestHistorical_Search(t *testing.T) { assert.Equal(t, 0, len(ids)) assert.NoError(t, err) }) - - t.Run("test load collection partition released in collection", func(t *testing.T) { - tSafe := newTSafeReplica() - his, err := genSimpleHistorical(ctx, tSafe) - assert.NoError(t, err) - - plan, searchReqs, err := genSimpleSearchPlanAndRequests(IndexFaissIDMap) - assert.NoError(t, err) - - col, err := his.replica.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - col.addReleasedPartition(defaultPartitionID) - - err = his.replica.removePartition(defaultPartitionID) - assert.NoError(t, err) - - res, ids, _, err := his.search(searchReqs, defaultCollectionID, []UniqueID{defaultPartitionID}, plan, Timestamp(0)) - assert.Equal(t, 0, len(res)) - assert.Equal(t, 0, len(ids)) - assert.Error(t, err) - }) } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 6c538c1a7b..0eb76a0650 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -505,7 +505,6 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps var cTimestampsPtr = (*C.uint64_t)(&(*timestamps)[0]) var cSizeofPerRow = C.int(sizeofPerRow) var cRawDataVoidPtr = unsafe.Pointer(&rawData[0]) - log.Debug("QueryNode::Segment::InsertBegin", zap.Any("cNumOfRows", cNumOfRows)) status := C.Insert(s.segmentPtr, cOffset, cNumOfRows, diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index 8d2208109c..99c55d77ad 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -154,9 +154,6 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs } if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection { - if err = col.checkReleasedPartitions(partIDs); err != nil { - return searchResults, searchSegmentIDs, searchPartIDs, err - } return searchResults, searchSegmentIDs, searchPartIDs, nil } diff --git a/internal/querynode/task.go b/internal/querynode/task.go index ba3acfadbe..d755fda60a 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -226,19 +226,13 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { collectionID := w.req.CollectionID partitionIDs := w.req.GetPartitionIDs() - var lType loadType - - switch w.req.GetLoadMeta().GetLoadType() { - case queryPb.LoadType_LoadCollection: - lType = loadTypeCollection - case queryPb.LoadType_LoadPartition: - lType = loadTypePartition - default: + lType := w.req.GetLoadMeta().GetLoadType() + if lType == queryPb.LoadType_UnKnownType { // if no partitionID is specified, load type is load collection if len(partitionIDs) != 0 { - lType = loadTypePartition + lType = queryPb.LoadType_LoadPartition } else { - lType = loadTypeCollection + lType = queryPb.LoadType_LoadCollection } } @@ -261,6 +255,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { log.Debug("Starting WatchDmChannels ...", zap.String("collectionName", w.req.Schema.Name), zap.Int64("collectionID", collectionID), + zap.Any("load type", lType), zap.Strings("vChannels", vChannels), zap.Strings("pChannels", pChannels), ) @@ -299,6 +294,17 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { Schema: w.req.GetSchema(), LoadMeta: w.req.GetLoadMeta(), } + + // update partition info from unFlushedSegments and loadMeta + for _, info := range req.Infos { + w.node.streaming.replica.addPartition(collectionID, info.PartitionID) + w.node.historical.replica.addPartition(collectionID, info.PartitionID) + } + for _, partitionID := range req.GetLoadMeta().GetPartitionIDs() { + w.node.historical.replica.addPartition(collectionID, partitionID) + w.node.streaming.replica.addPartition(collectionID, partitionID) + } + log.Debug("loading growing segments in WatchDmChannels...", zap.Int64("collectionID", collectionID), zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs), @@ -441,12 +447,6 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { hCol.addVChannels(vChannels) hCol.addPChannels(pChannels) hCol.setLoadType(lType) - for _, partitionID := range w.req.GetLoadMeta().GetPartitionIDs() { - sCol.deleteReleasedPartition(partitionID) - hCol.deleteReleasedPartition(partitionID) - w.node.streaming.replica.addPartition(collectionID, partitionID) - w.node.historical.replica.addPartition(collectionID, partitionID) - } log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels)) // create tSafe @@ -665,21 +665,6 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { return err } - for _, info := range l.req.Infos { - collectionID := info.CollectionID - partitionID := info.PartitionID - sCol, err := l.node.streaming.replica.getCollectionByID(collectionID) - if err != nil { - return err - } - sCol.deleteReleasedPartition(partitionID) - hCol, err := l.node.historical.replica.getCollectionByID(collectionID) - if err != nil { - return err - } - hCol.deleteReleasedPartition(partitionID) - } - log.Debug("LoadSegments done", zap.String("SegmentLoadInfos", fmt.Sprintln(l.req.Infos))) return nil } @@ -825,11 +810,11 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { time.Sleep(gracefulReleaseTime * time.Second) // get collection from streaming and historical - hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) + _, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) if err != nil { return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err) } - sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) + _, err = r.node.streaming.replica.getCollectionByID(r.req.CollectionID) if err != nil { return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err) } @@ -853,9 +838,6 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { log.Warn(err.Error()) } } - - hCol.addReleasedPartition(id) - sCol.addReleasedPartition(id) } log.Debug("Release partition task done", diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index 1021c4143a..5ef9ddef43 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -258,6 +258,11 @@ func TestTask_watchDmChannelsTask(t *testing.T) { req: genWatchDMChannelsRequest(), node: node, } + task.req.LoadMeta = &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadPartition, + CollectionID: defaultCollectionID, + PartitionIDs: []UniqueID{defaultPartitionID}, + } task.req.Infos = []*datapb.VchannelInfo{ { CollectionID: defaultCollectionID, @@ -377,6 +382,39 @@ func TestTask_watchDmChannelsTask(t *testing.T) { err = task.Execute(ctx) assert.Error(t, err) }) + + t.Run("test load growing segment", func(t *testing.T) { + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + task := watchDmChannelsTask{ + req: genWatchDMChannelsRequest(), + node: node, + } + + fieldBinlog, err := saveSimpleBinLog(ctx) + assert.NoError(t, err) + + task.req.Infos = []*datapb.VchannelInfo{ + { + CollectionID: defaultCollectionID, + ChannelName: defaultDMLChannel, + UnflushedSegments: []*datapb.SegmentInfo{ + { + CollectionID: defaultCollectionID, + PartitionID: defaultPartitionID + 1, // load a new partition + DmlPosition: &internalpb.MsgPosition{ + ChannelName: defaultDMLChannel, + Timestamp: typeutil.MaxTimestamp, + }, + Binlogs: fieldBinlog, + }, + }, + }, + } + err = task.Execute(ctx) + assert.NoError(t, err) + }) } func TestTask_watchDeltaChannelsTask(t *testing.T) { @@ -762,7 +800,7 @@ func TestTask_releasePartitionTask(t *testing.T) { assert.Error(t, err) }) - t.Run("test execute, remove deltaVChannel", func(t *testing.T) { + t.Run("test execute remove deltaVChannel", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) diff --git a/internal/querynode/type_def.go b/internal/querynode/type_def.go index 900963fde7..3f817aa527 100644 --- a/internal/querynode/type_def.go +++ b/internal/querynode/type_def.go @@ -17,6 +17,7 @@ package querynode import ( + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -49,9 +50,9 @@ type TimeRange struct { } // loadType is load collection or load partition -type loadType = int32 +type loadType = querypb.LoadType const ( - loadTypeCollection loadType = 0 - loadTypePartition loadType = 1 + loadTypeCollection = querypb.LoadType_LoadCollection + loadTypePartition = querypb.LoadType_LoadPartition )