fix: make savebinlogpath idompotent at binlog level (#43615)

issue: #43574

- update all binlog every time when calling udpate savebinlogpath.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-07-29 19:47:36 +08:00 committed by GitHub
parent d57890449f
commit cd38d65417
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 2169 additions and 1628 deletions

View File

@ -769,7 +769,35 @@ type updateSegmentPack struct {
// for update etcd binlog paths // for update etcd binlog paths
increments map[int64]metastore.BinlogsIncrement increments map[int64]metastore.BinlogsIncrement
// for update segment metric after alter segments // for update segment metric after alter segments
metricMutation *segMetricMutation metricMutation *segMetricMutation
fromSaveBinlogPathSegmentID int64 // if true, the operator is from save binlog paths
}
func (p *updateSegmentPack) Validate() error {
if p.fromSaveBinlogPathSegmentID != 0 {
segment, ok := p.segments[p.fromSaveBinlogPathSegmentID]
if !ok {
panic(fmt.Sprintf("segment %d not found when validating save binlog paths", p.fromSaveBinlogPathSegmentID))
}
if segment.Level == datapb.SegmentLevel_L0 {
return nil
}
segmentInMeta := p.meta.segments.GetSegment(segment.ID)
if segmentInMeta.State == commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Dropped {
// if the segment is flushed, we should not update the segment meta, ignore the operation directly.
return errors.Wrapf(ErrIgnoredSegmentMetaOperation,
"segment is flushed, segmentID: %d",
segment.ID)
}
if segment.GetDmlPosition().GetTimestamp() < segmentInMeta.GetDmlPosition().GetTimestamp() {
return errors.Wrapf(ErrIgnoredSegmentMetaOperation,
"dml time tick is less than the segment meta, segmentID: %d, new incoming time tick: %d, existing time tick: %d",
segment.ID,
segment.GetDmlPosition().GetTimestamp(),
segmentInMeta.GetDmlPosition().GetTimestamp())
}
}
return nil
} }
func (p *updateSegmentPack) Get(segmentID int64) *SegmentInfo { func (p *updateSegmentPack) Get(segmentID int64) *SegmentInfo {
@ -1012,6 +1040,30 @@ func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25l
} }
} }
func UpdateBinlogsFromSaveBinlogPathsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs []*datapb.FieldBinlog) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
modPack.fromSaveBinlogPathSegmentID = segmentID
segment := modPack.Get(segmentID)
if segment == nil {
log.Ctx(context.TODO()).Warn("meta update: update binlog failed - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
segment.Binlogs = mergeFieldBinlogs(nil, binlogs)
segment.Statslogs = mergeFieldBinlogs(nil, statslogs)
segment.Deltalogs = mergeFieldBinlogs(nil, deltalogs)
if len(deltalogs) > 0 {
segment.deltaRowcount.Store(-1)
}
segment.Bm25Statslogs = mergeFieldBinlogs(nil, bm25logs)
modPack.increments[segmentID] = metastore.BinlogsIncrement{
Segment: segment.SegmentInfo,
}
return true
}
}
// update startPosition // update startPosition
func UpdateStartPosition(startPositions []*datapb.SegmentStartPosition) UpdateOperator { func UpdateStartPosition(startPositions []*datapb.SegmentStartPosition) UpdateOperator {
return func(modPack *updateSegmentPack) bool { return func(modPack *updateSegmentPack) bool {
@ -1051,7 +1103,7 @@ func UpdateDmlPosition(segmentID int64, dmlPosition *msgpb.MsgPosition) UpdateOp
} }
// UpdateCheckPointOperator updates segment checkpoint and num rows // UpdateCheckPointOperator updates segment checkpoint and num rows
func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint) UpdateOperator { func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint, skipDmlPositionCheck ...bool) UpdateOperator {
return func(modPack *updateSegmentPack) bool { return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID) segment := modPack.Get(segmentID)
if segment == nil { if segment == nil {
@ -1070,7 +1122,9 @@ func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint)
continue continue
} }
if segment.DmlPosition != nil && segment.DmlPosition.Timestamp >= cp.Position.Timestamp { // add skipDmlPositionCheck to skip this check, the check will be done at updateSegmentPack's Validate() to fail the full meta operation
// but not only filter the checkpoint update.
if segment.DmlPosition != nil && segment.DmlPosition.Timestamp >= cp.Position.Timestamp && (len(skipDmlPositionCheck) == 0 || !skipDmlPositionCheck[0]) {
log.Ctx(context.TODO()).Warn("checkpoint in segment is larger than reported", zap.Any("current", segment.GetDmlPosition()), zap.Any("reported", cp.GetPosition())) log.Ctx(context.TODO()).Warn("checkpoint in segment is larger than reported", zap.Any("current", segment.GetDmlPosition()), zap.Any("reported", cp.GetPosition()))
// segment position in etcd is larger than checkpoint, then dont change it // segment position in etcd is larger than checkpoint, then dont change it
continue continue
@ -1164,6 +1218,11 @@ func (m *meta) UpdateSegmentsInfo(ctx context.Context, operators ...UpdateOperat
return nil return nil
} }
// Validate the update pack.
if err := updatePack.Validate(); err != nil {
return err
}
segments := lo.MapToSlice(updatePack.segments, func(_ int64, segment *SegmentInfo) *datapb.SegmentInfo { return segment.SegmentInfo }) segments := lo.MapToSlice(updatePack.segments, func(_ int64, segment *SegmentInfo) *datapb.SegmentInfo { return segment.SegmentInfo })
increments := lo.Values(updatePack.increments) increments := lo.Values(updatePack.increments)

View File

@ -940,6 +940,117 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.Equal(t, updated.NumOfRows, expected.NumOfRows) assert.Equal(t, updated.NumOfRows, expected.NumOfRows)
}) })
t.Run("update binlogs from save binlog paths", func(t *testing.T) {
meta, err := newMemoryMeta(t)
assert.NoError(t, err)
segment1 := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{},
Statslogs: []*datapb.FieldBinlog{},
})
err = meta.AddSegment(context.TODO(), segment1)
assert.NoError(t, err)
require.EqualValues(t, -1, segment1.deltaRowcount.Load())
assert.EqualValues(t, 0, segment1.getDeltaCount())
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateStatusOperator(1, commonpb.SegmentState_Growing),
UpdateBinlogsFromSaveBinlogPathsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 333)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 334)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogID: 335}}}},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogID: 335}}}},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10, Position: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 100}}}, true),
)
assert.NoError(t, err)
updated := meta.GetHealthySegment(context.TODO(), 1)
assert.EqualValues(t, -1, updated.deltaRowcount.Load())
assert.EqualValues(t, 1, updated.getDeltaCount())
assert.Equal(t, updated.StartPosition, &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}})
assert.Equal(t, updated.DmlPosition, &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 100})
assert.Equal(t, len(updated.Binlogs[0].Binlogs), 1)
assert.Equal(t, len(updated.Statslogs[0].Binlogs), 1)
assert.Equal(t, len(updated.Deltalogs[0].Binlogs), 1)
assert.Equal(t, len(updated.Bm25Statslogs[0].Binlogs), 1)
assert.Equal(t, updated.State, commonpb.SegmentState_Growing)
assert.Equal(t, updated.NumOfRows, int64(10))
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateStatusOperator(1, commonpb.SegmentState_Growing),
UpdateBinlogsFromSaveBinlogPathsOperator(1, nil, nil, nil, nil),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10, Position: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 99}}}, true),
)
assert.True(t, errors.Is(err, ErrIgnoredSegmentMetaOperation))
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateStatusOperator(1, commonpb.SegmentState_Growing),
UpdateBinlogsFromSaveBinlogPathsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 335, 337)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 336)},
[]*datapb.FieldBinlog{},
[]*datapb.FieldBinlog{},
),
UpdateStatusOperator(1, commonpb.SegmentState_Flushed),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 12, Position: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 101}}}, true),
)
assert.NoError(t, err)
updated = meta.GetHealthySegment(context.TODO(), 1)
assert.Equal(t, updated.NumOfRows, int64(20))
assert.Equal(t, updated.DmlPosition, &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 101})
assert.Equal(t, len(updated.Binlogs[0].Binlogs), 2)
assert.Equal(t, len(updated.Statslogs[0].Binlogs), 1)
assert.Equal(t, len(updated.Deltalogs), 0)
assert.Equal(t, len(updated.Bm25Statslogs), 0)
assert.Equal(t, updated.State, commonpb.SegmentState_Flushed)
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateStatusOperator(1, commonpb.SegmentState_Flushed),
UpdateBinlogsFromSaveBinlogPathsOperator(1,
[]*datapb.FieldBinlog{},
[]*datapb.FieldBinlog{},
[]*datapb.FieldBinlog{},
[]*datapb.FieldBinlog{}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 12, Position: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 101}}}, true),
)
assert.True(t, errors.Is(err, ErrIgnoredSegmentMetaOperation))
updated = meta.GetHealthySegment(context.TODO(), 1)
assert.Equal(t, updated.NumOfRows, int64(20))
assert.Equal(t, updated.DmlPosition, &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 101})
assert.Equal(t, len(updated.Binlogs[0].Binlogs), 2)
assert.Equal(t, len(updated.Statslogs[0].Binlogs), 1)
assert.Equal(t, len(updated.Deltalogs), 0)
assert.Equal(t, len(updated.Bm25Statslogs), 0)
assert.Equal(t, updated.State, commonpb.SegmentState_Flushed)
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateStatusOperator(1, commonpb.SegmentState_Dropped),
UpdateBinlogsFromSaveBinlogPathsOperator(1,
[]*datapb.FieldBinlog{},
[]*datapb.FieldBinlog{},
[]*datapb.FieldBinlog{},
[]*datapb.FieldBinlog{}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 12, Position: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 101}}}, true),
)
assert.NoError(t, err)
updated = meta.GetSegment(context.TODO(), 1)
assert.Equal(t, updated.State, commonpb.SegmentState_Dropped)
})
t.Run("update compacted segment", func(t *testing.T) { t.Run("update compacted segment", func(t *testing.T) {
meta, err := newMemoryMeta(t) meta, err := newMemoryMeta(t)
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -16,7 +16,13 @@
package datacoord package datacoord
import "github.com/milvus-io/milvus/pkg/v2/proto/datapb" import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)
var ErrIgnoredSegmentMetaOperation = errors.New("ignored segment meta operation")
// reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2 // reviseVChannelInfo will revise the datapb.VchannelInfo for upgrade compatibility from 2.0.2
func reviseVChannelInfo(vChannel *datapb.VchannelInfo) { func reviseVChannelInfo(vChannel *datapb.VchannelInfo) {

View File

@ -519,6 +519,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("segmentID", req.GetSegmentID()), zap.Int64("segmentID", req.GetSegmentID()),
zap.String("level", req.GetSegLevel().String()), zap.String("level", req.GetSegLevel().String()),
zap.Bool("withFullBinlogs", req.GetWithFullBinlogs()),
) )
log.Info("receive SaveBinlogPaths request", log.Info("receive SaveBinlogPaths request",
@ -569,11 +570,6 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return merr.Status(err), nil return merr.Status(err), nil
} }
if segment.State == commonpb.SegmentState_Flushed && !req.Dropped {
log.Info("save to flushed segment, ignore this request")
return merr.Success(), nil
}
if segment.State == commonpb.SegmentState_Dropped { if segment.State == commonpb.SegmentState_Dropped {
log.Info("save to dropped segment, ignore this request") log.Info("save to dropped segment, ignore this request")
return merr.Success(), nil return merr.Success(), nil
@ -603,25 +599,42 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
} }
} }
if req.GetWithFullBinlogs() {
// check checkpoint will be executed at updateSegmentPack validation to ignore the illegal checkpoint update.
operators = append(operators, UpdateBinlogsFromSaveBinlogPathsOperator(
req.GetSegmentID(),
req.GetField2BinlogPaths(),
req.GetField2StatslogPaths(),
req.GetDeltalogs(),
req.GetField2Bm25LogPaths(),
), UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints(), true))
} else {
operators = append(operators, AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), req.GetField2Bm25LogPaths()),
UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()))
}
// save binlogs, start positions and checkpoints // save binlogs, start positions and checkpoints
operators = append(operators, operators = append(operators,
AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), req.GetField2Bm25LogPaths()),
UpdateStartPosition(req.GetStartPositions()), UpdateStartPosition(req.GetStartPositions()),
UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()),
UpdateAsDroppedIfEmptyWhenFlushing(req.GetSegmentID()), UpdateAsDroppedIfEmptyWhenFlushing(req.GetSegmentID()),
) )
// Update segment info in memory and meta. // Update segment info in memory and meta.
if err := s.meta.UpdateSegmentsInfo(ctx, operators...); err != nil { if err := s.meta.UpdateSegmentsInfo(ctx, operators...); err != nil {
log.Error("save binlog and checkpoints failed", zap.Error(err)) if !errors.Is(err, ErrIgnoredSegmentMetaOperation) {
return merr.Status(err), nil log.Error("save binlog and checkpoints failed", zap.Error(err))
return merr.Status(err), nil
}
log.Info("save binlog and checkpoints failed with ignorable error", zap.Error(err))
} }
s.meta.SetLastWrittenTime(req.GetSegmentID()) s.meta.SetLastWrittenTime(req.GetSegmentID())
log.Info("SaveBinlogPaths sync segment with meta", log.Info("SaveBinlogPaths sync segment with meta",
zap.Any("binlogs", req.GetField2BinlogPaths()), zap.Any("checkpoints", req.GetCheckPoints()),
zap.Any("deltalogs", req.GetDeltalogs()), zap.Strings("binlogs", stringifyBinlogs(req.GetField2BinlogPaths())),
zap.Any("statslogs", req.GetField2StatslogPaths()), zap.Strings("deltalogs", stringifyBinlogs(req.GetDeltalogs())),
zap.Strings("statslogs", stringifyBinlogs(req.GetField2StatslogPaths())),
zap.Strings("bm25logs", stringifyBinlogs(req.GetField2Bm25LogPaths())),
) )
if req.GetSegLevel() == datapb.SegmentLevel_L0 { if req.GetSegLevel() == datapb.SegmentLevel_L0 {

View File

@ -373,6 +373,7 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
0: 0, 0: 0,
1: 0, 1: 0,
2: 0, 2: 0,
3: 0,
} }
for segID, collID := range segments { for segID, collID := range segments {
info := &datapb.SegmentInfo{ info := &datapb.SegmentInfo{
@ -475,6 +476,194 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
segment = s.testServer.meta.GetSegment(context.TODO(), 2) segment = s.testServer.meta.GetSegment(context.TODO(), 2)
s.NotNil(segment) s.NotNil(segment)
s.Equal(commonpb.SegmentState_Dropped, segment.GetState()) s.Equal(commonpb.SegmentState_Dropped, segment.GetState())
resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 3,
CollectionID: 0,
Channel: "ch1",
Field2BinlogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test/0/1/1/1/1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test/0/1/1/1/2",
EntriesNum: 5,
},
},
},
},
Field2StatslogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test_stats/0/1/1/1/1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test_stats/0/1/1/1/2",
EntriesNum: 5,
},
},
},
},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 3,
Position: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 12,
},
},
Flushed: false,
WithFullBinlogs: true,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
segment = s.testServer.meta.GetHealthySegment(context.TODO(), 3)
s.NotNil(segment)
binlogs = segment.GetBinlogs()
s.EqualValues(1, len(binlogs))
fieldBinlogs = binlogs[0]
s.NotNil(fieldBinlogs)
s.EqualValues(2, len(fieldBinlogs.GetBinlogs()))
s.EqualValues(1, fieldBinlogs.GetFieldID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[0].GetLogPath())
s.EqualValues(int64(1), fieldBinlogs.GetBinlogs()[0].GetLogID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[1].GetLogPath())
s.EqualValues(int64(2), fieldBinlogs.GetBinlogs()[1].GetLogID())
resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 3,
CollectionID: 0,
Channel: "ch1",
Field2BinlogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test/0/1/1/1/1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test/0/1/1/1/2",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test/0/1/1/1/3",
EntriesNum: 5,
},
},
},
},
Field2StatslogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test_stats/0/1/1/1/1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test_stats/0/1/1/1/2",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test_stats/0/1/1/1/3",
EntriesNum: 5,
},
},
},
},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 3,
Position: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 12,
},
},
Flushed: false,
WithFullBinlogs: true,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
segment = s.testServer.meta.GetHealthySegment(context.TODO(), 3)
s.NotNil(segment)
binlogs = segment.GetBinlogs()
s.EqualValues(1, len(binlogs))
fieldBinlogs = binlogs[0]
s.NotNil(fieldBinlogs)
s.EqualValues(3, len(fieldBinlogs.GetBinlogs()))
s.EqualValues(1, fieldBinlogs.GetFieldID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[0].GetLogPath())
s.EqualValues(int64(1), fieldBinlogs.GetBinlogs()[0].GetLogID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[1].GetLogPath())
s.EqualValues(int64(2), fieldBinlogs.GetBinlogs()[1].GetLogID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[2].GetLogPath())
s.EqualValues(int64(3), fieldBinlogs.GetBinlogs()[2].GetLogID())
resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 3,
CollectionID: 0,
Channel: "ch1",
Field2BinlogPaths: []*datapb.FieldBinlog{},
Field2StatslogPaths: []*datapb.FieldBinlog{},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 3,
Position: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 12,
},
},
Flushed: false,
WithFullBinlogs: true,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
segment = s.testServer.meta.GetHealthySegment(context.TODO(), 3)
s.NotNil(segment)
binlogs = segment.GetBinlogs()
s.EqualValues(1, len(binlogs))
fieldBinlogs = binlogs[0]
s.NotNil(fieldBinlogs)
s.EqualValues(3, len(fieldBinlogs.GetBinlogs()))
s.EqualValues(1, fieldBinlogs.GetFieldID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[0].GetLogPath())
s.EqualValues(int64(1), fieldBinlogs.GetBinlogs()[0].GetLogID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[1].GetLogPath())
s.EqualValues(int64(2), fieldBinlogs.GetBinlogs()[1].GetLogID())
s.EqualValues("", fieldBinlogs.GetBinlogs()[2].GetLogPath())
s.EqualValues(int64(3), fieldBinlogs.GetBinlogs()[2].GetLogID())
} }
func (s *ServerSuite) TestFlush_NormalCase() { func (s *ServerSuite) TestFlush_NormalCase() {

View File

@ -410,3 +410,23 @@ func calculateStatsTaskSlot(segmentSize int64) int64 {
func enableSortCompaction() bool { func enableSortCompaction() bool {
return paramtable.Get().DataCoordCfg.EnableSortCompaction.GetAsBool() && paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() return paramtable.Get().DataCoordCfg.EnableSortCompaction.GetAsBool() && paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool()
} }
// stringifyBinlogs is used for logging, it's not used for other purposes.
func stringifyBinlogs(binlogs []*datapb.FieldBinlog) []string {
strs := make([]string, 0, len(binlogs))
byIDs := lo.GroupBy(binlogs, func(binlog *datapb.FieldBinlog) int64 {
return binlog.GetFieldID()
})
for _, binlogs := range byIDs {
fieldsStrs := make([]string, 0, len(binlogs))
for _, binlog := range binlogs {
for _, b := range binlog.GetBinlogs() {
fieldsStrs = append(fieldsStrs,
fmt.Sprintf("l%d(e%d,m%d,t%d-%d)", b.LogID, b.EntriesNum, b.MemorySize, b.TimestampFrom, b.TimestampTo),
)
}
}
strs = append(strs, fmt.Sprintf("f%d:%s", binlogs[0].GetFieldID(), strings.Join(fieldsStrs, "|")))
}
return strs
}

View File

@ -137,6 +137,30 @@ func SegmentActions(actions ...SegmentAction) SegmentAction {
} }
} }
func UpdateBinlogs(binlogs []*datapb.FieldBinlog) SegmentAction {
return func(info *SegmentInfo) {
info.binlogs = binlogs
}
}
func UpdateStatslogs(statslogs []*datapb.FieldBinlog) SegmentAction {
return func(info *SegmentInfo) {
info.statslogs = statslogs
}
}
func UpdateDeltalogs(deltalogs []*datapb.FieldBinlog) SegmentAction {
return func(info *SegmentInfo) {
info.deltalogs = deltalogs
}
}
func UpdateBm25logs(bm25logs []*datapb.FieldBinlog) SegmentAction {
return func(info *SegmentInfo) {
info.bm25logs = bm25logs
}
}
func UpdateState(state commonpb.SegmentState) SegmentAction { func UpdateState(state commonpb.SegmentState) SegmentAction {
return func(info *SegmentInfo) { return func(info *SegmentInfo) {
info.state = state info.state = state

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
) )
type SegmentFilterSuite struct { type SegmentFilterSuite struct {
@ -95,11 +96,40 @@ func (s *SegmentActionSuite) TestActions() {
s.Equal(numOfRows, info.NumOfRows()) s.Equal(numOfRows, info.NumOfRows())
info = &SegmentInfo{} info = &SegmentInfo{}
actions := SegmentActions(UpdateState(state), UpdateCheckpoint(cp), UpdateNumOfRows(numOfRows)) actions := SegmentActions(UpdateState(state), UpdateCheckpoint(cp), UpdateNumOfRows(numOfRows),
UpdateBinlogs([]*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
}),
UpdateStatslogs([]*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
}),
UpdateDeltalogs([]*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
}),
UpdateBm25logs([]*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
}),
)
actions(info) actions(info)
s.Equal(state, info.State()) s.Equal(state, info.State())
s.Equal(cp, info.Checkpoint()) s.Equal(cp, info.Checkpoint())
s.Equal(numOfRows, info.NumOfRows()) s.Equal(numOfRows, info.NumOfRows())
s.Equal(1, len(info.Binlogs()))
s.Equal(1, len(info.Statslogs()))
s.Equal(1, len(info.Deltalogs()))
s.Equal(1, len(info.Bm25logs()))
} }
func (s *SegmentActionSuite) TestMergeActions() { func (s *SegmentActionSuite) TestMergeActions() {

View File

@ -39,6 +39,10 @@ type SegmentInfo struct {
level datapb.SegmentLevel level datapb.SegmentLevel
syncingTasks int32 syncingTasks int32
storageVersion int64 storageVersion int64
binlogs []*datapb.FieldBinlog
statslogs []*datapb.FieldBinlog
deltalogs []*datapb.FieldBinlog
bm25logs []*datapb.FieldBinlog
} }
func (s *SegmentInfo) SegmentID() int64 { func (s *SegmentInfo) SegmentID() int64 {
@ -100,6 +104,22 @@ func (s *SegmentInfo) GetStorageVersion() int64 {
return s.storageVersion return s.storageVersion
} }
func (s *SegmentInfo) Binlogs() []*datapb.FieldBinlog {
return s.binlogs
}
func (s *SegmentInfo) Statslogs() []*datapb.FieldBinlog {
return s.statslogs
}
func (s *SegmentInfo) Deltalogs() []*datapb.FieldBinlog {
return s.deltalogs
}
func (s *SegmentInfo) Bm25logs() []*datapb.FieldBinlog {
return s.bm25logs
}
func (s *SegmentInfo) Clone() *SegmentInfo { func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{ return &SegmentInfo{
segmentID: s.segmentID, segmentID: s.segmentID,
@ -116,6 +136,10 @@ func (s *SegmentInfo) Clone() *SegmentInfo {
syncingTasks: s.syncingTasks, syncingTasks: s.syncingTasks,
bm25stats: s.bm25stats, bm25stats: s.bm25stats,
storageVersion: s.storageVersion, storageVersion: s.storageVersion,
binlogs: s.binlogs,
statslogs: s.statslogs,
deltalogs: s.deltalogs,
bm25logs: s.bm25logs,
} }
} }
@ -136,5 +160,9 @@ func NewSegmentInfo(info *datapb.SegmentInfo, bfs pkoracle.PkStat, bm25Stats *Se
bfs: bfs, bfs: bfs,
bm25stats: bm25Stats, bm25stats: bm25Stats,
storageVersion: info.GetStorageVersion(), storageVersion: info.GetStorageVersion(),
binlogs: info.GetBinlogs(),
statslogs: info.GetStatslogs(),
deltalogs: info.GetDeltalogs(),
bm25logs: info.GetBm25Statslogs(),
} }
} }

