From 70825a35cfc9f18f684f16032ce67bcea8fb6dff Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Fri, 20 May 2022 09:15:57 +0800 Subject: [PATCH] Make querynode LoadSegments interface idempotent (#17109) This PR makes the following changes: * separate LoadSegmentsTask into two phases: PreExecute and Execute * filters out segments that are already loaded in PreExecute phase Signed-off-by: Letian Jiang --- internal/querynode/task.go | 30 +++++++++++++++++--------- internal/querynode/task_test.go | 38 +++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/internal/querynode/task.go b/internal/querynode/task.go index bb076681d9..8d4a07f847 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -633,14 +633,8 @@ func (l *loadSegmentsTask) OnEnqueue() error { } func (l *loadSegmentsTask) PreExecute(ctx context.Context) error { - return nil -} - -func (l *loadSegmentsTask) Execute(ctx context.Context) error { - // TODO: support db - log.Info("LoadSegment start", zap.Int64("msgID", l.req.Base.MsgID)) + log.Info("LoadSegmentTask PreExecute start", zap.Int64("msgID", l.req.Base.MsgID)) var err error - // init meta collectionID := l.req.GetCollectionID() l.node.historical.replica.addCollection(collectionID, l.req.GetSchema()) @@ -656,13 +650,29 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { } } - err = l.node.loader.loadSegment(l.req, segmentTypeSealed) + // filter segments that are already loaded in this querynode + var filteredInfos []*queryPb.SegmentLoadInfo + for _, info := range l.req.Infos { + if !l.node.historical.replica.hasSegment(info.SegmentID) { + filteredInfos = append(filteredInfos, info) + } else { + log.Debug("ignore segment that is already loaded", zap.Int64("segmentID", info.SegmentID)) + } + } + l.req.Infos = filteredInfos + log.Info("LoadSegmentTask PreExecute done", zap.Int64("msgID", l.req.Base.MsgID)) + return nil +} + +func (l *loadSegmentsTask) Execute(ctx context.Context) error { + // TODO: support db + log.Info("LoadSegmentTask Execute start", zap.Int64("msgID", l.req.Base.MsgID)) + err := l.node.loader.loadSegment(l.req, segmentTypeSealed) if err != nil { log.Warn(err.Error()) return err } - - log.Info("LoadSegments done", zap.Int64("msgID", l.req.Base.MsgID)) + log.Info("LoadSegmentTask Execute done", zap.Int64("msgID", l.req.Base.MsgID)) return nil } diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index f8628bf125..8bc2268082 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -574,6 +574,44 @@ func TestTask_loadSegmentsTask(t *testing.T) { assert.NoError(t, err) }) + t.Run("test repeated load", func(t *testing.T) { + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) + assert.NoError(t, err) + + req := &querypb.LoadSegmentsRequest{ + Base: genCommonMsgBase(commonpb.MsgType_LoadSegments), + Schema: schema, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: defaultSegmentID, + PartitionID: defaultPartitionID, + CollectionID: defaultCollectionID, + BinlogPaths: fieldBinlog, + }, + }, + } + + task := loadSegmentsTask{ + req: req, + node: node, + } + // execute loadSegmentsTask twice + err = task.PreExecute(ctx) + assert.NoError(t, err) + err = task.Execute(ctx) + assert.NoError(t, err) + err = task.PreExecute(ctx) + assert.NoError(t, err) + err = task.Execute(ctx) + assert.NoError(t, err) + // expected only one segment in replica + num := node.historical.replica.getSegmentNum() + assert.Equal(t, 1, num) + }) + t.Run("test execute grpc error", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err)