diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 9168abd909..dc226ea91b 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -38,9 +38,10 @@ type dataSyncService struct { collectionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[collectionID]flowGraphs partitionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[partitionID]flowGraphs - streamingReplica ReplicaInterface - tSafeReplica TSafeReplicaInterface - msFactory msgstream.Factory + streamingReplica ReplicaInterface + historicalReplica ReplicaInterface + tSafeReplica TSafeReplicaInterface + msFactory msgstream.Factory } // collection flow graph @@ -60,6 +61,7 @@ func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID, collectionID, partitionID, dsService.streamingReplica, + dsService.historicalReplica, dsService.tSafeReplica, vChannel, dsService.msFactory) @@ -133,6 +135,7 @@ func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, p collectionID, partitionID, dsService.streamingReplica, + dsService.historicalReplica, dsService.tSafeReplica, vChannel, dsService.msFactory) @@ -198,6 +201,7 @@ func (dsService *dataSyncService) removePartitionFlowGraph(partitionID UniqueID) // newDataSyncService returns a new dataSyncService func newDataSyncService(ctx context.Context, streamingReplica ReplicaInterface, + historicalReplica ReplicaInterface, tSafeReplica TSafeReplicaInterface, factory msgstream.Factory) *dataSyncService { @@ -206,6 +210,7 @@ func newDataSyncService(ctx context.Context, collectionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph), partitionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph), streamingReplica: streamingReplica, + historicalReplica: historicalReplica, tSafeReplica: tSafeReplica, msFactory: factory, } diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index a64ebf5457..f546ce4dd3 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -135,10 +135,13 @@ func TestDataSyncService_collectionFlowGraphs(t *testing.T) { streaming, err := genSimpleStreaming(ctx) assert.NoError(t, err) + historicalReplica, err := genSimpleReplica() + assert.NoError(t, err) + fac, err := genFactory() assert.NoError(t, err) - dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac) + dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac) assert.NotNil(t, dataSyncService) dataSyncService.addCollectionFlowGraph(defaultCollectionID, []Channel{defaultVChannel}) @@ -178,10 +181,13 @@ func TestDataSyncService_partitionFlowGraphs(t *testing.T) { streaming, err := genSimpleStreaming(ctx) assert.NoError(t, err) + historicalReplica, err := genSimpleReplica() + assert.NoError(t, err) + fac, err := genFactory() assert.NoError(t, err) - dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac) + dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac) assert.NotNil(t, dataSyncService) dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel}) @@ -222,10 +228,13 @@ func TestDataSyncService_removePartitionFlowGraphs(t *testing.T) { streaming, err := genSimpleStreaming(ctx) assert.NoError(t, err) + historicalReplica, err := genSimpleReplica() + assert.NoError(t, err) + fac, err := genFactory() assert.NoError(t, err) - dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac) + dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac) assert.NotNil(t, dataSyncService) dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel}) diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 46fa23ee30..3ae865b851 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -32,7 +32,8 @@ import ( type insertNode struct { baseNode - replica ReplicaInterface + streamingReplica ReplicaInterface + historicalReplica ReplicaInterface } type insertData struct { @@ -89,8 +90,8 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // 1. hash insertMessages to insertData for _, task := range iMsg.insertMessages { // check if partition exists, if not, create partition - if hasPartition := iNode.replica.hasPartition(task.PartitionID); !hasPartition { - err := iNode.replica.addPartition(task.CollectionID, task.PartitionID) + if hasPartition := iNode.streamingReplica.hasPartition(task.PartitionID); !hasPartition { + err := iNode.streamingReplica.addPartition(task.CollectionID, task.PartitionID) if err != nil { log.Warn(err.Error()) continue @@ -98,8 +99,8 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // check if segment exists, if not, create this segment - if !iNode.replica.hasSegment(task.SegmentID) { - err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, task.ShardName, segmentTypeGrowing, true) + if !iNode.streamingReplica.hasSegment(task.SegmentID) { + err := iNode.streamingReplica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, task.ShardName, segmentTypeGrowing, true) if err != nil { log.Warn(err.Error()) continue @@ -114,7 +115,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // 2. do preInsert for segmentID := range iData.insertRecords { - var targetSegment, err = iNode.replica.getSegmentByID(segmentID) + var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { log.Warn(err.Error()) } @@ -146,48 +147,11 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // 1. filter segment by bloom filter for _, delMsg := range iMsg.deleteMessages { - var partitionIDs []UniqueID - var err error - if delMsg.PartitionID != -1 { - partitionIDs = []UniqueID{delMsg.PartitionID} - } else { - partitionIDs, err = iNode.replica.getPartitionIDs(delMsg.CollectionID) - if err != nil { - log.Warn(err.Error()) - continue - } + if iNode.streamingReplica != nil { + processDeleteMessages(iNode.streamingReplica, delMsg, delData) } - resultSegmentIDs := make([]UniqueID, 0) - for _, partitionID := range partitionIDs { - segmentIDs, err := iNode.replica.getSegmentIDs(partitionID) - if err != nil { - log.Warn(err.Error()) - continue - } - resultSegmentIDs = append(resultSegmentIDs, segmentIDs...) - } - for _, segmentID := range resultSegmentIDs { - segment, err := iNode.replica.getSegmentByID(segmentID) - if err != nil { - log.Warn(err.Error()) - continue - } - pks, err := filterSegmentsByPKs(delMsg.PrimaryKeys, segment) - if err != nil { - log.Warn(err.Error()) - continue - } - if len(pks) > 0 { - offset := segment.segmentPreDelete(len(pks)) - if err != nil { - log.Warn(err.Error()) - continue - } - delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...) - // TODO(yukun) get offset of pks - delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], delMsg.Timestamps[:len(pks)]...) - delData.deleteOffset[segmentID] = offset - } + if iNode.historicalReplica != nil { + processDeleteMessages(iNode.historicalReplica, delMsg, delData) } } @@ -208,6 +172,52 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return []Msg{res} } +func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, delData *deleteData) { + var partitionIDs []UniqueID + var err error + if msg.PartitionID != -1 { + partitionIDs = []UniqueID{msg.PartitionID} + } else { + partitionIDs, err = replica.getPartitionIDs(msg.CollectionID) + if err != nil { + log.Warn(err.Error()) + return + } + } + resultSegmentIDs := make([]UniqueID, 0) + for _, partitionID := range partitionIDs { + segmentIDs, err := replica.getSegmentIDs(partitionID) + if err != nil { + log.Warn(err.Error()) + continue + } + resultSegmentIDs = append(resultSegmentIDs, segmentIDs...) + } + for _, segmentID := range resultSegmentIDs { + segment, err := replica.getSegmentByID(segmentID) + if err != nil { + log.Warn(err.Error()) + continue + } + pks, err := filterSegmentsByPKs(msg.PrimaryKeys, segment) + if err != nil { + log.Warn(err.Error()) + continue + } + if len(pks) > 0 { + offset := segment.segmentPreDelete(len(pks)) + if err != nil { + log.Warn(err.Error()) + continue + } + delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...) + // TODO(yukun) get offset of pks + delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], msg.Timestamps[:len(pks)]...) + delData.deleteOffset[segmentID] = offset + } + } +} + func filterSegmentsByPKs(pks []int64, segment *Segment) ([]int64, error) { if pks == nil { return nil, fmt.Errorf("pks is nil when getSegmentsByPKs") @@ -218,19 +228,19 @@ func filterSegmentsByPKs(pks []int64, segment *Segment) ([]int64, error) { buf := make([]byte, 8) res := make([]int64, 0) for _, pk := range pks { - binary.BigEndian.PutUint64(buf, uint64(pk)) + binary.LittleEndian.PutUint64(buf, uint64(pk)) exist := segment.pkFilter.Test(buf) if exist { res = append(res, pk) } } - log.Debug("In filterSegmentsByPKs", zap.Any("pk", res), zap.Any("segment", segment.segmentID)) + log.Debug("In filterSegmentsByPKs", zap.Any("pk len", len(res)), zap.Any("segment", segment.segmentID)) return res, nil } func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) { log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) - var targetSegment, err = iNode.replica.getSegmentByID(segmentID) + var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID) if err != nil { log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID)) // TODO: add error handling @@ -263,13 +273,9 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync. func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) { defer wg.Done() log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID)) - var targetSegment, err = iNode.replica.getSegmentByID(segmentID) - if err != nil { - log.Warn("Cannot find segment:", zap.Int64("segmentID", segmentID)) - return - } - - if targetSegment.segmentType != segmentTypeGrowing { + targetSegment := iNode.getSegmentInReplica(segmentID) + if targetSegment == nil { + log.Warn("targetSegment is nil") return } @@ -277,7 +283,7 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg * timestamps := deleteData.deleteTimestamps[segmentID] offset := deleteData.deleteOffset[segmentID] - err = targetSegment.segmentDelete(offset, &ids, ×tamps) + err := targetSegment.segmentDelete(offset, &ids, ×tamps) if err != nil { log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err)) return @@ -286,6 +292,40 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg * log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID)) } +func (iNode *insertNode) getSegmentInReplica(segmentID int64) *Segment { + streamingSegment, err := iNode.streamingReplica.getSegmentByID(segmentID) + if err != nil { + log.Warn("Cannot find segment in streaming replica:", zap.Int64("segmentID", segmentID)) + } else { + return streamingSegment + } + historicalSegment, err := iNode.historicalReplica.getSegmentByID(segmentID) + if err != nil { + log.Warn("Cannot find segment in historical replica:", zap.Int64("segmentID", segmentID)) + } else { + return historicalSegment + } + log.Warn("Cannot find segment in both streaming and historical replica:", zap.Int64("segmentID", segmentID)) + return nil +} + +func (iNode *insertNode) getCollectionInReplica(segmentID int64) *Collection { + streamingCollection, err := iNode.streamingReplica.getCollectionByID(segmentID) + if err != nil { + log.Warn("Cannot find collection in streaming replica:", zap.Int64("collectionID", segmentID)) + } else { + return streamingCollection + } + historicalCollection, err := iNode.historicalReplica.getCollectionByID(segmentID) + if err != nil { + log.Warn("Cannot find collection in historical replica:", zap.Int64("collectionID", segmentID)) + } else { + return historicalCollection + } + log.Warn("Cannot find collection in both streaming and historical replica:", zap.Int64("collectionID", segmentID)) + return nil +} + func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 { if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { log.Warn("misaligned messages detected") @@ -293,12 +333,11 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 { } collectionID := msg.GetCollectionID() - collection, err := iNode.replica.getCollectionByID(collectionID) - if err != nil { - log.Warn("collection cannot be found") + collection := iNode.getCollectionInReplica(collectionID) + if collection == nil { + log.Warn("collectio is nil") return nil } - offset := 0 for _, field := range collection.schema.Fields { if field.IsPrimaryKey { @@ -332,10 +371,9 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 { } } case schemapb.DataType_BinaryVector: - var dim int for _, t := range field.TypeParams { if t.Key == "dim" { - dim, err = strconv.Atoi(t.Value) + dim, err := strconv.Atoi(t.Value) if err != nil { log.Error("strconv wrong on get dim", zap.Error(err)) return nil @@ -354,7 +392,7 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 { pks := make([]int64, len(blobReaders)) for i, reader := range blobReaders { - err = binary.Read(reader, binary.LittleEndian, &pks[i]) + err := binary.Read(reader, binary.LittleEndian, &pks[i]) if err != nil { log.Warn("binary read blob value failed", zap.Error(err)) } @@ -362,7 +400,7 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 { return pks } -func newInsertNode(replica ReplicaInterface) *insertNode { +func newInsertNode(streamingReplica ReplicaInterface, historicalReplica ReplicaInterface) *insertNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -371,7 +409,8 @@ func newInsertNode(replica ReplicaInterface) *insertNode { baseNode.SetMaxParallelism(maxParallelism) return &insertNode{ - baseNode: baseNode, - replica: replica, + baseNode: baseNode, + streamingReplica: streamingReplica, + historicalReplica: historicalReplica, } } diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index 7e643cc854..2d81fbe0d6 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -69,11 +69,13 @@ func genFlowGraphDeleteData() (*deleteData, error) { func TestFlowGraphInsertNode_insert(t *testing.T) { t.Run("test insert", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -90,11 +92,13 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { }) t.Run("test segment insert error", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -112,20 +116,24 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { }) t.Run("test no target segment", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) wg := &sync.WaitGroup{} wg.Add(1) insertNode.insert(nil, defaultSegmentID, wg) }) t.Run("test invalid segmentType", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -141,11 +149,13 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { func TestFlowGraphInsertNode_delete(t *testing.T) { t.Run("test insert and delete", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -167,11 +177,13 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { }) t.Run("test only delete", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -187,11 +199,13 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { }) t.Run("test segment delete error", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -208,9 +222,11 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { }) t.Run("test no target segment", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) wg := &sync.WaitGroup{} wg.Add(1) insertNode.delete(nil, defaultSegmentID, wg) @@ -219,11 +235,13 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { func TestFlowGraphInsertNode_operate(t *testing.T) { t.Run("test operate", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -245,7 +263,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { } msg := []flowgraph.Msg{&iMsg} insertNode.Operate(msg) - s, err := replica.getSegmentByID(defaultSegmentID) + s, err := streaming.getSegmentByID(defaultSegmentID) assert.Nil(t, err) buf := make([]byte, 8) for i := 0; i < defaultMsgLength; i++ { @@ -256,11 +274,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { }) t.Run("test invalid partitionID", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -282,11 +302,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { }) t.Run("test collection partition not exist", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -308,11 +330,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { }) t.Run("test partition not exist", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -333,11 +357,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { }) t.Run("test invalid input length", func(t *testing.T) { - replica, err := genSimpleReplica() + streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(replica) + historical, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(streaming, historical) - err = replica.addSegment(defaultSegmentID, + err = streaming.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, defaultVChannel, @@ -375,7 +401,7 @@ func TestGetSegmentsByPKs(t *testing.T) { } pks, err := filterSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segment) assert.Nil(t, err) - assert.Equal(t, len(pks), 3) + assert.Equal(t, len(pks), 1) pks, err = filterSegmentsByPKs([]int64{}, segment) assert.Nil(t, err) diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 7b87261c04..48fcf599fa 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -39,6 +39,7 @@ func newQueryNodeFlowGraph(ctx context.Context, collectionID UniqueID, partitionID UniqueID, streamingReplica ReplicaInterface, + historicalReplica ReplicaInterface, tSafeReplica TSafeReplicaInterface, channel Channel, factory msgstream.Factory) *queryNodeFlowGraph { @@ -56,7 +57,7 @@ func newQueryNodeFlowGraph(ctx context.Context, var dmStreamNode node = q.newDmInputNode(ctx1, factory) var filterDmNode node = newFilteredDmNode(streamingReplica, loadType, collectionID, partitionID) - var insertNode node = newInsertNode(streamingReplica) + var insertNode node = newInsertNode(streamingReplica, historicalReplica) var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, loadType, collectionID, partitionID, channel, factory) q.flowGraph.AddNode(dmStreamNode) diff --git a/internal/querynode/flow_graph_query_node_test.go b/internal/querynode/flow_graph_query_node_test.go index 5ab9fb81db..e393da152e 100644 --- a/internal/querynode/flow_graph_query_node_test.go +++ b/internal/querynode/flow_graph_query_node_test.go @@ -27,6 +27,9 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) { streaming, err := genSimpleStreaming(ctx) assert.NoError(t, err) + historicalReplica, err := genSimpleReplica() + assert.NoError(t, err) + fac, err := genFactory() assert.NoError(t, err) @@ -35,6 +38,7 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) { defaultCollectionID, defaultPartitionID, streaming.replica, + historicalReplica, streaming.tSafeReplica, defaultVChannel, fac) @@ -50,6 +54,9 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) { streaming, err := genSimpleStreaming(ctx) assert.NoError(t, err) + historicalReplica, err := genSimpleReplica() + assert.NoError(t, err) + fac, err := genFactory() assert.NoError(t, err) @@ -58,6 +65,7 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) { defaultCollectionID, defaultPartitionID, streaming.replica, + historicalReplica, streaming.tSafeReplica, defaultVChannel, fac) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 2e637cdf65..be78ef0fd2 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -923,7 +923,11 @@ func genSimpleStreaming(ctx context.Context) (*streaming, error) { if err != nil { return nil, err } - s := newStreaming(ctx, fac, kv) + historicalReplica, err := genSimpleReplica() + if err != nil { + return nil, err + } + s := newStreaming(ctx, fac, kv, historicalReplica) r, err := genSimpleReplica() if err != nil { return nil, err diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index 09fbff61b8..e4bede5d23 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -97,6 +97,7 @@ func updateTSafe(queryCollection *queryCollection, timestamp Timestamp) { } func TestQueryCollection_withoutVChannel(t *testing.T) { + ctx := context.Background() m := map[string]interface{}{ "PulsarAddress": Params.PulsarAddress, "ReceiveBufSize": 1024, @@ -134,7 +135,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) { assert.Nil(t, err) //create a streaming - streaming := newStreaming(context.Background(), factory, etcdKV) + streaming := newStreaming(ctx, factory, etcdKV, historical.replica) err = streaming.replica.addCollection(0, schema) assert.Nil(t, err) err = streaming.replica.addPartition(0, 1) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index f7ab52d4ed..13cbefd1ec 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -180,7 +180,7 @@ func (node *QueryNode) Init() error { node.indexCoord, node.msFactory, node.etcdKV) - node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV) + node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV, node.historical.replica) node.InitSegcore() diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 0199ef9ffd..9a099b0cf6 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -193,7 +193,7 @@ func newQueryNodeMock() *QueryNode { } svr := NewQueryNode(ctx, msFactory) svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, svr.msFactory, etcdKV) - svr.streaming = newStreaming(ctx, msFactory, etcdKV) + svr.streaming = newStreaming(ctx, msFactory, etcdKV, svr.historical.replica) svr.etcdKV = etcdKV return svr diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index c81224b6d2..e94aed6a3a 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -28,17 +28,18 @@ import ( type streaming struct { ctx context.Context - replica ReplicaInterface - tSafeReplica TSafeReplicaInterface + replica ReplicaInterface + historicalReplica ReplicaInterface + tSafeReplica TSafeReplicaInterface dataSyncService *dataSyncService msFactory msgstream.Factory } -func newStreaming(ctx context.Context, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV) *streaming { +func newStreaming(ctx context.Context, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV, historicalReplica ReplicaInterface) *streaming { replica := newCollectionReplica(etcdKV) tReplica := newTSafeReplica() - newDS := newDataSyncService(ctx, replica, tReplica, factory) + newDS := newDataSyncService(ctx, replica, historicalReplica, tReplica, factory) return &streaming{ replica: replica, diff --git a/tests/python_client/testcases/test_delete_20.py b/tests/python_client/testcases/test_delete_20.py index 54ac14af1d..1575cbea65 100644 --- a/tests/python_client/testcases/test_delete_20.py +++ b/tests/python_client/testcases/test_delete_20.py @@ -24,7 +24,6 @@ class TestDeleteParams(TestcaseBase): Only the `in` operator is supported in the expr """ - @pytest.mark.xfail(reason="Issues #10431") @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize('is_binary', [False, True]) def test_delete_entities(self, is_binary):