From ec66ac69ac7c3765cbf2f07f0ca18cafaa29b695 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 13 May 2022 19:55:54 +0800 Subject: [PATCH] Simplify chunk manager in QueryShard (#16976) Signed-off-by: Congqi Xia --- internal/querynode/query_shard.go | 49 +++++++++---------- internal/querynode/query_shard_service.go | 10 ++-- .../querynode/query_shard_service_test.go | 23 +++++++++ internal/querynode/query_shard_test.go | 41 +++++++++++++++- 4 files changed, 93 insertions(+), 30 deletions(-) diff --git a/internal/querynode/query_shard.go b/internal/querynode/query_shard.go index dcfd204bb7..ca6cec128f 100644 --- a/internal/querynode/query_shard.go +++ b/internal/querynode/query_shard.go @@ -62,8 +62,6 @@ type queryShard struct { startTickerOnce sync.Once ticker *time.Ticker // timed ticker for trigger timeout check - localChunkManager storage.ChunkManager - remoteChunkManager storage.ChunkManager vectorChunkManager *storage.VectorChunkManager localCacheEnabled bool localCacheSize int64 @@ -80,7 +78,27 @@ func newQueryShard( localChunkManager storage.ChunkManager, remoteChunkManager storage.ChunkManager, localCacheEnabled bool, -) *queryShard { +) (*queryShard, error) { + + collection, err := streaming.replica.getCollectionByID(collectionID) + if err != nil { + return nil, err + } + if localChunkManager == nil { + return nil, fmt.Errorf("can not create vector chunk manager for local chunk manager is nil") + } + if remoteChunkManager == nil { + return nil, fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil") + } + vectorChunkManager, err := storage.NewVectorChunkManager(localChunkManager, remoteChunkManager, + &etcdpb.CollectionMeta{ + ID: collectionID, + Schema: collection.schema, + }, Params.QueryNodeCfg.CacheMemoryLimit, localCacheEnabled) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(ctx) qs := &queryShard{ ctx: ctx, @@ -91,10 +109,7 @@ func newQueryShard( clusterService: clusterService, historical: historical, streaming: streaming, - localChunkManager: localChunkManager, - remoteChunkManager: remoteChunkManager, - localCacheEnabled: localCacheEnabled, - localCacheSize: Params.QueryNodeCfg.CacheMemoryLimit, + vectorChunkManager: vectorChunkManager, watcherCond: sync.NewCond(&sync.Mutex{}), } @@ -104,7 +119,7 @@ func newQueryShard( } qs.deltaChannel = deltaChannel - return qs + return qs, nil } // Close cleans query shard @@ -722,24 +737,6 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int } defer plan.delete() - // TODO: init vector chunk manager at most once - if q.vectorChunkManager == nil { - if q.localChunkManager == nil { - return nil, fmt.Errorf("can not create vector chunk manager for local chunk manager is nil") - } - if q.remoteChunkManager == nil { - return nil, fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil") - } - q.vectorChunkManager, err = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager, - &etcdpb.CollectionMeta{ - ID: collection.id, - Schema: collection.schema, - }, q.localCacheSize, q.localCacheEnabled) - if err != nil { - return nil, err - } - } - if req.IsShardLeader { cluster, ok := q.clusterService.getShardCluster(req.GetDmlChannel()) if !ok { diff --git a/internal/querynode/query_shard_service.go b/internal/querynode/query_shard_service.go index 0037b27527..b497809c5f 100644 --- a/internal/querynode/query_shard_service.go +++ b/internal/querynode/query_shard_service.go @@ -81,7 +81,7 @@ func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel if _, ok := q.queryShards[channel]; ok { return errors.New(fmt.Sprintln("query shard(channel) ", channel, " already exists")) } - qs := newQueryShard( + qs, err := newQueryShard( q.ctx, collectionID, channel, @@ -93,6 +93,9 @@ func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel q.remoteChunkManager, q.localCacheEnabled, ) + if err != nil { + return err + } q.queryShards[channel] = qs log.Info("Successfully add query shard", zap.Int64("collection", collectionID), zap.Int64("replica", replicaID), zap.String("channel", channel)) return nil @@ -155,14 +158,15 @@ func (q *queryShardService) releaseCollection(collectionID int64) { qc, ok := q.queryChannels[collectionID] if ok && qc != nil { qc.Stop() + delete(q.queryChannels, collectionID) } q.queryChannelMu.Unlock() q.queryShardsMu.Lock() - for _, queryShard := range q.queryShards { + for channel, queryShard := range q.queryShards { if queryShard.collectionID == collectionID { queryShard.Close() - delete(q.queryShards, queryShard.channel) + delete(q.queryShards, channel) } } q.queryShardsMu.Unlock() diff --git a/internal/querynode/query_shard_service_test.go b/internal/querynode/query_shard_service_test.go index 76ecc3bc42..ca2f44c0d6 100644 --- a/internal/querynode/query_shard_service_test.go +++ b/internal/querynode/query_shard_service_test.go @@ -45,3 +45,26 @@ func TestQueryShardService(t *testing.T) { err = qss.removeQueryShard("vchan2") assert.Error(t, err) } + +func TestQueryShardService_InvalidChunkManager(t *testing.T) { + qn, err := genSimpleQueryNode(context.Background()) + require.NoError(t, err) + + qss := newQueryShardService(context.Background(), qn.historical, qn.streaming, qn.ShardClusterService, qn.factory) + + lcm := qss.localChunkManager + qss.localChunkManager = nil + + err = qss.addQueryShard(0, "vchan", 0) + assert.Error(t, err) + + qss.localChunkManager = lcm + + rcm := qss.remoteChunkManager + qss.remoteChunkManager = nil + + err = qss.addQueryShard(0, "vchan", 0) + assert.Error(t, err) + + qss.remoteChunkManager = rcm +} diff --git a/internal/querynode/query_shard_test.go b/internal/querynode/query_shard_test.go index 6df441f3d4..b49e818515 100644 --- a/internal/querynode/query_shard_test.go +++ b/internal/querynode/query_shard_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -58,8 +59,11 @@ func genSimpleQueryShard(ctx context.Context) (*queryShard, error) { } shardClusterService.clusters.Store(defaultDMLChannel, shardCluster) - qs := newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService, + qs, err := newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService, historical, streaming, localCM, remoteCM, false) + if err != nil { + return nil, err + } qs.deltaChannel = defaultDeltaChannel err = qs.watchDMLTSafe() @@ -82,6 +86,41 @@ func updateQueryShardTSafe(qs *queryShard, timestamp Timestamp) error { return qs.historical.tSafeReplica.setTSafe(defaultDeltaChannel, timestamp) } +func TestNewQueryShard_IllegalCases(t *testing.T) { + ctx := context.Background() + tSafe := newTSafeReplica() + historical, err := genSimpleHistorical(ctx, tSafe) + require.NoError(t, err) + + streaming, err := genSimpleStreaming(ctx, tSafe) + require.NoError(t, err) + + localCM, err := genLocalChunkManager() + require.NoError(t, err) + + remoteCM, err := genRemoteChunkManager(ctx) + require.NoError(t, err) + + shardCluster := NewShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel, + &mockNodeDetector{}, &mockSegmentDetector{}, buildMockQueryNode) + shardClusterService := &ShardClusterService{ + clusters: sync.Map{}, + } + shardClusterService.clusters.Store(defaultDMLChannel, shardCluster) + + _, err = newQueryShard(ctx, defaultCollectionID-1, defaultDMLChannel, defaultReplicaID, shardClusterService, + historical, streaming, localCM, remoteCM, false) + assert.Error(t, err) + + _, err = newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService, + historical, streaming, nil, remoteCM, false) + assert.Error(t, err) + + _, err = newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService, + historical, streaming, localCM, nil, false) + assert.Error(t, err) +} + func TestQueryShard_Close(t *testing.T) { qs, err := genSimpleQueryShard(context.Background()) assert.NoError(t, err)