diff --git a/internal/querynode/task.go b/internal/querynode/task.go index d45f5e9739..26a8daf035 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -266,11 +266,11 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) { } } w.node.metaReplica.addExcludedSegments(collectionID, unFlushedCheckPointInfos) - unflushedSegmentIDs := make([]UniqueID, 0) - for i := 0; i < len(unFlushedCheckPointInfos); i++ { - unflushedSegmentIDs = append(unflushedSegmentIDs, unFlushedCheckPointInfos[i].GetID()) + unflushedSegmentIDs := make([]UniqueID, len(unFlushedCheckPointInfos)) + for i, segInfo := range unFlushedCheckPointInfos { + unflushedSegmentIDs[i] = segInfo.GetID() } - log.Info("watchDMChannel, add check points info for unFlushed segments done", + log.Info("watchDMChannel, add check points info for unflushed segments done", zap.Int64("collectionID", collectionID), zap.Any("unflushedSegmentIDs", unflushedSegmentIDs), ) @@ -291,9 +291,9 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) { } } w.node.metaReplica.addExcludedSegments(collectionID, flushedCheckPointInfos) - flushedSegmentIDs := make([]UniqueID, 0) - for i := 0; i < len(flushedCheckPointInfos); i++ { - flushedSegmentIDs = append(flushedSegmentIDs, flushedCheckPointInfos[i].GetID()) + flushedSegmentIDs := make([]UniqueID, len(flushedCheckPointInfos)) + for i, segInfo := range flushedCheckPointInfos { + flushedSegmentIDs[i] = segInfo.GetID() } log.Info("watchDMChannel, add check points info for flushed segments done", zap.Int64("collectionID", collectionID), @@ -316,9 +316,9 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) { } } w.node.metaReplica.addExcludedSegments(collectionID, droppedCheckPointInfos) - droppedSegmentIDs := make([]UniqueID, 0) - for i := 0; i < len(droppedCheckPointInfos); i++ { - droppedSegmentIDs = append(droppedSegmentIDs, droppedCheckPointInfos[i].GetID()) + droppedSegmentIDs := make([]UniqueID, len(droppedCheckPointInfos)) + for i, segInfo := range droppedCheckPointInfos { + droppedSegmentIDs[i] = segInfo.GetID() } log.Info("watchDMChannel, add check points info for dropped segments done", zap.Int64("collectionID", collectionID), diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index be1afe9276..c7df0d5760 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -45,6 +45,11 @@ func TestTask_watchDmChannelsTask(t *testing.T) { CollectionID: defaultCollectionID, PartitionIDs: []UniqueID{defaultPartitionID}, Schema: schema, + Infos: []*datapb.VchannelInfo{ + { + ChannelName: defaultDMLChannel, + }, + }, } return req } @@ -134,30 +139,60 @@ func TestTask_watchDmChannelsTask(t *testing.T) { assert.NoError(t, err) }) - //t.Run("test execute seek error", func(t *testing.T) { - // - // node, err := genSimpleQueryNode(ctx) - // assert.NoError(t, err) - // - // task := watchDmChannelsTask{ - // req: genWatchDMChannelsRequest(), - // node: node, - // } - // task.req.Infos = []*datapb.VchannelInfo{ - // { - // CollectionID: defaultCollectionID, - // ChannelName: defaultDMLChannel, - // SeekPosition: &msgstream.MsgPosition{ - // ChannelName: defaultDMLChannel, - // MsgID: []byte{1, 2, 3}, - // MsgGroup: defaultSubName, - // Timestamp: 0, - // }, - // }, - // } - // err = task.Execute(ctx) - // assert.Error(t, err) - //}) + t.Run("test execute seek error", func(t *testing.T) { + node, err := genSimpleQueryNode(ctx) + assert.NoError(t, err) + + task := watchDmChannelsTask{ + req: genWatchDMChannelsRequest(), + node: node, + } + task.req.LoadMeta = &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadPartition, + CollectionID: defaultCollectionID, + PartitionIDs: []UniqueID{defaultPartitionID}, + } + task.req.Infos = []*datapb.VchannelInfo{ + { + CollectionID: defaultCollectionID, + ChannelName: defaultDMLChannel, + UnflushedSegmentIds: []int64{100}, + FlushedSegmentIds: []int64{101}, + DroppedSegmentIds: []int64{102}, + SeekPosition: &internalpb.MsgPosition{ + ChannelName: defaultDMLChannel, + MsgID: []byte{235, 50, 164, 248, 255, 255, 255, 255}, + Timestamp: Timestamp(999), + }, + }, + } + task.req.SegmentInfos = map[int64]*datapb.SegmentInfo{ + 100: { + ID: 100, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: defaultDMLChannel, + Timestamp: Timestamp(1000), + }, + }, + 101: { + ID: 101, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: defaultDMLChannel, + Timestamp: Timestamp(1001), + }, + }, + 102: { + ID: 102, + DmlPosition: &internalpb.MsgPosition{ + ChannelName: defaultDMLChannel, + Timestamp: Timestamp(1002), + }, + }, + } + err = task.Execute(ctx) + // ["Failed to seek"] [error="topic name = xxx not exist"] + assert.Error(t, err) + }) t.Run("test add excluded segment for flushed segment", func(t *testing.T) {