fix: Save lite WatchInfo into etcd in DataNode (#29687)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-01-10 21:18:49 +08:00 committed by GitHub
parent a040692129
commit 9c8fd5e51d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -184,9 +184,10 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
}
v, err := proto.Marshal(watchInfo)
liteInfo := GetLiteChannelWatchInfo(watchInfo)
v, err := proto.Marshal(liteInfo)
if err != nil {
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err)
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, liteInfo.State.String(), err)
}
success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v))
@ -369,14 +370,31 @@ func (t *etcdTickler) stop() {
}
func newEtcdTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.WatchKV, interval time.Duration) *etcdTickler {
liteWatchInfo := GetLiteChannelWatchInfo(watchInfo)
return &etcdTickler{
progress: atomic.NewInt32(0),
path: path,
kv: kv,
watchInfo: watchInfo,
watchInfo: liteWatchInfo,
version: version,
interval: interval,
closeCh: make(chan struct{}),
isWatchFailed: atomic.NewBool(false),
}
}
// GetLiteChannelWatchInfo clones watchInfo without segmentIDs to reduce the size of the message
func GetLiteChannelWatchInfo(watchInfo *datapb.ChannelWatchInfo) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: watchInfo.GetVchan().GetCollectionID(),
ChannelName: watchInfo.GetVchan().GetChannelName(),
SeekPosition: watchInfo.GetVchan().GetSeekPosition(),
},
StartTs: watchInfo.GetStartTs(),
State: watchInfo.GetState(),
TimeoutTs: watchInfo.GetTimeoutTs(),
Schema: watchInfo.GetSchema(),
Progress: watchInfo.GetProgress(),
}
}