diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 076207a739..85e520c65b 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -216,7 +216,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { return err } log.Ctx(ctx).Debug("Get partitions in collection.", zap.Any("collectionName", collectionName), - zap.Int64("msgID", t.ID()), zap.Any("requestType", "query")) + zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"), zap.Int64s("partitionIDs", t.RetrieveRequest.GetPartitionIDs())) queryParams, err := parseQueryParams(t.request.GetQueryParams()) if err != nil { diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 47e37e6a27..2dad30a40d 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -1100,7 +1100,9 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i zap.Strings("vChannels", req.GetDmlChannels()), zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Uint64("guaranteeTimestamp", req.Req.GetGuaranteeTimestamp()), - zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) + zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()), + zap.Int64s("partitionIDs", req.GetReq().GetPartitionIDs()), + ) if req.GetReq().GetBase().GetTargetID() != node.session.ServerID { return &internalpb.RetrieveResults{ diff --git a/internal/querynode/meta_replica.go b/internal/querynode/meta_replica.go index 298b3b1754..ffecb95cfe 100644 --- a/internal/querynode/meta_replica.go +++ b/internal/querynode/meta_replica.go @@ -619,6 +619,7 @@ func (replica *metaReplica) addSegmentPrivate(segment *Segment) error { zap.String("segment type", segType.String()), zap.Int64("row count", rowCount), zap.Uint64("segment indexed fields", segment.indexedFieldInfos.Len()), + zap.String("vchannel", segment.vChannelID), ) metrics.QueryNodeNumSegments.WithLabelValues( fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 3ca4f4c8f6..0864ffcef2 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -187,7 +187,9 @@ func newSegment(collection *Collection, zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), - zap.String("segmentType", segType.String())) + zap.String("segmentType", segType.String()), + zap.String("vchannel", vChannelID), + ) var segment = &Segment{ segmentPtr: segmentPtr, diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index afd1772b68..23f3bb5559 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -736,6 +736,9 @@ func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest, // dispatch request to followers for nodeID, segments := range segAllocs { + if len(segments) == 0 { + continue + } internalReq := typeutil.Clone(req.GetReq()) internalReq.GetBase().TargetID = nodeID nodeReq := &querypb.SearchRequest{ @@ -825,6 +828,9 @@ func (sc *ShardCluster) Query(ctx context.Context, req *querypb.QueryRequest, wi // dispatch request to followers for nodeID, segments := range segAllocs { + if len(segments) == 0 { + continue + } internalReq := typeutil.Clone(req.GetReq()) internalReq.GetBase().TargetID = nodeID nodeReq := &querypb.QueryRequest{ diff --git a/internal/querynode/shard_cluster_test.go b/internal/querynode/shard_cluster_test.go index 9fecd55567..04513043f0 100644 --- a/internal/querynode/shard_cluster_test.go +++ b/internal/querynode/shard_cluster_test.go @@ -1351,6 +1351,58 @@ func TestShardCluster_Search(t *testing.T) { }, streamingDoNothing) assert.Error(t, err) }) + + t.Run("search empty segments", func(t *testing.T) { + nodeEvents := []nodeEvent{ + { + nodeID: 1, + nodeAddr: "addr_1", + }, + { + nodeID: 2, + nodeAddr: "addr_2", + }, + } + + segmentEvents := []segmentEvent{ + { + segmentID: 1, + nodeIDs: []int64{1}, + partitionID: 1, + state: segmentStateLoaded, + }, + { + segmentID: 2, + nodeIDs: []int64{2}, + partitionID: 2, + state: segmentStateLoaded, + }, + } + + sc := NewShardCluster(collectionID, replicaID, vchannelName, version, + &mockNodeDetector{ + initNodes: nodeEvents, + }, &mockSegmentDetector{ + initSegments: segmentEvents, + }, buildMockQueryNode) + + defer sc.Close() + // setup first version + sc.SetupFirstVersion() + setupSegmentForShardCluster(sc, segmentEvents) + + require.True(t, sc.serviceable()) + + result, err := sc.Search(ctx, &querypb.SearchRequest{ + Req: &internalpb.SearchRequest{ + Base: &commonpb.MsgBase{}, + PartitionIDs: []int64{1}, + }, + DmlChannels: []string{vchannelName}, + }, streamingDoNothing) + assert.NoError(t, err) + assert.Equal(t, 1, len(result)) + }) } func TestShardCluster_Query(t *testing.T) { @@ -1650,6 +1702,55 @@ func TestShardCluster_Query(t *testing.T) { assert.Error(t, err) }) + t.Run("query empty segments", func(t *testing.T) { + nodeEvents := []nodeEvent{ + { + nodeID: 1, + nodeAddr: "addr_1", + }, + { + nodeID: 2, + nodeAddr: "addr_2", + }, + } + segmentEvents := []segmentEvent{ + { + segmentID: 1, + nodeIDs: []int64{1}, + partitionID: 1, + state: segmentStateLoaded, + }, + { + segmentID: 2, + partitionID: 2, + nodeIDs: []int64{2}, + state: segmentStateLoaded, + }, + } + + sc := NewShardCluster(collectionID, replicaID, vchannelName, version, + &mockNodeDetector{ + initNodes: nodeEvents, + }, &mockSegmentDetector{ + initSegments: segmentEvents, + }, buildMockQueryNode) + + defer sc.Close() + // setup first version + sc.SetupFirstVersion() + setupSegmentForShardCluster(sc, segmentEvents) + + require.EqualValues(t, available, sc.state.Load()) + result, err := sc.Query(ctx, &querypb.QueryRequest{ + Req: &internalpb.RetrieveRequest{ + Base: &commonpb.MsgBase{}, + PartitionIDs: []int64{1}, + }, + DmlChannels: []string{vchannelName}, + }, streamingDoNothing) + assert.NoError(t, err) + assert.Equal(t, 1, len(result)) + }) } func TestShardCluster_GetStatistics(t *testing.T) {