mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: remove wrong start timetick to avoid filtering DML whose timetick is less than it. (#44691)
issue: #41611 - introduced by #44532 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
e55e63288e
commit
97c987f313
@ -439,7 +439,12 @@ func (h *ServerHandler) getCollectionStartPos(channel RWChannel) *msgpb.MsgPosit
|
|||||||
// use collection start position when segment position is not found
|
// use collection start position when segment position is not found
|
||||||
collection, err := h.GetCollection(h.s.ctx, channel.GetCollectionID())
|
collection, err := h.GetCollection(h.s.ctx, channel.GetCollectionID())
|
||||||
if collection != nil && err == nil {
|
if collection != nil && err == nil {
|
||||||
startPosition := getCollectionStartPosition(channel.GetName(), collection)
|
startPosition := toMsgPosition(channel.GetName(), collection.StartPositions)
|
||||||
|
// We should not set the timestamp to collectionInfo.CreatedAt
|
||||||
|
// because after enabling streaming arch, every shard has its own timetick, no comparison can be applied cross shards timetick.
|
||||||
|
// because when using the collection start position, we don't perform any sync operation of data,
|
||||||
|
// so we can just use 0 here without introducing any repeated data to avoid filtering some DML whose timetick is less than collectionInfo.CreatedAt.
|
||||||
|
// And after enabling new DDL framework, the collection start position will have its own timestamp, so we can use it directly.
|
||||||
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
|
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
|
||||||
zap.Uint64("posTs", startPosition.GetTimestamp()),
|
zap.Uint64("posTs", startPosition.GetTimestamp()),
|
||||||
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
|
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
|
||||||
@ -485,14 +490,6 @@ func (h *ServerHandler) GetChannelSeekPosition(channel RWChannel, partitionIDs .
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *msgpb.MsgPosition {
|
|
||||||
position := toMsgPosition(channel, collectionInfo.StartPositions)
|
|
||||||
if position != nil {
|
|
||||||
position.Timestamp = collectionInfo.CreatedAt
|
|
||||||
}
|
|
||||||
return position
|
|
||||||
}
|
|
||||||
|
|
||||||
func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *msgpb.MsgPosition {
|
func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *msgpb.MsgPosition {
|
||||||
for _, sp := range startPositions {
|
for _, sp := range startPositions {
|
||||||
if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
|
if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user