From dffcd974fffae6ea4916f517a3af2323c34ee06a Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 18 Nov 2022 15:35:09 +0800 Subject: [PATCH] Make sure that the segment path matches the segment id (#20677) Signed-off-by: SimFG Signed-off-by: SimFG --- internal/datacoord/compaction_test.go | 75 +++++++++++-------- internal/datacoord/garbage_collector_test.go | 4 +- internal/datacoord/meta_test.go | 16 ++-- internal/datacoord/server_test.go | 8 +- internal/indexcoord/index_coord_test.go | 20 ++++- internal/metastore/kv/datacoord/kv_catalog.go | 26 +++++++ .../metastore/kv/datacoord/kv_catalog_test.go | 35 +++++++-- internal/util/metautil/binlog.go | 29 +++++++ 8 files changed, 158 insertions(+), 55 deletions(-) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index a30c56416a..99c8824cff 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -22,6 +22,9 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/util/metautil" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/milvus-io/milvus-proto/go-api/commonpb" memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/log" @@ -219,6 +222,18 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) { assert.Less(t, uint64(2), max-min) } +func getInsertLogPath(rootPath string, segmentID typeutil.UniqueID) string { + return metautil.BuildInsertLogPath(rootPath, 10, 100, segmentID, 1000, 10000) +} + +func getStatsLogPath(rootPath string, segmentID typeutil.UniqueID) string { + return metautil.BuildStatsLogPath(rootPath, 10, 100, segmentID, 1000, 10000) +} + +func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string { + return metautil.BuildDeltaLogPath(rootPath, 10, 100, segmentID, 10000) +} + func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) { mockDataNode := &mocks.DataNode{} mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil) @@ -227,16 +242,16 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) { seg1 := &datapb.SegmentInfo{ ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}, } seg2 := &datapb.SegmentInfo{ ID: 2, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))}, } plan := &datapb.CompactionPlan{ @@ -319,18 +334,18 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) { PlanID: 1, SegmentID: 3, NumOfRows: 15, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, } compactionResult2 := &datapb.CompactionResult{ PlanID: 1, SegmentID: 3, NumOfRows: 0, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, } has, err := c.meta.HasSegments([]UniqueID{1, 2}) @@ -385,16 +400,16 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { seg1 := &datapb.SegmentInfo{ ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}, } seg2 := &datapb.SegmentInfo{ ID: 2, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))}, } plan := &datapb.CompactionPlan{ @@ -448,9 +463,9 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { PlanID: 1, SegmentID: 3, NumOfRows: 15, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, } flushCh := make(chan UniqueID, 1) @@ -480,16 +495,16 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { seg1 := &datapb.SegmentInfo{ ID: 1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}, } seg2 := &datapb.SegmentInfo{ ID: 2, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))}, } plan := &datapb.CompactionPlan{ @@ -549,9 +564,9 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { PlanID: 1, SegmentID: 3, NumOfRows: 0, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")}, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))}, } flushCh := make(chan UniqueID, 1) diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index cff6574db8..90e38dc212 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -308,7 +308,7 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i if i == 1 { token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", funcutil.RandomString(8), funcutil.RandomString(8)) } else { - token = path.Join(strconv.Itoa(i), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8), funcutil.RandomString(8)) + token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), funcutil.RandomString(8), funcutil.RandomString(8)) } // insert filePath := path.Join(root, insertLogPrefix, token) @@ -329,7 +329,7 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i if i == 1 { token = path.Join(strconv.Itoa(i), strconv.Itoa(i), "error-seg-id", funcutil.RandomString(8)) } else { - token = path.Join(strconv.Itoa(i), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8)) + token = path.Join(strconv.Itoa(1+i), strconv.Itoa(10+i), strconv.Itoa(100+i), funcutil.RandomString(8)) } filePath = path.Join(root, deltaLogPrefix, token) info, err = cli.PutObject(context.TODO(), bucket, filePath, reader, int64(len(content)), minio.PutObjectOptions{}) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index e68ec607c2..4f4fd01651 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -526,14 +526,14 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil) assert.Nil(t, err) - segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog0")}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog0")}}} + segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog0", 1))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog0", 1))}}} err = meta.AddSegment(segment1) assert.Nil(t, err) - err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog1")}, - []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog1")}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}}, + err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog1", 1))}, + []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog1", 1))}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}}, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}) assert.Nil(t, err) @@ -598,9 +598,9 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { } meta.segments.SetSegment(1, segmentInfo) - err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog")}, - []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog")}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}}, + err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog", 1))}, + []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("statslog", 1))}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1)}}}}, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}) assert.NotNil(t, err) assert.Equal(t, "mocked fail", err.Error()) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 7fe96b9058..41e1e215f1 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1150,10 +1150,10 @@ func TestSaveBinlogPaths(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: "/by-dev/test/0/1/2/1/Allo1", + LogPath: "/by-dev/test/0/1/1/1/Allo1", }, { - LogPath: "/by-dev/test/0/1/2/1/Allo2", + LogPath: "/by-dev/test/0/1/1/1/Allo2", }, }, }, @@ -1183,8 +1183,8 @@ func TestSaveBinlogPaths(t *testing.T) { assert.NotNil(t, fieldBinlogs) assert.EqualValues(t, 2, len(fieldBinlogs.GetBinlogs())) assert.EqualValues(t, 1, fieldBinlogs.GetFieldID()) - assert.EqualValues(t, "/by-dev/test/0/1/2/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath()) - assert.EqualValues(t, "/by-dev/test/0/1/2/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath()) + assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath()) + assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath()) segmentInfo := svr.meta.GetSegment(0) assert.NotNil(t, segmentInfo) diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index e3db8324a7..9f2a6aa83f 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -260,7 +260,6 @@ func testIndexCoord(t *testing.T) { indexs := ic.metaTable.collectionIndexes ic.metaTable.collectionIndexes = make(map[UniqueID]map[UniqueID]*model.Index) defer func() { - fmt.Println("simfg fubang") ic.metaTable.collectionIndexes = indexs }() @@ -278,7 +277,19 @@ func testIndexCoord(t *testing.T) { break } - indexs := ic.metaTable.segmentIndexes + updateSegmentIndexes := func(i map[UniqueID]map[UniqueID]*model.SegmentIndex) { + ic.metaTable.indexLock.Lock() + ic.metaTable.segmentIndexes = i + ic.metaTable.indexLock.Unlock() + } + + getSegmentIndexes := func() map[UniqueID]map[UniqueID]*model.SegmentIndex { + ic.metaTable.indexLock.RLock() + defer ic.metaTable.indexLock.RUnlock() + return ic.metaTable.segmentIndexes + } + + indexs := getSegmentIndexes() mockIndexs := make(map[UniqueID]map[UniqueID]*model.SegmentIndex) progressIndex := &model.SegmentIndex{ IndexState: commonpb.IndexState_InProgress, @@ -292,9 +303,10 @@ func testIndexCoord(t *testing.T) { IndexState: commonpb.IndexState_Finished, NumRows: 2048, } - ic.metaTable.segmentIndexes = mockIndexs + + updateSegmentIndexes(mockIndexs) defer func() { - ic.metaTable.segmentIndexes = indexs + updateSegmentIndexes(indexs) }() mockIndexs[111] = make(map[UniqueID]*model.SegmentIndex) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index c3909e4b40..55db96db4d 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -465,9 +465,35 @@ func buildLogPath(chunkManagerRootPath string, binlogType storage.BinlogType, co } } +func checkBinlogs(binlogType storage.BinlogType, segmentID typeutil.UniqueID, logs []*datapb.FieldBinlog) { + check := func(getSegmentID func(logPath string) typeutil.UniqueID) { + for _, fieldBinlog := range logs { + for _, binlog := range fieldBinlog.Binlogs { + if segmentID != getSegmentID(binlog.LogPath) { + log.Panic("the segment path doesn't match the segment id", zap.Int64("segment_id", segmentID), zap.String("path", binlog.LogPath)) + } + } + } + } + switch binlogType { + case storage.InsertBinlog: + check(metautil.GetSegmentIDFromInsertLogPath) + case storage.DeleteBinlog: + check(metautil.GetSegmentIDFromDeltaLogPath) + case storage.StatsBinlog: + check(metautil.GetSegmentIDFromStatsLogPath) + default: + log.Panic("invalid binlog type") + } +} + func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID, binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) { + checkBinlogs(storage.InsertBinlog, segmentID, binlogs) + checkBinlogs(storage.DeleteBinlog, segmentID, deltalogs) + checkBinlogs(storage.StatsBinlog, segmentID, statslogs) + fillLogIDByLogPath(binlogs, deltalogs, statslogs) kvs, err := buildBinlogKvs(collectionID, partitionID, segmentID, binlogs, deltalogs, statslogs) if err != nil { diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index ddf3ad65e4..60b5724e7a 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -39,6 +39,10 @@ var ( deltalogPath = metautil.BuildDeltaLogPath("a", collectionID, partitionID, segmentID, logID) statslogPath = metautil.BuildStatsLogPath("a", collectionID, partitionID, segmentID, fieldID, logID) + binlogPath2 = metautil.BuildInsertLogPath("a", collectionID, partitionID, segmentID2, fieldID, logID) + deltalogPath2 = metautil.BuildDeltaLogPath("a", collectionID, partitionID, segmentID2, logID) + statslogPath2 = metautil.BuildStatsLogPath("a", collectionID, partitionID, segmentID2, fieldID, logID) + k1 = buildFieldBinlogPath(collectionID, partitionID, segmentID, fieldID) k2 = buildFieldDeltalogPath(collectionID, partitionID, segmentID, fieldID) k3 = buildFieldStatslogPath(collectionID, partitionID, segmentID, fieldID) @@ -114,6 +118,20 @@ var ( }, } + getlogs = func(logpath string) []*datapb.FieldBinlog { + return []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: logpath, + }, + }, + }, + } + } + segment1 = &datapb.SegmentInfo{ ID: segmentID, CollectionID: collectionID, @@ -131,9 +149,9 @@ var ( PartitionID: partitionID, NumOfRows: 100, State: commonpb.SegmentState_Dropped, - Binlogs: binlogs, - Deltalogs: deltalogs, - Statslogs: statslogs, + Binlogs: getlogs(binlogPath2), + Deltalogs: getlogs(deltalogPath2), + Statslogs: getlogs(statslogPath2), } ) @@ -257,8 +275,9 @@ func Test_AddSegments(t *testing.T) { } catalog := &Catalog{txn, "a"} - err := catalog.AddSegment(context.TODO(), invalidSegment) - assert.Error(t, err) + assert.Panics(t, func() { + catalog.AddSegment(context.TODO(), invalidSegment) + }) }) t.Run("save error", func(t *testing.T) { @@ -299,8 +318,9 @@ func Test_AlterSegments(t *testing.T) { } catalog := &Catalog{txn, "a"} - err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}) - assert.Error(t, err) + assert.Panics(t, func() { + catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}) + }) }) t.Run("save error", func(t *testing.T) { @@ -420,6 +440,7 @@ func Test_AlterSegmentsAndAddNewSegment(t *testing.T) { return []string{}, []string{}, nil } + // TODO fubang catalog := &Catalog{txn, "a"} err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, segment1) assert.NoError(t, err) diff --git a/internal/util/metautil/binlog.go b/internal/util/metautil/binlog.go index 0a1a3c7d3e..ac9cf20de7 100644 --- a/internal/util/metautil/binlog.go +++ b/internal/util/metautil/binlog.go @@ -3,27 +3,56 @@ package metautil import ( "path" "strconv" + "strings" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/common" ) +const pathSep = "/" + func BuildInsertLogPath(rootPath string, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) string { k := JoinIDPath(collectionID, partitionID, segmentID, fieldID, logID) return path.Join(rootPath, common.SegmentInsertLogPath, k) } +func GetSegmentIDFromInsertLogPath(logPath string) typeutil.UniqueID { + return getSegmentIDFromPath(logPath, 3) +} + func BuildStatsLogPath(rootPath string, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) string { k := JoinIDPath(collectionID, partitionID, segmentID, fieldID, logID) return path.Join(rootPath, common.SegmentStatslogPath, k) } +func GetSegmentIDFromStatsLogPath(logPath string) typeutil.UniqueID { + return getSegmentIDFromPath(logPath, 3) +} + func BuildDeltaLogPath(rootPath string, collectionID, partitionID, segmentID, logID typeutil.UniqueID) string { k := JoinIDPath(collectionID, partitionID, segmentID, logID) return path.Join(rootPath, common.SegmentDeltaLogPath, k) } +func GetSegmentIDFromDeltaLogPath(logPath string) typeutil.UniqueID { + return getSegmentIDFromPath(logPath, 2) +} + +func getSegmentIDFromPath(logPath string, segmentIndex int) typeutil.UniqueID { + infos := strings.Split(logPath, pathSep) + l := len(infos) + if l < segmentIndex { + return 0 + } + + v, err := strconv.ParseInt(infos[l-segmentIndex], 10, 64) + if err != nil { + return 0 + } + return v +} + // JoinIDPath joins ids to path format. func JoinIDPath(ids ...typeutil.UniqueID) string { idStr := make([]string, 0, len(ids))