diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 15705e9055..c2b33a77d0 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -53,6 +53,7 @@ type Cluster interface { getNumDmChannels(nodeID int64) (int, error) hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool + hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error @@ -295,11 +296,24 @@ func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64, log.Debug("WatchDeltaChannels: queryNode watch dm channel error", zap.String("error", err.Error())) return err } + err = c.clusterMeta.setDeltaChannel(in.CollectionID, in.Infos) + if err != nil { + log.Debug("WatchDeltaChannels: queryNode watch delta channel error", zap.String("error", err.Error())) + return err + } + return nil } return errors.New("WatchDeltaChannels: Can't find query node by nodeID ") } +func (c *queryNodeCluster) hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool { + c.Lock() + defer c.Unlock() + + return c.nodes[nodeID].hasWatchedDeltaChannel(collectionID) +} + func (c *queryNodeCluster) hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool { c.Lock() defer c.Unlock() diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index a8d0892ba7..d2dbdf0b25 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -36,6 +37,7 @@ const ( collectionMetaPrefix = "queryCoord-collectionMeta" segmentMetaPrefix = "queryCoord-segmentMeta" queryChannelMetaPrefix = "queryCoord-queryChannel" + deltaChannelMetaPrefix = "queryCoord-deltaChannel" sealedSegmentChangeInfoPrefix = "queryCoord-sealedSegmentChangeInfo" globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition" ) @@ -72,6 +74,9 @@ type Meta interface { addDmChannel(collectionID UniqueID, nodeID int64, channels []string) error removeDmChannel(collectionID UniqueID, nodeID int64, channels []string) error + getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error) + setDeltaChannel(collectionID UniqueID, info []*datapb.VchannelInfo) error + getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error) @@ -99,6 +104,8 @@ type MetaReplica struct { segmentMu sync.RWMutex queryChannelInfos map[UniqueID]*querypb.QueryChannelInfo channelMu sync.RWMutex + deltaChannelInfos map[UniqueID][]*datapb.VchannelInfo + deltaChannelMu sync.RWMutex queryStreams map[UniqueID]msgstream.MsgStream streamMu sync.RWMutex @@ -111,6 +118,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll collectionInfos := make(map[UniqueID]*querypb.CollectionInfo) segmentInfos := make(map[UniqueID]*querypb.SegmentInfo) queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo) + deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo) queryMsgStream := make(map[UniqueID]msgstream.MsgStream) position := &internalpb.MsgPosition{} @@ -124,6 +132,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll collectionInfos: collectionInfos, segmentInfos: segmentInfos, queryChannelInfos: queryChannelInfos, + deltaChannelInfos: deltaChannelInfos, queryStreams: queryMsgStream, globalSeekPosition: position, } @@ -187,6 +196,25 @@ func (m *MetaReplica) reloadFromKV() error { } m.queryChannelInfos[collectionID] = queryChannelInfo } + + deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix) + if err != nil { + return nil + } + for index, value := range deltaChannelValues { + collectionIDString, _ := filepath.Split(deltaChannelKeys[index]) + collectionID, err := strconv.ParseInt(collectionIDString, 10, 64) + if err != nil { + return err + } + deltaChannelInfo := &datapb.VchannelInfo{} + err = proto.Unmarshal([]byte(value), deltaChannelInfo) + if err != nil { + return err + } + m.deltaChannelInfos[collectionID] = append(m.deltaChannelInfos[collectionID], deltaChannelInfo) + } + globalSeekPosValue, err := m.client.Load(globalQuerySeekPositionPrefix) if err == nil { position := &internalpb.MsgPosition{} @@ -950,6 +978,33 @@ func createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo { return info } +// Get delta channel info for collection, so far all the collection share the same query channel 0 +func (m *MetaReplica) getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error) { + m.deltaChannelMu.RLock() + defer m.deltaChannelMu.RUnlock() + if infos, ok := m.deltaChannelInfos[collectionID]; ok { + return infos, nil + } + + return nil, fmt.Errorf("delta channel not exist in meta") +} + +func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.VchannelInfo) error { + m.deltaChannelMu.Lock() + defer m.deltaChannelMu.Unlock() + _, ok := m.deltaChannelInfos[collectionID] + if ok { + return nil + } + + err := saveDeltaChannelInfo(collectionID, infos, m.client) + if err != nil { + return err + } + m.deltaChannelInfos[collectionID] = infos + return nil +} + // Get Query channel info for collection, so far all the collection share the same query channel 0 func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) { m.channelMu.Lock() @@ -1149,3 +1204,17 @@ func saveQueryChannelInfo(collectionID UniqueID, info *querypb.QueryChannelInfo, key := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID) return kv.Save(key, string(infoBytes)) } + +func saveDeltaChannelInfo(collectionID UniqueID, infos []*datapb.VchannelInfo, kv kv.MetaKv) error { + kvs := make(map[string]string) + for _, info := range infos { + infoBytes, err := proto.Marshal(info) + if err != nil { + return err + } + + key := fmt.Sprintf("%s/%d/%s", deltaChannelMetaPrefix, collectionID, info.ChannelName) + kvs[key] = string(infoBytes) + } + return kv.MultiSave(kvs) +} diff --git a/internal/querycoord/mock_querynode_client_test.go b/internal/querycoord/mock_querynode_client_test.go index 516a063078..391c2e2e93 100644 --- a/internal/querycoord/mock_querynode_client_test.go +++ b/internal/querycoord/mock_querynode_client_test.go @@ -20,6 +20,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -38,6 +39,7 @@ type queryNodeClientMock struct { func newQueryNodeTest(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error) { collectionInfo := make(map[UniqueID]*querypb.CollectionInfo) watchedChannels := make(map[UniqueID]*querypb.QueryChannelInfo) + watchedDeltaChannels := make(map[UniqueID][]*datapb.VchannelInfo) childCtx, cancel := context.WithCancel(ctx) client, err := newQueryNodeClientMock(childCtx, address) if err != nil { @@ -53,6 +55,7 @@ func newQueryNodeTest(ctx context.Context, address string, id UniqueID, kv *etcd kvClient: kv, collectionInfos: collectionInfo, watchedQueryChannels: watchedChannels, + watchedDeltaChannels: watchedDeltaChannels, } return node, nil diff --git a/internal/querycoord/querynode.go b/internal/querycoord/querynode.go index 984be9d49c..0383a0488d 100644 --- a/internal/querycoord/querynode.go +++ b/internal/querycoord/querynode.go @@ -24,6 +24,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -51,6 +52,7 @@ type Node interface { watchDeltaChannels(ctx context.Context, in *querypb.WatchDeltaChannelsRequest) error //removeDmChannel(collectionID UniqueID, channels []string) error + hasWatchedDeltaChannel(collectionID UniqueID) bool hasWatchedQueryChannel(collectionID UniqueID) bool //showWatchedQueryChannels() []*querypb.QueryChannelInfo addQueryChannel(ctx context.Context, in *querypb.AddQueryChannelRequest) error @@ -80,6 +82,7 @@ type queryNode struct { sync.RWMutex collectionInfos map[UniqueID]*querypb.CollectionInfo watchedQueryChannels map[UniqueID]*querypb.QueryChannelInfo + watchedDeltaChannels map[UniqueID][]*datapb.VchannelInfo state nodeState stateLock sync.RWMutex @@ -92,6 +95,7 @@ type queryNode struct { func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error) { collectionInfo := make(map[UniqueID]*querypb.CollectionInfo) watchedChannels := make(map[UniqueID]*querypb.QueryChannelInfo) + watchedDeltaChannels := make(map[UniqueID][]*datapb.VchannelInfo) childCtx, cancel := context.WithCancel(ctx) client, err := nodeclient.NewClient(childCtx, address) if err != nil { @@ -107,6 +111,7 @@ func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.E kvClient: kv, collectionInfos: collectionInfo, watchedQueryChannels: watchedChannels, + watchedDeltaChannels: watchedDeltaChannels, state: disConnect, } @@ -329,6 +334,14 @@ func (qn *queryNode) hasWatchedQueryChannel(collectionID UniqueID) bool { return false } +func (qn *queryNode) hasWatchedDeltaChannel(collectionID UniqueID) bool { + qn.RLock() + defer qn.RUnlock() + + _, ok := qn.watchedDeltaChannels[collectionID] + return ok +} + //func (qn *queryNode) showWatchedQueryChannels() []*querypb.QueryChannelInfo { // qn.RLock() // defer qn.RUnlock() @@ -341,6 +354,13 @@ func (qn *queryNode) hasWatchedQueryChannel(collectionID UniqueID) bool { // return results //} +func (qn *queryNode) setDeltaChannelInfo(collectionID int64, infos []*datapb.VchannelInfo) { + qn.Lock() + defer qn.Unlock() + + qn.watchedDeltaChannels[collectionID] = infos +} + func (qn *queryNode) setQueryChannelInfo(info *querypb.QueryChannelInfo) { qn.Lock() defer qn.Unlock() @@ -433,6 +453,7 @@ func (qn *queryNode) watchDeltaChannels(ctx context.Context, in *querypb.WatchDe if status.ErrorCode != commonpb.ErrorCode_Success { return errors.New(status.Reason) } + qn.setDeltaChannelInfo(in.CollectionID, in.Infos) return err } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 56d760f85d..22965effdf 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -351,7 +351,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) channelsToWatch := make([]string, 0) segmentsToLoad := make([]UniqueID, 0) - watchDeltaChannelReqs := make([]*querypb.WatchDeltaChannelsRequest, 0) + var watchDeltaChannels []*datapb.VchannelInfo for _, partitionID := range toLoadPartitionIDs { getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{ Base: lct.Base, @@ -390,25 +390,16 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - // init delta channels for sealed segments. - if len(loadSegmentReqs) != 0 && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) { + if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) if err != nil { return err } - deltaInfo := proto.Clone(info).(*datapb.VchannelInfo) - deltaInfo.ChannelName = deltaChannel - msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) - msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels - watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{ - Base: msgBase, - CollectionID: collectionID, - Infos: []*datapb.VchannelInfo{deltaInfo}, - } - watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaRequest) + deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) + deltaChannel.ChannelName = deltaChannelName + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } - } for _, info := range recoveryInfo.Channels { @@ -452,8 +443,17 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { } } + msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase) + msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels + watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ + Base: msgBase, + CollectionID: collectionID, + Infos: watchDeltaChannels, + } + // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule + lct.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) - internalTasks, err := assignInternalTask(ctx, collectionID, lct, lct.meta, lct.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReqs, false, nil, nil) + internalTasks, err := assignInternalTask(ctx, collectionID, lct, lct.meta, lct.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, false, nil, nil) if err != nil { log.Warn("loadCollectionTask: assign child task failed", zap.Int64("collectionID", collectionID)) lct.setResultInfo(err) @@ -707,7 +707,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) channelsToWatch := make([]string, 0) watchDmReqs := make([]*querypb.WatchDmChannelsRequest, 0) - watchDeltaReqs := make([]*querypb.WatchDeltaChannelsRequest, 0) + var watchDeltaChannels []*datapb.VchannelInfo for _, partitionID := range partitionIDs { getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{ Base: lpt.Base, @@ -745,23 +745,15 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - // init delta channels for sealed segments. - if len(loadSegmentReqs) != 0 && len(watchDeltaReqs) != len(recoveryInfo.Channels) { + if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) if err != nil { return err } - deltaInfo := proto.Clone(info).(*datapb.VchannelInfo) - deltaInfo.ChannelName = deltaChannel - msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase) - msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels - watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{ - Base: msgBase, - CollectionID: collectionID, - Infos: []*datapb.VchannelInfo{deltaInfo}, - } - watchDeltaReqs = append(watchDeltaReqs, watchDeltaRequest) + deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) + deltaChannel.ChannelName = deltaChannelName + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -783,7 +775,16 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { } } - internalTasks, err := assignInternalTask(ctx, collectionID, lpt, lpt.meta, lpt.cluster, loadSegmentReqs, watchDmReqs, watchDeltaReqs, false, nil, nil) + msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase) + msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels + watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ + Base: msgBase, + CollectionID: collectionID, + Infos: watchDeltaChannels, + } + // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule + lpt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) + internalTasks, err := assignInternalTask(ctx, collectionID, lpt, lpt.meta, lpt.cluster, loadSegmentReqs, watchDmReqs, watchDeltaChannelReq, false, nil, nil) if err != nil { log.Warn("loadPartitionTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) lpt.setResultInfo(err) @@ -1081,6 +1082,19 @@ func (lst *loadSegmentTask) reschedule(ctx context.Context) ([]task, error) { lst.excludeNodeIDs = []int64{} } lst.excludeNodeIDs = append(lst.excludeNodeIDs, lst.DstNodeID) + + deltaChannelInfos, err := lst.meta.getDeltaChannelsByCollectionID(collectionID) + if err != nil { + return nil, err + } + msgBase := proto.Clone(lst.Base).(*commonpb.MsgBase) + msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels + watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{ + Base: msgBase, + CollectionID: collectionID, + Infos: deltaChannelInfos, + } + log.Debug("assignInternalTask: add a watchDeltaChannelTask childTask", zap.Any("task", watchDeltaRequest)) //TODO:: wait or not according msgType reScheduledTasks, err := assignInternalTask(ctx, collectionID, lst.getParentTask(), lst.meta, lst.cluster, loadSegmentReqs, nil, nil, false, lst.excludeNodeIDs, nil) if err != nil { @@ -1509,7 +1523,7 @@ func (ht *handoffTask) execute(ctx context.Context) error { findBinlog := false var loadSegmentReq *querypb.LoadSegmentsRequest - var watchDeltaChannelReqs []*querypb.WatchDeltaChannelsRequest + var watchDeltaChannels []*datapb.VchannelInfo for _, segmentBinlogs := range recoveryInfo.Binlogs { if segmentBinlogs.SegmentID == segmentID { findBinlog = true @@ -1532,23 +1546,15 @@ func (ht *handoffTask) execute(ctx context.Context) error { } } } - // init delta channels for sealed segments. - if loadSegmentReq != nil && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) { + if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) if err != nil { return err } - deltaInfo := proto.Clone(info).(*datapb.VchannelInfo) - deltaInfo.ChannelName = deltaChannel - msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase) - msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels - watchDeltaChannelsRequest := &querypb.WatchDeltaChannelsRequest{ - Base: msgBase, - CollectionID: collectionID, - Infos: []*datapb.VchannelInfo{deltaInfo}, - } - watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaChannelsRequest) + deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) + deltaChannel.ChannelName = deltaChannelName + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -1557,7 +1563,16 @@ func (ht *handoffTask) execute(ctx context.Context) error { ht.setResultInfo(err) return err } - internalTasks, err := assignInternalTask(ctx, collectionID, ht, ht.meta, ht.cluster, []*querypb.LoadSegmentsRequest{loadSegmentReq}, nil, watchDeltaChannelReqs, true, nil, nil) + msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase) + msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels + watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ + Base: msgBase, + CollectionID: collectionID, + Infos: watchDeltaChannels, + } + // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule + ht.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) + internalTasks, err := assignInternalTask(ctx, collectionID, ht, ht.meta, ht.cluster, []*querypb.LoadSegmentsRequest{loadSegmentReq}, nil, watchDeltaChannelReq, true, nil, nil) if err != nil { log.Error("handoffTask: assign child task failed", zap.Any("segmentInfo", segmentInfo)) ht.setResultInfo(err) @@ -1664,7 +1679,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) channelsToWatch := make([]string, 0) watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) - watchDeltaChannelReqs := make([]*querypb.WatchDeltaChannelsRequest, 0) + var watchDeltaChannels []*datapb.VchannelInfo dmChannels, err := lbt.meta.getDmChannelsByNodeID(collectionID, nodeID) if err != nil { @@ -1712,20 +1727,15 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - // init delta channels for sealed segments. - if len(loadSegmentReqs) != 0 && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) { + if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannel := info.ChannelName - deltaInfo := proto.Clone(info).(*datapb.VchannelInfo) - deltaInfo.ChannelName = deltaChannel - msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) - msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels - watchDeltaChannelsRequest := &querypb.WatchDeltaChannelsRequest{ - Base: msgBase, - CollectionID: collectionID, - Infos: []*datapb.VchannelInfo{info}, + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + if err != nil { + return err } - watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaChannelsRequest) + deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) + deltaChannel.ChannelName = deltaChannelName + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } @@ -1773,8 +1783,17 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { } } } + msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) + msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels + watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ + Base: msgBase, + CollectionID: collectionID, + Infos: watchDeltaChannels, + } + // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule + lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) - internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReqs, true, lbt.SourceNodeIDs, lbt.DstNodeIDs) + internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, true, lbt.SourceNodeIDs, lbt.DstNodeIDs) if err != nil { log.Warn("loadBalanceTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) lbt.setResultInfo(err) @@ -1849,7 +1868,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { for collectionID, partitionIDs := range col2PartitionIDs { segmentsToLoad := make([]UniqueID, 0) loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) - watchDeltaChannelReqs := make([]*querypb.WatchDeltaChannelsRequest, 0) + var watchDeltaChannels []*datapb.VchannelInfo collectionInfo, err := lbt.meta.getCollectionInfoByID(collectionID) if err != nil { log.Error("loadBalanceTask: can't find collectionID in meta", zap.Int64("collectionID", collectionID), zap.Error(err)) @@ -1903,29 +1922,30 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } - // init delta channels for sealed segments. - if len(loadSegmentReqs) != 0 && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) { + if len(watchDeltaChannels) != len(recoveryInfo.Channels) { for _, info := range recoveryInfo.Channels { - deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) + deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix) if err != nil { return err } - deltaInfo := proto.Clone(info).(*datapb.VchannelInfo) - deltaInfo.ChannelName = deltaChannel - msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) - msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels - watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{ - Base: msgBase, - CollectionID: collectionID, - Infos: []*datapb.VchannelInfo{deltaInfo}, - } - watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaRequest) + deltaChannel := proto.Clone(info).(*datapb.VchannelInfo) + deltaChannel.ChannelName = deltaChannelName + watchDeltaChannels = append(watchDeltaChannels, deltaChannel) } } } + msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) + msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels + watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{ + Base: msgBase, + CollectionID: collectionID, + Infos: watchDeltaChannels, + } + // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule + lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos) // TODO:: assignInternalTask with multi collection - internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, nil, watchDeltaChannelReqs, false, lbt.SourceNodeIDs, lbt.DstNodeIDs) + internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, nil, watchDeltaChannelReq, false, lbt.SourceNodeIDs, lbt.DstNodeIDs) if err != nil { log.Warn("loadBalanceTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) lbt.setResultInfo(err) @@ -2005,7 +2025,7 @@ func assignInternalTask(ctx context.Context, collectionID UniqueID, parentTask task, meta Meta, cluster Cluster, loadSegmentRequests []*querypb.LoadSegmentsRequest, watchDmChannelRequests []*querypb.WatchDmChannelsRequest, - watchDeltaChannelRequests []*querypb.WatchDeltaChannelsRequest, + watchDeltaChannelRequest *querypb.WatchDeltaChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) ([]task, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() @@ -2067,9 +2087,9 @@ func assignInternalTask(ctx context.Context, internalTasks = append(internalTasks, loadSegmentTask) } - for _, req := range watchDeltaChannelRequests { + if watchDeltaChannelRequest != nil { ctx = opentracing.ContextWithSpan(context.Background(), sp) - watchDeltaRequest := proto.Clone(req).(*querypb.WatchDeltaChannelsRequest) + watchDeltaRequest := proto.Clone(watchDeltaChannelRequest).(*querypb.WatchDeltaChannelsRequest) watchDeltaRequest.NodeID = nodeID baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index e31b2cfa4e..12066c8813 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -181,9 +181,10 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6 Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadSegments, }, - DstNodeID: nodeID, - Schema: schema, - Infos: []*querypb.SegmentLoadInfo{segmentInfo}, + DstNodeID: nodeID, + Schema: schema, + Infos: []*querypb.SegmentLoadInfo{segmentInfo}, + CollectionID: defaultCollectionID, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.taskID = 100 @@ -594,6 +595,7 @@ func Test_RescheduleSegmentWithWatchQueryChannel(t *testing.T) { node1.loadSegment = returnFailedResult loadSegmentTask := genLoadSegmentTask(ctx, queryCoord, node1.queryNodeID) + loadSegmentTask.meta.setDeltaChannel(defaultCollectionID, []*datapb.VchannelInfo{}) loadCollectionTask := loadSegmentTask.parentTask queryCoord.scheduler.triggerTaskQueue.addTask(loadCollectionTask) diff --git a/internal/querynode/task.go b/internal/querynode/task.go index fceadf3bf8..5cad74d329 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -412,6 +412,16 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { if err != nil { return err } + + // Check if the same deltaChannel has been watched + for _, dstChan := range vDeltaChannels { + for _, srcChan := range hCol.vDeltaChannels { + if dstChan == srcChan { + return nil + } + } + } + hCol.addVDeltaChannels(vDeltaChannels) hCol.addPDeltaChannels(pDeltaChannels)