Simplify chunk manager in QueryShard (#16976)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-05-13 19:55:54 +08:00 committed by GitHub
parent a382133a8a
commit ec66ac69ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 30 deletions

View File

@ -62,8 +62,6 @@ type queryShard struct {
startTickerOnce sync.Once
ticker *time.Ticker // timed ticker for trigger timeout check
localChunkManager storage.ChunkManager
remoteChunkManager storage.ChunkManager
vectorChunkManager *storage.VectorChunkManager
localCacheEnabled bool
localCacheSize int64
@ -80,7 +78,27 @@ func newQueryShard(
localChunkManager storage.ChunkManager,
remoteChunkManager storage.ChunkManager,
localCacheEnabled bool,
) *queryShard {
) (*queryShard, error) {
collection, err := streaming.replica.getCollectionByID(collectionID)
if err != nil {
return nil, err
}
if localChunkManager == nil {
return nil, fmt.Errorf("can not create vector chunk manager for local chunk manager is nil")
}
if remoteChunkManager == nil {
return nil, fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil")
}
vectorChunkManager, err := storage.NewVectorChunkManager(localChunkManager, remoteChunkManager,
&etcdpb.CollectionMeta{
ID: collectionID,
Schema: collection.schema,
}, Params.QueryNodeCfg.CacheMemoryLimit, localCacheEnabled)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
qs := &queryShard{
ctx: ctx,
@ -91,10 +109,7 @@ func newQueryShard(
clusterService: clusterService,
historical: historical,
streaming: streaming,
localChunkManager: localChunkManager,
remoteChunkManager: remoteChunkManager,
localCacheEnabled: localCacheEnabled,
localCacheSize: Params.QueryNodeCfg.CacheMemoryLimit,
vectorChunkManager: vectorChunkManager,
watcherCond: sync.NewCond(&sync.Mutex{}),
}
@ -104,7 +119,7 @@ func newQueryShard(
}
qs.deltaChannel = deltaChannel
return qs
return qs, nil
}
// Close cleans query shard
@ -722,24 +737,6 @@ func (q *queryShard) query(ctx context.Context, req *querypb.QueryRequest) (*int
}
defer plan.delete()
// TODO: init vector chunk manager at most once
if q.vectorChunkManager == nil {
if q.localChunkManager == nil {
return nil, fmt.Errorf("can not create vector chunk manager for local chunk manager is nil")
}
if q.remoteChunkManager == nil {
return nil, fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil")
}
q.vectorChunkManager, err = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager,
&etcdpb.CollectionMeta{
ID: collection.id,
Schema: collection.schema,
}, q.localCacheSize, q.localCacheEnabled)
if err != nil {
return nil, err
}
}
if req.IsShardLeader {
cluster, ok := q.clusterService.getShardCluster(req.GetDmlChannel())
if !ok {

View File

@ -81,7 +81,7 @@ func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel
if _, ok := q.queryShards[channel]; ok {
return errors.New(fmt.Sprintln("query shard(channel) ", channel, " already exists"))
}
qs := newQueryShard(
qs, err := newQueryShard(
q.ctx,
collectionID,
channel,
@ -93,6 +93,9 @@ func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel
q.remoteChunkManager,
q.localCacheEnabled,
)
if err != nil {
return err
}
q.queryShards[channel] = qs
log.Info("Successfully add query shard", zap.Int64("collection", collectionID), zap.Int64("replica", replicaID), zap.String("channel", channel))
return nil
@ -155,14 +158,15 @@ func (q *queryShardService) releaseCollection(collectionID int64) {
qc, ok := q.queryChannels[collectionID]
if ok && qc != nil {
qc.Stop()
delete(q.queryChannels, collectionID)
}
q.queryChannelMu.Unlock()
q.queryShardsMu.Lock()
for _, queryShard := range q.queryShards {
for channel, queryShard := range q.queryShards {
if queryShard.collectionID == collectionID {
queryShard.Close()
delete(q.queryShards, queryShard.channel)
delete(q.queryShards, channel)
}
}
q.queryShardsMu.Unlock()

View File

@ -45,3 +45,26 @@ func TestQueryShardService(t *testing.T) {
err = qss.removeQueryShard("vchan2")
assert.Error(t, err)
}
func TestQueryShardService_InvalidChunkManager(t *testing.T) {
qn, err := genSimpleQueryNode(context.Background())
require.NoError(t, err)
qss := newQueryShardService(context.Background(), qn.historical, qn.streaming, qn.ShardClusterService, qn.factory)
lcm := qss.localChunkManager
qss.localChunkManager = nil
err = qss.addQueryShard(0, "vchan", 0)
assert.Error(t, err)
qss.localChunkManager = lcm
rcm := qss.remoteChunkManager
qss.remoteChunkManager = nil
err = qss.addQueryShard(0, "vchan", 0)
assert.Error(t, err)
qss.remoteChunkManager = rcm
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -58,8 +59,11 @@ func genSimpleQueryShard(ctx context.Context) (*queryShard, error) {
}
shardClusterService.clusters.Store(defaultDMLChannel, shardCluster)
qs := newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService,
qs, err := newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService,
historical, streaming, localCM, remoteCM, false)
if err != nil {
return nil, err
}
qs.deltaChannel = defaultDeltaChannel
err = qs.watchDMLTSafe()
@ -82,6 +86,41 @@ func updateQueryShardTSafe(qs *queryShard, timestamp Timestamp) error {
return qs.historical.tSafeReplica.setTSafe(defaultDeltaChannel, timestamp)
}
func TestNewQueryShard_IllegalCases(t *testing.T) {
ctx := context.Background()
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
require.NoError(t, err)
streaming, err := genSimpleStreaming(ctx, tSafe)
require.NoError(t, err)
localCM, err := genLocalChunkManager()
require.NoError(t, err)
remoteCM, err := genRemoteChunkManager(ctx)
require.NoError(t, err)
shardCluster := NewShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel,
&mockNodeDetector{}, &mockSegmentDetector{}, buildMockQueryNode)
shardClusterService := &ShardClusterService{
clusters: sync.Map{},
}
shardClusterService.clusters.Store(defaultDMLChannel, shardCluster)
_, err = newQueryShard(ctx, defaultCollectionID-1, defaultDMLChannel, defaultReplicaID, shardClusterService,
historical, streaming, localCM, remoteCM, false)
assert.Error(t, err)
_, err = newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService,
historical, streaming, nil, remoteCM, false)
assert.Error(t, err)
_, err = newQueryShard(ctx, defaultCollectionID, defaultDMLChannel, defaultReplicaID, shardClusterService,
historical, streaming, localCM, nil, false)
assert.Error(t, err)
}
func TestQueryShard_Close(t *testing.T) {
qs, err := genSimpleQueryShard(context.Background())
assert.NoError(t, err)