Fix collection nil when add query collection (#7311)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-08-27 17:51:56 +08:00 committed by GitHub
parent 70ea73fab5
commit 2dc2cb1a28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 23 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
@ -65,7 +66,10 @@ type queryCollection struct {
queryMsgStream msgstream.MsgStream queryMsgStream msgstream.MsgStream
queryResultMsgStream msgstream.MsgStream queryResultMsgStream msgstream.MsgStream
vcm storage.ChunkManager localChunkManager storage.ChunkManager
remoteChunkManager storage.ChunkManager
vectorChunkManager storage.ChunkManager
localCacheEnabled bool
} }
type ResultEntityIds []UniqueID type ResultEntityIds []UniqueID
@ -76,7 +80,9 @@ func newQueryCollection(releaseCtx context.Context,
historical *historical, historical *historical,
streaming *streaming, streaming *streaming,
factory msgstream.Factory, factory msgstream.Factory,
vcm storage.ChunkManager, localChunkManager storage.ChunkManager,
remoteChunkManager storage.ChunkManager,
localCacheEnabled bool,
) *queryCollection { ) *queryCollection {
unsolvedMsg := make([]queryMsg, 0) unsolvedMsg := make([]queryMsg, 0)
@ -102,7 +108,9 @@ func newQueryCollection(releaseCtx context.Context,
queryMsgStream: queryStream, queryMsgStream: queryStream,
queryResultMsgStream: queryResultStream, queryResultMsgStream: queryResultStream,
vcm: vcm, localChunkManager: localChunkManager,
remoteChunkManager: remoteChunkManager,
localCacheEnabled: localCacheEnabled,
} }
qc.register() qc.register()
@ -1059,8 +1067,21 @@ func (q *queryCollection) retrieve(msg queryMsg) error {
var mergeList []*segcorepb.RetrieveResults var mergeList []*segcorepb.RetrieveResults
if q.vectorChunkManager == nil {
if q.localChunkManager == nil {
return fmt.Errorf("can not create vector chunk manager for local chunk manager is nil")
}
if q.remoteChunkManager == nil {
return fmt.Errorf("can not create vector chunk manager for remote chunk manager is nil")
}
q.vectorChunkManager = storage.NewVectorChunkManager(q.localChunkManager, q.remoteChunkManager,
&etcdpb.CollectionMeta{
ID: collection.id,
Schema: collection.schema,
}, q.localCacheEnabled)
}
// historical retrieve // historical retrieve
hisRetrieveResults, sealedSegmentRetrieved, err1 := q.historical.retrieve(collectionID, retrieveMsg.PartitionIDs, q.vcm, plan) hisRetrieveResults, sealedSegmentRetrieved, err1 := q.historical.retrieve(collectionID, retrieveMsg.PartitionIDs, q.vectorChunkManager, plan)
if err1 != nil { if err1 != nil {
log.Warn(err1.Error()) log.Warn(err1.Error())
return err1 return err1

View File

@ -62,7 +62,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
queryCollection := newQueryCollection(ctx, cancel, 0, historical, streaming, factory, nil) queryCollection := newQueryCollection(ctx, cancel, 0, historical, streaming, factory, nil, nil, false)
producerChannels := []string{"testResultChannel"} producerChannels := []string{"testResultChannel"}
queryCollection.queryResultMsgStream.AsProducer(producerChannels) queryCollection.queryResultMsgStream.AsProducer(producerChannels)

View File

@ -21,7 +21,6 @@ import (
miniokv "github.com/milvus-io/milvus/internal/kv/minio" miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
) )
@ -36,9 +35,9 @@ type queryService struct {
factory msgstream.Factory factory msgstream.Factory
lcm storage.ChunkManager localChunkManager storage.ChunkManager
rcm storage.ChunkManager remoteChunkManager storage.ChunkManager
localCacheEnabled bool localCacheEnabled bool
} }
func newQueryService(ctx context.Context, func newQueryService(ctx context.Context,
@ -56,7 +55,7 @@ func newQueryService(ctx context.Context,
enabled, _ := Params.Load("localStorage.enabled") enabled, _ := Params.Load("localStorage.enabled")
localCacheEnabled, _ := strconv.ParseBool(enabled) localCacheEnabled, _ := strconv.ParseBool(enabled)
lcm := storage.NewLocalChunkManager(path) localChunkManager := storage.NewLocalChunkManager(path)
option := &miniokv.Option{ option := &miniokv.Option{
Address: Params.MinioEndPoint, Address: Params.MinioEndPoint,
@ -71,7 +70,7 @@ func newQueryService(ctx context.Context,
if err != nil { if err != nil {
panic(err) panic(err)
} }
rcm := storage.NewMinioChunkManager(client) remoteChunkManager := storage.NewMinioChunkManager(client)
return &queryService{ return &queryService{
ctx: queryServiceCtx, ctx: queryServiceCtx,
@ -84,9 +83,9 @@ func newQueryService(ctx context.Context,
factory: factory, factory: factory,
lcm: lcm, localChunkManager: localChunkManager,
rcm: rcm, remoteChunkManager: remoteChunkManager,
localCacheEnabled: localCacheEnabled, localCacheEnabled: localCacheEnabled,
} }
} }
@ -104,14 +103,6 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) {
log.Warn("query collection already exists", zap.Any("collectionID", collectionID)) log.Warn("query collection already exists", zap.Any("collectionID", collectionID))
return return
} }
collection, _ := q.historical.replica.getCollectionByID(collectionID)
vcm := storage.NewVectorChunkManager(q.lcm, q.rcm,
&etcdpb.CollectionMeta{
ID: collection.id,
Schema: collection.schema,
}, q.localCacheEnabled)
ctx1, cancel := context.WithCancel(q.ctx) ctx1, cancel := context.WithCancel(q.ctx)
qc := newQueryCollection(ctx1, qc := newQueryCollection(ctx1,
cancel, cancel,
@ -119,7 +110,9 @@ func (q *queryService) addQueryCollection(collectionID UniqueID) {
q.historical, q.historical,
q.streaming, q.streaming,
q.factory, q.factory,
vcm, q.localChunkManager,
q.remoteChunkManager,
q.localCacheEnabled,
) )
q.queryCollections[collectionID] = qc q.queryCollections[collectionID] = qc
} }