diff --git a/go.mod b/go.mod index f8cb3e468e..8da99987a9 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 // indirect github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48 github.com/prometheus/client_golang v1.5.1 // indirect - github.com/prometheus/common v0.10.0 // indirect + github.com/prometheus/common v0.10.0 github.com/prometheus/procfs v0.1.3 // indirect github.com/sirupsen/logrus v1.6.0 // indirect github.com/spaolacci/murmur3 v1.1.0 diff --git a/go.sum b/go.sum index 249aa05523..3f443fa53e 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apache/pulsar-client-go v0.1.1 h1:v/kU+2ZCC6yFIcbZrFtWa9/nvVzVr18L+xYJUvZSxEQ= github.com/apache/pulsar-client-go v0.1.1/go.mod h1:mlxC65KL1BLhGO2bnT9zWMttVzR2czVPb27D477YpyU= @@ -564,6 +566,7 @@ google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 h1:VPpdpQkGvFicX9y google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index c17606a64d..4232e20cba 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -13,7 +13,7 @@ import ( ) const ( - maxMergeRequests = 10000 + maxConcurrentRequests = 10000 ) type request interface { @@ -46,18 +46,23 @@ type idRequest struct { count uint32 } -func (req *idRequest) Wait() { - req.baseRequest.Wait() -} - type tsoRequest struct { baseRequest timestamp Timestamp count uint32 } -func (req *tsoRequest) Wait() { - req.baseRequest.Wait() +type segRequest struct { + baseRequest + count uint32 + colName string + partition string + segID UniqueID + channelID int32 +} + +type syncRequest struct { + baseRequest } type tickerChan interface { @@ -117,9 +122,16 @@ type Allocator struct { masterClient masterpb.MasterClient countPerRPC uint32 - tChan tickerChan + toDoReqs []request + + syncReqs []request + + tChan tickerChan + forceSyncChan chan request + syncFunc func() - processFunc func(req request) + processFunc func(req request) error + checkFunc func(timeout bool) bool } func (ta *Allocator) Start() error { @@ -148,25 +160,40 @@ func (ta *Allocator) connectMaster() error { return nil } +func (ta *Allocator) init() { + ta.forceSyncChan = make(chan request, maxConcurrentRequests) +} + func (ta *Allocator) mainLoop() { defer ta.wg.Done() loopCtx, loopCancel := context.WithCancel(ta.ctx) defer loopCancel() - defaultSize := maxMergeRequests + 1 - reqs := make([]request, defaultSize) for { select { - case <-ta.tChan.Chan(): - ta.sync() - case first := <-ta.reqs: - pendingPlus1 := len(ta.reqs) + 1 - reqs[0] = first - for i := 1; i < pendingPlus1; i++ { - reqs[i] = <-ta.reqs + + case first := <-ta.forceSyncChan: + ta.syncReqs = append(ta.syncReqs, first) + pending := len(ta.forceSyncChan) + for i := 0; i < pending; i++ { + ta.syncReqs = append(ta.syncReqs, <-ta.forceSyncChan) } - ta.finishRequest(reqs[:pendingPlus1]) + ta.sync(true) + ta.finishSyncRequest() + + case <-ta.tChan.Chan(): + ta.sync(true) + + case first := <-ta.reqs: + ta.toDoReqs = append(ta.toDoReqs, first) + pending := len(ta.reqs) + for i := 0; i < pending; i++ { + ta.toDoReqs = append(ta.toDoReqs, <-ta.reqs) + } + ta.sync(false) + + ta.finishRequest() case <-loopCtx.Done(): return @@ -175,21 +202,39 @@ func (ta *Allocator) mainLoop() { } } -func (ta *Allocator) sync() { - if ta.syncFunc != nil { - ta.syncFunc() - ta.tChan.Reset() - fmt.Println("synced") +func (ta *Allocator) sync(timeout bool) { + if ta.syncFunc == nil { + return } + if ta.checkFunc == nil || !ta.checkFunc(timeout) { + return + } + + ta.syncFunc() + + if !timeout { + ta.tChan.Reset() + } + fmt.Println("synced") } -func (ta *Allocator) finishRequest(reqs []request) { - for i := 0; i < len(reqs); i++ { - ta.processFunc(reqs[i]) - if reqs[i] != nil { - reqs[i].Notify(nil) +func (ta *Allocator) finishSyncRequest() { + for _, req := range ta.syncReqs { + if req != nil { + req.Notify(nil) } } + ta.syncReqs = ta.syncReqs[0:0] +} + +func (ta *Allocator) finishRequest() { + for _, req := range ta.toDoReqs { + if req != nil { + err := ta.processFunc(req) + req.Notify(err) + } + } + ta.toDoReqs = ta.toDoReqs[0:0] } func (ta *Allocator) revokeRequest(err error) { @@ -206,3 +251,9 @@ func (ta *Allocator) Close() { ta.tChan.Close() ta.revokeRequest(errors.New("closing")) } + +func (ta *Allocator) CleanCache() { + req := &syncRequest{baseRequest: baseRequest{done: make(chan error), valid: false}} + ta.forceSyncChan <- req + req.Wait() +} diff --git a/internal/allocator/id_allocator.go b/internal/allocator/id.go similarity index 59% rename from internal/allocator/id_allocator.go rename to internal/allocator/id.go index e52f7721d6..742fc60658 100644 --- a/internal/allocator/id_allocator.go +++ b/internal/allocator/id.go @@ -10,6 +10,10 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) +const ( + IDCountPerRPC = 200000 +) + type UniqueID = typeutil.UniqueID type IDAllocator struct { @@ -23,58 +27,73 @@ func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error ctx1, cancel := context.WithCancel(ctx) a := &IDAllocator{ - Allocator: Allocator{reqs: make(chan request, maxMergeRequests), + Allocator: Allocator{reqs: make(chan request, maxConcurrentRequests), ctx: ctx1, cancel: cancel, masterAddress: masterAddr, - countPerRPC: maxMergeRequests, + countPerRPC: IDCountPerRPC, }, } a.tChan = &emptyTicker{} a.Allocator.syncFunc = a.syncID a.Allocator.processFunc = a.processFunc + a.Allocator.checkFunc = a.checkFunc + a.init() return a, nil } -func (ta *IDAllocator) syncID() { +func (ia *IDAllocator) syncID() { fmt.Println("syncID") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) req := &internalpb.IDRequest{ PeerID: 1, Role: internalpb.PeerRole_Proxy, - Count: ta.countPerRPC, + Count: ia.countPerRPC, } - resp, err := ta.masterClient.AllocID(ctx, req) + resp, err := ia.masterClient.AllocID(ctx, req) cancel() if err != nil { log.Panic("syncID Failed!!!!!") return } - ta.idStart = resp.GetID() - ta.idEnd = ta.idStart + int64(resp.GetCount()) - + ia.idStart = resp.GetID() + ia.idEnd = ia.idStart + int64(resp.GetCount()) } -func (ta *IDAllocator) processFunc(req request) { +func (ia *IDAllocator) checkFunc(timeout bool) bool { + if timeout { + return timeout + } + need := uint32(0) + for _, req := range ia.toDoReqs { + iReq := req.(*idRequest) + need += iReq.count + } + return ia.idStart+int64(need) >= ia.idEnd +} + +func (ia *IDAllocator) processFunc(req request) error { idRequest := req.(*idRequest) - idRequest.id = 1 + idRequest.id = ia.idStart + ia.idStart++ fmt.Println("process ID") + return nil } -func (ta *IDAllocator) AllocOne() (UniqueID, error) { - ret, _, err := ta.Alloc(1) +func (ia *IDAllocator) AllocOne() (UniqueID, error) { + ret, _, err := ia.Alloc(1) if err != nil { return 0, err } return ret, nil } -func (ta *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { +func (ia *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { req := &idRequest{baseRequest: baseRequest{done: make(chan error), valid: false}} req.count = count - ta.reqs <- req + ia.reqs <- req req.Wait() if !req.IsValid() { diff --git a/internal/allocator/segment.go b/internal/allocator/segment.go new file mode 100644 index 0000000000..11aefe2736 --- /dev/null +++ b/internal/allocator/segment.go @@ -0,0 +1,205 @@ +package allocator + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +const ( + SegCountPerRPC = 20000 + ActiveTimeDuration = 100 // Second +) + +type assignInfo struct { + internalpb.SegIDAssignment + expireTime time.Time + lastInsertTime time.Time +} + +func (info *assignInfo) IsExpired(now time.Time) bool { + return now.Sub(info.expireTime) >= 0 +} + +func (info *assignInfo) IsActive(now time.Time) bool { + return now.Sub(info.lastInsertTime) <= ActiveTimeDuration*time.Second +} + +func (info *assignInfo) IsEnough(count uint32) bool { + return info.Count >= count +} + +type SegIDAssigner struct { + Allocator + assignInfos map[string][]*assignInfo // collectionName -> [] *assignInfo + segReqs []*internalpb.SegIDRequest + canDoReqs []request +} + +func NewSegIDAssigner(ctx context.Context, masterAddr string) (*SegIDAssigner, error) { + ctx1, cancel := context.WithCancel(ctx) + sa := &SegIDAssigner{ + Allocator: Allocator{reqs: make(chan request, maxConcurrentRequests), + ctx: ctx1, + cancel: cancel, + masterAddress: masterAddr, + countPerRPC: SegCountPerRPC, + //toDoReqs: []request, + }, + assignInfos: make(map[string][]*assignInfo), + //segReqs: make([]*internalpb.SegIDRequest, maxConcurrentRequests), + //canDoReqs: make([]request, maxConcurrentRequests), + } + sa.tChan = &ticker{ + updateInterval: time.Second, + } + sa.Allocator.syncFunc = sa.syncSegments + sa.Allocator.processFunc = sa.processFunc + sa.Allocator.checkFunc = sa.checkFunc + return sa, nil +} + +func (sa *SegIDAssigner) collectExpired() { + now := time.Now() + for _, colInfos := range sa.assignInfos { + for _, assign := range colInfos { + if !assign.IsActive(now) || !assign.IsExpired(now) { + continue + } + sa.segReqs = append(sa.segReqs, &internalpb.SegIDRequest{ + ChannelID: assign.ChannelID, + Count: sa.countPerRPC, + CollName: assign.CollName, + PartitionTag: assign.PartitionTag, + }) + } + } +} + +func (sa *SegIDAssigner) checkToDoReqs() { + if sa.toDoReqs == nil { + return + } + now := time.Now() + for _, req := range sa.toDoReqs { + fmt.Println("DDDDD????", req) + segRequest := req.(*segRequest) + assign := sa.getAssign(segRequest.colName, segRequest.partition, segRequest.channelID) + if assign == nil || assign.IsExpired(now) || !assign.IsEnough(segRequest.count) { + sa.segReqs = append(sa.segReqs, &internalpb.SegIDRequest{ + ChannelID: segRequest.channelID, + Count: segRequest.count, + CollName: segRequest.colName, + PartitionTag: segRequest.partition, + }) + } + } +} + +func (sa *SegIDAssigner) getAssign(colName, partition string, channelID int32) *assignInfo { + colInfos, ok := sa.assignInfos[colName] + if !ok { + return nil + } + for _, info := range colInfos { + if info.PartitionTag != partition || info.ChannelID != channelID { + continue + } + return info + } + return nil +} + +func (sa *SegIDAssigner) checkFunc(timeout bool) bool { + if timeout { + sa.collectExpired() + } else { + sa.checkToDoReqs() + } + + return len(sa.segReqs) != 0 +} + +func (sa *SegIDAssigner) syncSegments() { + if len(sa.segReqs) == 0 { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + req := &internalpb.AssignSegIDRequest{ + PeerID: 1, + Role: internalpb.PeerRole_Proxy, + PerChannelReq: sa.segReqs, + } + + sa.segReqs = sa.segReqs[0:0] + fmt.Println("OOOOO", req.PerChannelReq) + resp, err := sa.masterClient.AssignSegmentID(ctx, req) + log.Printf("resp: %v", resp) + + if resp.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS { + log.Panic("GRPC AssignSegmentID Failed") + return + } + + now := time.Now() + expiredTime := now.Add(time.Millisecond * time.Duration(resp.ExpireDuration)) + for _, info := range resp.PerChannelAssignment { + assign := sa.getAssign(info.CollName, info.PartitionTag, info.ChannelID) + if assign == nil { + colInfos := sa.assignInfos[info.CollName] + newAssign := &assignInfo{ + SegIDAssignment: *info, + expireTime: expiredTime, + lastInsertTime: now, + } + colInfos = append(colInfos, newAssign) + sa.assignInfos[info.CollName] = colInfos + } else { + assign.SegIDAssignment = *info + assign.expireTime = expiredTime + assign.lastInsertTime = now + } + } + + if err != nil { + log.Panic("syncID Failed!!!!!") + return + } +} + +func (sa *SegIDAssigner) processFunc(req request) error { + segRequest := req.(*segRequest) + assign := sa.getAssign(segRequest.colName, segRequest.partition, segRequest.channelID) + if assign == nil { + return errors.New("Failed to GetSegmentID") + } + segRequest.segID = assign.SegID + assign.Count -= segRequest.count + fmt.Println("process segmentID") + return nil +} + +func (sa *SegIDAssigner) GetSegmentID(colName, partition string, channelID int32, count uint32) (UniqueID, error) { + req := &segRequest{ + baseRequest: baseRequest{done: make(chan error), valid: false}, + colName: colName, + partition: partition, + channelID: channelID, + count: count, + } + sa.reqs <- req + req.Wait() + + if !req.IsValid() { + return 0, errors.New("GetSegmentID Failed") + } + return req.segID, nil +} diff --git a/internal/allocator/timestamp_allocator.go b/internal/allocator/timestamp.go similarity index 79% rename from internal/allocator/timestamp_allocator.go rename to internal/allocator/timestamp.go index 033f085431..6421ad71b3 100644 --- a/internal/allocator/timestamp_allocator.go +++ b/internal/allocator/timestamp.go @@ -13,8 +13,7 @@ import ( type Timestamp = typeutil.Timestamp const ( - tsCountPerRPC = 2 << 18 * 10 - defaultUpdateInterval = 1000 * time.Millisecond + tsCountPerRPC = 2 << 18 * 10 ) type TimestampAllocator struct { @@ -26,11 +25,11 @@ type TimestampAllocator struct { func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAllocator, error) { ctx1, cancel := context.WithCancel(ctx) a := &TimestampAllocator{ - Allocator: Allocator{reqs: make(chan request, maxMergeRequests), + Allocator: Allocator{reqs: make(chan request, maxConcurrentRequests), ctx: ctx1, cancel: cancel, masterAddress: masterAddr, - countPerRPC: maxMergeRequests, + countPerRPC: tsCountPerRPC, }, } a.tChan = &ticker{ @@ -41,6 +40,18 @@ func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAl return a, nil } +func (ta *TimestampAllocator) checkFunc(timeout bool) bool { + if timeout { + return true + } + need := uint32(0) + for _, req := range ta.toDoReqs { + iReq := req.(*tsoRequest) + need += iReq.count + } + return ta.lastTsBegin+Timestamp(need) >= ta.lastTsEnd +} + func (ta *TimestampAllocator) syncTs() { fmt.Println("sync TS") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -61,15 +72,12 @@ func (ta *TimestampAllocator) syncTs() { ta.lastTsEnd = ta.lastTsBegin + uint64(resp.GetCount()) } -func (ta *TimestampAllocator) processFunc(req request) { - if req == nil { - fmt.Println("Occur nil!!!!") - return - } +func (ta *TimestampAllocator) processFunc(req request) error { tsoRequest := req.(*tsoRequest) tsoRequest.timestamp = ta.lastTsBegin ta.lastTsBegin++ fmt.Println("process tso") + return nil } func (ta *TimestampAllocator) AllocOne() (Timestamp, error) { @@ -81,7 +89,6 @@ func (ta *TimestampAllocator) AllocOne() (Timestamp, error) { } func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) { - //req := tsoReqPool.Get().(*tsoRequest) req := &tsoRequest{ baseRequest: baseRequest{done: make(chan error), valid: false}, } @@ -101,3 +108,7 @@ func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) { } return ret, nil } + +func (ta *TimestampAllocator) ClearCache() { + +} diff --git a/internal/core/src/pb/common.pb.cc b/internal/core/src/pb/common.pb.cc index 5c53a82975..7f6f5ce4b8 100644 --- a/internal/core/src/pb/common.pb.cc +++ b/internal/core/src/pb/common.pb.cc @@ -171,7 +171,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "vus.proto.common.ErrorCode\022\016\n\006reason\030\002 \001" "(\t\"*\n\014KeyValuePair\022\013\n\003key\030\001 \001(\t\022\r\n\005value" "\030\002 \001(\t\"\025\n\004Blob\022\r\n\005value\030\001 \001(\014\"#\n\007Address" - "\022\n\n\002ip\030\001 \001(\t\022\014\n\004port\030\002 \001(\003*\242\004\n\tErrorCode" + "\022\n\n\002ip\030\001 \001(\t\022\014\n\004port\030\002 \001(\003*\270\004\n\tErrorCode" "\022\013\n\007SUCCESS\020\000\022\024\n\020UNEXPECTED_ERROR\020\001\022\022\n\016C" "ONNECT_FAILED\020\002\022\025\n\021PERMISSION_DENIED\020\003\022\031" "\n\025COLLECTION_NOT_EXISTS\020\004\022\024\n\020ILLEGAL_ARG" @@ -185,9 +185,9 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "\030\n\024CANNOT_DELETE_FOLDER\020\023\022\026\n\022CANNOT_DELE" "TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025\022\021\n\rILLE" "GAL_NLIST\020\026\022\027\n\023ILLEGAL_METRIC_TYPE\020\027\022\021\n\r" - "OUT_OF_MEMORY\020\030BBZ@github.com/zilliztech" - "/milvus-distributed/internal/proto/commo" - "npbb\006proto3" + "OUT_OF_MEMORY\020\030\022\024\n\017DD_REQUEST_RACE\020\350\007BBZ" + "@github.com/zilliztech/milvus-distribute" + "d/internal/proto/commonpbb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { }; @@ -201,7 +201,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static bool descriptor_table_common_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { - &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 851, + &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 873, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 5, 0, schemas, file_default_instances, TableStruct_common_2eproto::offsets, file_level_metadata_common_2eproto, 5, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, @@ -242,6 +242,7 @@ bool ErrorCode_IsValid(int value) { case 22: case 23: case 24: + case 1000: return true; default: return false; diff --git a/internal/core/src/pb/common.pb.h b/internal/core/src/pb/common.pb.h index 5a0c230a75..1b8ffeddb3 100644 --- a/internal/core/src/pb/common.pb.h +++ b/internal/core/src/pb/common.pb.h @@ -112,12 +112,13 @@ enum ErrorCode : int { ILLEGAL_NLIST = 22, ILLEGAL_METRIC_TYPE = 23, OUT_OF_MEMORY = 24, + DD_REQUEST_RACE = 1000, ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), ErrorCode_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max() }; bool ErrorCode_IsValid(int value); constexpr ErrorCode ErrorCode_MIN = SUCCESS; -constexpr ErrorCode ErrorCode_MAX = OUT_OF_MEMORY; +constexpr ErrorCode ErrorCode_MAX = DD_REQUEST_RACE; constexpr int ErrorCode_ARRAYSIZE = ErrorCode_MAX + 1; const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ErrorCode_descriptor(); diff --git a/internal/master/master.go b/internal/master/master.go index 8ccdc7cfa5..5a02affbe0 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -329,7 +329,6 @@ func (s *Master) grpcLoop(grpcPort int64) { if err := s.grpcServer.Serve(lis); err != nil { s.grpcErr <- err } - } func (s *Master) tsLoop() { diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 00b26ba2eb..4e4cd06aae 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -222,8 +222,9 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionMeta) error { if len(coll.SegmentIDs) != 0 { return errors.Errorf("segment should be empty when creating collection") } - if len(coll.PartitionTags) != 0 { - return errors.Errorf("segment should be empty when creating collection") + + if len(coll.PartitionTags) == 0 { + coll.PartitionTags = append(coll.PartitionTags, "default") } _, ok := mt.collName2ID[coll.Schema.Name] if ok { diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 9d6c7b65e1..94eb77fe0b 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -214,7 +214,7 @@ func TestMetaTable_DeletePartition(t *testing.T) { assert.Nil(t, err) afterCollMeta, err := meta.GetCollectionByName("coll1") assert.Nil(t, err) - assert.Equal(t, 2, len(afterCollMeta.PartitionTags)) + assert.Equal(t, 3, len(afterCollMeta.PartitionTags)) assert.Equal(t, 3, len(afterCollMeta.SegmentIDs)) err = meta.DeletePartition(100, "p1") assert.Nil(t, err) @@ -222,7 +222,7 @@ func TestMetaTable_DeletePartition(t *testing.T) { assert.NotNil(t, err) afterCollMeta, err = meta.GetCollectionByName("coll1") assert.Nil(t, err) - assert.Equal(t, 1, len(afterCollMeta.PartitionTags)) + assert.Equal(t, 2, len(afterCollMeta.PartitionTags)) assert.Equal(t, 1, len(afterCollMeta.SegmentIDs)) hasPartition := meta.HasPartition(colMeta.ID, "p1") assert.False(t, hasPartition) diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index 015e6b1fbc..e0fbb1be55 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -206,8 +206,9 @@ func TestMaster_Partition(t *testing.T) { assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") - assert.Equal(t, collMeta.PartitionTags[0], "partition1") - assert.Equal(t, collMeta.PartitionTags[1], "partition2") + //assert.Equal(t, collMeta.PartitionTags[0], "partition1") + //assert.Equal(t, collMeta.PartitionTags[1], "partition2") + assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, collMeta.PartitionTags) showPartitionReq := internalpb.ShowPartitionRequest{ MsgType: internalpb.MsgType_kShowPartitions, @@ -219,7 +220,7 @@ func TestMaster_Partition(t *testing.T) { stringList, err := cli.ShowPartitions(ctx, &showPartitionReq) assert.Nil(t, err) - assert.ElementsMatch(t, []string{"partition1", "partition2"}, stringList.Values) + assert.ElementsMatch(t, []string{"default", "partition1", "partition2"}, stringList.Values) showPartitionReq = internalpb.ShowPartitionRequest{ MsgType: internalpb.MsgType_kShowPartitions, diff --git a/internal/proto/common.proto b/internal/proto/common.proto index b9b272dace..cd28bbb4f2 100644 --- a/internal/proto/common.proto +++ b/internal/proto/common.proto @@ -28,8 +28,12 @@ enum ErrorCode { ILLEGAL_NLIST = 22; ILLEGAL_METRIC_TYPE = 23; OUT_OF_MEMORY = 24; + + // internal error code. + DD_REQUEST_RACE = 1000; } + message Empty{} diff --git a/internal/proto/commonpb/common.pb.go b/internal/proto/commonpb/common.pb.go index 48a09b4707..07dc12e626 100644 --- a/internal/proto/commonpb/common.pb.go +++ b/internal/proto/commonpb/common.pb.go @@ -47,33 +47,36 @@ const ( ErrorCode_ILLEGAL_NLIST ErrorCode = 22 ErrorCode_ILLEGAL_METRIC_TYPE ErrorCode = 23 ErrorCode_OUT_OF_MEMORY ErrorCode = 24 + // internal error code. + ErrorCode_DD_REQUEST_RACE ErrorCode = 1000 ) var ErrorCode_name = map[int32]string{ - 0: "SUCCESS", - 1: "UNEXPECTED_ERROR", - 2: "CONNECT_FAILED", - 3: "PERMISSION_DENIED", - 4: "COLLECTION_NOT_EXISTS", - 5: "ILLEGAL_ARGUMENT", - 7: "ILLEGAL_DIMENSION", - 8: "ILLEGAL_INDEX_TYPE", - 9: "ILLEGAL_COLLECTION_NAME", - 10: "ILLEGAL_TOPK", - 11: "ILLEGAL_ROWRECORD", - 12: "ILLEGAL_VECTOR_ID", - 13: "ILLEGAL_SEARCH_RESULT", - 14: "FILE_NOT_FOUND", - 15: "META_FAILED", - 16: "CACHE_FAILED", - 17: "CANNOT_CREATE_FOLDER", - 18: "CANNOT_CREATE_FILE", - 19: "CANNOT_DELETE_FOLDER", - 20: "CANNOT_DELETE_FILE", - 21: "BUILD_INDEX_ERROR", - 22: "ILLEGAL_NLIST", - 23: "ILLEGAL_METRIC_TYPE", - 24: "OUT_OF_MEMORY", + 0: "SUCCESS", + 1: "UNEXPECTED_ERROR", + 2: "CONNECT_FAILED", + 3: "PERMISSION_DENIED", + 4: "COLLECTION_NOT_EXISTS", + 5: "ILLEGAL_ARGUMENT", + 7: "ILLEGAL_DIMENSION", + 8: "ILLEGAL_INDEX_TYPE", + 9: "ILLEGAL_COLLECTION_NAME", + 10: "ILLEGAL_TOPK", + 11: "ILLEGAL_ROWRECORD", + 12: "ILLEGAL_VECTOR_ID", + 13: "ILLEGAL_SEARCH_RESULT", + 14: "FILE_NOT_FOUND", + 15: "META_FAILED", + 16: "CACHE_FAILED", + 17: "CANNOT_CREATE_FOLDER", + 18: "CANNOT_CREATE_FILE", + 19: "CANNOT_DELETE_FOLDER", + 20: "CANNOT_DELETE_FILE", + 21: "BUILD_INDEX_ERROR", + 22: "ILLEGAL_NLIST", + 23: "ILLEGAL_METRIC_TYPE", + 24: "OUT_OF_MEMORY", + 1000: "DD_REQUEST_RACE", } var ErrorCode_value = map[string]int32{ @@ -101,6 +104,7 @@ var ErrorCode_value = map[string]int32{ "ILLEGAL_NLIST": 22, "ILLEGAL_METRIC_TYPE": 23, "OUT_OF_MEMORY": 24, + "DD_REQUEST_RACE": 1000, } func (x ErrorCode) String() string { @@ -334,42 +338,43 @@ func init() { func init() { proto.RegisterFile("common.proto", fileDescriptor_555bd8c177793206) } var fileDescriptor_555bd8c177793206 = []byte{ - // 577 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x51, 0x4f, 0xdb, 0x3c, - 0x14, 0xfd, 0x5a, 0x0a, 0xfd, 0x7a, 0x29, 0xc5, 0x98, 0x02, 0x9d, 0x36, 0x4d, 0xa8, 0x4f, 0x68, - 0x12, 0xad, 0xb4, 0x49, 0x7b, 0x9b, 0xb4, 0xd4, 0xb9, 0x05, 0x0b, 0x27, 0xae, 0x1c, 0x87, 0xc1, - 0x5e, 0xac, 0x96, 0x46, 0x23, 0x5a, 0xdb, 0x54, 0x69, 0x8a, 0x04, 0x3f, 0x67, 0xbf, 0x74, 0x72, - 0xd2, 0x8c, 0x6a, 0xda, 0xdb, 0xbd, 0xe7, 0xf8, 0x1c, 0xfb, 0x9e, 0xe4, 0x42, 0xf3, 0x21, 0x99, - 0xcf, 0x93, 0x45, 0x6f, 0x99, 0x26, 0x59, 0x42, 0x8f, 0xe7, 0xf1, 0xec, 0x69, 0xbd, 0x2a, 0xba, - 0x5e, 0x41, 0x75, 0xeb, 0xb0, 0x8b, 0xf3, 0x65, 0xf6, 0xdc, 0x35, 0xb0, 0x17, 0x64, 0xe3, 0x6c, - 0xbd, 0xa2, 0x5f, 0x00, 0xa2, 0x34, 0x4d, 0x52, 0xf3, 0x90, 0x4c, 0xa3, 0x4e, 0xe5, 0xbc, 0x72, - 0xd1, 0xfa, 0xf8, 0xbe, 0xf7, 0x0f, 0x71, 0x0f, 0xed, 0x31, 0x96, 0x4c, 0x23, 0xd5, 0x88, 0xca, - 0x92, 0x9e, 0xc2, 0x5e, 0x1a, 0x8d, 0x57, 0xc9, 0xa2, 0x53, 0x3d, 0xaf, 0x5c, 0x34, 0xd4, 0xa6, - 0xeb, 0x7e, 0x86, 0xe6, 0x4d, 0xf4, 0x7c, 0x3b, 0x9e, 0xad, 0xa3, 0xd1, 0x38, 0x4e, 0x29, 0x81, - 0x9d, 0x9f, 0xd1, 0x73, 0xee, 0xdf, 0x50, 0xb6, 0xa4, 0x6d, 0xd8, 0x7d, 0xb2, 0xf4, 0x46, 0x58, - 0x34, 0xdd, 0x77, 0x50, 0x1b, 0xcc, 0x92, 0xc9, 0x2b, 0x6b, 0x15, 0xcd, 0x92, 0xbd, 0x84, 0xba, - 0x33, 0x9d, 0xa6, 0xd1, 0x6a, 0x45, 0x5b, 0x50, 0x8d, 0x97, 0x1b, 0xbf, 0x6a, 0xbc, 0xa4, 0x14, - 0x6a, 0xcb, 0x24, 0xcd, 0x72, 0xb7, 0x1d, 0x95, 0xd7, 0x1f, 0x7e, 0xd5, 0xa0, 0xf1, 0xe7, 0xd5, - 0x74, 0x1f, 0xea, 0x41, 0xc8, 0x18, 0x06, 0x01, 0xf9, 0x8f, 0xb6, 0x81, 0x84, 0x3e, 0xde, 0x8d, - 0x90, 0x69, 0x74, 0x0d, 0x2a, 0x25, 0x15, 0xa9, 0x50, 0x0a, 0x2d, 0x26, 0x7d, 0x1f, 0x99, 0x36, - 0x43, 0x87, 0x0b, 0x74, 0x49, 0x95, 0x9e, 0xc0, 0xd1, 0x08, 0x95, 0xc7, 0x83, 0x80, 0x4b, 0xdf, - 0xb8, 0xe8, 0x73, 0x74, 0xc9, 0x0e, 0x7d, 0x03, 0x27, 0x4c, 0x0a, 0x81, 0x4c, 0x5b, 0xd8, 0x97, - 0xda, 0xe0, 0x1d, 0x0f, 0x74, 0x40, 0x6a, 0xd6, 0x9b, 0x0b, 0x81, 0x57, 0x8e, 0x30, 0x8e, 0xba, - 0x0a, 0x3d, 0xf4, 0x35, 0xd9, 0xb5, 0x3e, 0x25, 0xea, 0x72, 0x0f, 0x7d, 0x6b, 0x47, 0xea, 0xf4, - 0x14, 0x68, 0x09, 0x73, 0xdf, 0xc5, 0x3b, 0xa3, 0xef, 0x47, 0x48, 0xfe, 0xa7, 0x6f, 0xe1, 0xac, - 0xc4, 0xb7, 0xef, 0x71, 0x3c, 0x24, 0x0d, 0x4a, 0xa0, 0x59, 0x92, 0x5a, 0x8e, 0x6e, 0x08, 0x6c, - 0xbb, 0x2b, 0xf9, 0x4d, 0x21, 0x93, 0xca, 0x25, 0xfb, 0xdb, 0xf0, 0x2d, 0x32, 0x2d, 0x95, 0xe1, - 0x2e, 0x69, 0xda, 0xc7, 0x97, 0x70, 0x80, 0x8e, 0x62, 0xd7, 0x46, 0x61, 0x10, 0x0a, 0x4d, 0x0e, - 0x6c, 0x04, 0x43, 0x2e, 0x30, 0x9f, 0x68, 0x28, 0x43, 0xdf, 0x25, 0x2d, 0x7a, 0x08, 0xfb, 0x1e, - 0x6a, 0xa7, 0xcc, 0xe4, 0xd0, 0xde, 0xcf, 0x1c, 0x76, 0x8d, 0x25, 0x42, 0x68, 0x07, 0xda, 0xcc, - 0xf1, 0xad, 0x88, 0x29, 0x74, 0x34, 0x9a, 0xa1, 0x14, 0x2e, 0x2a, 0x72, 0x64, 0x07, 0xfc, 0x8b, - 0xe1, 0x02, 0x09, 0xdd, 0x52, 0xb8, 0x28, 0xf0, 0x55, 0x71, 0xbc, 0xa5, 0x28, 0x19, 0xab, 0x68, - 0xdb, 0x61, 0x06, 0x21, 0x17, 0xee, 0x26, 0xa8, 0xe2, 0xa3, 0x9d, 0xd0, 0x23, 0x38, 0x28, 0x87, - 0xf1, 0x05, 0x0f, 0x34, 0x39, 0xa5, 0x67, 0x70, 0x5c, 0x42, 0x1e, 0x6a, 0xc5, 0x59, 0x91, 0xea, - 0x99, 0x3d, 0x2b, 0x43, 0x6d, 0xe4, 0xd0, 0x78, 0xe8, 0x49, 0x75, 0x4f, 0x3a, 0x83, 0xc1, 0xf7, - 0xaf, 0x3f, 0xe2, 0xec, 0x71, 0x3d, 0xb1, 0xff, 0x79, 0xff, 0x25, 0x9e, 0xcd, 0xe2, 0x97, 0x2c, - 0x7a, 0x78, 0xec, 0x17, 0x3b, 0x70, 0x39, 0x8d, 0x57, 0x59, 0x1a, 0x4f, 0xd6, 0x59, 0x34, 0xed, - 0xc7, 0x8b, 0x2c, 0x4a, 0x17, 0xe3, 0x59, 0x3f, 0x5f, 0x8c, 0x7e, 0xb1, 0x18, 0xcb, 0xc9, 0x64, - 0x2f, 0xef, 0x3f, 0xfd, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x4f, 0x8c, 0xe4, 0x07, 0x83, 0x03, 0x00, - 0x00, + // 595 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x51, 0x6f, 0xda, 0x3c, + 0x14, 0xfd, 0xa0, 0xb4, 0x7c, 0xdc, 0x52, 0xea, 0xba, 0xb4, 0x65, 0xda, 0x34, 0x55, 0x3c, 0x55, + 0x93, 0x0a, 0xd2, 0x26, 0xed, 0x6d, 0xd2, 0x82, 0x7d, 0x69, 0xad, 0x26, 0x31, 0x73, 0x9c, 0xae, + 0xdd, 0x8b, 0x05, 0x25, 0x5a, 0xa3, 0x01, 0x41, 0x21, 0x54, 0xa2, 0xbf, 0x6c, 0x3f, 0x69, 0x3f, + 0x63, 0x32, 0x21, 0x2b, 0x9a, 0xf6, 0x76, 0xef, 0x39, 0x3e, 0xc7, 0xbe, 0x27, 0xb9, 0x50, 0x7f, + 0x48, 0xa6, 0xd3, 0x64, 0xd6, 0x99, 0xa7, 0x49, 0x96, 0xd0, 0xe3, 0x69, 0x3c, 0x79, 0x5a, 0x2e, + 0xf2, 0xae, 0x93, 0x53, 0xed, 0x2a, 0xec, 0xe2, 0x74, 0x9e, 0xad, 0xda, 0x06, 0xf6, 0x82, 0x6c, + 0x98, 0x2d, 0x17, 0xf4, 0x13, 0x40, 0x94, 0xa6, 0x49, 0x6a, 0x1e, 0x92, 0x71, 0xd4, 0x2a, 0x9d, + 0x97, 0x2e, 0x1a, 0xef, 0xdf, 0x76, 0xfe, 0x21, 0xee, 0xa0, 0x3d, 0xc6, 0x92, 0x71, 0xa4, 0x6a, + 0x51, 0x51, 0xd2, 0x53, 0xd8, 0x4b, 0xa3, 0xe1, 0x22, 0x99, 0xb5, 0xca, 0xe7, 0xa5, 0x8b, 0x9a, + 0xda, 0x74, 0xed, 0x8f, 0x50, 0xbf, 0x89, 0x56, 0xb7, 0xc3, 0xc9, 0x32, 0x1a, 0x0c, 0xe3, 0x94, + 0x12, 0xd8, 0xf9, 0x11, 0xad, 0xd6, 0xfe, 0x35, 0x65, 0x4b, 0xda, 0x84, 0xdd, 0x27, 0x4b, 0x6f, + 0x84, 0x79, 0xd3, 0x7e, 0x03, 0x95, 0xde, 0x24, 0x19, 0xbd, 0xb0, 0x56, 0x51, 0x2f, 0xd8, 0x4b, + 0xa8, 0x3a, 0xe3, 0x71, 0x1a, 0x2d, 0x16, 0xb4, 0x01, 0xe5, 0x78, 0xbe, 0xf1, 0x2b, 0xc7, 0x73, + 0x4a, 0xa1, 0x32, 0x4f, 0xd2, 0x6c, 0xed, 0xb6, 0xa3, 0xd6, 0xf5, 0xbb, 0x9f, 0x15, 0xa8, 0xfd, + 0x79, 0x35, 0xdd, 0x87, 0x6a, 0x10, 0x32, 0x86, 0x41, 0x40, 0xfe, 0xa3, 0x4d, 0x20, 0xa1, 0x8f, + 0x77, 0x03, 0x64, 0x1a, 0xb9, 0x41, 0xa5, 0xa4, 0x22, 0x25, 0x4a, 0xa1, 0xc1, 0xa4, 0xef, 0x23, + 0xd3, 0xa6, 0xef, 0x08, 0x17, 0x39, 0x29, 0xd3, 0x13, 0x38, 0x1a, 0xa0, 0xf2, 0x44, 0x10, 0x08, + 0xe9, 0x1b, 0x8e, 0xbe, 0x40, 0x4e, 0x76, 0xe8, 0x2b, 0x38, 0x61, 0xd2, 0x75, 0x91, 0x69, 0x0b, + 0xfb, 0x52, 0x1b, 0xbc, 0x13, 0x81, 0x0e, 0x48, 0xc5, 0x7a, 0x0b, 0xd7, 0xc5, 0x2b, 0xc7, 0x35, + 0x8e, 0xba, 0x0a, 0x3d, 0xf4, 0x35, 0xd9, 0xb5, 0x3e, 0x05, 0xca, 0x85, 0x87, 0xbe, 0xb5, 0x23, + 0x55, 0x7a, 0x0a, 0xb4, 0x80, 0x85, 0xcf, 0xf1, 0xce, 0xe8, 0xfb, 0x01, 0x92, 0xff, 0xe9, 0x6b, + 0x38, 0x2b, 0xf0, 0xed, 0x7b, 0x1c, 0x0f, 0x49, 0x8d, 0x12, 0xa8, 0x17, 0xa4, 0x96, 0x83, 0x1b, + 0x02, 0xdb, 0xee, 0x4a, 0x7e, 0x55, 0xc8, 0xa4, 0xe2, 0x64, 0x7f, 0x1b, 0xbe, 0x45, 0xa6, 0xa5, + 0x32, 0x82, 0x93, 0xba, 0x7d, 0x7c, 0x01, 0x07, 0xe8, 0x28, 0x76, 0x6d, 0x14, 0x06, 0xa1, 0xab, + 0xc9, 0x81, 0x8d, 0xa0, 0x2f, 0x5c, 0x5c, 0x4f, 0xd4, 0x97, 0xa1, 0xcf, 0x49, 0x83, 0x1e, 0xc2, + 0xbe, 0x87, 0xda, 0x29, 0x32, 0x39, 0xb4, 0xf7, 0x33, 0x87, 0x5d, 0x63, 0x81, 0x10, 0xda, 0x82, + 0x26, 0x73, 0x7c, 0x2b, 0x62, 0x0a, 0x1d, 0x8d, 0xa6, 0x2f, 0x5d, 0x8e, 0x8a, 0x1c, 0xd9, 0x01, + 0xff, 0x62, 0x84, 0x8b, 0x84, 0x6e, 0x29, 0x38, 0xba, 0xf8, 0xa2, 0x38, 0xde, 0x52, 0x14, 0x8c, + 0x55, 0x34, 0xed, 0x30, 0xbd, 0x50, 0xb8, 0x7c, 0x13, 0x54, 0xfe, 0xd1, 0x4e, 0xe8, 0x11, 0x1c, + 0x14, 0xc3, 0xf8, 0xae, 0x08, 0x34, 0x39, 0xa5, 0x67, 0x70, 0x5c, 0x40, 0x1e, 0x6a, 0x25, 0x58, + 0x9e, 0xea, 0x99, 0x3d, 0x2b, 0x43, 0x6d, 0x64, 0xdf, 0x78, 0xe8, 0x49, 0x75, 0x4f, 0x5a, 0xb4, + 0x09, 0x87, 0x9c, 0x1b, 0x85, 0x5f, 0x42, 0x0c, 0xb4, 0x51, 0x0e, 0x43, 0xf2, 0xab, 0xda, 0xeb, + 0x7d, 0xfb, 0xfc, 0x3d, 0xce, 0x1e, 0x97, 0x23, 0xfb, 0xf7, 0x77, 0x9f, 0xe3, 0xc9, 0x24, 0x7e, + 0xce, 0xa2, 0x87, 0xc7, 0x6e, 0xbe, 0x19, 0x97, 0xe3, 0x78, 0x91, 0xa5, 0xf1, 0x68, 0x99, 0x45, + 0xe3, 0x6e, 0x3c, 0xcb, 0xa2, 0x74, 0x36, 0x9c, 0x74, 0xd7, 0xeb, 0xd2, 0xcd, 0xd7, 0x65, 0x3e, + 0x1a, 0xed, 0xad, 0xfb, 0x0f, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x48, 0x19, 0x69, 0x99, + 0x03, 0x00, 0x00, } diff --git a/internal/proxy/mock/master_tso.go b/internal/proxy/mock/master_tso.go deleted file mode 100644 index b2af512275..0000000000 --- a/internal/proxy/mock/master_tso.go +++ /dev/null @@ -1,26 +0,0 @@ -package mock - -import ( - "context" - "sync" - "time" - - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -const timeWindow = time.Second - -type Timestamp = typeutil.Timestamp - -type TSOClient struct { - lastTs Timestamp - mux sync.Mutex -} - -func (c *TSOClient) GetTimeStamp(ctx context.Context, n Timestamp) (ts Timestamp, count uint64, window time.Duration, err error) { - c.mux.Lock() - defer c.mux.Unlock() - ts = c.lastTs - c.lastTs += n - return ts, n, timeWindow, nil -} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index f0134835cd..d4cdf5ff14 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -34,6 +34,7 @@ type Proxy struct { idAllocator *allocator.IDAllocator tsoAllocator *allocator.TimestampAllocator + segAssigner *allocator.SegIDAssigner manipulationMsgStream *msgstream.PulsarMsgStream queryMsgStream *msgstream.PulsarMsgStream @@ -97,6 +98,12 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { } p.tsoAllocator = tsoAllocator + segAssigner, err := allocator.NewSegIDAssigner(p.proxyLoopCtx, masterAddr) + if err != nil { + panic(err) + } + p.segAssigner = segAssigner + p.taskSch, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator) if err != nil { return nil, err @@ -121,6 +128,7 @@ func (p *Proxy) startProxy() error { p.taskSch.Start() p.idAllocator.Start() p.tsoAllocator.Start() + p.segAssigner.Start() // Run callbacks for _, cb := range p.startCallbacks { @@ -241,6 +249,8 @@ func (p *Proxy) stopProxyLoop() { p.idAllocator.Close() + p.segAssigner.Close() + p.taskSch.Close() p.manipulationMsgStream.Close() diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 55cf118312..8acd44c284 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -131,44 +131,56 @@ func shutdown() { proxyServer.Close() } +func hasCollection(t *testing.T, name string) bool { + resp, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: name}) + msg := "Has Collection " + name + " should succeed!" + assert.Nil(t, err, msg) + return resp.Value +} + +func createCollection(t *testing.T, name string) { + has := hasCollection(t, name) + if has { + dropCollection(t, name) + } + + req := &schemapb.CollectionSchema{ + Name: name, + Description: "no description", + AutoID: true, + Fields: make([]*schemapb.FieldSchema, 1), + } + fieldName := "Field1" + req.Fields[0] = &schemapb.FieldSchema{ + Name: fieldName, + Description: "no description", + DataType: schemapb.DataType_INT32, + } + resp, err := proxyClient.CreateCollection(ctx, req) + assert.Nil(t, err) + msg := "Create Collection " + name + " should succeed!" + assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) +} + +func dropCollection(t *testing.T, name string) { + req := &servicepb.CollectionName{ + CollectionName: name, + } + resp, err := proxyClient.DropCollection(ctx, req) + assert.Nil(t, err) + msg := "Drop Collection " + name + " should succeed!" + assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) +} + func TestProxy_CreateCollection(t *testing.T) { var wg sync.WaitGroup for i := 0; i < testNum; i++ { i := i collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10) - req := &schemapb.CollectionSchema{ - Name: collectionName, - Description: "no description", - AutoID: true, - Fields: make([]*schemapb.FieldSchema, 1), - } - fieldName := "Field" + strconv.FormatInt(int64(i), 10) - req.Fields[0] = &schemapb.FieldSchema{ - Name: fieldName, - Description: "no description", - DataType: schemapb.DataType_INT32, - } - wg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() - - bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) - if err != nil { - t.Error(err) - } - msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - - if !bool.Value { - resp, err := proxyClient.CreateCollection(ctx, req) - if err != nil { - t.Error(err) - } - t.Logf("create collection response: %v", resp) - msg := "Create Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - } + createCollection(t, collectionName) }(&wg) } wg.Wait() @@ -179,18 +191,12 @@ func TestProxy_HasCollection(t *testing.T) { for i := 0; i < testNum; i++ { i := i collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10) - wg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() - - bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) - if err != nil { - t.Error(err) - } - msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - t.Logf("Has Collection %v: %v", i, bool) + has := hasCollection(t, collectionName) + msg := "Should has Collection " + collectionName + assert.Equal(t, has, true, msg) }(&wg) } wg.Wait() @@ -205,15 +211,8 @@ func TestProxy_DescribeCollection(t *testing.T) { wg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() - - bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) - if err != nil { - t.Error(err) - } - msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - - if bool.Value { + has := hasCollection(t, collectionName) + if has { resp, err := proxyClient.DescribeCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) if err != nil { t.Error(err) @@ -236,15 +235,8 @@ func TestProxy_ShowCollections(t *testing.T) { wg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() - - bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) - if err != nil { - t.Error(err) - } - msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - - if bool.Value { + has := hasCollection(t, collectionName) + if has { resp, err := proxyClient.ShowCollections(ctx, &commonpb.Empty{}) if err != nil { t.Error(err) @@ -274,14 +266,8 @@ func TestProxy_Insert(t *testing.T) { wg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() - bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) - if err != nil { - t.Error(err) - } - msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - - if bool.Value { + has := hasCollection(t, collectionName) + if has { resp, err := proxyClient.Insert(ctx, req) if err != nil { t.Error(err) @@ -343,53 +329,20 @@ func TestProxy_Search(t *testing.T) { for i := 0; i < testNum; i++ { i := i - collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10) req := &servicepb.Query{ CollectionName: collectionName, } - queryWg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() - bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName}) - if err != nil { - t.Error(err) - } - msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - - if !bool.Value { - req := &schemapb.CollectionSchema{ - Name: collectionName, - Description: "no description", - AutoID: true, - Fields: make([]*schemapb.FieldSchema, 1), - } - fieldName := "Field" + strconv.FormatInt(int64(i), 10) - req.Fields[0] = &schemapb.FieldSchema{ - Name: fieldName, - Description: "no description", - DataType: schemapb.DataType_INT32, - } - resp, err := proxyClient.CreateCollection(ctx, req) - if err != nil { - t.Error(err) - } - t.Logf("create collection response: %v", resp) - msg := "Create Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - } - fn := func() error { - log.Printf("Search: %v", collectionName) - resp, err := proxyClient.Search(ctx, req) - t.Logf("response of search collection %v: %v", i, resp) - return err - } - err = fn() - if err != nil { - t.Error(err) + has := hasCollection(t, collectionName) + if !has { + createCollection(t, collectionName) } + resp, err := proxyClient.Search(ctx, req) + t.Logf("response of search collection %v: %v", i, resp) + assert.Nil(t, err) }(&queryWg) } @@ -400,34 +353,29 @@ func TestProxy_Search(t *testing.T) { sendWg.Wait() } +func TestProxy_AssignSegID(t *testing.T) { + collectionName := "CreateCollection1" + createCollection(t, collectionName) + testNum := 4 + for i := 0; i < testNum; i++ { + segID, err := proxyServer.segAssigner.GetSegmentID(collectionName, "default", int32(i), 200000) + assert.Nil(t, err) + fmt.Println("segID", segID) + } + +} + func TestProxy_DropCollection(t *testing.T) { var wg sync.WaitGroup for i := 0; i < testNum; i++ { i := i - collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10) - req := &servicepb.CollectionName{ - CollectionName: collectionName, - } - wg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() - bool, err := proxyClient.HasCollection(ctx, req) - if err != nil { - t.Error(err) - } - msg := "Has Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - - if bool.Value { - resp, err := proxyClient.DropCollection(ctx, req) - if err != nil { - t.Error(err) - } - msg := "Drop Collection " + strconv.Itoa(i) + " should succeed!" - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg) - t.Logf("response of insert collection %v: %v", i, resp) + has := hasCollection(t, collectionName) + if has { + dropCollection(t, collectionName) } }(&wg) } @@ -484,8 +432,8 @@ func TestProxy_PartitionGRPC(t *testing.T) { sts, err := proxyClient.ShowPartitions(ctx, &servicepb.CollectionName{CollectionName: collName}) assert.Nil(t, err) assert.Equal(t, sts.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) - assert.True(t, len(sts.Values) >= 1) - assert.True(t, len(sts.Values) <= testNum) + assert.True(t, len(sts.Values) >= 2) + assert.True(t, len(sts.Values) <= testNum+1) st, err = proxyClient.DropPartition(ctx, preq) assert.Nil(t, err)