diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 04a50c73f6..3621da1456 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -40,6 +40,7 @@ func TestMain(t *testing.M) { func TestDataNode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) node := newIDLEDataNodeMock(ctx) + node.Init() node.Start() node.Register() @@ -228,6 +229,52 @@ func TestDataNode(t *testing.T) { }) + t.Run("Test GetChannelName", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + node := newIDLEDataNodeMock(ctx) + + testCollIDs := []UniqueID{0, 1, 2, 1} + testSegIDs := []UniqueID{10, 11, 12, 13} + testchanNames := []string{"a", "b", "c", "d"} + + node.chanMut.Lock() + for i, name := range testchanNames { + replica := &SegmentReplica{ + collectionID: testCollIDs[i], + newSegments: make(map[UniqueID]*Segment), + } + + replica.addNewSegment(testSegIDs[i], testCollIDs[i], 0, name, &internalpb.MsgPosition{}, nil) + node.vchan2SyncService[name] = &dataSyncService{collectionID: testCollIDs[i], replica: replica} + } + node.chanMut.Unlock() + + type Test struct { + inCollID UniqueID + expectedChannels []string + + inSegID UniqueID + expectedChannel string + } + tests := []Test{ + {0, []string{"a"}, 10, "a"}, + {1, []string{"b", "d"}, 11, "b"}, + {2, []string{"c"}, 12, "c"}, + {3, []string{}, 13, "d"}, + {3, []string{}, 100, ""}, + } + + for _, test := range tests { + actualChannels := node.getChannelNamesbyCollectionID(test.inCollID) + assert.ElementsMatch(t, test.expectedChannels, actualChannels) + + actualChannel := node.getChannelNamebySegmentID(test.inSegID) + assert.Equal(t, test.expectedChannel, actualChannel) + } + + cancel() + }) + t.Run("Test BackGroundGC", func(t *testing.T) { t.Skipf("Skip for data race") collIDCh := make(chan UniqueID) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 3526bd930c..aa1cfa1b3f 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -122,7 +122,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro zap.Int64("SegmentID", fu.segID), zap.Int64("CollectionID", fu.collID), zap.Int("Length of Field2BinlogPaths", len(id2path)), - zap.Any("Start Positions", fu.startPositions), ) req := &datapb.SaveBinlogPathsRequest{ diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index af34d42119..3310be3b9e 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -57,6 +57,7 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msMsg, ok := in[0].(*MsgStreamMsg) if !ok { log.Error("type assertion failed for MsgStreamMsg") + return []flowgraph.Msg{} // TODO: add error handling }