diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index bb9f4b10b1..19c7757397 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -804,16 +804,9 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr } // use collection start position when segment position is not found if seekPosition == nil { - coll := s.meta.GetCollection(collectionID) - if coll != nil { - for _, sp := range coll.GetStartPositions() { - if sp.GetKey() == rootcoord.ToPhysicalChannel(channel) { - seekPosition = &internalpb.MsgPosition{ - ChannelName: channel, - MsgID: sp.GetData(), - } - } - } + collection := s.GetCollection(s.ctx, collectionID) + if collection != nil { + seekPosition = getCollectionStartPosition(channel, collection) } } @@ -826,6 +819,19 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr } } +func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition { + for _, sp := range collectionInfo.GetStartPositions() { + if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) { + continue + } + return &internalpb.MsgPosition{ + ChannelName: channel, + MsgID: sp.GetData(), + } + } + return nil +} + // trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo { return &datapb.SegmentInfo{ @@ -841,3 +847,16 @@ func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo { DmlPosition: info.DmlPosition, } } + +func (s *Server) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo { + coll := s.meta.GetCollection(collectionID) + if coll != nil { + return coll + } + err := s.loadCollectionFromRootCoord(ctx, collectionID) + if err != nil { + log.Warn("failed to load collection from RootCoord", zap.Int64("collectionID", collectionID), zap.Error(err)) + } + + return s.meta.GetCollection(collectionID) +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 0a484b432d..90cab67e96 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -114,13 +114,8 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI zap.String("channelName", r.GetChannelName()), zap.Uint32("count", r.GetCount())) - if coll := s.meta.GetCollection(r.CollectionID); coll == nil { - if err := s.loadCollectionFromRootCoord(ctx, r.CollectionID); err != nil { - log.Error("load collection from rootcoord error", - zap.Int64("collectionID", r.CollectionID), - zap.Error(err)) - continue - } + if coll := s.GetCollection(ctx, r.CollectionID); coll == nil { + continue } s.cluster.Watch(r.ChannelName, r.CollectionID)