diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index d3251bcfa7..fd105d51b0 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -439,7 +439,12 @@ func (h *ServerHandler) getCollectionStartPos(channel RWChannel) *msgpb.MsgPosit // use collection start position when segment position is not found collection, err := h.GetCollection(h.s.ctx, channel.GetCollectionID()) if collection != nil && err == nil { - startPosition := getCollectionStartPosition(channel.GetName(), collection) + startPosition := toMsgPosition(channel.GetName(), collection.StartPositions) + // We should not set the timestamp to collectionInfo.CreatedAt + // because after enabling streaming arch, every shard has its own timetick, no comparison can be applied cross shards timetick. + // because when using the collection start position, we don't perform any sync operation of data, + // so we can just use 0 here without introducing any repeated data to avoid filtering some DML whose timetick is less than collectionInfo.CreatedAt. + // And after enabling new DDL framework, the collection start position will have its own timestamp, so we can use it directly. log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position", zap.Uint64("posTs", startPosition.GetTimestamp()), zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())), @@ -485,14 +490,6 @@ func (h *ServerHandler) GetChannelSeekPosition(channel RWChannel, partitionIDs . return nil } -func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *msgpb.MsgPosition { - position := toMsgPosition(channel, collectionInfo.StartPositions) - if position != nil { - position.Timestamp = collectionInfo.CreatedAt - } - return position -} - func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *msgpb.MsgPosition { for _, sp := range startPositions { if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {