mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Re-calculate segment entry num when binlog field is zero (#23883)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
8ffa8a28fa
commit
884f7904a1
@ -375,6 +375,11 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
|
||||
if err := loader.loadSealedSegmentFields(ctx, segment, fieldBinlogs, loadInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
// https://github.com/milvus-io/milvus/23654
|
||||
// legacy entry num = 0
|
||||
if err := loader.patchEntryNumber(ctx, segment, loadInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := loader.loadGrowingSegmentFields(ctx, segment, loadInfo.BinlogPaths); err != nil {
|
||||
return err
|
||||
@ -779,6 +784,71 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment *LocalSe
|
||||
return nil
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
|
||||
var needReset bool
|
||||
|
||||
segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool {
|
||||
for _, info := range info.FieldBinlog.GetBinlogs() {
|
||||
if info.GetEntriesNum() == 0 {
|
||||
needReset = true
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
if !needReset {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Warn("legacy segment binlog found, start to patch entry num", zap.Int64("segmentID", segment.segmentID))
|
||||
rowIDField := lo.FindOrElse(loadInfo.BinlogPaths, nil, func(binlog *datapb.FieldBinlog) bool {
|
||||
return binlog.GetFieldID() == common.RowIDField
|
||||
})
|
||||
|
||||
if rowIDField == nil {
|
||||
return errors.New("rowID field binlog not found")
|
||||
}
|
||||
|
||||
counts := make([]int64, 0, len(rowIDField.GetBinlogs()))
|
||||
for _, binlog := range rowIDField.GetBinlogs() {
|
||||
bs, err := loader.cm.Read(ctx, binlog.LogPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get binlog entry num from rowID field
|
||||
// since header does not store entry numb, we have to read all data here
|
||||
|
||||
reader, err := storage.NewBinlogReader(bs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
er, err := reader.NextEventReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowIDs, err := er.GetInt64FromPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
counts = append(counts, int64(len(rowIDs)))
|
||||
}
|
||||
|
||||
var err error
|
||||
segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool {
|
||||
if len(info.FieldBinlog.GetBinlogs()) != len(counts) {
|
||||
err = errors.New("rowID & index binlog number not matched")
|
||||
return false
|
||||
}
|
||||
for i, binlog := range info.FieldBinlog.GetBinlogs() {
|
||||
binlog.EntriesNum = counts[i]
|
||||
}
|
||||
return true
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// JoinIDPath joins ids to path format.
|
||||
func JoinIDPath(ids ...UniqueID) string {
|
||||
idStr := make([]string, 0, len(ids))
|
||||
|
||||
@ -353,6 +353,61 @@ func (suite *SegmentLoaderSuite) TestLoadWithMmap() {
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *SegmentLoaderSuite) TestPatchEntryNum() {
|
||||
ctx := context.Background()
|
||||
|
||||
segmentID := suite.segmentID
|
||||
binlogs, statsLogs, err := SaveBinLog(ctx,
|
||||
suite.collectionID,
|
||||
suite.partitionID,
|
||||
segmentID,
|
||||
100,
|
||||
suite.schema,
|
||||
suite.chunkManager,
|
||||
)
|
||||
suite.NoError(err)
|
||||
|
||||
vecFields := funcutil.GetVecFieldIDs(suite.schema)
|
||||
indexInfo, err := GenAndSaveIndex(
|
||||
suite.collectionID,
|
||||
suite.partitionID,
|
||||
segmentID,
|
||||
vecFields[0],
|
||||
100,
|
||||
IndexFaissIVFFlat,
|
||||
L2,
|
||||
suite.chunkManager,
|
||||
)
|
||||
suite.NoError(err)
|
||||
loadInfo := &querypb.SegmentLoadInfo{
|
||||
SegmentID: segmentID,
|
||||
PartitionID: suite.partitionID,
|
||||
CollectionID: suite.collectionID,
|
||||
BinlogPaths: binlogs,
|
||||
Statslogs: statsLogs,
|
||||
IndexInfos: []*querypb.FieldIndexInfo{indexInfo},
|
||||
}
|
||||
|
||||
// mock legacy binlog entry num is zero case
|
||||
for _, fieldLog := range binlogs {
|
||||
for _, binlog := range fieldLog.GetBinlogs() {
|
||||
binlog.EntriesNum = 0
|
||||
}
|
||||
}
|
||||
|
||||
segments, err := suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, loadInfo)
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(1, len(segments))
|
||||
|
||||
segment := segments[0]
|
||||
info := segment.GetIndex(vecFields[0])
|
||||
suite.Require().NotNil(info)
|
||||
|
||||
for _, binlog := range info.FieldBinlog.GetBinlogs() {
|
||||
suite.Greater(binlog.EntriesNum, int64(0))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSegmentLoader(t *testing.T) {
|
||||
suite.Run(t, &SegmentLoaderSuite{})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user