diff --git a/internal/allocator/id_allocator.go b/internal/allocator/id_allocator.go index 6ea7f8fca3..5cc5211b46 100644 --- a/internal/allocator/id_allocator.go +++ b/internal/allocator/id_allocator.go @@ -93,7 +93,6 @@ func (ia *IDAllocator) syncID() (bool, error) { req := &rootcoordpb.AllocIDRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestID), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(ia.PeerID), ), Count: need, diff --git a/internal/datacoord/allocator.go b/internal/datacoord/allocator.go index d4c687bd72..c9fc434dbc 100644 --- a/internal/datacoord/allocator.go +++ b/internal/datacoord/allocator.go @@ -53,7 +53,6 @@ func (alloc *rootCoordAllocator) allocTimestamp(ctx context.Context) (Timestamp, resp, err := alloc.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), Count: 1, @@ -69,7 +68,6 @@ func (alloc *rootCoordAllocator) allocID(ctx context.Context) (UniqueID, error) resp, err := alloc.AllocID(ctx, &rootcoordpb.AllocIDRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestID), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), Count: 1, diff --git a/internal/datacoord/coordinator_broker.go b/internal/datacoord/coordinator_broker.go index f686f3c176..65805621d6 100644 --- a/internal/datacoord/coordinator_broker.go +++ b/internal/datacoord/coordinator_broker.go @@ -76,7 +76,6 @@ func (b *CoordinatorBroker) ShowPartitionsInternal(ctx context.Context, collecti resp, err := b.rootCoord.ShowPartitionsInternal(ctx, &milvuspb.ShowPartitionsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), // please do not specify the collection name alone after database feature. diff --git a/internal/datanode/broker/datacoord.go b/internal/datanode/broker/datacoord.go index 4290b543e3..6081ee8285 100644 --- a/internal/datanode/broker/datacoord.go +++ b/internal/datanode/broker/datacoord.go @@ -68,7 +68,6 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, segmentIDs []int6 infoResp, err := dc.client.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), SegmentIDs: segmentIDs, diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 942e50f02b..c27e0cbdfc 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -712,8 +712,6 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo RowNum: rowCount, SaveBinlogPathReq: &datapb.SaveBinlogPathsRequest{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(0), - commonpbutil.WithMsgID(0), commonpbutil.WithTimeStamp(ts), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), diff --git a/internal/datanode/stats_updater.go b/internal/datanode/stats_updater.go index efea9ba13a..cc44fff208 100644 --- a/internal/datanode/stats_updater.go +++ b/internal/datanode/stats_updater.go @@ -62,7 +62,6 @@ func (u *mqStatsUpdater) send(ts Timestamp, segmentIDs []int64) error { DataNodeTtMsg: msgpb.DataNodeTtMsg{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt), - commonpbutil.WithMsgID(0), commonpbutil.WithTimeStamp(ts), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), diff --git a/internal/datanode/syncmgr/meta_writer.go b/internal/datanode/syncmgr/meta_writer.go index 9acb4fd726..0f7c3f8617 100644 --- a/internal/datanode/syncmgr/meta_writer.go +++ b/internal/datanode/syncmgr/meta_writer.go @@ -167,8 +167,6 @@ func (b *brokerMetaWriter) UpdateSyncV2(pack *SyncTaskV2) error { req := &datapb.SaveBinlogPathsRequest{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(0), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), SegmentID: pack.segmentID, @@ -218,8 +216,6 @@ func (b *brokerMetaWriter) DropChannel(channelName string) error { err := retry.Do(context.Background(), func() error { status, err := b.broker.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(0), // TODO msg type - commonpbutil.WithMsgID(0), // TODO msg id commonpbutil.WithSourceID(paramtable.GetNodeID()), ), ChannelName: channelName, diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index eb7a526be2..40937d459a 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1608,7 +1608,6 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get msgBase := commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ) if request.Base == nil { @@ -1695,7 +1694,6 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt msgBase := commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ) if request.Base == nil { @@ -2223,7 +2221,6 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) InsertRequest: msgpb.InsertRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_Insert), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), DbName: request.GetDbName(), @@ -3283,7 +3280,6 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), SegmentIDs: getSegmentsByStatesResponse.Segments, @@ -3357,7 +3353,6 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), CollectionID: collID, @@ -3508,7 +3503,6 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque req.Base = commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ) if metricType == metricsinfo.SystemInfoMetrics { @@ -3570,7 +3564,6 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics req.Base = commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ) @@ -3627,7 +3620,6 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), SourceNodeIDs: []int64{req.SrcNodeID}, @@ -4578,7 +4570,6 @@ func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCol req.Base = commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RenameCollection), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(paramtable.GetNodeID()), ) resp, err := node.rootCoord.RenameCollection(ctx, req) diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index ffab642dbb..8dcb771bce 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -1241,7 +1241,6 @@ func TestProxy_ReplicateMessage(t *testing.T) { timeTickResult := msgpb.TimeTickMsg{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType(-1)), - commonpbutil.WithMsgID(0), commonpbutil.WithTimeStamp(10), commonpbutil.WithSourceID(-1), ), diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index e804dfaf2b..4c80889d7a 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -338,8 +338,7 @@ func (node *Proxy) sendChannelsTimeTickLoop() { req := &internalpb.ChannelTimeTickMsg{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_TimeTick), // todo - commonpbutil.WithMsgID(0), // todo + commonpbutil.WithMsgType(commonpb.MsgType_TimeTick), commonpbutil.WithSourceID(node.session.ServerID), ), ChannelNames: channels, diff --git a/internal/proxy/timestamp.go b/internal/proxy/timestamp.go index 7fe95cd627..7078b809c6 100644 --- a/internal/proxy/timestamp.go +++ b/internal/proxy/timestamp.go @@ -52,7 +52,6 @@ func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timest req := &rootcoordpb.AllocTimestampRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), - commonpbutil.WithMsgID(0), commonpbutil.WithSourceID(ta.peerID), ), Count: count, diff --git a/internal/rootcoord/expire_cache.go b/internal/rootcoord/expire_cache.go index eccfa6e7c2..df21a36fe8 100644 --- a/internal/rootcoord/expire_cache.go +++ b/internal/rootcoord/expire_cache.go @@ -70,8 +70,6 @@ func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []s for _, collName := range collNames { req := proxypb.InvalidateCollMetaCacheRequest{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(0), // TODO, msg type - commonpbutil.WithMsgID(0), // TODO, msg id commonpbutil.WithTimeStamp(ts), commonpbutil.WithSourceID(c.session.ServerID), ), diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index c07b945f12..045d7c6a0f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -2031,8 +2031,6 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( func (c *Core) ExpireCredCache(ctx context.Context, username string) error { req := proxypb.InvalidateCredCacheRequest{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(0), // TODO, msg type - commonpbutil.WithMsgID(0), // TODO, msg id commonpbutil.WithSourceID(c.session.ServerID), ), Username: username, @@ -2044,8 +2042,6 @@ func (c *Core) ExpireCredCache(ctx context.Context, username string) error { func (c *Core) UpdateCredCache(ctx context.Context, credInfo *internalpb.CredentialInfo) error { req := proxypb.UpdateCredCacheRequest{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(0), // TODO, msg type - commonpbutil.WithMsgID(0), // TODO, msg id commonpbutil.WithSourceID(c.session.ServerID), ), Username: credInfo.Username, diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 208d634e69..bc906519cc 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -330,7 +330,6 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim timeTickResult := msgpb.TimeTickMsg{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_TimeTick), - commonpbutil.WithMsgID(0), commonpbutil.WithTimeStamp(ts), commonpbutil.WithSourceID(t.sourceID), ),