View File

@ -56,6 +56,11 @@ func (s *SegmentSuite) TestClone() {
s.Equal(segment.Checkpoint(), cloned.Checkpoint()) s.Equal(segment.Checkpoint(), cloned.Checkpoint())
s.Equal(segment.GetHistory(), cloned.GetHistory()) s.Equal(segment.GetHistory(), cloned.GetHistory())
s.Equal(segment.startPosRecorded, cloned.startPosRecorded) s.Equal(segment.startPosRecorded, cloned.startPosRecorded)
s.Equal(segment.Binlogs(), cloned.Binlogs())
s.Equal(segment.Statslogs(), cloned.Statslogs())
s.Equal(segment.Deltalogs(), cloned.Deltalogs())
s.Equal(segment.Bm25logs(), cloned.Bm25logs())
s.Equal(segment.GetBM25Stats(), cloned.GetBM25Stats())
} }
func TestSegment(t *testing.T) { func TestSegment(t *testing.T) {

View File

@ -38,26 +38,26 @@ func BrokerMetaWriter(broker broker.Broker, serverID int64, opts ...retry.Option
} }
func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error { func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error {
var ( checkPoints := []*datapb.CheckPoint{}
checkPoints = []*datapb.CheckPoint{}
deltaFieldBinlogs = []*datapb.FieldBinlog{}
deltaBm25StatsBinlogs []*datapb.FieldBinlog = nil
)
insertFieldBinlogs := lo.MapToSlice(pack.insertBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
statsFieldBinlogs := lo.MapToSlice(pack.statsBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
if pack.deltaBinlog != nil && len(pack.deltaBinlog.Binlogs) > 0 {
deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog)
}
if len(pack.bm25Binlogs) > 0 {
deltaBm25StatsBinlogs = lo.MapToSlice(pack.bm25Binlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
}
// only current segment checkpoint info // only current segment checkpoint info
segment, ok := pack.metacache.GetSegmentByID(pack.segmentID) segment, ok := pack.metacache.GetSegmentByID(pack.segmentID)
if !ok { if !ok {
return merr.WrapErrSegmentNotFound(pack.segmentID) return merr.WrapErrSegmentNotFound(pack.segmentID)
} }
insertFieldBinlogs := append(segment.Binlogs(), lo.MapToSlice(pack.insertBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })...)
statsFieldBinlogs := append(segment.Statslogs(), lo.MapToSlice(pack.statsBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })...)
deltaFieldBinlogs := segment.Deltalogs()
if pack.deltaBinlog != nil && len(pack.deltaBinlog.Binlogs) > 0 {
deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog)
}
deltaBm25StatsBinlogs := segment.Bm25logs()
if len(pack.bm25Binlogs) > 0 {
deltaBm25StatsBinlogs = append(segment.Bm25logs(), lo.MapToSlice(pack.bm25Binlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })...)
}
checkPoints = append(checkPoints, &datapb.CheckPoint{ checkPoints = append(checkPoints, &datapb.CheckPoint{
SegmentID: pack.segmentID, SegmentID: pack.segmentID,
NumOfRows: segment.FlushedRows() + pack.batchRows, NumOfRows: segment.FlushedRows() + pack.batchRows,
@ -110,12 +110,13 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
CheckPoints: checkPoints, CheckPoints: checkPoints,
StartPositions: startPos, StartPositions: startPos,
Flushed: pack.pack.isFlush, Flushed: pack.pack.isFlush,
Dropped: pack.pack.isDrop, Dropped: pack.pack.isDrop,
Channel: pack.channelName, Channel: pack.channelName,
SegLevel: pack.level, SegLevel: pack.level,
StorageVersion: segment.GetStorageVersion(), StorageVersion: segment.GetStorageVersion(),
WithFullBinlogs: true,
} }
err := retry.Handle(ctx, func() (bool, error) { err := retry.Handle(ctx, func() (bool, error) {
err := b.broker.SaveBinlogPaths(ctx, req) err := b.broker.SaveBinlogPaths(ctx, req)
@ -149,7 +150,12 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
} }
pack.metacache.UpdateSegments(metacache.SetStartPosRecorded(true), metacache.WithSegmentIDs(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) int64 { return pos.GetSegmentID() })...)) pack.metacache.UpdateSegments(metacache.SetStartPosRecorded(true), metacache.WithSegmentIDs(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) int64 { return pos.GetSegmentID() })...))
pack.metacache.UpdateSegments(metacache.MergeSegmentAction(
metacache.UpdateBinlogs(insertFieldBinlogs),
metacache.UpdateStatslogs(statsFieldBinlogs),
metacache.UpdateDeltalogs(deltaFieldBinlogs),
metacache.UpdateBm25logs(deltaBm25StatsBinlogs),
), metacache.WithSegmentIDs(pack.segmentID))
return nil return nil
} }

