mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Fix segment DmChannel in querycoord for compatibility to 2.0.2 (#18373)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
4844219e5a
commit
6521f6da44
@ -102,6 +102,8 @@ type Meta interface {
|
||||
updateShardLeader(replicaID UniqueID, dmChannel string, leaderID UniqueID, leaderAddr string) error
|
||||
}
|
||||
|
||||
var _ Meta = (*MetaReplica)(nil)
|
||||
|
||||
// MetaReplica records the current load information on all querynodes
|
||||
type MetaReplica struct {
|
||||
ctx context.Context
|
||||
@ -156,6 +158,46 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *MetaReplica) fixSegmentInfoDMChannel() error {
|
||||
var segmentIDs []UniqueID
|
||||
for id, info := range m.segmentsInfo.segmentIDMap {
|
||||
if info.GetDmChannel() == "" {
|
||||
segmentIDs = append(segmentIDs, id)
|
||||
}
|
||||
}
|
||||
|
||||
if len(segmentIDs) == 0 {
|
||||
log.Info("QueryCoord MetaReplica no need to fix SegmentInfo DmChannel")
|
||||
return nil
|
||||
}
|
||||
|
||||
//var segmentInfos []*datapb.SegmentInfo
|
||||
infoResp, err := m.dataCoord.GetSegmentInfo(m.ctx, &datapb.GetSegmentInfoRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_SegmentInfo,
|
||||
},
|
||||
SegmentIDs: segmentIDs,
|
||||
IncludeUnHealthy: true,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
|
||||
err = errors.New(infoResp.GetStatus().Reason)
|
||||
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, newInfo := range infoResp.Infos {
|
||||
curInfo, ok := m.segmentsInfo.segmentIDMap[newInfo.GetID()]
|
||||
if ok {
|
||||
curInfo.DmChannel = newInfo.GetInsertChannel()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MetaReplica) reloadFromKV() error {
|
||||
log.Info("recovery collections...")
|
||||
collectionKeys, collectionValues, err := m.getKvClient().LoadWithPrefix(collectionMetaPrefix)
|
||||
@ -290,7 +332,6 @@ func (m *MetaReplica) reloadFromKV() error {
|
||||
|
||||
//TODO::update partition states
|
||||
log.Info("reload from kv finished")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -163,6 +163,16 @@ func (qc *QueryCoord) Init() error {
|
||||
log.Error("query coordinator init meta failed", zap.Error(initError))
|
||||
return
|
||||
}
|
||||
meta, ok := qc.meta.(*MetaReplica)
|
||||
if !ok {
|
||||
panic("QueryCoord qc.meta assertion of MetaReplica error")
|
||||
}
|
||||
|
||||
meta.dataCoord = qc.dataCoordClient
|
||||
fixErr := meta.fixSegmentInfoDMChannel()
|
||||
if fixErr != nil {
|
||||
log.Error("QueryCoord newMeta fixSegmentInfoDMChannel failed", zap.Error(fixErr))
|
||||
}
|
||||
|
||||
// init channelUnsubscribeHandler
|
||||
qc.handler, initError = newChannelUnsubscribeHandler(qc.loopCtx, qc.kvClient, qc.factory)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user