diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index ed877cd452..877136a72b 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -121,38 +121,129 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu } func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionName) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, nil + dct := &DropCollectionTask{ + DropCollectionRequest: internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + // TODO: req_id, timestamp, proxy_id + CollectionName: req, + }, + masterClient: p.masterClient, + done: make(chan error), + resultChan: make(chan *commonpb.Status), + } + dct.ctx, dct.cancel = context.WithCancel(ctx) + defer dct.cancel() + + var t task = dct + p.taskSch.DdQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("create collection timeout!") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "create collection timeout!", + }, errors.New("create collection timeout!") + case result := <-dct.resultChan: + return result, nil + } + } } func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName) (*servicepb.BoolResponse, error) { - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", + hct := &HasCollectionTask{ + HasCollectionRequest: internalpb.HasCollectionRequest{ + MsgType: internalpb.MsgType_kHasCollection, + // TODO: req_id, timestamp, proxy_id + CollectionName: req, }, - Value: true, - }, nil + masterClient: p.masterClient, + done: make(chan error), + resultChan: make(chan *servicepb.BoolResponse), + } + hct.ctx, hct.cancel = context.WithCancel(ctx) + defer hct.cancel() + + var t task = hct + p.taskSch.DqQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("has collection timeout!") + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "has collection timeout!", + }, + Value: false, + }, errors.New("has collection timeout!") + case result := <-hct.resultChan: + return result, nil + } + } } func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.CollectionName) (*servicepb.CollectionDescription, error) { - return &servicepb.CollectionDescription{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", + dct := &DescribeCollectionTask{ + DescribeCollectionRequest: internalpb.DescribeCollectionRequest{ + MsgType: internalpb.MsgType_kDescribeCollection, + // TODO: req_id, timestamp, proxy_id + CollectionName: req, }, - }, nil + masterClient: p.masterClient, + done: make(chan error), + resultChan: make(chan *servicepb.CollectionDescription), + } + dct.ctx, dct.cancel = context.WithCancel(ctx) + defer dct.cancel() + + var t task = dct + p.taskSch.DqQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("has collection timeout!") + return &servicepb.CollectionDescription{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "describe collection timeout!", + }, + }, errors.New("describe collection timeout!") + case result := <-dct.resultChan: + return result, nil + } + } } func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*servicepb.StringListResponse, error) { - return &servicepb.StringListResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", + sct := &ShowCollectionsTask{ + ShowCollectionRequest: internalpb.ShowCollectionRequest{ + MsgType: internalpb.MsgType_kDescribeCollection, + // TODO: req_id, timestamp, proxy_id }, - }, nil + masterClient: p.masterClient, + done: make(chan error), + resultChan: make(chan *servicepb.StringListResponse), + } + sct.ctx, sct.cancel = context.WithCancel(ctx) + defer sct.cancel() + + var t task = sct + p.taskSch.DqQueue.Enqueue(&t) + for { + select { + case <-ctx.Done(): + log.Print("show collections timeout!") + return &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "show collections timeout!", + }, + }, errors.New("show collections timeout!") + case result := <-sct.resultChan: + return result, nil + } + } } func (p *Proxy) CreatePartition(ctx context.Context, in *servicepb.PartitionName) (*commonpb.Status, error) { diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 40df3a0627..8c6339abcd 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -162,6 +162,74 @@ func (cct *CreateCollectionTask) Notify(err error) { cct.done <- err } +type DropCollectionTask struct { + internalpb.DropCollectionRequest + masterClient masterpb.MasterClient + done chan error + resultChan chan *commonpb.Status + ctx context.Context + cancel context.CancelFunc +} + +func (dct *DropCollectionTask) Id() UniqueID { + return dct.ReqId +} + +func (dct *DropCollectionTask) Type() internalpb.MsgType { + return dct.MsgType +} + +func (dct *DropCollectionTask) BeginTs() Timestamp { + return dct.Timestamp +} + +func (dct *DropCollectionTask) EndTs() Timestamp { + return dct.Timestamp +} + +func (dct *DropCollectionTask) SetTs(ts Timestamp) { + dct.Timestamp = ts +} + +func (dct *DropCollectionTask) PreExecute() error { + return nil +} + +func (dct *DropCollectionTask) Execute() error { + resp, err := dct.masterClient.DropCollection(dct.ctx, &dct.DropCollectionRequest) + if err != nil { + log.Printf("drop collection failed, error= %v", err) + dct.resultChan <- &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + } else { + dct.resultChan <- resp + } + return err +} + +func (dct *DropCollectionTask) PostExecute() error { + return nil +} + +func (dct *DropCollectionTask) WaitToFinish() error { + defer dct.cancel() + for { + select { + case err := <-dct.done: + return err + case <-dct.ctx.Done(): + log.Print("wait to finish failed, timeout!") + return errors.New("wait to finish failed, timeout!") + } + } +} + +func (dct *DropCollectionTask) Notify(err error) { + dct.done <- err +} + type QueryTask struct { internalpb.SearchRequest queryMsgStream *msgstream.PulsarMsgStream @@ -290,3 +358,214 @@ func (qt *QueryTask) Notify(err error) { } } } + +type HasCollectionTask struct { + internalpb.HasCollectionRequest + masterClient masterpb.MasterClient + done chan error + resultChan chan *servicepb.BoolResponse + ctx context.Context + cancel context.CancelFunc +} + +func (hct *HasCollectionTask) Id() UniqueID { + return hct.ReqId +} + +func (hct *HasCollectionTask) Type() internalpb.MsgType { + return hct.MsgType +} + +func (hct *HasCollectionTask) BeginTs() Timestamp { + return hct.Timestamp +} + +func (hct *HasCollectionTask) EndTs() Timestamp { + return hct.Timestamp +} + +func (hct *HasCollectionTask) SetTs(ts Timestamp) { + hct.Timestamp = ts +} + +func (hct *HasCollectionTask) PreExecute() error { + return nil +} + +func (hct *HasCollectionTask) Execute() error { + resp, err := hct.masterClient.HasCollection(hct.ctx, &hct.HasCollectionRequest) + if err != nil { + log.Printf("has collection failed, error= %v", err) + hct.resultChan <- &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "internal error", + }, + Value: false, + } + } else { + hct.resultChan <- resp + } + return err +} + +func (hct *HasCollectionTask) PostExecute() error { + return nil +} + +func (hct *HasCollectionTask) WaitToFinish() error { + defer hct.cancel() + for { + select { + case err := <-hct.done: + return err + case <-hct.ctx.Done(): + log.Print("wait to finish failed, timeout!") + return errors.New("wait to finish failed, timeout!") + } + } +} + +func (hct *HasCollectionTask) Notify(err error) { + hct.done <- err +} + +type DescribeCollectionTask struct { + internalpb.DescribeCollectionRequest + masterClient masterpb.MasterClient + done chan error + resultChan chan *servicepb.CollectionDescription + ctx context.Context + cancel context.CancelFunc +} + +func (dct *DescribeCollectionTask) Id() UniqueID { + return dct.ReqId +} + +func (dct *DescribeCollectionTask) Type() internalpb.MsgType { + return dct.MsgType +} + +func (dct *DescribeCollectionTask) BeginTs() Timestamp { + return dct.Timestamp +} + +func (dct *DescribeCollectionTask) EndTs() Timestamp { + return dct.Timestamp +} + +func (dct *DescribeCollectionTask) SetTs(ts Timestamp) { + dct.Timestamp = ts +} + +func (dct *DescribeCollectionTask) PreExecute() error { + return nil +} + +func (dct *DescribeCollectionTask) Execute() error { + resp, err := dct.masterClient.DescribeCollection(dct.ctx, &dct.DescribeCollectionRequest) + if err != nil { + log.Printf("describe collection failed, error= %v", err) + dct.resultChan <- &servicepb.CollectionDescription{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "internal error", + }, + } + } else { + dct.resultChan <- resp + } + return err +} + +func (dct *DescribeCollectionTask) PostExecute() error { + return nil +} + +func (dct *DescribeCollectionTask) WaitToFinish() error { + defer dct.cancel() + for { + select { + case err := <-dct.done: + return err + case <-dct.ctx.Done(): + log.Print("wait to finish failed, timeout!") + return errors.New("wait to finish failed, timeout!") + } + } +} + +func (dct *DescribeCollectionTask) Notify(err error) { + dct.done <- err +} + +type ShowCollectionsTask struct { + internalpb.ShowCollectionRequest + masterClient masterpb.MasterClient + done chan error + resultChan chan *servicepb.StringListResponse + ctx context.Context + cancel context.CancelFunc +} + +func (sct *ShowCollectionsTask) Id() UniqueID { + return sct.ReqId +} + +func (sct *ShowCollectionsTask) Type() internalpb.MsgType { + return sct.MsgType +} + +func (sct *ShowCollectionsTask) BeginTs() Timestamp { + return sct.Timestamp +} + +func (sct *ShowCollectionsTask) EndTs() Timestamp { + return sct.Timestamp +} + +func (sct *ShowCollectionsTask) SetTs(ts Timestamp) { + sct.Timestamp = ts +} + +func (sct *ShowCollectionsTask) PreExecute() error { + return nil +} + +func (sct *ShowCollectionsTask) Execute() error { + resp, err := sct.masterClient.ShowCollections(sct.ctx, &sct.ShowCollectionRequest) + if err != nil { + log.Printf("show collections failed, error= %v", err) + sct.resultChan <- &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "internal error", + }, + } + } else { + sct.resultChan <- resp + } + return err +} + +func (sct *ShowCollectionsTask) PostExecute() error { + return nil +} + +func (sct *ShowCollectionsTask) WaitToFinish() error { + defer sct.cancel() + for { + select { + case err := <-sct.done: + return err + case <-sct.ctx.Done(): + log.Print("wait to finish failed, timeout!") + return errors.New("wait to finish failed, timeout!") + } + } +} + +func (sct *ShowCollectionsTask) Notify(err error) { + sct.done <- err +}