diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index fed30511b1..2233029ee3 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -469,23 +469,23 @@ func (kc *Catalog) DropChannel(ctx context.Context, channel string) error { } func (kc *Catalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) { - keys, values, err := kc.MetaKv.LoadWithPrefix(ChannelCheckpointPrefix) - if err != nil { - return nil, err - } - channelCPs := make(map[string]*msgpb.MsgPosition) - for i, key := range keys { - value := values[i] + applyFn := func(key []byte, value []byte) error { channelCP := &msgpb.MsgPosition{} - err = proto.Unmarshal([]byte(value), channelCP) + err := proto.Unmarshal(value, channelCP) if err != nil { log.Error("unmarshal channelCP failed when ListChannelCheckpoint", zap.Error(err)) - return nil, err + return err } - ss := strings.Split(key, "/") + ss := strings.Split(string(key), "/") vChannel := ss[len(ss)-1] channelCPs[vChannel] = channelCP + return nil + } + + err := kc.MetaKv.WalkWithPrefix(ChannelCheckpointPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return channelCPs, nil @@ -555,24 +555,23 @@ func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error { } func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) { - _, values, err := kc.MetaKv.LoadWithPrefix(util.FieldIndexPrefix) - if err != nil { - log.Error("list index meta fail", zap.String("prefix", util.FieldIndexPrefix), zap.Error(err)) - return nil, err - } - indexes := make([]*model.Index, 0) - for _, value := range values { + applyFn := func(key []byte, value []byte) error { meta := &indexpb.FieldIndex{} - err = proto.Unmarshal([]byte(value), meta) + err := proto.Unmarshal(value, meta) if err != nil { log.Warn("unmarshal index info failed", zap.Error(err)) - return nil, err + return err } indexes = append(indexes, model.UnmarshalIndexModel(meta)) + return nil } + err := kc.MetaKv.WalkWithPrefix(util.FieldIndexPrefix, paginationSize, applyFn) + if err != nil { + return nil, err + } return indexes, nil } @@ -632,22 +631,22 @@ func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.Segment } func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) { - _, values, err := kc.MetaKv.LoadWithPrefix(util.SegmentIndexPrefix) - if err != nil { - log.Error("list segment index meta fail", zap.String("prefix", util.SegmentIndexPrefix), zap.Error(err)) - return nil, err - } - segIndexes := make([]*model.SegmentIndex, 0) - for _, value := range values { + applyFn := func(key []byte, value []byte) error { segmentIndexInfo := &indexpb.SegmentIndex{} - err = proto.Unmarshal([]byte(value), segmentIndexInfo) + err := proto.Unmarshal(value, segmentIndexInfo) if err != nil { log.Warn("unmarshal segment index info failed", zap.Error(err)) - return segIndexes, err + return err } segIndexes = append(segIndexes, model.UnmarshalSegmentIndexModel(segmentIndexInfo)) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(util.SegmentIndexPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return segIndexes, nil @@ -689,17 +688,19 @@ func (kc *Catalog) SaveImportJob(ctx context.Context, job *datapb.ImportJob) err func (kc *Catalog) ListImportJobs(ctx context.Context) ([]*datapb.ImportJob, error) { jobs := make([]*datapb.ImportJob, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(ImportJobPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { job := &datapb.ImportJob{} - err = proto.Unmarshal([]byte(value), job) + err := proto.Unmarshal(value, job) if err != nil { - return nil, err + return err } jobs = append(jobs, job) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(ImportJobPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return jobs, nil } @@ -721,19 +722,20 @@ func (kc *Catalog) SavePreImportTask(ctx context.Context, task *datapb.PreImport func (kc *Catalog) ListPreImportTasks(ctx context.Context) ([]*datapb.PreImportTask, error) { tasks := make([]*datapb.PreImportTask, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(PreImportTaskPrefix) + applyFn := func(key []byte, value []byte) error { + task := &datapb.PreImportTask{} + err := proto.Unmarshal(value, task) + if err != nil { + return err + } + tasks = append(tasks, task) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(PreImportTaskPrefix, paginationSize, applyFn) if err != nil { return nil, err } - for _, value := range values { - task := &datapb.PreImportTask{} - err = proto.Unmarshal([]byte(value), task) - if err != nil { - return nil, err - } - tasks = append(tasks, task) - } - return tasks, nil } @@ -754,17 +756,19 @@ func (kc *Catalog) SaveImportTask(ctx context.Context, task *datapb.ImportTaskV2 func (kc *Catalog) ListImportTasks(ctx context.Context) ([]*datapb.ImportTaskV2, error) { tasks := make([]*datapb.ImportTaskV2, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(ImportTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { task := &datapb.ImportTaskV2{} - err = proto.Unmarshal([]byte(value), task) + err := proto.Unmarshal(value, task) if err != nil { - return nil, err + return err } tasks = append(tasks, task) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(ImportTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return tasks, nil } @@ -792,17 +796,19 @@ func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID type func (kc *Catalog) ListCompactionTask(ctx context.Context) ([]*datapb.CompactionTask, error) { tasks := make([]*datapb.CompactionTask, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(CompactionTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { info := &datapb.CompactionTask{} - err = proto.Unmarshal([]byte(value), info) + err := proto.Unmarshal(value, info) if err != nil { - return nil, err + return err } tasks = append(tasks, info) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(CompactionTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return tasks, nil } @@ -829,17 +835,19 @@ func (kc *Catalog) DropCompactionTask(ctx context.Context, task *datapb.Compacti func (kc *Catalog) ListAnalyzeTasks(ctx context.Context) ([]*indexpb.AnalyzeTask, error) { tasks := make([]*indexpb.AnalyzeTask, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(AnalyzeTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { task := &indexpb.AnalyzeTask{} - err = proto.Unmarshal([]byte(value), task) + err := proto.Unmarshal(value, task) if err != nil { - return nil, err + return err } tasks = append(tasks, task) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(AnalyzeTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return tasks, nil } @@ -867,17 +875,19 @@ func (kc *Catalog) DropAnalyzeTask(ctx context.Context, taskID typeutil.UniqueID func (kc *Catalog) ListPartitionStatsInfos(ctx context.Context) ([]*datapb.PartitionStatsInfo, error) { infos := make([]*datapb.PartitionStatsInfo, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(PartitionStatsInfoPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { info := &datapb.PartitionStatsInfo{} - err = proto.Unmarshal([]byte(value), info) + err := proto.Unmarshal(value, info) if err != nil { - return nil, err + return err } infos = append(infos, info) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(PartitionStatsInfoPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return infos, nil } @@ -927,18 +937,20 @@ func (kc *Catalog) DropCurrentPartitionStatsVersion(ctx context.Context, collID, func (kc *Catalog) ListStatsTasks(ctx context.Context) ([]*indexpb.StatsTask, error) { tasks := make([]*indexpb.StatsTask, 0) - _, values, err := kc.MetaKv.LoadWithPrefix(StatsTaskPrefix) - if err != nil { - return nil, err - } - for _, value := range values { + applyFn := func(key []byte, value []byte) error { task := &indexpb.StatsTask{} - err = proto.Unmarshal([]byte(value), task) + err := proto.Unmarshal(value, task) if err != nil { - return nil, err + return err } tasks = append(tasks, task) + return nil + } + + err := kc.MetaKv.WalkWithPrefix(StatsTaskPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return tasks, nil } diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 6804ff5641..baeb8bedd3 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -637,7 +637,9 @@ func TestChannelCP(t *testing.T) { err := catalog.SaveChannelCheckpoint(context.TODO(), mockVChannel, pos) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{k}, []string{string(v)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte(k), v) + }) res, err := catalog.ListChannelCheckpoint(context.TODO()) assert.NoError(t, err) assert.True(t, len(res) > 0) @@ -646,7 +648,7 @@ func TestChannelCP(t *testing.T) { t.Run("ListChannelCheckpoint failed", func(t *testing.T) { txn := mocks.NewMetaKv(t) catalog := NewCatalog(txn, rootPath, "") - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("mock error")) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error")) _, err = catalog.ListChannelCheckpoint(context.TODO()) assert.Error(t, err) }) @@ -691,7 +693,7 @@ func TestChannelCP(t *testing.T) { assert.NoError(t, err) txn.EXPECT().Remove(mock.Anything).Return(nil) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil) err = catalog.DropChannelCheckpoint(context.TODO(), mockVChannel) assert.NoError(t, err) res, err := catalog.ListChannelCheckpoint(context.TODO()) @@ -878,7 +880,7 @@ func TestCatalog_CreateIndex(t *testing.T) { func TestCatalog_ListIndexes(t *testing.T) { t.Run("success", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).RunAndReturn(func(s string) ([]string, []string, error) { + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { i := &indexpb.FieldIndex{ IndexInfo: &indexpb.IndexInfo{ CollectionID: 0, @@ -893,7 +895,7 @@ func TestCatalog_ListIndexes(t *testing.T) { } v, err := proto.Marshal(i) assert.NoError(t, err) - return []string{"1"}, []string{string(v)}, nil + return f([]byte("1"), v) }) catalog := &Catalog{ @@ -906,7 +908,7 @@ func TestCatalog_ListIndexes(t *testing.T) { t.Run("failed", func(t *testing.T) { txn := mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("error")) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error")) catalog := &Catalog{ MetaKv: txn, } @@ -916,7 +918,9 @@ func TestCatalog_ListIndexes(t *testing.T) { t.Run("unmarshal failed", func(t *testing.T) { txn := mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"1"}, []string{"invalid"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("1"), []byte("invalid")) + }) catalog := &Catalog{ MetaKv: txn, @@ -1070,7 +1074,9 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) { assert.NoError(t, err) metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key"}, []string{string(v)}, nil) + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key"), v) + }) catalog := &Catalog{ MetaKv: metakv, } @@ -1082,7 +1088,7 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) { t.Run("failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, errors.New("error")) + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error")) catalog := &Catalog{ MetaKv: metakv, } @@ -1093,7 +1099,9 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) { t.Run("unmarshal failed", func(t *testing.T) { metakv := mocks.NewMetaKv(t) - metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key"}, []string{"invalid"}, nil) + metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key"), []byte("invalid")) + }) catalog := &Catalog{ MetaKv: metakv, } @@ -1376,20 +1384,24 @@ func TestCatalog_Import(t *testing.T) { txn := mocks.NewMetaKv(t) value, err := proto.Marshal(job) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, value) + }) kc.MetaKv = txn jobs, err := kc.ListImportJobs(context.TODO()) assert.NoError(t, err) assert.Equal(t, 1, len(jobs)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, []byte("@#%#^#")) + }) kc.MetaKv = txn _, err = kc.ListImportJobs(context.TODO()) assert.Error(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn _, err = kc.ListImportJobs(context.TODO()) assert.Error(t, err) @@ -1430,20 +1442,24 @@ func TestCatalog_Import(t *testing.T) { txn := mocks.NewMetaKv(t) value, err := proto.Marshal(pit) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, value) + }) kc.MetaKv = txn tasks, err := kc.ListPreImportTasks(context.TODO()) assert.NoError(t, err) assert.Equal(t, 1, len(tasks)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, []byte("@#%#^#")) + }) kc.MetaKv = txn _, err = kc.ListPreImportTasks(context.TODO()) assert.Error(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn _, err = kc.ListPreImportTasks(context.TODO()) assert.Error(t, err) @@ -1484,20 +1500,24 @@ func TestCatalog_Import(t *testing.T) { txn := mocks.NewMetaKv(t) value, err := proto.Marshal(it) assert.NoError(t, err) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, value) + }) kc.MetaKv = txn tasks, err := kc.ListImportTasks(context.TODO()) assert.NoError(t, err) assert.Equal(t, 1, len(tasks)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f(nil, []byte("@#%#^#")) + }) kc.MetaKv = txn _, err = kc.ListImportTasks(context.TODO()) assert.Error(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn _, err = kc.ListImportTasks(context.TODO()) assert.Error(t, err) @@ -1524,7 +1544,7 @@ func TestCatalog_AnalyzeTask(t *testing.T) { t.Run("ListAnalyzeTasks", func(t *testing.T) { txn := mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn tasks, err := kc.ListAnalyzeTasks(context.Background()) @@ -1550,9 +1570,9 @@ func TestCatalog_AnalyzeTask(t *testing.T) { assert.NoError(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key1"}, []string{ - string(value), - }, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key1"), value) + }) kc.MetaKv = txn tasks, err = kc.ListAnalyzeTasks(context.Background()) @@ -1560,7 +1580,9 @@ func TestCatalog_AnalyzeTask(t *testing.T) { assert.Equal(t, 1, len(tasks)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key1"}, []string{"1234"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key1"), []byte("1234")) + }) kc.MetaKv = txn tasks, err = kc.ListAnalyzeTasks(context.Background()) @@ -1623,7 +1645,7 @@ func Test_PartitionStatsInfo(t *testing.T) { t.Run("ListPartitionStatsInfo", func(t *testing.T) { txn := mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn infos, err := kc.ListPartitionStatsInfos(context.Background()) @@ -1643,7 +1665,9 @@ func Test_PartitionStatsInfo(t *testing.T) { assert.NoError(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key1"}, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key1"), value) + }) kc.MetaKv = txn infos, err = kc.ListPartitionStatsInfos(context.Background()) @@ -1651,7 +1675,9 @@ func Test_PartitionStatsInfo(t *testing.T) { assert.Equal(t, 1, len(infos)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key1"}, []string{"1234"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key1"), []byte("1234")) + }) kc.MetaKv = txn infos, err = kc.ListPartitionStatsInfos(context.Background()) @@ -1777,7 +1803,7 @@ func Test_StatsTasks(t *testing.T) { t.Run("ListStatsTasks", func(t *testing.T) { txn := mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) kc.MetaKv = txn tasks, err := kc.ListStatsTasks(context.Background()) @@ -1799,7 +1825,9 @@ func Test_StatsTasks(t *testing.T) { assert.NoError(t, err) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key1"}, []string{string(value)}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key1"), value) + }) kc.MetaKv = txn tasks, err = kc.ListStatsTasks(context.Background()) @@ -1807,7 +1835,9 @@ func Test_StatsTasks(t *testing.T) { assert.Equal(t, 1, len(tasks)) txn = mocks.NewMetaKv(t) - txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key1"}, []string{"1234"}, nil) + txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error { + return f([]byte("key1"), []byte("1234")) + }) kc.MetaKv = txn tasks, err = kc.ListStatsTasks(context.Background()) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index 2c7d5edc38..f121462e47 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -19,6 +19,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/compressor" ) +var paginationSize = 2000 + var ErrInvalidKey = errors.New("invalid load info key") const ( @@ -105,51 +107,57 @@ func (s Catalog) RemoveResourceGroup(ctx context.Context, rgName string) error { } func (s Catalog) GetCollections(ctx context.Context) ([]*querypb.CollectionLoadInfo, error) { - _, values, err := s.cli.LoadWithPrefix(CollectionLoadInfoPrefix) - if err != nil { - return nil, err - } - ret := make([]*querypb.CollectionLoadInfo, 0, len(values)) - for _, v := range values { + ret := make([]*querypb.CollectionLoadInfo, 0) + applyFn := func(key []byte, value []byte) error { info := querypb.CollectionLoadInfo{} - if err := proto.Unmarshal([]byte(v), &info); err != nil { - return nil, err + if err := proto.Unmarshal(value, &info); err != nil { + return err } ret = append(ret, &info) + return nil + } + + err := s.cli.WalkWithPrefix(CollectionLoadInfoPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return ret, nil } func (s Catalog) GetPartitions(ctx context.Context) (map[int64][]*querypb.PartitionLoadInfo, error) { - _, values, err := s.cli.LoadWithPrefix(PartitionLoadInfoPrefix) - if err != nil { - return nil, err - } ret := make(map[int64][]*querypb.PartitionLoadInfo) - for _, v := range values { + applyFn := func(key []byte, value []byte) error { info := querypb.PartitionLoadInfo{} - if err := proto.Unmarshal([]byte(v), &info); err != nil { - return nil, err + if err := proto.Unmarshal(value, &info); err != nil { + return err } ret[info.GetCollectionID()] = append(ret[info.GetCollectionID()], &info) + return nil + } + + err := s.cli.WalkWithPrefix(PartitionLoadInfoPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return ret, nil } func (s Catalog) GetReplicas(ctx context.Context) ([]*querypb.Replica, error) { - _, values, err := s.cli.LoadWithPrefix(ReplicaPrefix) - if err != nil { - return nil, err - } - ret := make([]*querypb.Replica, 0, len(values)) - for _, v := range values { + ret := make([]*querypb.Replica, 0) + applyFn := func(key []byte, value []byte) error { info := querypb.Replica{} - if err := proto.Unmarshal([]byte(v), &info); err != nil { - return nil, err + if err := proto.Unmarshal(value, &info); err != nil { + return err } ret = append(ret, &info) + return nil + } + + err := s.cli.WalkWithPrefix(ReplicaPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } replicasV1, err := s.getReplicasFromV1(ctx) @@ -290,21 +298,23 @@ func (s Catalog) RemoveCollectionTarget(ctx context.Context, collectionID int64) } func (s Catalog) GetCollectionTargets(ctx context.Context) (map[int64]*querypb.CollectionTarget, error) { - keys, values, err := s.cli.LoadWithPrefix(CollectionTargetPrefix) - if err != nil { - return nil, err - } ret := make(map[int64]*querypb.CollectionTarget) - for i, v := range values { + applyFn := func(key []byte, value []byte) error { var decompressed bytes.Buffer - compressor.ZstdDecompress(bytes.NewReader([]byte(v)), io.Writer(&decompressed)) + compressor.ZstdDecompress(bytes.NewReader(value), io.Writer(&decompressed)) target := &querypb.CollectionTarget{} if err := proto.Unmarshal(decompressed.Bytes(), target); err != nil { // recover target from meta is a optimize policy, skip when failure happens - log.Warn("failed to unmarshal collection target", zap.String("key", keys[i]), zap.Error(err)) - continue + log.Warn("failed to unmarshal collection target", zap.String("key", string(key)), zap.Error(err)) + return nil } ret[target.GetCollectionID()] = target + return nil + } + + err := s.cli.WalkWithPrefix(CollectionTargetPrefix, paginationSize, applyFn) + if err != nil { + return nil, err } return ret, nil diff --git a/internal/metastore/kv/querycoord/kv_catalog_test.go b/internal/metastore/kv/querycoord/kv_catalog_test.go index 9bde8491a6..638ee61b52 100644 --- a/internal/metastore/kv/querycoord/kv_catalog_test.go +++ b/internal/metastore/kv/querycoord/kv_catalog_test.go @@ -239,7 +239,7 @@ func (suite *CatalogTestSuite) TestCollectionTarget() { mockStore := mocks.NewMetaKv(suite.T()) mockErr := errors.New("failed to access etcd") mockStore.EXPECT().MultiSave(mock.Anything).Return(mockErr) - mockStore.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr) + mockStore.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr) suite.catalog.cli = mockStore err = suite.catalog.SaveCollectionTargets(ctx, &querypb.CollectionTarget{})