diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 01b6d215b9..7656f871df 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -184,9 +184,10 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess } - v, err := proto.Marshal(watchInfo) + liteInfo := GetLiteChannelWatchInfo(watchInfo) + v, err := proto.Marshal(liteInfo) if err != nil { - return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err) + return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, liteInfo.State.String(), err) } success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v)) @@ -369,14 +370,31 @@ func (t *etcdTickler) stop() { } func newEtcdTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.WatchKV, interval time.Duration) *etcdTickler { + liteWatchInfo := GetLiteChannelWatchInfo(watchInfo) return &etcdTickler{ progress: atomic.NewInt32(0), path: path, kv: kv, - watchInfo: watchInfo, + watchInfo: liteWatchInfo, version: version, interval: interval, closeCh: make(chan struct{}), isWatchFailed: atomic.NewBool(false), } } + +// GetLiteChannelWatchInfo clones watchInfo without segmentIDs to reduce the size of the message +func GetLiteChannelWatchInfo(watchInfo *datapb.ChannelWatchInfo) *datapb.ChannelWatchInfo { + return &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: watchInfo.GetVchan().GetCollectionID(), + ChannelName: watchInfo.GetVchan().GetChannelName(), + SeekPosition: watchInfo.GetVchan().GetSeekPosition(), + }, + StartTs: watchInfo.GetStartTs(), + State: watchInfo.GetState(), + TimeoutTs: watchInfo.GetTimeoutTs(), + Schema: watchInfo.GetSchema(), + Progress: watchInfo.GetProgress(), + } +}