diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 6906af5dba..693a904971 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1253,7 +1253,7 @@ func (m *meta) GetCompactionTo(segmentID int64) *SegmentInfo { // UpdateChannelCheckpoint updates and saves channel checkpoint. func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition) error { - if pos == nil { + if pos == nil || pos.GetMsgID() == nil { return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel) } @@ -1268,9 +1268,10 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition) } m.channelCPs[vChannel] = pos ts, _ := tsoutil.ParseTS(pos.Timestamp) - log.Debug("UpdateChannelCheckpoint done", + log.Info("UpdateChannelCheckpoint done", zap.String("vChannel", vChannel), - zap.Uint64("ts", pos.Timestamp), + zap.Uint64("ts", pos.GetTimestamp()), + zap.ByteString("msgID", pos.GetMsgID()), zap.Time("time", ts)) } return nil diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 3f564ca342..46907ad6ff 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -905,7 +905,7 @@ func TestChannelCP(t *testing.T) { pos := &msgpb.MsgPosition{ ChannelName: mockPChannel, - MsgID: []byte{}, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, Timestamp: 1000, } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index b99ff71e77..cef7f2167a 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -662,7 +662,7 @@ func TestGetSegmentInfo(t *testing.T) { pos := &msgpb.MsgPosition{ ChannelName: mockPChannel, - MsgID: []byte{}, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, Timestamp: 1000, } @@ -1780,6 +1780,7 @@ func TestGetChannelSeekPosition(t *testing.T) { Data: []byte{4, 5, 6}, }, } + msgID := []byte{0, 0, 0, 0, 0, 0, 0, 0} tests := []struct { testName string @@ -1790,16 +1791,16 @@ func TestGetChannelSeekPosition(t *testing.T) { expectedPos *msgpb.MsgPosition }{ {"test-with-channelCP", - &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100}, - []*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}}, + &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100, MsgID: msgID}, + []*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50, MsgID: msgID}, {ChannelName: "ch1", Timestamp: 200, MsgID: msgID}}, startPos1, - "ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100}}, + "ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 100, MsgID: msgID}}, {"test-with-segmentDMLPos", nil, - []*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}}, + []*msgpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50, MsgID: msgID}, {ChannelName: "ch1", Timestamp: 200, MsgID: msgID}}, startPos1, - "ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 50}}, + "ch1", &msgpb.MsgPosition{ChannelName: "ch1", Timestamp: 50, MsgID: msgID}}, {"test-with-collStartPos", nil, @@ -2559,6 +2560,7 @@ func TestGetRecoveryInfo(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 10, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) @@ -2664,6 +2666,7 @@ func TestGetRecoveryInfo(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) @@ -2839,6 +2842,7 @@ func TestGetRecoveryInfo(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) @@ -2880,6 +2884,7 @@ func TestGetRecoveryInfo(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) require.NoError(t, err) @@ -2920,6 +2925,7 @@ func TestGetRecoveryInfo(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) @@ -3905,6 +3911,7 @@ func TestDataCoordServer_UpdateChannelCheckpoint(t *testing.T) { Position: &msgpb.MsgPosition{ ChannelName: mockPChannel, Timestamp: 1000, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }, } diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 8c96959a89..05b7031806 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -156,6 +156,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 10, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) @@ -260,6 +261,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) @@ -433,6 +435,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) @@ -473,6 +476,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) require.NoError(t, err) @@ -512,6 +516,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ ChannelName: "vchan1", Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, }) assert.NoError(t, err) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index fd00fe7b46..a17dfd2692 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -486,7 +486,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position // Random the subname in case we trying to load same delta at the same time subName := fmt.Sprintf("querynode-delta-loader-%d-%d-%d", paramtable.GetNodeID(), sd.collectionID, rand.Int()) - log.Info("from dml check point load delete", zap.Any("position", position), zap.String("subName", subName), zap.Time("positionTs", ts)) + log.Info("from dml check point load delete", zap.Any("position", position), zap.String("vChannel", vchannelName), zap.String("subName", subName), zap.Time("positionTs", ts)) stream.AsConsumer([]string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown) err = stream.Seek([]*msgpb.MsgPosition{position})