diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index dc226ea91b..1eedbdc19b 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -34,9 +34,10 @@ const ( type dataSyncService struct { ctx context.Context - mu sync.Mutex // guards FlowGraphs - collectionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[collectionID]flowGraphs - partitionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[partitionID]flowGraphs + mu sync.Mutex // guards FlowGraphs + collectionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[collectionID]flowGraphs + collectionDeltaFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph + partitionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[partitionID]flowGraphs streamingReplica ReplicaInterface historicalReplica ReplicaInterface @@ -72,6 +73,33 @@ func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID, } } +// collection flow graph +// addCollectionFlowGraphDelta add a collection flowGraph to collectionFlowGraphs +func (dsService *dataSyncService) addCollectionDeltaFlowGraph(collectionID UniqueID, vChannels []string) { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; !ok { + dsService.collectionDeltaFlowGraphs[collectionID] = make(map[Channel]*queryNodeFlowGraph) + } + for _, vChannel := range vChannels { + // collection flow graph doesn't need partition id + partitionID := UniqueID(0) + newFlowGraph := newQueryNodeDeltaFlowGraph(dsService.ctx, + loadTypeCollection, + collectionID, + partitionID, + dsService.historicalReplica, + dsService.tSafeReplica, + vChannel, + dsService.msFactory) + dsService.collectionDeltaFlowGraphs[collectionID][vChannel] = newFlowGraph + log.Debug("add collection flow graph", + zap.Any("collectionID", collectionID), + zap.Any("channel", vChannel)) + } +} + func (dsService *dataSyncService) getCollectionFlowGraphs(collectionID UniqueID, vChannels []string) (map[Channel]*queryNodeFlowGraph, error) { dsService.mu.Lock() defer dsService.mu.Unlock() @@ -90,6 +118,24 @@ func (dsService *dataSyncService) getCollectionFlowGraphs(collectionID UniqueID, return tmpFGs, nil } +func (dsService *dataSyncService) getCollectionDeltaFlowGraphs(collectionID UniqueID, vChannels []string) (map[Channel]*queryNodeFlowGraph, error) { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; !ok { + return nil, errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID)) + } + + tmpFGs := make(map[Channel]*queryNodeFlowGraph) + for _, channel := range vChannels { + if _, ok := dsService.collectionDeltaFlowGraphs[collectionID][channel]; ok { + tmpFGs[channel] = dsService.collectionDeltaFlowGraphs[collectionID][channel] + } + } + + return tmpFGs, nil +} + func (dsService *dataSyncService) startCollectionFlowGraph(collectionID UniqueID, vChannels []string) error { dsService.mu.Lock() defer dsService.mu.Unlock() @@ -107,6 +153,23 @@ func (dsService *dataSyncService) startCollectionFlowGraph(collectionID UniqueID return nil } +func (dsService *dataSyncService) startCollectionDeltaFlowGraph(collectionID UniqueID, vChannels []string) error { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; !ok { + return errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID)) + } + for _, channel := range vChannels { + if _, ok := dsService.collectionDeltaFlowGraphs[collectionID][channel]; ok { + // start flow graph + log.Debug("start collection flow graph", zap.Any("channel", channel)) + dsService.collectionDeltaFlowGraphs[collectionID][channel].flowGraph.Start() + } + } + return nil +} + func (dsService *dataSyncService) removeCollectionFlowGraph(collectionID UniqueID) { dsService.mu.Lock() defer dsService.mu.Unlock() @@ -121,6 +184,20 @@ func (dsService *dataSyncService) removeCollectionFlowGraph(collectionID UniqueI delete(dsService.collectionFlowGraphs, collectionID) } +func (dsService *dataSyncService) removeCollectionDeltaFlowGraph(collectionID UniqueID) { + dsService.mu.Lock() + defer dsService.mu.Unlock() + + if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; ok { + for _, nodeFG := range dsService.collectionDeltaFlowGraphs[collectionID] { + // close flow graph + nodeFG.close() + } + dsService.collectionDeltaFlowGraphs[collectionID] = nil + } + delete(dsService.collectionDeltaFlowGraphs, collectionID) +} + // partition flow graph func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, partitionID UniqueID, vChannels []string) { dsService.mu.Lock() @@ -206,13 +283,14 @@ func newDataSyncService(ctx context.Context, factory msgstream.Factory) *dataSyncService { return &dataSyncService{ - ctx: ctx, - collectionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph), - partitionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph), - streamingReplica: streamingReplica, - historicalReplica: historicalReplica, - tSafeReplica: tSafeReplica, - msFactory: factory, + ctx: ctx, + collectionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph), + collectionDeltaFlowGraphs: map[int64]map[string]*queryNodeFlowGraph{}, + partitionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph), + streamingReplica: streamingReplica, + historicalReplica: historicalReplica, + tSafeReplica: tSafeReplica, + msFactory: factory, } } diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 8ff1bf6c96..aa1eacbad6 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -117,12 +117,12 @@ func TestDataSyncService_Start(t *testing.T) { assert.Nil(t, err) channels := []Channel{"0"} - node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, channels) - err = node.streaming.dataSyncService.startCollectionFlowGraph(collectionID, channels) + node.dataSyncService.addCollectionFlowGraph(collectionID, channels) + err = node.dataSyncService.startCollectionFlowGraph(collectionID, channels) assert.NoError(t, err) <-node.queryNodeLoopCtx.Done() - node.streaming.dataSyncService.close() + node.dataSyncService.close() err = node.Stop() assert.NoError(t, err) @@ -132,7 +132,7 @@ func TestDataSyncService_collectionFlowGraphs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streaming, err := genSimpleStreaming(ctx) + streaming, err := genSimpleReplica() assert.NoError(t, err) historicalReplica, err := genSimpleReplica() @@ -141,7 +141,8 @@ func TestDataSyncService_collectionFlowGraphs(t *testing.T) { fac, err := genFactory() assert.NoError(t, err) - dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac) + tSafe := newTSafeReplica() + dataSyncService := newDataSyncService(ctx, streaming, historicalReplica, tSafe, fac) assert.NotNil(t, dataSyncService) dataSyncService.addCollectionFlowGraph(defaultCollectionID, []Channel{defaultVChannel}) @@ -178,7 +179,7 @@ func TestDataSyncService_partitionFlowGraphs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streaming, err := genSimpleStreaming(ctx) + streaming, err := genSimpleReplica() assert.NoError(t, err) historicalReplica, err := genSimpleReplica() @@ -187,7 +188,9 @@ func TestDataSyncService_partitionFlowGraphs(t *testing.T) { fac, err := genFactory() assert.NoError(t, err) - dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac) + tSafe := newTSafeReplica() + + dataSyncService := newDataSyncService(ctx, streaming, historicalReplica, tSafe, fac) assert.NotNil(t, dataSyncService) dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel}) @@ -225,7 +228,7 @@ func TestDataSyncService_removePartitionFlowGraphs(t *testing.T) { defer cancel() t.Run("test no tSafe", func(t *testing.T) { - streaming, err := genSimpleStreaming(ctx) + streaming, err := genSimpleReplica() assert.NoError(t, err) historicalReplica, err := genSimpleReplica() @@ -234,7 +237,10 @@ func TestDataSyncService_removePartitionFlowGraphs(t *testing.T) { fac, err := genFactory() assert.NoError(t, err) - dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac) + tSafe := newTSafeReplica() + tSafe.addTSafe(defaultVChannel) + + dataSyncService := newDataSyncService(ctx, streaming, historicalReplica, tSafe, fac) assert.NotNil(t, dataSyncService) dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel}) diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go new file mode 100644 index 0000000000..b35da0bc8b --- /dev/null +++ b/internal/querynode/flow_graph_delete_node.go @@ -0,0 +1,133 @@ +package querynode + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/trace" + "github.com/opentracing/opentracing-go" + "go.uber.org/zap" +) + +type deleteNode struct { + baseNode + replica ReplicaInterface // historical +} + +func (dNode *deleteNode) Name() string { + return "dNode" +} + +func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + if len(in) != 1 { + log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in))) + // TODO: add error handling + } + + dMsg, ok := in[0].(*deleteMsg) + if !ok { + log.Warn("type assertion failed for deleteMsg") + // TODO: add error handling + } + + delData := &deleteData{ + deleteIDs: map[UniqueID][]int64{}, + deleteTimestamps: map[UniqueID][]Timestamp{}, + deleteOffset: map[UniqueID]int64{}, + } + + if dMsg == nil { + return []Msg{} + } + + var spans []opentracing.Span + for _, msg := range dMsg.deleteMessages { + sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) + spans = append(spans, sp) + msg.SetTraceCtx(ctx) + } + + // 1. filter segment by bloom filter + for _, delMsg := range dMsg.deleteMessages { + if dNode.replica.getSegmentNum() != 0 { + processDeleteMessages(dNode.replica, delMsg, delData) + } + } + + // 2. do preDelete + for segmentID, pks := range delData.deleteIDs { + segment, err := dNode.replica.getSegmentByID(segmentID) + if err != nil { + log.Warn("Cannot find segment in historical replica:", zap.Int64("segmentID", segmentID)) + } + offset := segment.segmentPreDelete(len(pks)) + delData.deleteOffset[segmentID] = offset + } + + // 3. do delete + wg := sync.WaitGroup{} + for segmentID := range delData.deleteIDs { + wg.Add(1) + go dNode.delete(delData, segmentID, &wg) + } + wg.Wait() + + var res Msg = &serviceTimeMsg{ + timeRange: dMsg.timeRange, + } + for _, sp := range spans { + sp.Finish() + } + + return []Msg{res} +} + +func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { + defer wg.Done() + log.Debug("QueryNode::dNode::delete", zap.Any("SegmentID", segmentID)) + targetSegment := dNode.getSegmentInReplica(segmentID) + if targetSegment == nil { + log.Warn("targetSegment is nil") + return + } + + if targetSegment.segmentType != segmentTypeSealed { + return + } + + ids := deleteData.deleteIDs[segmentID] + timestamps := deleteData.deleteTimestamps[segmentID] + offset := deleteData.deleteOffset[segmentID] + + err := targetSegment.segmentDelete(offset, &ids, ×tamps) + if err != nil { + log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err)) + return + } + + log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID)) +} + +func (dNode *deleteNode) getSegmentInReplica(segmentID int64) *Segment { + segment, err := dNode.replica.getSegmentByID(segmentID) + if err != nil { + } else { + return segment + } + return nil +} + +func newDeleteNode(historicalReplica ReplicaInterface) *deleteNode { + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + baseNode := baseNode{} + baseNode.SetMaxQueueLength(maxQueueLength) + baseNode.SetMaxParallelism(maxParallelism) + + return &deleteNode{ + baseNode: baseNode, + replica: historicalReplica, + } +} diff --git a/internal/querynode/flow_graph_delete_node_test.go b/internal/querynode/flow_graph_delete_node_test.go new file mode 100644 index 0000000000..2c88dfc53b --- /dev/null +++ b/internal/querynode/flow_graph_delete_node_test.go @@ -0,0 +1,235 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/util/flowgraph" +) + +func TestFlowGraphDeleteNode_delete(t *testing.T) { + t.Run("test delete", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + deleteData, err := genFlowGraphDeleteData() + assert.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + deleteNode.delete(deleteData, defaultSegmentID, wg) + }) + + t.Run("test segment delete error", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + deleteData, err := genFlowGraphDeleteData() + assert.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2] + deleteNode.delete(deleteData, defaultSegmentID, wg) + }) + + t.Run("test no target segment", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + wg := &sync.WaitGroup{} + wg.Add(1) + deleteNode.delete(nil, defaultSegmentID, wg) + }) + + t.Run("test invalid segmentType", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + deleteNode.delete(&deleteData{}, defaultSegmentID, wg) + }) +} + +func TestFlowGraphDeleteNode_operate(t *testing.T) { + t.Run("test operate", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + dMsg := deleteMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&dMsg} + deleteNode.Operate(msg) + s, err := historical.getSegmentByID(defaultSegmentID) + pks := make([]int64, defaultMsgLength) + for i := 0; i < defaultMsgLength; i++ { + pks[i] = int64(i) + } + s.updateBloomFilter(pks) + assert.Nil(t, err) + buf := make([]byte, 8) + for i := 0; i < defaultMsgLength; i++ { + common.Endian.PutUint64(buf, uint64(i)) + assert.True(t, s.pkFilter.Test(buf)) + } + + }) + + t.Run("test invalid partitionID", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + msgDeleteMsg.PartitionID = common.InvalidPartitionID + assert.NoError(t, err) + dMsg := deleteMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&dMsg} + deleteNode.Operate(msg) + }) + + t.Run("test collection partition not exist", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + msgDeleteMsg.CollectionID = 9999 + msgDeleteMsg.PartitionID = -1 + assert.NoError(t, err) + dMsg := deleteMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&dMsg} + deleteNode.Operate(msg) + }) + + t.Run("test partition not exist", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + msgDeleteMsg.PartitionID = 9999 + assert.NoError(t, err) + dMsg := deleteMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&dMsg} + deleteNode.Operate(msg) + }) + + t.Run("test invalid input length", func(t *testing.T) { + historical, err := genSimpleReplica() + assert.NoError(t, err) + deleteNode := newDeleteNode(historical) + + err = historical.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + msgDeleteMsg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + dMsg := deleteMsg{ + deleteMessages: []*msgstream.DeleteMsg{ + msgDeleteMsg, + }, + } + msg := []flowgraph.Msg{&dMsg, &dMsg} + deleteNode.Operate(msg) + }) +} diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go new file mode 100644 index 0000000000..aa46655348 --- /dev/null +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -0,0 +1,113 @@ +package querynode + +import ( + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/trace" + "github.com/opentracing/opentracing-go" + "go.uber.org/zap" +) + +type filterDeleteNode struct { + baseNode + collectionID UniqueID + partitionID UniqueID + replica ReplicaInterface +} + +func (fddNode *filterDeleteNode) Name() string { + return "fddNode" +} + +func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + if len(in) != 1 { + log.Error("Invalid operate message input in filterDDNode", zap.Int("input length", len(in))) + // TODO: add error handling + } + + msgStreamMsg, ok := in[0].(*MsgStreamMsg) + if !ok { + log.Warn("type assertion failed for MsgStreamMsg") + // TODO: add error handling + } + + if msgStreamMsg == nil { + return []Msg{} + } + + var spans []opentracing.Span + for _, msg := range msgStreamMsg.TsMessages() { + sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) + spans = append(spans, sp) + msg.SetTraceCtx(ctx) + } + + var dMsg = deleteMsg{ + deleteMessages: make([]*msgstream.DeleteMsg, 0), + timeRange: TimeRange{ + timestampMin: msgStreamMsg.TimestampMin(), + timestampMax: msgStreamMsg.TimestampMax(), + }, + } + + for _, msg := range msgStreamMsg.TsMessages() { + switch msg.Type() { + case commonpb.MsgType_Delete: + resMsg := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + if resMsg != nil { + dMsg.deleteMessages = append(dMsg.deleteMessages, resMsg) + } + default: + log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type()))) + } + } + var res Msg = &dMsg + for _, sp := range spans { + sp.Finish() + } + return []Msg{res} +} + +func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg { + sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) + msg.SetTraceCtx(ctx) + defer sp.Finish() + + if msg.CollectionID != fddNode.collectionID { + return nil + } + + if len(msg.PrimaryKeys) != len(msg.Timestamps) { + log.Warn("Error, misaligned messages detected") + return nil + } + + if len(msg.Timestamps) <= 0 { + log.Debug("filter invalid delete message, no message", + zap.Any("collectionID", msg.CollectionID), + zap.Any("partitionID", msg.PartitionID)) + return nil + } + return msg +} + +func newFilteredDeleteNode(replica ReplicaInterface, + collectionID UniqueID, + partitionID UniqueID) *filterDeleteNode { + + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + baseNode := baseNode{} + baseNode.SetMaxQueueLength(maxQueueLength) + baseNode.SetMaxParallelism(maxParallelism) + + return &filterDeleteNode{ + baseNode: baseNode, + collectionID: collectionID, + partitionID: partitionID, + replica: replica, + } +} diff --git a/internal/querynode/flow_graph_filter_delete_node_test.go b/internal/querynode/flow_graph_filter_delete_node_test.go new file mode 100644 index 0000000000..945e440127 --- /dev/null +++ b/internal/querynode/flow_graph_filter_delete_node_test.go @@ -0,0 +1,115 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/util/flowgraph" +) + +func getFilterDeleteNode(ctx context.Context) (*filterDeleteNode, error) { + historical, err := genSimpleReplica() + if err != nil { + return nil, err + } + + historical.addExcludedSegments(defaultCollectionID, nil) + return newFilteredDeleteNode(historical, defaultCollectionID, defaultPartitionID), nil +} + +func TestFlowGraphFilterDeleteNode_filterDeleteNode(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fg, err := getFilterDeleteNode(ctx) + assert.NoError(t, err) + fg.Name() +} + +func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("delete valid test", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDeleteNode(ctx) + assert.NoError(t, err) + res := fg.filterInvalidDeleteMessage(msg) + assert.NotNil(t, res) + }) + + t.Run("test delete no collection", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + msg.CollectionID = UniqueID(1003) + fg, err := getFilterDeleteNode(ctx) + assert.NoError(t, err) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete not target collection", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDeleteNode(ctx) + assert.NoError(t, err) + fg.collectionID = UniqueID(1000) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) + + t.Run("test delete no data", func(t *testing.T) { + msg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + fg, err := getFilterDeleteNode(ctx) + assert.NoError(t, err) + msg.Timestamps = make([]Timestamp, 0) + msg.PrimaryKeys = make([]IntPrimaryKey, 0) + res := fg.filterInvalidDeleteMessage(msg) + assert.Nil(t, res) + }) +} + +func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + genFilterDeleteMsg := func() []flowgraph.Msg { + dMsg, err := genSimpleDeleteMsg() + assert.NoError(t, err) + msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil) + return []flowgraph.Msg{msg} + } + + t.Run("valid test", func(t *testing.T) { + msg := genFilterDeleteMsg() + fg, err := getFilterDeleteNode(ctx) + assert.NoError(t, err) + res := fg.Operate(msg) + assert.NotNil(t, res) + }) + + t.Run("invalid input length", func(t *testing.T) { + msg := genFilterDeleteMsg() + fg, err := getFilterDeleteNode(ctx) + assert.NoError(t, err) + var m flowgraph.Msg + msg = append(msg, m) + res := fg.Operate(msg) + assert.NotNil(t, res) + }) +} diff --git a/internal/querynode/flow_graph_filter_dm_node_test.go b/internal/querynode/flow_graph_filter_dm_node_test.go index 896cb8108d..26a3eaba26 100644 --- a/internal/querynode/flow_graph_filter_dm_node_test.go +++ b/internal/querynode/flow_graph_filter_dm_node_test.go @@ -25,13 +25,13 @@ import ( ) func getFilterDMNode(ctx context.Context) (*filterDmNode, error) { - streaming, err := genSimpleStreaming(ctx) + streaming, err := genSimpleReplica() if err != nil { return nil, err } - streaming.replica.addExcludedSegments(defaultCollectionID, nil) - return newFilteredDmNode(streaming.replica, loadTypeCollection, defaultCollectionID, defaultPartitionID), nil + streaming.addExcludedSegments(defaultCollectionID, nil) + return newFilteredDmNode(streaming, loadTypeCollection, defaultCollectionID, defaultPartitionID), nil } func TestFlowGraphFilterDmNode_filterDmNode(t *testing.T) { @@ -183,7 +183,7 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { t.Run("test delete no collection", func(t *testing.T) { msg, err := genSimpleDeleteMsg() assert.NoError(t, err) - msg.CollectionID = UniqueID(1000) + msg.CollectionID = UniqueID(1003) fg, err := getFilterDMNode(ctx) assert.NoError(t, err) res := fg.filterInvalidDeleteMessage(msg) diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index 2f6f7550a3..28ab83a32c 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -28,6 +28,11 @@ type insertMsg struct { timeRange TimeRange } +type deleteMsg struct { + deleteMessages []*msgstream.DeleteMsg + timeRange TimeRange +} + type serviceTimeMsg struct { timeRange TimeRange } @@ -36,6 +41,10 @@ func (iMsg *insertMsg) TimeTick() Timestamp { return iMsg.timeRange.timestampMax } +func (dMsg *deleteMsg) TimeTick() Timestamp { + return dMsg.timeRange.timestampMax +} + func (stMsg *serviceTimeMsg) TimeTick() Timestamp { return stMsg.timeRange.timestampMax } diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 48fcf599fa..5f4aa0c039 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -104,6 +104,75 @@ func newQueryNodeFlowGraph(ctx context.Context, return q } +func newQueryNodeDeltaFlowGraph(ctx context.Context, + loadType loadType, + collectionID UniqueID, + partitionID UniqueID, + historicalReplica ReplicaInterface, + tSafeReplica TSafeReplicaInterface, + channel Channel, + factory msgstream.Factory) *queryNodeFlowGraph { + + ctx1, cancel := context.WithCancel(ctx) + + q := &queryNodeFlowGraph{ + ctx: ctx1, + cancel: cancel, + collectionID: collectionID, + partitionID: partitionID, + channel: channel, + flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1), + } + + var dmStreamNode node = q.newDmInputNode(ctx1, factory) + var filterDeleteNode node = newFilteredDeleteNode(historicalReplica, collectionID, partitionID) + var deleteNode node = newDeleteNode(historicalReplica) + var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, loadTypeCollection, collectionID, partitionID, channel, factory) + + q.flowGraph.AddNode(dmStreamNode) + q.flowGraph.AddNode(filterDeleteNode) + q.flowGraph.AddNode(deleteNode) + q.flowGraph.AddNode(serviceTimeNode) + + // dmStreamNode + var err = q.flowGraph.SetEdges(dmStreamNode.Name(), + []string{}, + []string{filterDeleteNode.Name()}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name())) + } + + // filterDmNode + err = q.flowGraph.SetEdges(filterDeleteNode.Name(), + []string{dmStreamNode.Name()}, + []string{deleteNode.Name()}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", filterDeleteNode.Name())) + } + + // insertNode + err = q.flowGraph.SetEdges(deleteNode.Name(), + []string{filterDeleteNode.Name()}, + []string{serviceTimeNode.Name()}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", deleteNode.Name())) + } + + // serviceTimeNode + err = q.flowGraph.SetEdges(serviceTimeNode.Name(), + []string{deleteNode.Name()}, + []string{}, + ) + if err != nil { + log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name())) + } + + return q +} + func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode { insertStream, err := factory.NewTtMsgStream(ctx) if err != nil { diff --git a/internal/querynode/flow_graph_query_node_test.go b/internal/querynode/flow_graph_query_node_test.go index e393da152e..fad7ff8186 100644 --- a/internal/querynode/flow_graph_query_node_test.go +++ b/internal/querynode/flow_graph_query_node_test.go @@ -24,7 +24,9 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + + streamingReplica, err := genSimpleReplica() assert.NoError(t, err) historicalReplica, err := genSimpleReplica() @@ -37,9 +39,9 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) { loadTypeCollection, defaultCollectionID, defaultPartitionID, - streaming.replica, + streamingReplica, historicalReplica, - streaming.tSafeReplica, + tSafe, defaultVChannel, fac) @@ -51,7 +53,7 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streaming, err := genSimpleStreaming(ctx) + streamingReplica, err := genSimpleReplica() assert.NoError(t, err) historicalReplica, err := genSimpleReplica() @@ -60,13 +62,15 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) { fac, err := genFactory() assert.NoError(t, err) + tSafe := newTSafeReplica() + fg := newQueryNodeFlowGraph(ctx, loadTypeCollection, defaultCollectionID, defaultPartitionID, - streaming.replica, + streamingReplica, historicalReplica, - streaming.tSafeReplica, + tSafe, defaultVChannel, fac) diff --git a/internal/querynode/flow_graph_service_time_node_test.go b/internal/querynode/flow_graph_service_time_node_test.go index 4d5dd5772b..c602fe90cf 100644 --- a/internal/querynode/flow_graph_service_time_node_test.go +++ b/internal/querynode/flow_graph_service_time_node_test.go @@ -25,14 +25,14 @@ func TestServiceTimeNode_Operate(t *testing.T) { defer cancel() genServiceTimeNode := func() *serviceTimeNode { - streaming, err := genSimpleStreaming(ctx) - assert.NoError(t, err) + tSafe := newTSafeReplica() + tSafe.addTSafe(defaultVChannel) fac, err := genFactory() assert.NoError(t, err) node := newServiceTimeNode(ctx, - streaming.tSafeReplica, + tSafe, loadTypeCollection, defaultCollectionID, defaultPartitionID, diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go index b359f5cead..62c660fe77 100644 --- a/internal/querynode/historical.go +++ b/internal/querynode/historical.go @@ -43,6 +43,7 @@ type historical struct { replica ReplicaInterface loader *segmentLoader statsService *statsService + tSafeReplica TSafeReplicaInterface mu sync.Mutex // guards globalSealedSegments globalSealedSegments map[UniqueID]*querypb.SegmentInfo @@ -52,11 +53,12 @@ type historical struct { // newHistorical returns a new historical func newHistorical(ctx context.Context, + replica ReplicaInterface, rootCoord types.RootCoord, indexCoord types.IndexCoord, factory msgstream.Factory, - etcdKV *etcdkv.EtcdKV) *historical { - replica := newCollectionReplica(etcdKV) + etcdKV *etcdkv.EtcdKV, + tSafeReplica TSafeReplicaInterface) *historical { loader := newSegmentLoader(ctx, rootCoord, indexCoord, replica, etcdKV) ss := newStatsService(ctx, replica, loader.indexLoader.fieldStatsChan, factory) @@ -67,6 +69,7 @@ func newHistorical(ctx context.Context, statsService: ss, globalSealedSegments: make(map[UniqueID]*querypb.SegmentInfo), etcdKV: etcdKV, + tSafeReplica: tSafeReplica, } } diff --git a/internal/querynode/historical_test.go b/internal/querynode/historical_test.go index 84b21bedc6..cf3110cee4 100644 --- a/internal/querynode/historical_test.go +++ b/internal/querynode/historical_test.go @@ -95,7 +95,8 @@ func TestHistorical_Search(t *testing.T) { defer cancel() t.Run("test search", func(t *testing.T) { - his, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + his, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) plan, searchReqs, err := genSimpleSearchPlanAndRequests() @@ -106,7 +107,8 @@ func TestHistorical_Search(t *testing.T) { }) t.Run("test no collection - search partitions", func(t *testing.T) { - his, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + his, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) plan, searchReqs, err := genSimpleSearchPlanAndRequests() @@ -120,7 +122,8 @@ func TestHistorical_Search(t *testing.T) { }) t.Run("test no collection - search all collection", func(t *testing.T) { - his, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + his, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) plan, searchReqs, err := genSimpleSearchPlanAndRequests() @@ -134,7 +137,8 @@ func TestHistorical_Search(t *testing.T) { }) t.Run("test load partition and partition has been released", func(t *testing.T) { - his, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + his, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) plan, searchReqs, err := genSimpleSearchPlanAndRequests() @@ -152,7 +156,8 @@ func TestHistorical_Search(t *testing.T) { }) t.Run("test no partition in collection", func(t *testing.T) { - his, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + his, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) plan, searchReqs, err := genSimpleSearchPlanAndRequests() @@ -168,7 +173,8 @@ func TestHistorical_Search(t *testing.T) { }) t.Run("test load collection partition released in collection", func(t *testing.T) { - his, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + his, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) plan, searchReqs, err := genSimpleSearchPlanAndRequests() diff --git a/internal/querynode/impl_test.go b/internal/querynode/impl_test.go index 6eb20d111d..a82e71d6e7 100644 --- a/internal/querynode/impl_test.go +++ b/internal/querynode/impl_test.go @@ -241,7 +241,7 @@ func TestImpl_WatchDmChannels(t *testing.T) { req := &queryPb.WatchDmChannelsRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_WatchQueryChannels, + MsgType: commonpb.MsgType_WatchDmChannels, MsgID: rand.Int63(), }, NodeID: 0, diff --git a/internal/querynode/index_loader_test.go b/internal/querynode/index_loader_test.go index 4bd83825ef..8e08ddf3ac 100644 --- a/internal/querynode/index_loader_test.go +++ b/internal/querynode/index_loader_test.go @@ -25,7 +25,8 @@ func TestIndexLoader_setIndexInfo(t *testing.T) { defer cancel() t.Run("test setIndexInfo", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) segment, err := genSimpleSealedSegment() @@ -39,7 +40,8 @@ func TestIndexLoader_setIndexInfo(t *testing.T) { }) t.Run("test nil root and index", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) segment, err := genSimpleSealedSegment() @@ -55,7 +57,8 @@ func TestIndexLoader_getIndexBinlog(t *testing.T) { defer cancel() t.Run("test getIndexBinlog", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) paths, err := generateIndex(defaultSegmentID) @@ -66,7 +69,8 @@ func TestIndexLoader_getIndexBinlog(t *testing.T) { }) t.Run("test invalid path", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) _, _, _, err = historical.loader.indexLoader.getIndexBinlog([]string{""}) @@ -78,7 +82,8 @@ func TestIndexLoader_printIndexParams(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) indexKV := []*commonpb.KeyValuePair{ @@ -95,7 +100,8 @@ func TestIndexLoader_loadIndex(t *testing.T) { defer cancel() t.Run("test loadIndex", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) segment, err := genSimpleSealedSegment() @@ -112,7 +118,8 @@ func TestIndexLoader_loadIndex(t *testing.T) { }) t.Run("test set indexinfo with empty indexFilePath", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) segment, err := genSimpleSealedSegment() @@ -144,7 +151,8 @@ func TestIndexLoader_loadIndex(t *testing.T) { //}) t.Run("test checkIndexReady failed", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) segment, err := genSimpleSealedSegment() diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 6bdb36a7ad..b75b856f11 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -58,8 +58,9 @@ const ( defaultNProb = 10 defaultMetricType = "JACCARD" - defaultKVRootPath = "query-node-unittest" - defaultVChannel = "query-node-unittest-channel-0" + defaultKVRootPath = "query-node-unittest" + defaultVChannel = "query-node-unittest-channel-0" + defaultHistoricalVChannel = "query-node-unittest-historical-channel-0" //defaultQueryChannel = "query-node-unittest-query-channel-0" //defaultQueryResultChannel = "query-node-unittest-query-result-channel-0" defaultSubName = "query-node-unittest-sub-name-0" @@ -881,7 +882,7 @@ func genSimpleReplica() (ReplicaInterface, error) { return r, err } -func genSimpleHistorical(ctx context.Context) (*historical, error) { +func genSimpleHistorical(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*historical, error) { fac, err := genFactory() if err != nil { return nil, err @@ -890,7 +891,11 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) { if err != nil { return nil, err } - h := newHistorical(ctx, newMockRootCoord(), newMockIndexCoord(), fac, kv) + replica, err := genSimpleReplica() + if err != nil { + return nil, err + } + h := newHistorical(ctx, replica, newMockRootCoord(), newMockIndexCoord(), fac, kv, tSafeReplica) r, err := genSimpleReplica() if err != nil { return nil, err @@ -910,12 +915,14 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) { return nil, err } col.addVChannels([]Channel{ + // defaultHistoricalVChannel, defaultVChannel, }) + // h.tSafeReplica.addTSafe(defaultHistoricalVChannel) return h, nil } -func genSimpleStreaming(ctx context.Context) (*streaming, error) { +func genSimpleStreaming(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*streaming, error) { kv, err := genEtcdKV() if err != nil { return nil, err @@ -924,11 +931,11 @@ func genSimpleStreaming(ctx context.Context) (*streaming, error) { if err != nil { return nil, err } - historicalReplica, err := genSimpleReplica() + replica, err := genSimpleReplica() if err != nil { return nil, err } - s := newStreaming(ctx, fac, kv, historicalReplica) + s := newStreaming(ctx, replica, fac, kv, tSafeReplica) r, err := genSimpleReplica() if err != nil { return nil, err @@ -1304,15 +1311,18 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) { node.etcdKV = etcdKV - streaming, err := genSimpleStreaming(ctx) + node.tSafeReplica = newTSafeReplica() + + streaming, err := genSimpleStreaming(ctx, node.tSafeReplica) if err != nil { return nil, err } - historical, err := genSimpleHistorical(ctx) + historical, err := genSimpleHistorical(ctx, node.tSafeReplica) if err != nil { return nil, err } + node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, streaming.replica, historical.replica, node.tSafeReplica, node.msFactory) node.streaming = streaming node.historical = historical diff --git a/internal/querynode/plan_test.go b/internal/querynode/plan_test.go index a626b2a9f4..06da3982b6 100644 --- a/internal/querynode/plan_test.go +++ b/internal/querynode/plan_test.go @@ -47,7 +47,8 @@ func TestPlan_createSearchPlanByExpr(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) col, err := historical.replica.getCollectionByID(defaultCollectionID) diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 6ee64f1e2a..755edf8d5f 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -147,10 +147,15 @@ func (q *queryCollection) registerCollectionTSafe() error { if err != nil { return err } + // historicalCollection, err := q.historical.replica.getCollectionByID(q.collectionID) + // if err != nil { + // return err + // } log.Debug("register tSafe watcher and init watcher select case", zap.Any("collectionID", collection.ID()), zap.Any("dml channels", collection.getVChannels()), + // zap.Any("delta channels", collection.getVChannels()), ) for _, channel := range collection.getVChannels() { err = q.addTSafeWatcher(channel) @@ -158,6 +163,12 @@ func (q *queryCollection) registerCollectionTSafe() error { return err } } + // for _, channel := range historicalCollection.getVChannels() { + // err := q.addTSafeWatcher(channel) + // if err != nil { + // return err + // } + // } return nil } @@ -1216,7 +1227,7 @@ func mergeRetrieveResults(retrieveResults []*segcorepb.RetrieveResults) (*segcor // merge results and remove duplicates for _, rr := range retrieveResults { // skip empty result, it will break merge result - if rr == nil || rr.Offset == nil || len(rr.Offset) == 0 { + if rr == nil || len(rr.Offset) == 0 { continue } @@ -1234,7 +1245,7 @@ func mergeRetrieveResults(retrieveResults []*segcorepb.RetrieveResults) (*segcor } if len(ret.FieldsData) != len(rr.FieldsData) { - return nil, fmt.Errorf("mismatch FieldData in querynode RetrieveResults, expect %d get %d", len(ret.FieldsData), len(rr.FieldsData)) + return nil, fmt.Errorf("mismatch FieldData in RetrieveResults") } dstIds := ret.Ids.GetIntId() diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index dccc86442e..172819c677 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -40,12 +40,13 @@ import ( ) func genSimpleQueryCollection(ctx context.Context, cancel context.CancelFunc) (*queryCollection, error) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) if err != nil { return nil, err } - streaming, err := genSimpleStreaming(ctx) + streaming, err := genSimpleStreaming(ctx, tSafe) if err != nil { return nil, err } @@ -127,7 +128,10 @@ func TestQueryCollection_withoutVChannel(t *testing.T) { assert.Nil(t, err) schema := genTestCollectionSchema(0, false, 2) - historical := newHistorical(context.Background(), nil, nil, factory, etcdKV) + historicalReplica := newCollectionReplica(etcdKV) + tsReplica := newTSafeReplica() + streamingReplica := newCollectionReplica(etcdKV) + historical := newHistorical(context.Background(), historicalReplica, nil, nil, factory, etcdKV, tsReplica) //add a segment to historical data err = historical.replica.addCollection(0, schema) @@ -153,7 +157,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) { assert.Nil(t, err) //create a streaming - streaming := newStreaming(ctx, factory, etcdKV, historical.replica) + streaming := newStreaming(ctx, streamingReplica, factory, etcdKV, tsReplica) err = streaming.replica.addCollection(0, schema) assert.Nil(t, err) err = streaming.replica.addPartition(0, 1) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 39b2a3836b..0d6dd1d51e 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -79,6 +79,12 @@ type QueryNode struct { historical *historical streaming *streaming + // tSafeReplica + tSafeReplica TSafeReplicaInterface + + // dataSyncService + dataSyncService *dataSyncService + // internal services queryService *queryService @@ -179,13 +185,27 @@ func (node *QueryNode) Init() error { zap.Any("EtcdEndpoints", Params.EtcdEndpoints), zap.Any("MetaRootPath", Params.MetaRootPath), ) + node.tSafeReplica = newTSafeReplica() + + streamingReplica := newCollectionReplica(node.etcdKV) + historicalReplica := newCollectionReplica(node.etcdKV) node.historical = newHistorical(node.queryNodeLoopCtx, + historicalReplica, node.rootCoord, node.indexCoord, node.msFactory, - node.etcdKV) - node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV, node.historical.replica) + node.etcdKV, + node.tSafeReplica, + ) + node.streaming = newStreaming(node.queryNodeLoopCtx, + streamingReplica, + node.msFactory, + node.etcdKV, + node.tSafeReplica, + ) + + node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, streamingReplica, historicalReplica, node.tSafeReplica, node.msFactory) node.InitSegcore() @@ -240,6 +260,9 @@ func (node *QueryNode) Stop() error { node.queryNodeLoopCancel() // close services + if node.dataSyncService != nil { + node.dataSyncService.close() + } if node.historical != nil { node.historical.close() } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index b92889c82f..5b54182e9d 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -192,8 +192,12 @@ func newQueryNodeMock() *QueryNode { panic(err) } svr := NewQueryNode(ctx, msFactory) - svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, svr.msFactory, etcdKV) - svr.streaming = newStreaming(ctx, msFactory, etcdKV, svr.historical.replica) + tsReplica := newTSafeReplica() + streamingReplica := newCollectionReplica(etcdKV) + historicalReplica := newCollectionReplica(etcdKV) + svr.historical = newHistorical(svr.queryNodeLoopCtx, historicalReplica, nil, nil, svr.msFactory, etcdKV, tsReplica) + svr.streaming = newStreaming(ctx, streamingReplica, msFactory, etcdKV, tsReplica) + svr.dataSyncService = newDataSyncService(ctx, svr.streaming.replica, svr.historical.replica, tsReplica, msFactory) svr.etcdKV = etcdKV return svr diff --git a/internal/querynode/query_service_test.go b/internal/querynode/query_service_test.go index 2085713eae..666160ed88 100644 --- a/internal/querynode/query_service_test.go +++ b/internal/querynode/query_service_test.go @@ -216,10 +216,11 @@ func TestQueryService_addQueryCollection(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - his, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + his, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) - str, err := genSimpleStreaming(ctx) + str, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) fac, err := genFactory() diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index 8ab050fc12..a0810c3289 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -37,7 +37,8 @@ func TestSegmentLoader_loadSegment(t *testing.T) { assert.NoError(t, err) t.Run("test load segment", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) err = historical.replica.removeSegment(defaultSegmentID) @@ -68,7 +69,8 @@ func TestSegmentLoader_loadSegment(t *testing.T) { }) t.Run("test set segment error", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) err = historical.replica.removePartition(defaultPartitionID) @@ -104,7 +106,8 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) { defer cancel() runLoadSegmentFieldData := func(dataType schemapb.DataType) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) fieldUID := genConstantField(uidField) @@ -185,7 +188,8 @@ func TestSegmentLoader_invalid(t *testing.T) { defer cancel() t.Run("test no collection", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) err = historical.replica.removeCollection(defaultCollectionID) @@ -247,7 +251,8 @@ func TestSegmentLoader_invalid(t *testing.T) { //}) t.Run("test no vec field 2", func(t *testing.T) { - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) err = historical.replica.removeCollection(defaultCollectionID) @@ -291,7 +296,8 @@ func TestSegmentLoader_checkSegmentSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) err = historical.loader.checkSegmentSize(defaultSegmentID, map[UniqueID]int64{defaultSegmentID: 1024}) @@ -307,7 +313,8 @@ func TestSegmentLoader_estimateSegmentSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - historical, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) seg, err := historical.replica.getSegmentByID(defaultSegmentID) diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index aed29f2120..c67a2a7da5 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -886,7 +886,8 @@ func TestSegment_indexInfoTest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + h, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) seg, err := h.replica.getSegmentByID(defaultSegmentID) @@ -939,7 +940,8 @@ func TestSegment_indexInfoTest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h, err := genSimpleHistorical(ctx) + tSafe := newTSafeReplica() + h, err := genSimpleHistorical(ctx, tSafe) assert.NoError(t, err) seg, err := h.replica.getSegmentByID(defaultSegmentID) diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index e94aed6a3a..f1b259ff94 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -28,23 +28,17 @@ import ( type streaming struct { ctx context.Context - replica ReplicaInterface - historicalReplica ReplicaInterface - tSafeReplica TSafeReplicaInterface + replica ReplicaInterface + tSafeReplica TSafeReplicaInterface - dataSyncService *dataSyncService - msFactory msgstream.Factory + msFactory msgstream.Factory } -func newStreaming(ctx context.Context, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV, historicalReplica ReplicaInterface) *streaming { - replica := newCollectionReplica(etcdKV) - tReplica := newTSafeReplica() - newDS := newDataSyncService(ctx, replica, historicalReplica, tReplica, factory) +func newStreaming(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV, tSafeReplica TSafeReplicaInterface) *streaming { return &streaming{ - replica: replica, - tSafeReplica: tReplica, - dataSyncService: newDS, + replica: replica, + tSafeReplica: tSafeReplica, } } @@ -55,10 +49,6 @@ func (s *streaming) start() { func (s *streaming) close() { // TODO: stop stats - if s.dataSyncService != nil { - s.dataSyncService.close() - } - // free collectionReplica s.replica.freeAll() } diff --git a/internal/querynode/streaming_test.go b/internal/querynode/streaming_test.go index 1963366c61..b34a9e7a06 100644 --- a/internal/querynode/streaming_test.go +++ b/internal/querynode/streaming_test.go @@ -22,7 +22,8 @@ func TestStreaming_streaming(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() @@ -34,7 +35,8 @@ func TestStreaming_search(t *testing.T) { defer cancel() t.Run("test search", func(t *testing.T) { - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() @@ -52,7 +54,8 @@ func TestStreaming_search(t *testing.T) { }) t.Run("test run empty partition", func(t *testing.T) { - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() @@ -70,7 +73,8 @@ func TestStreaming_search(t *testing.T) { }) t.Run("test run empty partition and loadCollection", func(t *testing.T) { - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() @@ -95,7 +99,8 @@ func TestStreaming_search(t *testing.T) { }) t.Run("test run empty partition and loadPartition", func(t *testing.T) { - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() @@ -119,7 +124,8 @@ func TestStreaming_search(t *testing.T) { }) t.Run("test no partitions in collection", func(t *testing.T) { - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() @@ -140,7 +146,8 @@ func TestStreaming_search(t *testing.T) { }) t.Run("test search failed", func(t *testing.T) { - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() @@ -166,7 +173,8 @@ func TestStreaming_retrieve(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streaming, err := genSimpleStreaming(ctx) + tSafe := newTSafeReplica() + streaming, err := genSimpleStreaming(ctx, tSafe) assert.NoError(t, err) defer streaming.close() diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 5b4cb7ac5f..55b47b176f 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -220,15 +220,15 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { // create tSafe for _, channel := range vChannels { - w.node.streaming.tSafeReplica.addTSafe(channel) + w.node.tSafeReplica.addTSafe(channel) } // add flow graph if loadPartition { - w.node.streaming.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels) + w.node.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels) log.Debug("Query node add partition flow graphs", zap.Any("channels", vChannels)) } else { - w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, vChannels) + w.node.dataSyncService.addCollectionFlowGraph(collectionID, vChannels) log.Debug("Query node add collection flow graphs", zap.Any("channels", vChannels)) } @@ -247,12 +247,12 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { // channels as consumer var nodeFGs map[Channel]*queryNodeFlowGraph if loadPartition { - nodeFGs, err = w.node.streaming.dataSyncService.getPartitionFlowGraphs(partitionID, vChannels) + nodeFGs, err = w.node.dataSyncService.getPartitionFlowGraphs(partitionID, vChannels) if err != nil { return err } } else { - nodeFGs, err = w.node.streaming.dataSyncService.getCollectionFlowGraphs(collectionID, vChannels) + nodeFGs, err = w.node.dataSyncService.getCollectionFlowGraphs(collectionID, vChannels) if err != nil { return err } @@ -296,12 +296,12 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { // start flow graphs if loadPartition { - err = w.node.streaming.dataSyncService.startPartitionFlowGraph(partitionID, vChannels) + err = w.node.dataSyncService.startPartitionFlowGraph(partitionID, vChannels) if err != nil { return err } } else { - err = w.node.streaming.dataSyncService.startCollectionFlowGraph(collectionID, vChannels) + err = w.node.dataSyncService.startCollectionFlowGraph(collectionID, vChannels) if err != nil { return err } @@ -447,7 +447,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { ) // remove collection flow graph - r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID) + r.node.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID) // remove partition flow graphs which partitions belong to the target collection partitionIDs, err := r.node.streaming.replica.getPartitionIDs(r.req.CollectionID) @@ -456,7 +456,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { return err } for _, partitionID := range partitionIDs { - r.node.streaming.dataSyncService.removePartitionFlowGraph(partitionID) + r.node.dataSyncService.removePartitionFlowGraph(partitionID) } // remove all tSafes of the target collection @@ -466,7 +466,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { zap.Any("vChannel", channel), ) // no tSafe in tSafeReplica, don't return error - err = r.node.streaming.tSafeReplica.removeTSafe(channel) + err = r.node.tSafeReplica.removeTSafe(channel) if err != nil { log.Warn(err.Error()) } @@ -554,8 +554,8 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { // release partitions vChannels := sCol.getVChannels() for _, id := range r.req.PartitionIDs { - if _, err = r.node.streaming.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil { - r.node.streaming.dataSyncService.removePartitionFlowGraph(id) + if _, err = r.node.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil { + r.node.dataSyncService.removePartitionFlowGraph(id) // remove all tSafes of the target partition for _, channel := range vChannels { log.Debug("Releasing tSafe in releasePartitionTask...", @@ -564,7 +564,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { zap.Any("vChannel", channel), ) // no tSafe in tSafeReplica, don't return error - err = r.node.streaming.tSafeReplica.removeTSafe(channel) + err = r.node.tSafeReplica.removeTSafe(channel) if err != nil { log.Warn(err.Error()) } diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index 9662aecce6..8a5ec099ca 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -392,7 +392,7 @@ func TestTask_releasePartitionTask(t *testing.T) { req: genReleasePartitionsRequest(), node: node, } - task.node.streaming.dataSyncService.addPartitionFlowGraph(defaultCollectionID, + task.node.dataSyncService.addPartitionFlowGraph(defaultCollectionID, defaultPartitionID, []Channel{defaultVChannel}) err = task.Execute(ctx)