mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
enhance: do check when add not empty logpath (#33640)
meta only store logid Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
ecee7d90d4
commit
c61fb1eff5
@ -696,7 +696,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
|
|||||||
AddBinlogsOperator(1,
|
AddBinlogsOperator(1,
|
||||||
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)},
|
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)},
|
||||||
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)},
|
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)},
|
||||||
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}},
|
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
|
||||||
),
|
),
|
||||||
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
|
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
|
||||||
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
|
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
|
||||||
@ -837,7 +837,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
|
|||||||
AddBinlogsOperator(1,
|
AddBinlogsOperator(1,
|
||||||
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
|
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
|
||||||
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
|
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
|
||||||
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}},
|
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
|
||||||
),
|
),
|
||||||
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
|
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
|
||||||
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
|
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
|
||||||
|
|||||||
@ -183,7 +183,7 @@ func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists
|
// SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists
|
||||||
// set the logPath of segement in meta empty, to save space
|
// set the logPath of segment in meta empty, to save space
|
||||||
// if segment has logPath, make it empty
|
// if segment has logPath, make it empty
|
||||||
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
|
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
|
||||||
if segment, ok := s.segments[segmentID]; ok {
|
if segment, ok := s.segments[segmentID]; ok {
|
||||||
|
|||||||
@ -35,7 +35,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -649,7 +648,6 @@ func TestTryToSealSegment(t *testing.T) {
|
|||||||
{
|
{
|
||||||
EntriesNum: 10,
|
EntriesNum: 10,
|
||||||
LogID: 3,
|
LogID: 3,
|
||||||
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3),
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -674,12 +672,10 @@ func TestTryToSealSegment(t *testing.T) {
|
|||||||
{
|
{
|
||||||
EntriesNum: 10,
|
EntriesNum: 10,
|
||||||
LogID: 1,
|
LogID: 1,
|
||||||
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogID: 2,
|
LogID: 2,
|
||||||
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2),
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
@ -58,7 +58,6 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/util/tikv"
|
"github.com/milvus-io/milvus/pkg/util/tikv"
|
||||||
@ -320,17 +319,14 @@ func TestGetSegmentInfo(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801),
|
|
||||||
LogID: 801,
|
LogID: 801,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802),
|
|
||||||
LogID: 802,
|
LogID: 802,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803),
|
|
||||||
LogID: 803,
|
LogID: 803,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -344,10 +340,10 @@ func TestGetSegmentInfo(t *testing.T) {
|
|||||||
SegmentIDs: []int64{0},
|
SegmentIDs: []int64{0},
|
||||||
}
|
}
|
||||||
resp, err := svr.GetSegmentInfo(svr.ctx, req)
|
resp, err := svr.GetSegmentInfo(svr.ctx, req)
|
||||||
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 1, len(resp.GetInfos()))
|
assert.Equal(t, 1, len(resp.GetInfos()))
|
||||||
// Check that # of rows is corrected from 100 to 60.
|
// Check that # of rows is corrected from 100 to 60.
|
||||||
assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows())
|
assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows())
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||||
})
|
})
|
||||||
t.Run("with wrong segmentID", func(t *testing.T) {
|
t.Run("with wrong segmentID", func(t *testing.T) {
|
||||||
@ -1824,17 +1820,14 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
|
|
||||||
LogID: 901,
|
LogID: 901,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
|
|
||||||
LogID: 902,
|
LogID: 902,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
|
|
||||||
LogID: 903,
|
LogID: 903,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1847,12 +1840,10 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 30,
|
EntriesNum: 30,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
|
|
||||||
LogID: 801,
|
LogID: 801,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 70,
|
EntriesNum: 70,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
|
|
||||||
LogID: 802,
|
LogID: 802,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1926,17 +1917,14 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
|
|
||||||
LogID: 901,
|
LogID: 901,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
|
|
||||||
LogID: 902,
|
LogID: 902,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
|
|
||||||
LogID: 903,
|
LogID: 903,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1949,12 +1937,10 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 30,
|
EntriesNum: 30,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
|
|
||||||
LogID: 801,
|
LogID: 801,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 70,
|
EntriesNum: 70,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
|
|
||||||
LogID: 802,
|
LogID: 802,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
@ -1138,17 +1138,14 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
|
|
||||||
LogID: 901,
|
LogID: 901,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
|
|
||||||
LogID: 902,
|
LogID: 902,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
|
|
||||||
LogID: 903,
|
LogID: 903,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1161,12 +1158,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 30,
|
EntriesNum: 30,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
|
|
||||||
LogID: 801,
|
LogID: 801,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 70,
|
EntriesNum: 70,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
|
|
||||||
LogID: 802,
|
LogID: 802,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1243,17 +1238,14 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
|
|
||||||
LogID: 901,
|
LogID: 901,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
|
|
||||||
LogID: 902,
|
LogID: 902,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 20,
|
EntriesNum: 20,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
|
|
||||||
LogID: 903,
|
LogID: 903,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1266,12 +1258,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
EntriesNum: 30,
|
EntriesNum: 30,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
|
|
||||||
LogID: 801,
|
LogID: 801,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
EntriesNum: 70,
|
EntriesNum: 70,
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
|
|
||||||
LogID: 802,
|
LogID: 802,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1318,11 +1308,9 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||||||
FieldID: 1,
|
FieldID: 1,
|
||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
|
|
||||||
LogID: 801,
|
LogID: 801,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
|
|
||||||
LogID: 801,
|
LogID: 801,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1333,11 +1321,9 @@ func TestGetRecoveryInfoV2(t *testing.T) {
|
|||||||
FieldID: 1,
|
FieldID: 1,
|
||||||
Binlogs: []*datapb.Binlog{
|
Binlogs: []*datapb.Binlog{
|
||||||
{
|
{
|
||||||
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
|
|
||||||
LogID: 10000,
|
LogID: 10000,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
|
|
||||||
LogID: 10000,
|
LogID: 10000,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
@ -313,6 +313,53 @@ func Test_AddSegments(t *testing.T) {
|
|||||||
assert.Equal(t, 4, len(savedKvs))
|
assert.Equal(t, 4, len(savedKvs))
|
||||||
verifySavedKvsForSegment(t, savedKvs)
|
verifySavedKvsForSegment(t, savedKvs)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("no need to store log path", func(t *testing.T) {
|
||||||
|
metakv := mocks.NewMetaKv(t)
|
||||||
|
catalog := NewCatalog(metakv, rootPath, "")
|
||||||
|
|
||||||
|
validFieldBinlog := []*datapb.FieldBinlog{{
|
||||||
|
FieldID: 1,
|
||||||
|
Binlogs: []*datapb.Binlog{
|
||||||
|
{
|
||||||
|
LogID: 1,
|
||||||
|
LogPath: "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
invalidFieldBinlog := []*datapb.FieldBinlog{{
|
||||||
|
FieldID: 1,
|
||||||
|
Binlogs: []*datapb.Binlog{
|
||||||
|
{
|
||||||
|
LogID: 1,
|
||||||
|
LogPath: "no need to store",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
segment := &datapb.SegmentInfo{
|
||||||
|
ID: segmentID,
|
||||||
|
CollectionID: collectionID,
|
||||||
|
PartitionID: partitionID,
|
||||||
|
NumOfRows: 100,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
}
|
||||||
|
|
||||||
|
segment.Statslogs = invalidFieldBinlog
|
||||||
|
err := catalog.AddSegment(context.TODO(), segment)
|
||||||
|
assert.Error(t, err)
|
||||||
|
segment.Statslogs = validFieldBinlog
|
||||||
|
|
||||||
|
segment.Binlogs = invalidFieldBinlog
|
||||||
|
err = catalog.AddSegment(context.TODO(), segment)
|
||||||
|
assert.Error(t, err)
|
||||||
|
segment.Binlogs = validFieldBinlog
|
||||||
|
|
||||||
|
segment.Deltalogs = invalidFieldBinlog
|
||||||
|
err = catalog.AddSegment(context.TODO(), segment)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_AlterSegments(t *testing.T) {
|
func Test_AlterSegments(t *testing.T) {
|
||||||
|
|||||||
@ -169,6 +169,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl
|
|||||||
if binlog.GetLogID() == 0 {
|
if binlog.GetLogID() == 0 {
|
||||||
return fmt.Errorf("invalid log id, binlog:%v", binlog)
|
return fmt.Errorf("invalid log id, binlog:%v", binlog)
|
||||||
}
|
}
|
||||||
|
if binlog.GetLogPath() != "" {
|
||||||
|
return fmt.Errorf("fieldBinlog no need to store logpath, binlog:%v", binlog)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user