diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index d505d346f6..d08d63f6a3 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -441,7 +441,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact } if err := c.meta.alterMetaStoreAfterCompaction(newSegment, modSegments); err != nil { - log.Warn("fail to alert meta store", zap.Error(err)) + log.Warn("fail to alter meta store", zap.Error(err)) return err } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 5058888015..2c5dc13756 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -133,7 +133,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() { s.Equal(7, len(operators)) }).Return(nil).Once() - deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))} + deltalogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} // 2 l0 segments, 3 sealed segments plan := &datapb.CompactionPlan{ PlanID: 1, @@ -219,7 +219,7 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { }, ) - deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))} + deltalogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} // 2 l0 segments plan := &datapb.CompactionPlan{ PlanID: 1, @@ -437,16 +437,16 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() { seg1 := &datapb.SegmentInfo{ ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 1)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}, } seg2 := &datapb.SegmentInfo{ ID: 2, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 4)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 5)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 6)}, } plan := &datapb.CompactionPlan{ @@ -483,9 +483,9 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() { { SegmentID: 3, NumOfRows: 15, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 301)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 302)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 303)}, }, }, } @@ -584,6 +584,17 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { s.Equal(failed, task.state) } +func getFieldBinlogIDs(id int64, logIDs ...int64) *datapb.FieldBinlog { + l := &datapb.FieldBinlog{ + FieldID: id, + Binlogs: make([]*datapb.Binlog, 0, len(logIDs)), + } + for _, id := range logIDs { + l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id}) + } + return l +} + func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ FieldID: id, @@ -595,13 +606,13 @@ func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog { return l } -func getFieldBinlogPathsWithEntry(id int64, entry int64, paths ...string) *datapb.FieldBinlog { +func getFieldBinlogIDsWithEntry(id int64, entry int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ FieldID: id, - Binlogs: make([]*datapb.Binlog, 0, len(paths)), + Binlogs: make([]*datapb.Binlog, 0, len(logIDs)), } - for _, path := range paths { - l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path, EntriesNum: entry}) + for _, id := range logIDs { + l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id, EntriesNum: entry}) } return l } diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 700cac5e1f..4da01b024e 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -753,6 +753,8 @@ func segmentsToPlan(segments []*SegmentInfo, compactTime *compactTime) *datapb.C FieldBinlogs: s.GetBinlogs(), Field2StatslogPaths: s.GetStatslogs(), Deltalogs: s.GetDeltalogs(), + CollectionID: s.GetCollectionID(), + PartitionID: s.GetPartitionID(), } plan.TotalRows += s.GetNumOfRows() plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinlogs) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 0d1e6e271d..4d14186f83 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -122,14 +122,14 @@ func Test_compactionTrigger_force(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1"}, + {EntriesNum: 5, LogID: 1}, }, }, }, Deltalogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "deltalog1"}, + {EntriesNum: 5, LogID: 1}, }, }, }, @@ -167,14 +167,14 @@ func Test_compactionTrigger_force(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log2"}, + {EntriesNum: 5, LogID: 2}, }, }, }, Deltalogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "deltalog2"}, + {EntriesNum: 5, LogID: 2}, }, }, }, @@ -412,7 +412,7 @@ func Test_compactionTrigger_force(t *testing.T) { FieldBinlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1"}, + {EntriesNum: 5, LogID: 1}, }, }, }, @@ -420,17 +420,19 @@ func Test_compactionTrigger_force(t *testing.T) { Deltalogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "deltalog1"}, + {EntriesNum: 5, LogID: 1}, }, }, }, + CollectionID: 2, + PartitionID: 1, }, { SegmentID: 2, FieldBinlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log2"}, + {EntriesNum: 5, LogID: 2}, }, }, }, @@ -438,10 +440,12 @@ func Test_compactionTrigger_force(t *testing.T) { Deltalogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "deltalog2"}, + {EntriesNum: 5, LogID: 2}, }, }, }, + CollectionID: 2, + PartitionID: 1, }, }, StartTime: 0, diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 4888fb1669..cfc525da80 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -92,9 +92,11 @@ func (m *CompactionTriggerManager) BuildLevelZeroCompactionPlan(view CompactionV levelZeroSegs := lo.Map(view.GetSegmentsView(), func(v *SegmentView, _ int) *datapb.CompactionSegmentBinlogs { s := m.meta.GetSegment(v.ID) return &datapb.CompactionSegmentBinlogs{ - SegmentID: s.GetID(), - Deltalogs: s.GetDeltalogs(), - Level: datapb.SegmentLevel_L0, + SegmentID: s.GetID(), + Deltalogs: s.GetDeltalogs(), + Level: datapb.SegmentLevel_L0, + CollectionID: s.GetCollectionID(), + PartitionID: s.GetPartitionID(), } }) segmentBinlogs = append(segmentBinlogs, levelZeroSegs...) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 64a4bffe71..89e150a685 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -31,6 +31,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" @@ -201,8 +202,10 @@ func (gc *garbageCollector) scan() { filesMap := typeutil.NewSet[string]() segments := gc.meta.GetAllSegmentsUnsafe() for _, segment := range segments { + cloned := segment.Clone() + binlog.DecompressBinLogs(cloned.SegmentInfo) segmentMap.Insert(segment.GetID()) - for _, log := range getLogs(segment) { + for _, log := range getLogs(cloned) { filesMap.Insert(log.GetLogPath()) } } @@ -323,14 +326,16 @@ func (gc *garbageCollector) clearEtcd() { compactTo := make(map[int64]*SegmentInfo) channels := typeutil.NewSet[string]() for _, segment := range all { - if segment.GetState() == commonpb.SegmentState_Dropped { - drops[segment.GetID()] = segment - channels.Insert(segment.GetInsertChannel()) + cloned := segment.Clone() + binlog.DecompressBinLogs(cloned.SegmentInfo) + if cloned.GetState() == commonpb.SegmentState_Dropped { + drops[cloned.GetID()] = cloned + channels.Insert(cloned.GetInsertChannel()) // continue // A(indexed), B(indexed) -> C(no indexed), D(no indexed) -> E(no indexed), A, B can not be GC } - for _, from := range segment.GetCompactionFrom() { - compactTo[from] = segment + for _, from := range cloned.GetCompactionFrom() { + compactTo[from] = cloned } } diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 75d25a80c0..621a99ae37 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -19,6 +19,8 @@ package datacoord import ( "bytes" "context" + "fmt" + "math/rand" "path" "strconv" "strings" @@ -107,7 +109,7 @@ func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName str func Test_garbageCollector_scan(t *testing.T) { bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8)) - rootPath := `gc` + funcutil.RandomString(8) + rootPath := paramtable.Get().MinioCfg.RootPath.GetValue() // TODO change to Params cli, inserts, stats, delta, others, err := initUtOSSEnv(bucketName, rootPath, 4) require.NoError(t, err) @@ -278,9 +280,9 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i var token string if i == 1 { - token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", funcutil.RandomString(8), funcutil.RandomString(8)) + token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", strconv.Itoa(i), fmt.Sprint(rand.Int63())) } else { - token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), funcutil.RandomString(8), funcutil.RandomString(8)) + token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), strconv.Itoa(i), fmt.Sprint(rand.Int63())) } // insert filePath := path.Join(root, common.SegmentInsertLogPath, token) @@ -299,9 +301,9 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i // delta if i == 1 { - token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", funcutil.RandomString(8)) + token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", fmt.Sprint(rand.Int63())) } else { - token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), funcutil.RandomString(8)) + token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), fmt.Sprint(rand.Int63())) } filePath = path.Join(root, common.SegmentDeltaLogPath, token) info, err = cli.PutObject(context.TODO(), bucket, filePath, reader, int64(len(content)), minio.PutObjectOptions{}) diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index 374806ef3a..bf7780df54 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -269,12 +269,12 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { return false } - binLogs := make([]string, 0) + binlogIDs := make([]int64, 0) fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) for _, fieldBinLog := range segment.GetBinlogs() { if fieldBinLog.GetFieldID() == fieldID { for _, binLog := range fieldBinLog.GetBinlogs() { - binLogs = append(binLogs, binLog.LogPath) + binlogIDs = append(binlogIDs, binLog.GetLogID()) } break } @@ -344,7 +344,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath), BuildID: buildID, - DataPaths: binLogs, IndexVersion: meta.IndexVersion + 1, StorageConfig: storageConfig, IndexParams: indexParams, @@ -361,19 +360,24 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { IndexStorePath: indexStorePath, Dim: int64(dim), CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), + DataIds: binlogIDs, } } else { req = &indexpb.CreateJobRequest{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath), BuildID: buildID, - DataPaths: binLogs, IndexVersion: meta.IndexVersion + 1, StorageConfig: storageConfig, IndexParams: indexParams, TypeParams: typeParams, NumRows: meta.NumRows, CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), + DataIds: binlogIDs, + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + SegmentID: segment.GetID(), + FieldID: fieldID, } } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 90a586cfd6..d101790da6 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -115,6 +116,7 @@ func (m *meta) reloadFromKV() error { metrics.DataCoordNumSegments.Reset() numStoredRows := int64(0) for _, segment := range segments { + // segments from catalog.ListSegments will not have logPath m.segments.SetSegment(segment.ID, NewSegmentInfo(segment)) metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc() if segment.State == commonpb.SegmentState_Flushed { @@ -303,6 +305,7 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error { return err } m.segments.SetSegment(segment.GetID(), segment) + metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc() log.Info("meta update: adding segment - complete", zap.Int64("segmentID", segment.GetID())) return nil @@ -481,7 +484,6 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string Level: datapb.SegmentLevel_L0, }, } - modPack.metricMutation.addNewSeg(commonpb.SegmentState_Growing, datapb.SegmentLevel_L0, 0) } return true } @@ -993,6 +995,10 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, for _, cl := range compactionLogs { if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil { cloned := segment.Clone() + err := binlog.DecompressBinLog(storage.DeleteBinlog, cloned.GetCollectionID(), cloned.GetPartitionID(), cloned.GetID(), cloned.GetDeltalogs()) + if err != nil { + return nil, nil, nil, err + } updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) cloned.DroppedAt = uint64(time.Now().UnixNano()) cloned.Compacted = true @@ -1027,7 +1033,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, // MixCompaction / MergeCompaction will generates one and only one segment compactToSegment := result.GetSegments()[0] - newAddedDeltalogs := updateDeltalogs(originDeltalogs, deletedDeltalogs, nil) + newAddedDeltalogs := updateDeltalogs(originDeltalogs, deletedDeltalogs) copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, compactToSegment.GetSegmentID()) if err != nil { return nil, nil, nil, err @@ -1085,7 +1091,6 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti if err != nil { return nil, err } - binlog.LogPath = blobPath } ret = append(ret, fieldBinlog) } @@ -1137,66 +1142,17 @@ func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segm return nil } -func (m *meta) updateBinlogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog { - fieldBinlogs := make(map[int64]map[string]*datapb.Binlog) - for _, f := range origin { - fid := f.GetFieldID() - if _, ok := fieldBinlogs[fid]; !ok { - fieldBinlogs[fid] = make(map[string]*datapb.Binlog) - } - for _, p := range f.GetBinlogs() { - fieldBinlogs[fid][p.GetLogPath()] = p - } - } - - for _, f := range removes { - fid := f.GetFieldID() - if _, ok := fieldBinlogs[fid]; !ok { - continue - } - for _, p := range f.GetBinlogs() { - delete(fieldBinlogs[fid], p.GetLogPath()) - } - } - - for _, f := range adds { - fid := f.GetFieldID() - if _, ok := fieldBinlogs[fid]; !ok { - fieldBinlogs[fid] = make(map[string]*datapb.Binlog) - } - for _, p := range f.GetBinlogs() { - fieldBinlogs[fid][p.GetLogPath()] = p - } - } - - res := make([]*datapb.FieldBinlog, 0, len(fieldBinlogs)) - for fid, logs := range fieldBinlogs { - if len(logs) == 0 { - continue - } - - binlogs := make([]*datapb.Binlog, 0, len(logs)) - for _, log := range logs { - binlogs = append(binlogs, log) - } - - field := &datapb.FieldBinlog{FieldID: fid, Binlogs: binlogs} - res = append(res, field) - } - return res -} - -func updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog { +func updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog) []*datapb.FieldBinlog { res := make([]*datapb.FieldBinlog, 0, len(origin)) for _, fbl := range origin { - logs := make(map[string]*datapb.Binlog) + logs := make(map[int64]*datapb.Binlog) for _, d := range fbl.GetBinlogs() { - logs[d.GetLogPath()] = d + logs[d.GetLogID()] = d } for _, remove := range removes { if remove.GetFieldID() == fbl.GetFieldID() { for _, r := range remove.GetBinlogs() { - delete(logs, r.GetLogPath()) + delete(logs, r.GetLogID()) } } } diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 7fafa5ebdc..3745b5b37b 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -496,8 +496,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Growing, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog0", 1))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog0", 1))}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -505,8 +505,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { err = meta.UpdateSegmentsInfo( UpdateStatusOperator(1, commonpb.SegmentState_Flushing), UpdateBinlogsOperator(1, - []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))}, - []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog1", 1))}, + []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)}, []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), @@ -518,8 +518,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog0", "binlog1")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog0", "statslog1")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0, 1)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0, 1)}, Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}}, }} @@ -547,8 +547,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { // normal segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog0", 1))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog0", 1))}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -621,8 +621,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { err = meta.UpdateSegmentsInfo( UpdateStatusOperator(1, commonpb.SegmentState_Flushing), UpdateBinlogsOperator(1, - []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog", 1))}, - []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("statslog", 1))}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1)}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), @@ -679,9 +679,9 @@ func TestMeta_alterMetaStore(t *testing.T) { segments: &SegmentsInfo{map[int64]*SegmentInfo{ 1: {SegmentInfo: &datapb.SegmentInfo{ ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, }}, }}, } @@ -700,9 +700,9 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { CollectionID: 100, PartitionID: 10, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, NumOfRows: 1, }}, 2: {SegmentInfo: &datapb.SegmentInfo{ @@ -710,9 +710,9 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { CollectionID: 100, PartitionID: 10, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, NumOfRows: 1, }}, }, @@ -727,15 +727,15 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: 1, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")}, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, }, { SegmentID: 2, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")}, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, }, }, StartTime: 15, @@ -743,9 +743,9 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { inSegment := &datapb.CompactionSegment{ SegmentID: 3, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log5")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog5")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)}, NumOfRows: 2, } inCompactionResult := &datapb.CompactionPlanResult{ diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 6d630e5127..4cdd56b819 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -71,6 +71,7 @@ func NewSegmentsInfo() *SegmentsInfo { } // GetSegment returns SegmentInfo +// the logPath in meta is empty func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo { segment, ok := s.segments[segmentID] if !ok { @@ -81,6 +82,7 @@ func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo { // GetSegments iterates internal map and returns all SegmentInfo in a slice // no deep copy applied +// the logPath in meta is empty func (s *SegmentsInfo) GetSegments() []*SegmentInfo { segments := make([]*SegmentInfo, 0, len(s.segments)) for _, segment := range s.segments { @@ -96,6 +98,8 @@ func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { } // SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists +// set the logPath of segement in meta empty, to save space +// if segment has logPath, make it empty func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) { s.segments[segmentID] = segment } @@ -190,15 +194,6 @@ func (s *SegmentsInfo) SetCurrentRows(segmentID UniqueID, rows int64) { } } -// SetBinlogs sets binlog paths for segment -// if the segment is not found, do nothing -// uses `Clone` since internal SegmentInfo's Binlogs is changed -func (s *SegmentsInfo) SetBinlogs(segmentID UniqueID, binlogs []*datapb.FieldBinlog) { - if segment, ok := s.segments[segmentID]; ok { - s.segments[segmentID] = segment.Clone(SetBinlogs(binlogs)) - } -} - // SetFlushTime sets flush time for segment // if the segment is not found, do nothing // uses `ShadowClone` since internal SegmentInfo is not changed @@ -208,15 +203,6 @@ func (s *SegmentsInfo) SetFlushTime(segmentID UniqueID, t time.Time) { } } -// AddSegmentBinlogs adds binlogs for segment -// if the segment is not found, do nothing -// uses `Clone` since internal SegmentInfo's Binlogs is changed -func (s *SegmentsInfo) AddSegmentBinlogs(segmentID UniqueID, field2Binlogs map[UniqueID][]*datapb.Binlog) { - if segment, ok := s.segments[segmentID]; ok { - s.segments[segmentID] = segment.Clone(addSegmentBinlogs(field2Binlogs)) - } -} - // SetIsCompacting sets compaction status for segment func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) { if segment, ok := s.segments[segmentID]; ok { @@ -338,13 +324,6 @@ func SetCurrentRows(rows int64) SegmentInfoOption { } } -// SetBinlogs is the option to set binlogs for segment info -func SetBinlogs(binlogs []*datapb.FieldBinlog) SegmentInfoOption { - return func(segment *SegmentInfo) { - segment.Binlogs = binlogs - } -} - // SetFlushTime is the option to set flush time for segment info func SetFlushTime(t time.Time) SegmentInfoOption { return func(segment *SegmentInfo) { @@ -359,29 +338,6 @@ func SetIsCompacting(isCompacting bool) SegmentInfoOption { } } -func addSegmentBinlogs(field2Binlogs map[UniqueID][]*datapb.Binlog) SegmentInfoOption { - return func(segment *SegmentInfo) { - for fieldID, binlogPaths := range field2Binlogs { - found := false - for _, binlog := range segment.Binlogs { - if binlog.FieldID != fieldID { - continue - } - binlog.Binlogs = append(binlog.Binlogs, binlogPaths...) - found = true - break - } - if !found { - // if no field matched - segment.Binlogs = append(segment.Binlogs, &datapb.FieldBinlog{ - FieldID: fieldID, - Binlogs: binlogPaths, - }) - } - } - } -} - func (s *SegmentInfo) getSegmentSize() int64 { if s.size.Load() <= 0 { var size int64 diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 907dab0067..77711d98a6 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -18,7 +18,6 @@ package datacoord import ( "context" - "fmt" "math/rand" "os" "os/signal" @@ -238,10 +237,10 @@ func TestGetInsertBinlogPaths(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "dev/datacoord/testsegment/1/part1", + LogID: 1, }, { - LogPath: "dev/datacoord/testsegment/1/part2", + LogID: 2, }, }, }, @@ -269,10 +268,10 @@ func TestGetInsertBinlogPaths(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "dev/datacoord/testsegment/1/part1", + LogID: 1, }, { - LogPath: "dev/datacoord/testsegment/1/part2", + LogID: 2, }, }, }, @@ -2149,10 +2148,10 @@ func TestGetRecoveryInfo(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/binlog/file1", + LogPath: "/binlog/1", }, { - LogPath: "/binlog/file2", + LogPath: "/binlog/2", }, }, }, @@ -2162,10 +2161,10 @@ func TestGetRecoveryInfo(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/stats_log/file1", + LogPath: "/stats_log/1", }, { - LogPath: "/stats_log/file2", + LogPath: "/stats_log/2", }, }, }, @@ -2176,7 +2175,7 @@ func TestGetRecoveryInfo(t *testing.T) { { TimestampFrom: 0, TimestampTo: 1, - LogPath: "/stats_log/file1", + LogPath: "/stats_log/1", LogSize: 1, }, }, @@ -2226,8 +2225,11 @@ func TestGetRecoveryInfo(t *testing.T) { assert.EqualValues(t, 0, resp.GetBinlogs()[0].GetSegmentID()) assert.EqualValues(t, 1, len(resp.GetBinlogs()[0].GetFieldBinlogs())) assert.EqualValues(t, 1, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetFieldID()) + for _, binlog := range resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs() { + assert.Equal(t, "", binlog.GetLogPath()) + } for i, binlog := range resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs() { - assert.Equal(t, fmt.Sprintf("/binlog/file%d", i+1), binlog.GetLogPath()) + assert.Equal(t, int64(i+1), binlog.GetLogID()) } }) t.Run("with dropped segments", func(t *testing.T) { @@ -3129,9 +3131,9 @@ func TestDataCoord_SegmentStatistics(t *testing.T) { seg1 := &datapb.SegmentInfo{ ID: 100, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(101, 1, getInsertLogPath("log1", 100))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 100))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 100))}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(101, 1, 1)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3)}, State: commonpb.SegmentState_Importing, } @@ -3156,9 +3158,9 @@ func TestDataCoord_SegmentStatistics(t *testing.T) { seg1 := &datapb.SegmentInfo{ ID: 100, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(101, 1, getInsertLogPath("log1", 100))}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 100))}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 100))}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(101, 1, 1)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3)}, State: commonpb.SegmentState_Flushed, } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index f3151e5ad7..1e973702b1 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -31,8 +31,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -268,6 +270,7 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert Status: merr.Status(err), }, nil } + segment := s.meta.GetHealthySegment(req.GetSegmentID()) if segment == nil { return &datapb.GetInsertBinlogPathsResponse{ @@ -275,6 +278,12 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert }, nil } + err := binlog.DecompressBinLog(storage.InsertBinlog, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), segment.GetBinlogs()) + if err != nil { + return &datapb.GetInsertBinlogPathsResponse{ + Status: merr.Status(err), + }, nil + } resp := &datapb.GetInsertBinlogPathsResponse{ Status: merr.Success(), } @@ -442,6 +451,13 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return merr.Status(err), nil } } + // for compatibility issue, before 2.3.4, SaveBinlogPaths has only logpath + // try to parse path and fill logid + err := binlog.CompressSaveBinlogPaths(req) + if err != nil { + log.Warn("fail to CompressSaveBinlogPaths", zap.String("channel", channelName), zap.Error(err)) + return merr.Status(err), nil + } // validate segmentID := req.GetSegmentID() @@ -493,7 +509,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath operators = append(operators, UpdateStorageVersionOperator(segmentID, req.GetStorageVersion())) } // run all operator and update new segment info - err := s.meta.UpdateSegmentsInfo(operators...) + err = s.meta.UpdateSegmentsInfo(operators...) if err != nil { log.Error("save binlog and checkpoints failed", zap.Error(err)) return merr.Status(err), nil diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 45a14f73c5..9b76bbb4fb 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -258,11 +258,11 @@ func (s *ServerSuite) TestSaveBinlogPath_L0Segment() { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/by-dev/test/0/1/1/1/Allo1", + LogPath: "/by-dev/test/0/1/1/1/1", EntriesNum: 5, }, { - LogPath: "/by-dev/test/0/1/1/1/Allo2", + LogPath: "/by-dev/test/0/1/1/1/2", EntriesNum: 5, }, }, @@ -323,11 +323,11 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/by-dev/test/0/1/1/1/Allo1", + LogPath: "/by-dev/test/0/1/1/1/1", EntriesNum: 5, }, { - LogPath: "/by-dev/test/0/1/1/1/Allo2", + LogPath: "/by-dev/test/0/1/1/1/2", EntriesNum: 5, }, }, @@ -338,11 +338,11 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/by-dev/test_stats/0/1/1/1/Allo1", + LogPath: "/by-dev/test_stats/0/1/1/1/1", EntriesNum: 5, }, { - LogPath: "/by-dev/test_stats/0/1/1/1/Allo2", + LogPath: "/by-dev/test_stats/0/1/1/1/2", EntriesNum: 5, }, }, @@ -373,8 +373,10 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() { s.NotNil(fieldBinlogs) s.EqualValues(2, len(fieldBinlogs.GetBinlogs())) s.EqualValues(1, fieldBinlogs.GetFieldID()) - s.EqualValues("/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath()) - s.EqualValues("/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath()) + s.EqualValues("", fieldBinlogs.GetBinlogs()[0].GetLogPath()) + s.EqualValues(int64(1), fieldBinlogs.GetBinlogs()[0].GetLogID()) + s.EqualValues("", fieldBinlogs.GetBinlogs()[1].GetLogPath()) + s.EqualValues(int64(2), fieldBinlogs.GetBinlogs()[1].GetLogID()) s.EqualValues(segment.DmlPosition.ChannelName, "ch1") s.EqualValues(segment.DmlPosition.MsgID, []byte{1, 2, 3}) diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 34a80fe7e6..3400c84602 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" @@ -286,6 +287,12 @@ func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*datapb.Compa ), }) + // for compatibility issue, before 2.3.4, resp has only logpath + // try to parse path and fill logid + for _, result := range resp.Results { + binlog.CompressCompactionBinlogs(result.GetSegments()) + } + if err := merr.CheckRPCCall(resp, err); err != nil { log.Info("Get State failed", zap.Error(err)) return diff --git a/internal/datanode/broker/datacoord.go b/internal/datanode/broker/datacoord.go index 6081ee8285..e3e57cd839 100644 --- a/internal/datanode/broker/datacoord.go +++ b/internal/datanode/broker/datacoord.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" @@ -77,6 +78,11 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, segmentIDs []int6 log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Error(err)) return nil, err } + err = binlog.DecompressMultiBinLogs(infoResp.GetInfos()) + if err != nil { + log.Warn("Fail to DecompressMultiBinLogs", zap.Error(err)) + return nil, err + } return infoResp.Infos, nil } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index fb6a259444..63a7d3eb2f 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" @@ -457,6 +458,11 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { } log.Info("compact start", zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) + err = binlog.DecompressCompactionBinlogs(t.plan.GetSegmentBinlogs()) + if err != nil { + log.Warn("DecompressCompactionBinlogs fails", zap.Error(err)) + return nil, err + } segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs())) for _, s := range t.plan.GetSegmentBinlogs() { segIDs = append(segIDs, s.GetSegmentID()) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 590d554011..968ddb877a 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -138,8 +138,6 @@ func getMetaCacheWithEtcdTickler(initCtx context.Context, node *DataNode, info * } func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2Cache, chunkManager storage.ChunkManager, info *datapb.ChannelWatchInfo, tickler interface{ inc() }, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) { - recoverTs := info.GetVchan().GetSeekPosition().GetTimestamp() - // tickler will update addSegment progress to watchInfo futures := make([]*conc.Future[any], 0, len(unflushed)+len(flushed)) segmentPks := typeutil.NewConcurrentMap[int64, []*storage.PkStatistics]() @@ -160,7 +158,7 @@ func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2C if params.Params.CommonCfg.EnableStorageV2.GetAsBool() { stats, err = loadStatsV2(storageV2Cache, segment, info.GetSchema()) } else { - stats, err = loadStats(initCtx, chunkManager, info.GetSchema(), segment.GetID(), segment.GetCollectionID(), segment.GetStatslogs(), recoverTs) + stats, err = loadStats(initCtx, chunkManager, info.GetSchema(), segment.GetID(), segment.GetStatslogs()) } if err != nil { return nil, err @@ -246,7 +244,7 @@ func loadStatsV2(storageCache *metacache.StorageV2Cache, segment *datapb.Segment return getResult(stats), nil } -func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, collectionID int64, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) ([]*storage.PkStatistics, error) { +func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, statsBinlogs []*datapb.FieldBinlog) ([]*storage.PkStatistics, error) { startTs := time.Now() log := log.With(zap.Int64("segmentID", segmentID)) log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs))) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 7ccfaf614f..28de7669d9 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -347,8 +348,12 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments log.Warn("failed to sync segments", zap.Error(err)) return merr.Status(err), nil } - - pks, err := loadStats(ctx, node.chunkManager, ds.metacache.Schema(), req.GetCompactedTo(), req.GetCollectionId(), req.GetStatsLogs(), 0) + err := binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), req.GetCompactedTo(), req.GetStatsLogs()) + if err != nil { + log.Warn("failed to DecompressBinLog", zap.Error(err)) + return merr.Status(err), nil + } + pks, err := loadStats(ctx, node.chunkManager, ds.metacache.Schema(), req.GetCompactedTo(), req.GetStatsLogs()) if err != nil { log.Warn("failed to load segment statslog", zap.Error(err)) return merr.Status(err), nil @@ -581,7 +586,15 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor // Add the new segment to the channel. if len(ds.metacache.GetSegmentIDsBy(metacache.WithSegmentIDs(req.GetSegmentId()), metacache.WithSegmentState(commonpb.SegmentState_Flushed))) == 0 { log.Info("adding a new segment to channel", logFields...) - pks, err := loadStats(ctx, node.chunkManager, ds.metacache.Schema(), req.GetSegmentId(), req.GetCollectionId(), req.GetStatsLog(), req.GetBase().GetTimestamp()) + // no error will be throw + err := binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), req.GetSegmentId(), req.GetStatsLog()) + if err != nil { + log.Warn("failed to DecompressBinLog", zap.Error(err)) + return &datapb.AddImportSegmentResponse{ + Status: merr.Status(err), + }, nil + } + pks, err := loadStats(ctx, node.chunkManager, ds.metacache.Schema(), req.GetSegmentId(), req.GetStatsLog()) if err != nil { log.Warn("failed to get segment pk stats", zap.Error(err)) return &datapb.AddImportSegmentResponse{ diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index a29a35eff7..fd5ebf4aca 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -318,6 +319,12 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { typeParams := make(map[string]string) indexParams := make(map[string]string) + if len(it.req.DataPaths) == 0 { + for _, id := range it.req.GetDataIds() { + path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), it.req.GetFieldID(), id) + it.req.DataPaths = append(it.req.DataPaths, path) + } + } // type params can be removed for _, kvPair := range it.req.GetTypeParams() { key, value := kvPair.GetKey(), kvPair.GetValue() diff --git a/internal/metastore/kv/binlog/binlog.go b/internal/metastore/kv/binlog/binlog.go new file mode 100644 index 0000000000..a61beec438 --- /dev/null +++ b/internal/metastore/kv/binlog/binlog.go @@ -0,0 +1,189 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "fmt" + "strconv" + "strings" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func CompressSaveBinlogPaths(req *datapb.SaveBinlogPathsRequest) error { + err := CompressFieldBinlogs(req.GetDeltalogs()) + if err != nil { + return err + } + err = CompressFieldBinlogs(req.GetField2BinlogPaths()) + if err != nil { + return err + } + err = CompressFieldBinlogs(req.GetField2StatslogPaths()) + if err != nil { + return err + } + return nil +} + +func CompressCompactionBinlogs(binlogs []*datapb.CompactionSegment) error { + for _, binlog := range binlogs { + err := CompressFieldBinlogs(binlog.GetInsertLogs()) + if err != nil { + return err + } + err = CompressFieldBinlogs(binlog.GetDeltalogs()) + if err != nil { + return err + } + err = CompressFieldBinlogs(binlog.GetField2StatslogPaths()) + if err != nil { + return err + } + } + return nil +} + +func CompressBinLogs(s *datapb.SegmentInfo) error { + err := CompressFieldBinlogs(s.GetBinlogs()) + if err != nil { + return err + } + err = CompressFieldBinlogs(s.GetDeltalogs()) + if err != nil { + return err + } + err = CompressFieldBinlogs(s.GetStatslogs()) + if err != nil { + return err + } + return nil +} + +func CompressFieldBinlogs(fieldBinlogs []*datapb.FieldBinlog) error { + for _, fieldBinlog := range fieldBinlogs { + for _, binlog := range fieldBinlog.Binlogs { + logPath := binlog.GetLogPath() + if len(logPath) != 0 { + var logID int64 + idx := strings.LastIndex(logPath, "/") + if idx == -1 { + return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid binlog path: %s", logPath)) + } + var err error + logPathStr := logPath[(idx + 1):] + logID, err = strconv.ParseInt(logPathStr, 10, 64) + if err != nil { + return err + } + binlog.LogID = logID + binlog.LogPath = "" + } + // remove timestamp since it's not necessary + binlog.TimestampFrom = 0 + binlog.TimestampTo = 0 + } + } + return nil +} + +func DecompressMultiBinLogs(infos []*datapb.SegmentInfo) error { + for _, info := range infos { + err := DecompressBinLogs(info) + if err != nil { + return err + } + } + return nil +} + +func DecompressCompactionBinlogs(binlogs []*datapb.CompactionSegmentBinlogs) error { + for _, binlog := range binlogs { + collectionID, partitionID, segmentID := binlog.GetCollectionID(), binlog.GetPartitionID(), binlog.GetSegmentID() + err := DecompressBinLog(storage.InsertBinlog, collectionID, partitionID, segmentID, binlog.GetFieldBinlogs()) + if err != nil { + return err + } + err = DecompressBinLog(storage.DeleteBinlog, collectionID, partitionID, segmentID, binlog.GetDeltalogs()) + if err != nil { + return err + } + err = DecompressBinLog(storage.StatsBinlog, collectionID, partitionID, segmentID, binlog.GetField2StatslogPaths()) + if err != nil { + return err + } + } + return nil +} + +func DecompressBinLogs(s *datapb.SegmentInfo) error { + collectionID, partitionID, segmentID := s.GetCollectionID(), s.GetPartitionID(), s.ID + err := DecompressBinLog(storage.InsertBinlog, collectionID, partitionID, segmentID, s.GetBinlogs()) + if err != nil { + return err + } + err = DecompressBinLog(storage.DeleteBinlog, collectionID, partitionID, segmentID, s.GetDeltalogs()) + if err != nil { + return err + } + err = DecompressBinLog(storage.StatsBinlog, collectionID, partitionID, segmentID, s.GetStatslogs()) + if err != nil { + return err + } + return nil +} + +func DecompressBinLog(binlogType storage.BinlogType, collectionID, partitionID, + segmentID typeutil.UniqueID, fieldBinlogs []*datapb.FieldBinlog, +) error { + for _, fieldBinlog := range fieldBinlogs { + for _, binlog := range fieldBinlog.Binlogs { + if binlog.GetLogPath() == "" { + path, err := buildLogPath(binlogType, collectionID, partitionID, + segmentID, fieldBinlog.GetFieldID(), binlog.GetLogID()) + if err != nil { + return err + } + binlog.LogPath = path + } + } + } + return nil +} + +// build a binlog path on the storage by metadata +func buildLogPath(binlogType storage.BinlogType, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) (string, error) { + chunkManagerRootPath := paramtable.Get().MinioCfg.RootPath.GetValue() + if paramtable.Get().CommonCfg.StorageType.GetValue() == "local" { + chunkManagerRootPath = paramtable.Get().LocalStorageCfg.Path.GetValue() + } + switch binlogType { + case storage.InsertBinlog: + return metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, fieldID, logID), nil + case storage.DeleteBinlog: + return metautil.BuildDeltaLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, logID), nil + case storage.StatsBinlog: + return metautil.BuildStatsLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, fieldID, logID), nil + } + // should not happen + return "", merr.WrapErrParameterInvalidMsg("invalid binlog type") +} diff --git a/internal/metastore/kv/binlog/binlog_test.go b/internal/metastore/kv/binlog/binlog_test.go new file mode 100644 index 0000000000..ed4f6d5f13 --- /dev/null +++ b/internal/metastore/kv/binlog/binlog_test.go @@ -0,0 +1,290 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "math/rand" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +var ( + logID = int64(99) + collectionID = int64(2) + partitionID = int64(1) + segmentID = int64(1) + segmentID2 = int64(11) + fieldID = int64(1) + rootPath = "a" + + binlogPath = metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, logID) + deltalogPath = metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID, logID) + statslogPath = metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, logID) + + binlogPath2 = metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID2, fieldID, logID) + deltalogPath2 = metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID2, logID) + statslogPath2 = metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID2, fieldID, logID) + + invalidSegment = &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: "badpath", + }, + }, + }, + }, + } + + binlogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: binlogPath, + }, + }, + }, + } + + deltalogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: deltalogPath, + }, + }, + }, + } + statslogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: statslogPath, + }, + }, + }, + } + + getlogs = func(logpath string) []*datapb.FieldBinlog { + return []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: logpath, + }, + }, + }, + } + } + + segment1 = &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs, + Deltalogs: deltalogs, + Statslogs: statslogs, + } + + droppedSegment = &datapb.SegmentInfo{ + ID: segmentID2, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Dropped, + Binlogs: getlogs(binlogPath2), + Deltalogs: getlogs(deltalogPath2), + Statslogs: getlogs(statslogPath2), + } +) + +func getSegment(rootPath string, collectionID, partitionID, segmentID, fieldID int64, binlogNum int) *datapb.SegmentInfo { + binLogPaths := make([]*datapb.Binlog, binlogNum) + for i := 0; i < binlogNum; i++ { + binLogPaths[i] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(i)), + } + } + binlogs = []*datapb.FieldBinlog{ + { + FieldID: fieldID, + Binlogs: binLogPaths, + }, + } + + deltalogs = []*datapb.FieldBinlog{ + { + FieldID: fieldID, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID, int64(rand.Int())), + }, + }, + }, + } + + statslogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, int64(rand.Int())), + }, + }, + }, + } + + return &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs, + Deltalogs: deltalogs, + Statslogs: statslogs, + } +} + +func TestBinlog_Compress(t *testing.T) { + paramtable.Init() + rootPath := paramtable.Get().MinioCfg.RootPath.GetValue() + segmentInfo := getSegment(rootPath, 0, 1, 2, 3, 10) + val, err := proto.Marshal(segmentInfo) + assert.NoError(t, err) + + compressedSegmentInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo) + err = CompressBinLogs(compressedSegmentInfo) + assert.NoError(t, err) + + valCompressed, err := proto.Marshal(compressedSegmentInfo) + assert.NoError(t, err) + + assert.True(t, len(valCompressed) < len(val)) + + // make sure the compact + unmarshaledSegmentInfo := &datapb.SegmentInfo{} + proto.Unmarshal(val, unmarshaledSegmentInfo) + + unmarshaledSegmentInfoCompressed := &datapb.SegmentInfo{} + proto.Unmarshal(valCompressed, unmarshaledSegmentInfoCompressed) + DecompressBinLogs(unmarshaledSegmentInfoCompressed) + + assert.Equal(t, len(unmarshaledSegmentInfo.GetBinlogs()), len(unmarshaledSegmentInfoCompressed.GetBinlogs())) + for i := 0; i < 10; i++ { + assert.Equal(t, unmarshaledSegmentInfo.GetBinlogs()[0].Binlogs[i].LogPath, unmarshaledSegmentInfoCompressed.GetBinlogs()[0].Binlogs[i].LogPath) + } + + // test compress erorr path + fakeBinlogs := make([]*datapb.Binlog, 1) + fakeBinlogs[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: "test", + } + fieldBinLogs := make([]*datapb.FieldBinlog, 1) + fieldBinLogs[0] = &datapb.FieldBinlog{ + FieldID: 106, + Binlogs: fakeBinlogs, + } + segmentInfo1 := &datapb.SegmentInfo{ + Binlogs: fieldBinLogs, + } + err = CompressBinLogs(segmentInfo1) + assert.ErrorIs(t, err, merr.ErrParameterInvalid) + + fakeDeltalogs := make([]*datapb.Binlog, 1) + fakeDeltalogs[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: "test", + } + fieldDeltaLogs := make([]*datapb.FieldBinlog, 1) + fieldDeltaLogs[0] = &datapb.FieldBinlog{ + FieldID: 106, + Binlogs: fakeBinlogs, + } + segmentInfo2 := &datapb.SegmentInfo{ + Deltalogs: fieldDeltaLogs, + } + err = CompressBinLogs(segmentInfo2) + assert.ErrorIs(t, err, merr.ErrParameterInvalid) + + fakeStatslogs := make([]*datapb.Binlog, 1) + fakeStatslogs[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: "test", + } + fieldStatsLogs := make([]*datapb.FieldBinlog, 1) + fieldStatsLogs[0] = &datapb.FieldBinlog{ + FieldID: 106, + Binlogs: fakeBinlogs, + } + segmentInfo3 := &datapb.SegmentInfo{ + Statslogs: fieldDeltaLogs, + } + err = CompressBinLogs(segmentInfo3) + assert.ErrorIs(t, err, merr.ErrParameterInvalid) + + // test decompress error invalid Type + // should not happen + fakeBinlogs = make([]*datapb.Binlog, 1) + fakeBinlogs[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: "", + LogID: 1, + } + fieldBinLogs = make([]*datapb.FieldBinlog, 1) + fieldBinLogs[0] = &datapb.FieldBinlog{ + FieldID: 106, + Binlogs: fakeBinlogs, + } + segmentInfo = &datapb.SegmentInfo{ + Binlogs: fieldBinLogs, + } + invaildType := storage.BinlogType(100) + err = DecompressBinLog(invaildType, 1, 1, 1, segmentInfo.Binlogs) + assert.ErrorIs(t, err, merr.ErrParameterInvalid) +} diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index e0f67c783b..6e263e7884 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/metastore" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -95,7 +96,10 @@ func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, err return nil, err } - kc.applyBinlogInfo(segments, insertLogs, deltaLogs, statsLogs) + err = kc.applyBinlogInfo(segments, insertLogs, deltaLogs, statsLogs) + if err != nil { + return nil, err + } return segments, nil } @@ -184,20 +188,12 @@ func (kc *Catalog) listBinlogs(binlogType storage.BinlogType) (map[typeutil.Uniq return fmt.Errorf("failed to unmarshal datapb.FieldBinlog: %d, err:%w", fieldBinlog.FieldID, err) } - collectionID, partitionID, segmentID, err := kc.parseBinlogKey(string(key), prefixIdx) + _, _, segmentID, err := kc.parseBinlogKey(string(key), prefixIdx) if err != nil { return fmt.Errorf("prefix:%s, %w", path.Join(kc.metaRootpath, logPathPrefix), err) } - switch binlogType { - case storage.InsertBinlog: - fillLogPathByLogID(kc.ChunkManagerRootPath, storage.InsertBinlog, collectionID, partitionID, segmentID, fieldBinlog) - case storage.DeleteBinlog: - fillLogPathByLogID(kc.ChunkManagerRootPath, storage.DeleteBinlog, collectionID, partitionID, segmentID, fieldBinlog) - case storage.StatsBinlog: - fillLogPathByLogID(kc.ChunkManagerRootPath, storage.StatsBinlog, collectionID, partitionID, segmentID, fieldBinlog) - } - + // no need to set log path and only store log id ret[segmentID] = append(ret[segmentID], fieldBinlog) return nil } @@ -211,20 +207,37 @@ func (kc *Catalog) listBinlogs(binlogType storage.BinlogType) (map[typeutil.Uniq func (kc *Catalog) applyBinlogInfo(segments []*datapb.SegmentInfo, insertLogs, deltaLogs, statsLogs map[typeutil.UniqueID][]*datapb.FieldBinlog, -) { +) error { + var err error for _, segmentInfo := range segments { if len(segmentInfo.Binlogs) == 0 { segmentInfo.Binlogs = insertLogs[segmentInfo.ID] + } else { + err = binlog.CompressFieldBinlogs(segmentInfo.Binlogs) + if err != nil { + return err + } } if len(segmentInfo.Deltalogs) == 0 { segmentInfo.Deltalogs = deltaLogs[segmentInfo.ID] + } else { + err = binlog.CompressFieldBinlogs(segmentInfo.Deltalogs) + if err != nil { + return err + } } if len(segmentInfo.Statslogs) == 0 { segmentInfo.Statslogs = statsLogs[segmentInfo.ID] + } else { + err = binlog.CompressFieldBinlogs(segmentInfo.Statslogs) + if err != nil { + return err + } } } + return nil } func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error { @@ -289,15 +302,12 @@ func (kc *Catalog) AlterSegments(ctx context.Context, segments []*datapb.Segment for _, b := range binlogs { segment := b.Segment - if err := ValidateSegment(segment); err != nil { - return err - } - binlogKvs, err := buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), cloneLogs(segment.GetBinlogs()), cloneLogs(segment.GetDeltalogs()), cloneLogs(segment.GetStatslogs())) if err != nil { return err } + maps.Copy(kvs, binlogKvs) } @@ -538,31 +548,9 @@ func (kc *Catalog) getBinlogsWithPrefix(binlogType storage.BinlogType, collectio if err != nil { return nil, nil, err } - return keys, values, nil } -// unmarshal binlog/deltalog/statslog -func (kc *Catalog) unmarshalBinlog(binlogType storage.BinlogType, collectionID, partitionID, segmentID typeutil.UniqueID) ([]*datapb.FieldBinlog, error) { - _, values, err := kc.getBinlogsWithPrefix(binlogType, collectionID, partitionID, segmentID) - if err != nil { - return nil, err - } - - result := make([]*datapb.FieldBinlog, len(values)) - for i, value := range values { - fieldBinlog := &datapb.FieldBinlog{} - err = proto.Unmarshal([]byte(value), fieldBinlog) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal datapb.FieldBinlog: %d, err:%w", fieldBinlog.FieldID, err) - } - - fillLogPathByLogID(kc.ChunkManagerRootPath, binlogType, collectionID, partitionID, segmentID, fieldBinlog) - result[i] = fieldBinlog - } - return result, nil -} - func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error { key := BuildIndexKey(index.CollectionID, index.IndexID) @@ -718,13 +706,3 @@ func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID type } return len(keys) == 0 && len(values) == 0 } - -func fillLogPathByLogID(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, - segmentID typeutil.UniqueID, fieldBinlog *datapb.FieldBinlog, -) { - for _, binlog := range fieldBinlog.Binlogs { - path := buildLogPath(chunkManagerRootPath, binlogType, collectionID, partitionID, - segmentID, fieldBinlog.GetFieldID(), binlog.GetLogID()) - binlog.LogPath = path - } -} diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 35bb11946c..cabebe6327 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -55,14 +55,6 @@ var ( fieldID = int64(1) rootPath = "a" - binlogPath = metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, logID) - deltalogPath = metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID, logID) - statslogPath = metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID, fieldID, logID) - - binlogPath2 = metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID2, fieldID, logID) - deltalogPath2 = metautil.BuildDeltaLogPath(rootPath, collectionID, partitionID, segmentID2, logID) - statslogPath2 = metautil.BuildStatsLogPath(rootPath, collectionID, partitionID, segmentID2, fieldID, logID) - k1 = buildFieldBinlogPath(collectionID, partitionID, segmentID, fieldID) k2 = buildFieldDeltalogPath(collectionID, partitionID, segmentID, fieldID) k3 = buildFieldStatslogPath(collectionID, partitionID, segmentID, fieldID) @@ -108,7 +100,7 @@ var ( Binlogs: []*datapb.Binlog{ { EntriesNum: 5, - LogPath: binlogPath, + LogID: logID, }, }, }, @@ -120,7 +112,7 @@ var ( Binlogs: []*datapb.Binlog{ { EntriesNum: 5, - LogPath: deltalogPath, + LogID: logID, }, }, }, @@ -131,20 +123,20 @@ var ( Binlogs: []*datapb.Binlog{ { EntriesNum: 5, - LogPath: statslogPath, + LogID: logID, }, }, }, } - getlogs = func(logpath string) []*datapb.FieldBinlog { + getlogs = func(id int64) []*datapb.FieldBinlog { return []*datapb.FieldBinlog{ { FieldID: 1, Binlogs: []*datapb.Binlog{ { EntriesNum: 5, - LogPath: logpath, + LogID: id, }, }, }, @@ -168,9 +160,9 @@ var ( PartitionID: partitionID, NumOfRows: 100, State: commonpb.SegmentState_Dropped, - Binlogs: getlogs(binlogPath2), - Deltalogs: getlogs(deltalogPath2), - Statslogs: getlogs(statslogPath2), + Binlogs: getlogs(logID), + Deltalogs: getlogs(logID), + Statslogs: getlogs(logID), } ) @@ -196,19 +188,22 @@ func Test_ListSegments(t *testing.T) { assert.Equal(t, fieldID, segment.Binlogs[0].FieldID) assert.Equal(t, 1, len(segment.Binlogs[0].Binlogs)) assert.Equal(t, logID, segment.Binlogs[0].Binlogs[0].LogID) - assert.Equal(t, binlogPath, segment.Binlogs[0].Binlogs[0].LogPath) + // set log path to empty and only store log id + assert.Equal(t, "", segment.Binlogs[0].Binlogs[0].LogPath) assert.Equal(t, 1, len(segment.Deltalogs)) assert.Equal(t, fieldID, segment.Deltalogs[0].FieldID) assert.Equal(t, 1, len(segment.Deltalogs[0].Binlogs)) assert.Equal(t, logID, segment.Deltalogs[0].Binlogs[0].LogID) - assert.Equal(t, deltalogPath, segment.Deltalogs[0].Binlogs[0].LogPath) + // set log path to empty and only store log id + assert.Equal(t, "", segment.Deltalogs[0].Binlogs[0].LogPath) assert.Equal(t, 1, len(segment.Statslogs)) assert.Equal(t, fieldID, segment.Statslogs[0].FieldID) assert.Equal(t, 1, len(segment.Statslogs[0].Binlogs)) assert.Equal(t, logID, segment.Statslogs[0].Binlogs[0].LogID) - assert.Equal(t, statslogPath, segment.Statslogs[0].Binlogs[0].LogPath) + // set log path to empty and only store log id + assert.Equal(t, "", segment.Statslogs[0].Binlogs[0].LogPath) } t.Run("test compatibility", func(t *testing.T) { @@ -228,7 +223,7 @@ func Test_ListSegments(t *testing.T) { assert.NotNil(t, ret) assert.NoError(t, err) - verifySegments(t, int64(0), ret) + verifySegments(t, logID, ret) }) t.Run("list successfully", func(t *testing.T) { @@ -392,7 +387,7 @@ func Test_AlterSegments(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 5, - LogPath: binlogPath, + LogID: logID, }, }, }) @@ -1062,54 +1057,6 @@ func TestCatalog_DropSegmentIndex(t *testing.T) { }) } -func TestCatalog_Compress(t *testing.T) { - segmentInfo := getSegment(rootPath, 0, 1, 2, 3, 10000) - val, err := proto.Marshal(segmentInfo) - assert.NoError(t, err) - - compressedSegmentInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo) - compressedSegmentInfo.Binlogs, err = CompressBinLog(compressedSegmentInfo.Binlogs) - assert.NoError(t, err) - compressedSegmentInfo.Deltalogs, err = CompressBinLog(compressedSegmentInfo.Deltalogs) - assert.NoError(t, err) - compressedSegmentInfo.Statslogs, err = CompressBinLog(compressedSegmentInfo.Statslogs) - assert.NoError(t, err) - - valCompressed, err := proto.Marshal(compressedSegmentInfo) - assert.NoError(t, err) - - assert.True(t, len(valCompressed) < len(val)) - - // make sure the compact - unmarshaledSegmentInfo := &datapb.SegmentInfo{} - proto.Unmarshal(val, unmarshaledSegmentInfo) - - unmarshaledSegmentInfoCompressed := &datapb.SegmentInfo{} - proto.Unmarshal(valCompressed, unmarshaledSegmentInfoCompressed) - DecompressBinLog(rootPath, unmarshaledSegmentInfoCompressed) - - assert.Equal(t, len(unmarshaledSegmentInfo.GetBinlogs()), len(unmarshaledSegmentInfoCompressed.GetBinlogs())) - for i := 0; i < 1000; i++ { - assert.Equal(t, unmarshaledSegmentInfo.GetBinlogs()[0].Binlogs[i].LogPath, unmarshaledSegmentInfoCompressed.GetBinlogs()[0].Binlogs[i].LogPath) - } - - // test compress erorr path - fakeBinlogs := make([]*datapb.Binlog, 1) - fakeBinlogs[0] = &datapb.Binlog{ - EntriesNum: 10000, - LogPath: "test", - } - fieldBinLogs := make([]*datapb.FieldBinlog, 1) - fieldBinLogs[0] = &datapb.FieldBinlog{ - FieldID: 106, - Binlogs: fakeBinlogs, - } - compressedSegmentInfo.Binlogs, err = CompressBinLog(fieldBinLogs) - assert.Error(t, err) - - // test decompress error path -} - func BenchmarkCatalog_List1000Segments(b *testing.B) { paramtable.Init() etcdCli, err := etcd.GetEtcdClient( diff --git a/internal/metastore/kv/datacoord/util.go b/internal/metastore/kv/datacoord/util.go index fa290fa812..4ace6aa1cd 100644 --- a/internal/metastore/kv/datacoord/util.go +++ b/internal/metastore/kv/datacoord/util.go @@ -18,9 +18,6 @@ package datacoord import ( "fmt" - "path" - "strconv" - "strings" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -30,71 +27,14 @@ import ( "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) -func CompressBinLog(fieldBinLogs []*datapb.FieldBinlog) ([]*datapb.FieldBinlog, error) { - compressedFieldBinLogs := make([]*datapb.FieldBinlog, 0) - for _, fieldBinLog := range fieldBinLogs { - compressedFieldBinLog := &datapb.FieldBinlog{} - compressedFieldBinLog.FieldID = fieldBinLog.FieldID - for _, binlog := range fieldBinLog.Binlogs { - logPath := binlog.LogPath - idx := strings.LastIndex(logPath, "/") - if idx == -1 { - return nil, fmt.Errorf("invailed binlog path: %s", logPath) - } - logPathStr := logPath[(idx + 1):] - logID, err := strconv.ParseInt(logPathStr, 10, 64) - if err != nil { - return nil, err - } - binlog := &datapb.Binlog{ - EntriesNum: binlog.EntriesNum, - // remove timestamp since it's not necessary - LogSize: binlog.LogSize, - LogID: logID, - } - compressedFieldBinLog.Binlogs = append(compressedFieldBinLog.Binlogs, binlog) - } - compressedFieldBinLogs = append(compressedFieldBinLogs, compressedFieldBinLog) - } - return compressedFieldBinLogs, nil -} - -func DecompressBinLog(path string, info *datapb.SegmentInfo) error { - for _, fieldBinLogs := range info.GetBinlogs() { - fillLogPathByLogID(path, storage.InsertBinlog, info.CollectionID, info.PartitionID, info.ID, fieldBinLogs) - } - - for _, deltaLogs := range info.GetDeltalogs() { - fillLogPathByLogID(path, storage.DeleteBinlog, info.CollectionID, info.PartitionID, info.ID, deltaLogs) - } - - for _, statsLogs := range info.GetStatslogs() { - fillLogPathByLogID(path, storage.StatsBinlog, info.CollectionID, info.PartitionID, info.ID, statsLogs) - } - return nil -} - func ValidateSegment(segment *datapb.SegmentInfo) error { log := log.With( zap.Int64("collection", segment.GetCollectionID()), zap.Int64("partition", segment.GetPartitionID()), zap.Int64("segment", segment.GetID())) - err := checkBinlogs(storage.InsertBinlog, segment.GetID(), segment.GetBinlogs()) - if err != nil { - return err - } - checkBinlogs(storage.DeleteBinlog, segment.GetID(), segment.GetDeltalogs()) - if err != nil { - return err - } - checkBinlogs(storage.StatsBinlog, segment.GetID(), segment.GetStatslogs()) - if err != nil { - return err - } // check stats log and bin log size match // check L0 Segment @@ -142,47 +82,9 @@ func ValidateSegment(segment *datapb.SegmentInfo) error { return nil } -// build a binlog path on the storage by metadata -func buildLogPath(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID, filedID, logID typeutil.UniqueID) string { - switch binlogType { - case storage.InsertBinlog: - return metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID) - case storage.DeleteBinlog: - return metautil.BuildDeltaLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, logID) - case storage.StatsBinlog: - return metautil.BuildStatsLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, filedID, logID) - } - // should not happen - log.Panic("invalid binlog type", zap.Any("type", binlogType)) - return "" -} - -func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, logs []*datapb.FieldBinlog) error { - check := func(getSegmentID func(logPath string) typeutil.UniqueID) error { - for _, fieldBinlog := range logs { - for _, binlog := range fieldBinlog.Binlogs { - if segmentID != getSegmentID(binlog.LogPath) { - return fmt.Errorf("the segment path doesn't match the segmentID, segmentID %d, path %s", segmentID, binlog.LogPath) - } - } - } - return nil - } - switch binlogType { - case storage.InsertBinlog: - return check(metautil.GetSegmentIDFromInsertLogPath) - case storage.DeleteBinlog: - return check(metautil.GetSegmentIDFromDeltaLogPath) - case storage.StatsBinlog: - return check(metautil.GetSegmentIDFromStatsLogPath) - default: - return fmt.Errorf("the segment path doesn't match the segmentID, segmentID %d, type %d", segmentID, binlogType) - } -} - func hasSpecialStatslog(segment *datapb.SegmentInfo) bool { for _, statslog := range segment.GetStatslogs()[0].GetBinlogs() { - _, logidx := path.Split(statslog.LogPath) + logidx := fmt.Sprint(statslog.LogID) if logidx == storage.CompoundStatsType.LogIdx() { return true } @@ -193,7 +95,7 @@ func hasSpecialStatslog(segment *datapb.SegmentInfo) bool { func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID, binlogs, deltalogs, statslogs []*datapb.FieldBinlog, ) (map[string]string, error) { - fillLogIDByLogPath(binlogs, deltalogs, statslogs) + // all the FieldBinlog will only have logid kvs, err := buildBinlogKvs(collectionID, partitionID, segmentID, binlogs, deltalogs, statslogs) if err != nil { return nil, err @@ -259,30 +161,6 @@ func cloneLogs(binlogs []*datapb.FieldBinlog) []*datapb.FieldBinlog { return res } -func fillLogIDByLogPath(multiFieldBinlogs ...[]*datapb.FieldBinlog) error { - for _, fieldBinlogs := range multiFieldBinlogs { - for _, fieldBinlog := range fieldBinlogs { - for _, binlog := range fieldBinlog.Binlogs { - logPath := binlog.LogPath - idx := strings.LastIndex(logPath, "/") - if idx == -1 { - return fmt.Errorf("invailed binlog path: %s", logPath) - } - logPathStr := logPath[(idx + 1):] - logID, err := strconv.ParseInt(logPathStr, 10, 64) - if err != nil { - return err - } - - // set log path to empty and only store log id - binlog.LogPath = "" - binlog.LogID = logID - } - } - } - return nil -} - func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) { kv := make(map[string]string) diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index d78a942290..1567249448 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -509,6 +509,8 @@ message CompactionSegmentBinlogs { repeated FieldBinlog deltalogs = 4; string insert_channel = 5; SegmentLevel level = 6; + int64 collectionID = 7; + int64 partitionID = 8; } message CompactionPlan { diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 2dce6870b4..b27250d2ee 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -265,6 +265,7 @@ message CreateJobRequest { int64 store_version = 20; string index_store_path = 21; int64 dim = 22; + repeated int64 data_ids = 23; } message QueryJobsRequest { diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index cd18da3e13..afef07449a 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -171,6 +172,12 @@ func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...Uniq return nil, fmt.Errorf("no such segment in DataCoord") } + err = binlog.DecompressMultiBinLogs(resp.GetInfos()) + if err != nil { + log.Warn("failed to DecompressMultiBinLogs", zap.Error(err)) + return nil, err + } + return resp, nil } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 60a043aee7..ff7c0bb1f2 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1265,6 +1265,7 @@ func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *Loca counts := make([]int64, 0, len(rowIDField.GetBinlogs())) for _, binlog := range rowIDField.GetBinlogs() { + // binlog.LogPath has already been filled bs, err := loader.cm.Read(ctx, binlog.LogPath) if err != nil { return err diff --git a/pkg/eventlog/mock_logger.go b/pkg/eventlog/mock_logger.go index 566126521a..8d5c8c3306 100644 --- a/pkg/eventlog/mock_logger.go +++ b/pkg/eventlog/mock_logger.go @@ -130,8 +130,7 @@ func (_c *MockLogger_RecordFunc_Call) RunAndReturn(run func(Level, func() Evt)) func NewMockLogger(t interface { mock.TestingT Cleanup(func()) -}, -) *MockLogger { +}) *MockLogger { mock := &MockLogger{} mock.Mock.Test(t) diff --git a/pkg/mq/msgdispatcher/mock_client.go b/pkg/mq/msgdispatcher/mock_client.go index 4b99b5e8f4..f2ecaf4341 100644 --- a/pkg/mq/msgdispatcher/mock_client.go +++ b/pkg/mq/msgdispatcher/mock_client.go @@ -153,8 +153,7 @@ func (_c *MockClient_Register_Call) RunAndReturn(run func(context.Context, strin func NewMockClient(t interface { mock.TestingT Cleanup(func()) -}, -) *MockClient { +}) *MockClient { mock := &MockClient{} mock.Mock.Test(t) diff --git a/pkg/mq/msgstream/mock_msgstream.go b/pkg/mq/msgstream/mock_msgstream.go index 18be8faa46..e97b0e30d9 100644 --- a/pkg/mq/msgstream/mock_msgstream.go +++ b/pkg/mq/msgstream/mock_msgstream.go @@ -526,8 +526,7 @@ func (_c *MockMsgStream_SetRepackFunc_Call) RunAndReturn(run func(RepackFunc)) * func NewMockMsgStream(t interface { mock.TestingT Cleanup(func()) -}, -) *MockMsgStream { +}) *MockMsgStream { mock := &MockMsgStream{} mock.Mock.Test(t) diff --git a/tests/integration/bulkinsert/bulkinsert_test.go b/tests/integration/bulkinsert/bulkinsert_test.go index 4fd5e2be9a..c9622a796a 100644 --- a/tests/integration/bulkinsert/bulkinsert_test.go +++ b/tests/integration/bulkinsert/bulkinsert_test.go @@ -95,6 +95,9 @@ func (s *BulkInsertSuite) TestBulkInsert() { s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + err = os.MkdirAll(c.ChunkManager.RootPath(), os.ModePerm) + s.NoError(err) + err = GenerateNumpyFile(c.ChunkManager.RootPath()+"/"+"embeddings.npy", 100, schemapb.DataType_FloatVector, []*commonpb.KeyValuePair{ { Key: common.DimKey, diff --git a/tests/integration/minicluster_v2.go b/tests/integration/minicluster_v2.go index 29a37faf61..b039550168 100644 --- a/tests/integration/minicluster_v2.go +++ b/tests/integration/minicluster_v2.go @@ -163,7 +163,7 @@ func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2, } // setup servers - cluster.factory = dependency.NewDefaultFactory(true) + cluster.factory = dependency.MockDefaultFactory(true, params) chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx) if err != nil { return nil, err