diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index ff0f23afda..b811b47e9f 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -2,10 +2,13 @@ package allocator import ( "context" + "errors" + "fmt" "sync" "time" - "errors" + "github.com/zilliztech/milvus-distributed/internal/log" + "go.uber.org/zap" ) const ( @@ -106,11 +109,13 @@ type Allocator struct { TChan TickerChan ForceSyncChan chan Request - SyncFunc func() bool + SyncFunc func() (bool, error) ProcessFunc func(req Request) error CheckSyncFunc func(timeout bool) bool PickCanDoFunc func() + SyncErr error + Role string } func (ta *Allocator) Start() error { @@ -183,7 +188,7 @@ func (ta *Allocator) pickCanDo() { func (ta *Allocator) sync(timeout bool) bool { if ta.SyncFunc == nil || ta.CheckSyncFunc == nil { ta.CanDoReqs = ta.ToDoReqs - ta.ToDoReqs = ta.ToDoReqs[0:0] + ta.ToDoReqs = nil return true } if !timeout && len(ta.ToDoReqs) == 0 { @@ -193,7 +198,8 @@ func (ta *Allocator) sync(timeout bool) bool { return false } - ret := ta.SyncFunc() + var ret bool + ret, ta.SyncErr = ta.SyncFunc() if !timeout { ta.TChan.Reset() @@ -207,16 +213,28 @@ func (ta *Allocator) finishSyncRequest() { req.Notify(nil) } } - ta.SyncReqs = ta.SyncReqs[0:0] + ta.SyncReqs = nil } func (ta *Allocator) failRemainRequest() { + var err error + if ta.SyncErr != nil { + err = fmt.Errorf("%s failRemainRequest err:%w", ta.Role, ta.SyncErr) + } else { + errMsg := fmt.Sprintf("%s failRemainRequest unexpected error", ta.Role) + err = errors.New(errMsg) + } + if len(ta.ToDoReqs) > 0 { + log.Debug("Allocator has some reqs to fail", + zap.Any("Role", ta.Role), + zap.Any("reqLen", len(ta.ToDoReqs))) + } for _, req := range ta.ToDoReqs { if req != nil { - req.Notify(errors.New("failed: unexpected error")) + req.Notify(err) } } - ta.ToDoReqs = []Request{} + ta.ToDoReqs = nil } func (ta *Allocator) finishRequest() { @@ -241,7 +259,8 @@ func (ta *Allocator) Close() { ta.CancelFunc() ta.wg.Wait() ta.TChan.Close() - ta.revokeRequest(errors.New("closing")) + errMsg := fmt.Sprintf("%s is closing", ta.Role) + ta.revokeRequest(errors.New(errMsg)) } func (ta *Allocator) CleanCache() { diff --git a/internal/allocator/id.go b/internal/allocator/id.go index 5b681be0ba..9af3aac966 100644 --- a/internal/allocator/id.go +++ b/internal/allocator/id.go @@ -2,14 +2,16 @@ package allocator import ( "context" - "log" + "fmt" "time" - "github.com/zilliztech/milvus-distributed/internal/util/retry" + "go.uber.org/zap" "google.golang.org/grpc" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/util/retry" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -41,6 +43,7 @@ func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error Allocator: Allocator{ Ctx: ctx1, CancelFunc: cancel, + Role: "IDAllocator", }, countPerRPC: IDCountPerRPC, masterAddress: masterAddr, @@ -58,12 +61,11 @@ func (ia *IDAllocator) Start() error { connectMasterFn := func() error { return ia.connectMaster() } - err := retry.Retry(10, time.Millisecond*200, connectMasterFn) + err := retry.Retry(1000, time.Millisecond*200, connectMasterFn) if err != nil { panic("connect to master failed") } - ia.Allocator.Start() - return nil + return ia.Allocator.Start() } func (ia *IDAllocator) connectMaster() error { @@ -71,16 +73,31 @@ func (ia *IDAllocator) connectMaster() error { defer cancel() conn, err := grpc.DialContext(ctx, ia.masterAddress, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { - log.Printf("Connect to master failed, error= %v", err) + log.Error("Connect to master failed", zap.Any("Role", ia.Role), zap.Error(err)) return err } - log.Printf("Connected to master, master_addr=%s", ia.masterAddress) + log.Debug("Connected to master", zap.Any("Role", ia.Role), zap.Any("masterAddress", ia.masterAddress)) ia.masterConn = conn ia.masterClient = masterpb.NewMasterServiceClient(conn) return nil } -func (ia *IDAllocator) syncID() bool { +func (ia *IDAllocator) gatherReqIDCount() uint32 { + need := uint32(0) + for _, req := range ia.ToDoReqs { + tReq := req.(*IDRequest) + need += tReq.count + } + return need +} + +func (ia *IDAllocator) syncID() (bool, error) { + + need := ia.gatherReqIDCount() + if need < ia.countPerRPC { + need = ia.countPerRPC + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) req := &masterpb.AllocIDRequest{ Base: &commonpb.MsgBase{ @@ -89,18 +106,17 @@ func (ia *IDAllocator) syncID() bool { Timestamp: 0, SourceID: ia.PeerID, }, - Count: ia.countPerRPC, + Count: need, } resp, err := ia.masterClient.AllocID(ctx, req) cancel() if err != nil { - log.Println("syncID Failed!!!!!") - return false + return false, fmt.Errorf("syncID Failed:%w", err) } ia.idStart = resp.GetID() ia.idEnd = ia.idStart + int64(resp.GetCount()) - return true + return true, nil } func (ia *IDAllocator) checkSyncFunc(timeout bool) bool { @@ -122,6 +138,10 @@ func (ia *IDAllocator) pickCanDoFunc() { } } ia.ToDoReqs = ia.ToDoReqs[idx:] + log.Debug("IDAllocator pickCanDoFunc", + zap.Any("need", need), + zap.Any("total", total), + zap.Any("remainReqCnt", len(ia.ToDoReqs))) } func (ia *IDAllocator) processFunc(req Request) error { diff --git a/internal/allocator/timestamp.go b/internal/allocator/timestamp.go index c62acd4281..1ce14e387f 100644 --- a/internal/allocator/timestamp.go +++ b/internal/allocator/timestamp.go @@ -3,9 +3,11 @@ package allocator import ( "context" "fmt" - "log" "time" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/util/retry" @@ -38,6 +40,7 @@ func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAl Allocator: Allocator{ Ctx: ctx1, CancelFunc: cancel, + Role: "TimestampAllocator", }, masterAddress: masterAddr, countPerRPC: tsCountPerRPC, @@ -57,7 +60,7 @@ func (ta *TimestampAllocator) Start() error { connectMasterFn := func() error { return ta.connectMaster() } - err := retry.Retry(10, time.Millisecond*200, connectMasterFn) + err := retry.Retry(1000, time.Millisecond*200, connectMasterFn) if err != nil { panic("Timestamp local allocator connect to master failed") } @@ -70,10 +73,10 @@ func (ta *TimestampAllocator) connectMaster() error { defer cancel() conn, err := grpc.DialContext(ctx, ta.masterAddress, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { - log.Printf("Connect to master failed, error= %v", err) + log.Error("TimestampAllocator Connect to master failed", zap.Error(err)) return err } - log.Printf("Connected to master, master_addr=%s", ta.masterAddress) + log.Debug("TimestampAllocator connected to master", zap.Any("masterAddress", ta.masterAddress)) ta.masterConn = conn ta.masterClient = masterpb.NewMasterServiceClient(conn) return nil @@ -98,9 +101,26 @@ func (ta *TimestampAllocator) pickCanDoFunc() { } } ta.ToDoReqs = ta.ToDoReqs[idx:] + log.Debug("TimestampAllocator pickCanDoFunc", + zap.Any("need", need), + zap.Any("total", total), + zap.Any("remainReqCnt", len(ta.ToDoReqs))) } -func (ta *TimestampAllocator) syncTs() bool { +func (ta *TimestampAllocator) gatherReqTsCount() uint32 { + need := uint32(0) + for _, req := range ta.ToDoReqs { + tReq := req.(*TSORequest) + need += tReq.count + } + return need +} + +func (ta *TimestampAllocator) syncTs() (bool, error) { + need := ta.gatherReqTsCount() + if need < ta.countPerRPC { + need = ta.countPerRPC + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) req := &masterpb.AllocTimestampRequest{ Base: &commonpb.MsgBase{ @@ -109,18 +129,18 @@ func (ta *TimestampAllocator) syncTs() bool { Timestamp: 0, SourceID: ta.PeerID, }, - Count: ta.countPerRPC, + Count: need, } - resp, err := ta.masterClient.AllocTimestamp(ctx, req) - cancel() + resp, err := ta.masterClient.AllocTimestamp(ctx, req) + defer cancel() + if err != nil { - log.Println("syncTimestamp Failed!!!!!") - return false + return false, fmt.Errorf("syncTimestamp Failed:%w", err) } ta.lastTsBegin = resp.GetTimestamp() ta.lastTsEnd = ta.lastTsBegin + uint64(resp.GetCount()) - return true + return true, nil } func (ta *TimestampAllocator) processFunc(req Request) error { diff --git a/internal/proxynode/segment.go b/internal/proxynode/segment.go index 052bd09d54..a56077adf3 100644 --- a/internal/proxynode/segment.go +++ b/internal/proxynode/segment.go @@ -138,6 +138,7 @@ func NewSegIDAssigner(ctx context.Context, dataService types.DataService, getTic Allocator: Allocator{ Ctx: ctx1, CancelFunc: cancel, + Role: "SegmentIDAllocator", }, countPerRPC: SegCountPerRPC, dataService: dataService, @@ -275,9 +276,9 @@ func (sa *SegIDAssigner) reduceSegReqs() { sa.segReqs = newSegReqs } -func (sa *SegIDAssigner) syncSegments() bool { +func (sa *SegIDAssigner) syncSegments() (bool, error) { if len(sa.segReqs) == 0 { - return true + return true, nil } sa.reduceSegReqs() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -292,8 +293,7 @@ func (sa *SegIDAssigner) syncSegments() bool { resp, err := sa.dataService.AssignSegmentID(ctx, req) if err != nil { - log.Debug("proxynode", zap.String("GRPC AssignSegmentID Failed", err.Error())) - return false + return false, fmt.Errorf("syncSegmentID Failed:%w", err) } now := time.Now() @@ -331,7 +331,7 @@ func (sa *SegIDAssigner) syncSegments() bool { assign.lastInsertTime = now success = true } - return success + return success, nil } func (sa *SegIDAssigner) processFunc(req allocator.Request) error {