From d3a18a66b56ac74e4bb668631eee310c7cecf71f Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 8 Sep 2021 20:50:00 +0800 Subject: [PATCH] Add tSafeReplica and task scheduler unittests (#7587) Signed-off-by: bigsheeper --- .../querynode/flow_graph_query_node_test.go | 73 ++++++++++++++++++ internal/querynode/task_queue_test.go | 59 +++++++++++++++ internal/querynode/task_scheduler_test.go | 75 +++++++++++++++++++ internal/querynode/tsafe_replica_test.go | 53 +++++++++++++ 4 files changed, 260 insertions(+) create mode 100644 internal/querynode/flow_graph_query_node_test.go create mode 100644 internal/querynode/task_queue_test.go create mode 100644 internal/querynode/task_scheduler_test.go create mode 100644 internal/querynode/tsafe_replica_test.go diff --git a/internal/querynode/flow_graph_query_node_test.go b/internal/querynode/flow_graph_query_node_test.go new file mode 100644 index 0000000000..5ab9fb81db --- /dev/null +++ b/internal/querynode/flow_graph_query_node_test.go @@ -0,0 +1,73 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/internalpb" +) + +func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streaming, err := genSimpleStreaming(ctx) + assert.NoError(t, err) + + fac, err := genFactory() + assert.NoError(t, err) + + fg := newQueryNodeFlowGraph(ctx, + loadTypeCollection, + defaultCollectionID, + defaultPartitionID, + streaming.replica, + streaming.tSafeReplica, + defaultVChannel, + fac) + + err = fg.consumerFlowGraph(defaultVChannel, defaultSubName) + assert.NoError(t, err) +} + +func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streaming, err := genSimpleStreaming(ctx) + assert.NoError(t, err) + + fac, err := genFactory() + assert.NoError(t, err) + + fg := newQueryNodeFlowGraph(ctx, + loadTypeCollection, + defaultCollectionID, + defaultPartitionID, + streaming.replica, + streaming.tSafeReplica, + defaultVChannel, + fac) + + position := &internalpb.MsgPosition{ + ChannelName: defaultVChannel, + MsgID: []byte{}, + MsgGroup: defaultSubName, + Timestamp: 0, + } + err = fg.seekQueryNodeFlowGraph(position) + assert.Error(t, err) +} diff --git a/internal/querynode/task_queue_test.go b/internal/querynode/task_queue_test.go new file mode 100644 index 0000000000..f41ae763e8 --- /dev/null +++ b/internal/querynode/task_queue_test.go @@ -0,0 +1,59 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBaseTaskQueue_addUnissuedTask(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := &taskScheduler{ + ctx: ctx, + cancel: cancel, + } + + t.Run("test full", func(t *testing.T) { + taskQueue := newLoadAndReleaseTaskQueue(s) + task := &mockTask{} + for i := 0; i < maxTaskNum; i++ { + err := taskQueue.addUnissuedTask(task) + assert.NoError(t, err) + } + err := taskQueue.addUnissuedTask(task) + assert.Error(t, err) + }) + + t.Run("add task to front", func(t *testing.T) { + taskQueue := newLoadAndReleaseTaskQueue(s) + mt := &mockTask{ + timestamp: 1000, + } + err := taskQueue.addUnissuedTask(mt) + fmt.Println(taskQueue.unissuedTasks.Back().Value.(task).Timestamp()) + assert.NoError(t, err) + err = taskQueue.addUnissuedTask(mt) + fmt.Println(taskQueue.unissuedTasks.Back().Value.(task).Timestamp()) + assert.NoError(t, err) + mt2 := &mockTask{ + timestamp: 0, + } + err = taskQueue.addUnissuedTask(mt2) + assert.NoError(t, err) + }) +} diff --git a/internal/querynode/task_scheduler_test.go b/internal/querynode/task_scheduler_test.go new file mode 100644 index 0000000000..64d0badb3a --- /dev/null +++ b/internal/querynode/task_scheduler_test.go @@ -0,0 +1,75 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + "errors" + "testing" +) + +type mockTask struct { + baseTask + preExecuteError bool + executeError bool + timestamp Timestamp +} + +func (m *mockTask) Timestamp() Timestamp { + return m.timestamp +} + +func (m *mockTask) OnEnqueue() error { + return nil +} + +func (m *mockTask) PreExecute(ctx context.Context) error { + if m.preExecuteError { + return errors.New("test error") + } + return nil +} + +func (m *mockTask) Execute(ctx context.Context) error { + if m.executeError { + return errors.New("test error") + } + return nil +} + +func (m *mockTask) PostExecute(ctx context.Context) error { + return nil +} + +func TestTaskScheduler(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts := newTaskScheduler(ctx) + ts.Start() + + task := &mockTask{ + baseTask: baseTask{ + ctx: ctx, + done: make(chan error, 1024), + }, + preExecuteError: true, + executeError: false, + } + ts.processTask(task, ts.queue) + + task.preExecuteError = false + task.executeError = true + ts.processTask(task, ts.queue) + + ts.Close() +} diff --git a/internal/querynode/tsafe_replica_test.go b/internal/querynode/tsafe_replica_test.go new file mode 100644 index 0000000000..281dadb154 --- /dev/null +++ b/internal/querynode/tsafe_replica_test.go @@ -0,0 +1,53 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTSafeReplica_valid(t *testing.T) { + replica := newTSafeReplica() + replica.addTSafe(defaultVChannel) + + watcher := newTSafeWatcher() + replica.registerTSafeWatcher(defaultVChannel, watcher) + + timestamp := Timestamp(1000) + replica.setTSafe(defaultVChannel, defaultCollectionID, timestamp) + time.Sleep(20 * time.Millisecond) + resT := replica.getTSafe(defaultVChannel) + assert.Equal(t, timestamp, resT) + + replica.removeTSafe(defaultVChannel) +} + +func TestTSafeReplica_invalid(t *testing.T) { + replica := newTSafeReplica() + + watcher := newTSafeWatcher() + replica.registerTSafeWatcher(defaultVChannel, watcher) + + timestamp := Timestamp(1000) + replica.setTSafe(defaultVChannel, defaultCollectionID, timestamp) + time.Sleep(20 * time.Millisecond) + resT := replica.getTSafe(defaultVChannel) + assert.Equal(t, Timestamp(0), resT) + + replica.removeTSafe(defaultVChannel) + + replica.addTSafe(defaultVChannel) + replica.addTSafe(defaultVChannel) +}