mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Cherry-pick from master pr: #33198 See also #31506 #31508 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
2f260cd33b
commit
5370c39a23
@ -1374,6 +1374,28 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition)
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkChannelCheckpointDropped set channel checkpoint to MaxUint64 preventing future update
|
||||
// and remove the metrics for channel checkpoint lag.
|
||||
func (m *meta) MarkChannelCheckpointDropped(ctx context.Context, channel string) error {
|
||||
m.channelCPs.Lock()
|
||||
defer m.channelCPs.Unlock()
|
||||
|
||||
cp := &msgpb.MsgPosition{
|
||||
ChannelName: channel,
|
||||
Timestamp: math.MaxUint64,
|
||||
}
|
||||
|
||||
err := m.catalog.SaveChannelCheckpoints(ctx, []*msgpb.MsgPosition{cp})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.channelCPs.checkpoints[channel] = cp
|
||||
|
||||
metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateChannelCheckpoints updates and saves channel checkpoints.
|
||||
func (m *meta) UpdateChannelCheckpoints(positions []*msgpb.MsgPosition) error {
|
||||
m.channelCPs.Lock()
|
||||
|
||||
@ -618,6 +618,8 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
|
||||
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
|
||||
s.compactionHandler.removeTasksByChannel(channel)
|
||||
metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel)
|
||||
s.meta.MarkChannelCheckpointDropped(ctx, channel)
|
||||
|
||||
// no compaction triggered in Drop procedure
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user