mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>
This commit is contained in:
parent
3f34bc1ab7
commit
2be217bcf4
@ -170,7 +170,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
||||
pos: *us.GetDmlPosition(),
|
||||
}
|
||||
}
|
||||
if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), us.GetNumOfRows(), us.Statslogs, cp); err != nil {
|
||||
if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(),
|
||||
us.GetNumOfRows(), us.Statslogs, cp, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -192,8 +193,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
||||
zap.Int64("SegmentID", fs.GetID()),
|
||||
zap.Int64("NumOfRows", fs.GetNumOfRows()),
|
||||
)
|
||||
if err := dsService.replica.addFlushedSegment(fs.GetID(), fs.CollectionID,
|
||||
fs.PartitionID, fs.GetInsertChannel(), fs.GetNumOfRows(), fs.Statslogs); err != nil {
|
||||
if err := dsService.replica.addFlushedSegment(fs.GetID(), fs.CollectionID, fs.PartitionID, fs.GetInsertChannel(),
|
||||
fs.GetNumOfRows(), fs.Statslogs, vchanInfo.GetSeekPosition().GetTimestamp()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,9 +52,9 @@ type Replica interface {
|
||||
|
||||
listAllSegmentIDs() []UniqueID
|
||||
addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error
|
||||
addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint) error
|
||||
addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error
|
||||
filterSegments(channelName string, partitionID UniqueID) []*Segment
|
||||
addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog) error
|
||||
addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlog []*datapb.FieldBinlog, recoverTs Timestamp) error
|
||||
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
|
||||
listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint
|
||||
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
|
||||
@ -322,7 +322,7 @@ func (replica *SegmentReplica) filterSegments(channelName string, partitionID Un
|
||||
|
||||
// addNormalSegment adds a *NotNew* and *NotFlushed* segment. Before add, please make sure there's no
|
||||
// such segment by `hasSegment`
|
||||
func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, cp *segmentCheckPoint) error {
|
||||
func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, cp *segmentCheckPoint, recoverTs Timestamp) error {
|
||||
if collID != replica.collectionID {
|
||||
log.Warn("Mismatch collection",
|
||||
zap.Int64("input ID", collID),
|
||||
@ -352,7 +352,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
|
||||
seg.checkPoint = *cp
|
||||
seg.endPos = &cp.pos
|
||||
}
|
||||
err := replica.initPKBloomFilter(seg, statsBinlogs)
|
||||
err := replica.initPKBloomFilter(seg, statsBinlogs, recoverTs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -370,7 +370,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
|
||||
|
||||
// addFlushedSegment adds a *Flushed* segment. Before add, please make sure there's no
|
||||
// such segment by `hasSegment`
|
||||
func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog) error {
|
||||
func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, statsBinlogs []*datapb.FieldBinlog, recoverTs Timestamp) error {
|
||||
|
||||
if collID != replica.collectionID {
|
||||
log.Warn("Mismatch collection",
|
||||
@ -399,7 +399,7 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
|
||||
maxPK: math.MinInt64, // use min value represents no value
|
||||
}
|
||||
|
||||
err := replica.initPKBloomFilter(seg, statsBinlogs)
|
||||
err := replica.initPKBloomFilter(seg, statsBinlogs, recoverTs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -414,9 +414,9 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
|
||||
return nil
|
||||
}
|
||||
|
||||
func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*datapb.FieldBinlog) error {
|
||||
func (replica *SegmentReplica) initPKBloomFilter(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
|
||||
log.Info("begin to init pk bloom filter", zap.Int("stats bin logs", len(statsBinlogs)))
|
||||
schema, err := replica.getCollectionSchema(s.collectionID, 0)
|
||||
schema, err := replica.getCollectionSchema(s.collectionID, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -340,7 +340,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
|
||||
sr.minIOKV = &mockMinioKV{}
|
||||
assert.Nil(t, err)
|
||||
require.False(t, sr.hasSegment(test.inSegID, true))
|
||||
err = sr.addNormalSegment(test.inSegID, test.inCollID, 1, "", 0, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{})
|
||||
err = sr.addNormalSegment(test.inSegID, test.inCollID, 1, "", 0, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{}, 0)
|
||||
if test.isValidCase {
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, sr.hasSegment(test.inSegID, true))
|
||||
@ -361,7 +361,7 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
|
||||
segID := int64(101)
|
||||
require.False(t, sr.hasSegment(segID, true))
|
||||
assert.NotPanics(t, func() {
|
||||
err = sr.addNormalSegment(segID, 1, 10, "empty_dml_chan", 0, []*datapb.FieldBinlog{}, nil)
|
||||
err = sr.addNormalSegment(segID, 1, 10, "empty_dml_chan", 0, []*datapb.FieldBinlog{}, nil, 0)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
})
|
||||
@ -587,9 +587,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
|
||||
|
||||
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
|
||||
cp := &segmentCheckPoint{int64(10), *cpPos}
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp)
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0)
|
||||
assert.NotNil(t, err)
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()})
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
@ -600,9 +600,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
|
||||
|
||||
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
|
||||
cp := &segmentCheckPoint{int64(10), *cpPos}
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp)
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0)
|
||||
assert.NotNil(t, err)
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()})
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
@ -613,9 +613,9 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
|
||||
|
||||
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
|
||||
cp := &segmentCheckPoint{int64(10), *cpPos}
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp)
|
||||
err = sr.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0)
|
||||
assert.NotNil(t, err)
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()})
|
||||
err = sr.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
@ -678,7 +678,7 @@ func TestInnerFunctionSegment(t *testing.T) {
|
||||
|
||||
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
|
||||
cp := &segmentCheckPoint{int64(10), *cpPos}
|
||||
err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp)
|
||||
err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, replica.hasSegment(1, true))
|
||||
assert.Equal(t, 1, len(replica.normalSegments))
|
||||
@ -695,7 +695,7 @@ func TestInnerFunctionSegment(t *testing.T) {
|
||||
assert.False(t, seg.isNew.Load().(bool))
|
||||
assert.False(t, seg.isFlushed.Load().(bool))
|
||||
|
||||
err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{})
|
||||
err = replica.addNormalSegment(1, 100000, 2, "invalid", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, &segmentCheckPoint{}, 0)
|
||||
assert.Error(t, err)
|
||||
|
||||
replica.updateStatistics(1, 10)
|
||||
@ -730,7 +730,7 @@ func TestInnerFunctionSegment(t *testing.T) {
|
||||
replica.updateSegmentCheckPoint(1)
|
||||
assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows)
|
||||
|
||||
err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()})
|
||||
err = replica.addFlushedSegment(1, 1, 2, "insert-01", int64(0), []*datapb.FieldBinlog{getSimpleFieldBinlog()}, 0)
|
||||
assert.Nil(t, err)
|
||||
|
||||
totalSegments := replica.filterSegments("insert-01", common.InvalidPartitionID)
|
||||
@ -776,7 +776,7 @@ func TestReplica_UpdatePKRange(t *testing.T) {
|
||||
|
||||
err = replica.addNewSegment(1, collID, partID, chanName, startPos, endPos)
|
||||
assert.Nil(t, err)
|
||||
err = replica.addNormalSegment(2, collID, partID, chanName, 100, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp)
|
||||
err = replica.addNormalSegment(2, collID, partID, chanName, 100, []*datapb.FieldBinlog{getSimpleFieldBinlog()}, cp, 0)
|
||||
assert.Nil(t, err)
|
||||
|
||||
segNew := replica.newSegments[1]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user