View File

@ -38,15 +38,49 @@ func (s *MetaWriterSuite) SetupTest() {
func (s *MetaWriterSuite) TestNormalSave() { func (s *MetaWriterSuite) TestNormalSave() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := pkoracle.NewBloomFilterSet() bfs := pkoracle.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
},
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
},
Bm25Statslogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "test"}},
},
},
}, bfs, nil)
metacache.UpdateNumOfRows(1000)(seg) metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack)) task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack))
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error {
s.Equal(1, len(req.Field2BinlogPaths))
s.Equal(1, len(req.Field2Bm25LogPaths))
s.Equal(1, len(req.Field2StatslogPaths))
s.Equal(1, len(req.Deltalogs))
return nil
})
err := s.writer.UpdateSync(ctx, task) err := s.writer.UpdateSync(ctx, task)
s.NoError(err) s.NoError(err)
} }

View File

@ -195,6 +195,8 @@ func (bw *BulkPackWriter) writeInserts(ctx context.Context, pack *SyncPack) (map
func (bw *BulkPackWriter) writeStats(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) { func (bw *BulkPackWriter) writeStats(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) {
if len(pack.insertData) == 0 { if len(pack.insertData) == 0 {
// TODO: we should not skip here, if the flush operation don't carry any insert data,
// the merge stats operation will be skipped, which is a bad case.
return make(map[int64]*datapb.FieldBinlog), nil return make(map[int64]*datapb.FieldBinlog), nil
} }
serializer, err := NewStorageSerializer(bw.metaCache, bw.schema) serializer, err := NewStorageSerializer(bw.metaCache, bw.schema)
@ -242,6 +244,8 @@ func (bw *BulkPackWriter) writeStats(ctx context.Context, pack *SyncPack) (map[i
func (bw *BulkPackWriter) writeBM25Stasts(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) { func (bw *BulkPackWriter) writeBM25Stasts(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) {
if len(pack.bm25Stats) == 0 { if len(pack.bm25Stats) == 0 {
// TODO: we should not skip here, if the flush operation don't carry any insert data,
// the merge stats operation will be skipped, which is a bad case.
return make(map[int64]*datapb.FieldBinlog), nil return make(map[int64]*datapb.FieldBinlog), nil
} }

View File

@ -397,6 +397,7 @@ message SaveBinlogPathsRequest {
int64 partitionID =14; // report partitionID for create L0 segment int64 partitionID =14; // report partitionID for create L0 segment
int64 storageVersion = 15; int64 storageVersion = 15;
repeated FieldBinlog field2Bm25logPaths = 16; repeated FieldBinlog field2Bm25logPaths = 16;
bool with_full_binlogs = 17; // report with full data for verification.
} }
message CheckPoint { message CheckPoint {

File diff suppressed because it is too large Load Diff