Forbid update checkpoint without msgID (#25694)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2023-07-25 10:43:04 +08:00 committed by GitHub
parent 150947a5f4
commit b533c68632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 11 deletions

View File

@ -1253,7 +1253,7 @@ func (m *meta) GetCompactionTo(segmentID int64) *SegmentInfo {
// UpdateChannelCheckpoint updates and saves channel checkpoint.
func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition) error {
if pos == nil {
if pos == nil || pos.GetMsgID() == nil {
return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel)
}
@ -1268,9 +1268,10 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition)
}
m.channelCPs[vChannel] = pos
ts, _ := tsoutil.ParseTS(pos.Timestamp)
log.Debug("UpdateChannelCheckpoint done",
log.Info("UpdateChannelCheckpoint done",
zap.String("vChannel", vChannel),
zap.Uint64("ts", pos.Timestamp),
zap.Uint64("ts", pos.GetTimestamp()),
zap.ByteString("msgID", pos.GetMsgID()),
zap.Time("time", ts))
}
return nil

View File

@ -905,7 +905,7 @@ func TestChannelCP(t *testing.T) {
pos := &msgpb.MsgPosition{
ChannelName: mockPChannel,
MsgID: []byte{},
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
Timestamp: 1000,
}

View File

@ -662,7 +662,7 @@ func TestGetSegmentInfo(t *testing.T) {
pos := &msgpb.MsgPosition{
ChannelName: mockPChannel,
MsgID: []byte{},
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
Timestamp: 1000,
}
@ -1780,6 +1780,7 @@ func TestGetChannelSeekPosition(t *testing.T) {
Data: []byte{4, 5, 6},
},
}
msgID := []byte{0, 0, 0, 0, 0, 0, 0, 0}
tests := []struct {
testName string
@ -1790,16 +1791,16 @@ func TestGetChannelSeekPosition(t *testing.T) {
expectedPos *msgpb.MsgPosition
}{
{"test-with-channelCP",
&msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100},
[]*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}},
&msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100, MsgID: msgID},
[]*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50, MsgID: msgID}, {ChannelName: "ch1", Timestamp: 200, MsgID: msgID}},
startPos1,
"ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100}},
"ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100, MsgID: msgID}},
{"test-with-segmentDMLPos",
nil,
[]*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}},
[]*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50, MsgID: msgID}, {ChannelName: "ch1", Timestamp: 200, MsgID: msgID}},
startPos1,
"ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 50}},
"ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 50, MsgID: msgID}},
{"test-with-collStartPos",
nil,
@ -2559,6 +2560,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 10,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
@ -2664,6 +2666,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
@ -2839,6 +2842,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
@ -2880,6 +2884,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
require.NoError(t, err)
@ -2920,6 +2925,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
@ -3905,6 +3911,7 @@ func TestDataCoordServer_UpdateChannelCheckpoint(t *testing.T) {
Position: &msgpb.MsgPosition{
ChannelName: mockPChannel,
Timestamp: 1000,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
},
}

View File

@ -156,6 +156,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 10,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
@ -260,6 +261,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
@ -433,6 +435,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)
@ -473,6 +476,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
require.NoError(t, err)
@ -512,6 +516,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
})
assert.NoError(t, err)

View File

@ -486,7 +486,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
// Random the subname in case we trying to load same delta at the same time
subName := fmt.Sprintf("querynode-delta-loader-%d-%d-%d", paramtable.GetNodeID(), sd.collectionID, rand.Int())
log.Info("from dml check point load delete", zap.Any("position", position), zap.String("subName", subName), zap.Time("positionTs", ts))
log.Info("from dml check point load delete", zap.Any("position", position), zap.String("vChannel", vchannelName), zap.String("subName", subName), zap.Time("positionTs", ts))
stream.AsConsumer([]string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown)
err = stream.Seek([]*msgpb.MsgPosition{position})