From 66b9684fe5fa0a93725f3ff4b630fe57d96fc548 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Tue, 2 Nov 2021 15:50:30 +0800 Subject: [PATCH] Update ListCollectionPhysicalChannels and ListCollectionVirtualChannels (#11007) Signed-off-by: yudong.cai --- internal/rootcoord/meta_table.go | 20 ++++++++++---------- internal/rootcoord/root_coord.go | 8 +++++--- internal/rootcoord/root_coord_test.go | 17 +++++++++++------ 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 2d5d7e276d..2d7630318c 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -535,27 +535,27 @@ func (mt *MetaTable) ListAliases(collID typeutil.UniqueID) []string { } // ListCollectionVirtualChannels list virtual channels of all collections -func (mt *MetaTable) ListCollectionVirtualChannels() []string { +func (mt *MetaTable) ListCollectionVirtualChannels() map[typeutil.UniqueID][]string { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - vlist := []string{} + chanMap := make(map[typeutil.UniqueID][]string) - for _, c := range mt.collID2Meta { - vlist = append(vlist, c.VirtualChannelNames...) + for id, collInfo := range mt.collID2Meta { + chanMap[id] = collInfo.VirtualChannelNames } - return vlist + return chanMap } // ListCollectionPhysicalChannels list physical channels of all collections -func (mt *MetaTable) ListCollectionPhysicalChannels() []string { +func (mt *MetaTable) ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - plist := []string{} + chanMap := make(map[typeutil.UniqueID][]string) - for _, c := range mt.collID2Meta { - plist = append(plist, c.PhysicalChannelNames...) + for id, collInfo := range mt.collID2Meta { + chanMap[id] = collInfo.PhysicalChannelNames } - return plist + return chanMap } // AddPartition add partition diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 6d5aefeeb1..bc5b5c8687 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -928,9 +928,11 @@ func (c *Core) Init() error { c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum) // recover physical channels for all collections - pc := c.MetaTable.ListCollectionPhysicalChannels() - c.dmlChannels.AddProducerChannels(pc...) - log.Debug("recover all physical channels", zap.Any("chanNames", pc)) + chanMap := c.MetaTable.ListCollectionPhysicalChannels() + for collID, chanNames := range chanMap { + c.dmlChannels.AddProducerChannels(chanNames...) + log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("chanNames", chanNames)) + } c.chanTimeTick = newTimeTickSync(c) c.chanTimeTick.AddProxy(c.session) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 0b6a619e21..df82450b12 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -674,17 +674,21 @@ func TestRootCoord(t *testing.T) { assert.Equal(t, shardsNum, int32(core.dmlChannels.GetNumChannels())) - pChan := core.MetaTable.ListCollectionPhysicalChannels() - dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName) + createMeta, err := core.MetaTable.GetCollectionByName(collName, 0) + assert.Nil(t, err) + dmlStream.AsConsumer([]string{createMeta.PhysicalChannelNames[0]}, Params.MsgChannelSubName) dmlStream.Start() + pChanMap := core.MetaTable.ListCollectionPhysicalChannels() + assert.Greater(t, len(pChanMap[createMeta.ID]), 0) + vChanMap := core.MetaTable.ListCollectionVirtualChannels() + assert.Greater(t, len(vChanMap[createMeta.ID]), 0) + // get CreateCollectionMsg msgs := getNotTtMsg(ctx, 1, dmlStream.Chan()) assert.Equal(t, 1, len(msgs)) createMsg, ok := (msgs[0]).(*msgstream.CreateCollectionMsg) assert.True(t, ok) - createMeta, err := core.MetaTable.GetCollectionByName(collName, 0) - assert.Nil(t, err) assert.Equal(t, createMeta.ID, createMsg.CollectionID) assert.Equal(t, 1, len(createMeta.PartitionIDs)) assert.Equal(t, createMeta.PartitionIDs[0], createMsg.PartitionID) @@ -2272,9 +2276,10 @@ func TestRootCoord2(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) - pChan := core.MetaTable.ListCollectionPhysicalChannels() + collInfo, err := core.MetaTable.GetCollectionByName(collName, 0) + assert.Nil(t, err) dmlStream, _ := msFactory.NewMsgStream(ctx) - dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName) + dmlStream.AsConsumer([]string{collInfo.PhysicalChannelNames[0]}, Params.MsgChannelSubName) dmlStream.Start() msgs := getNotTtMsg(ctx, 1, dmlStream.Chan())