From 6dc938e6f4a0815033622727de23cd468f78788c Mon Sep 17 00:00:00 2001 From: sunby Date: Thu, 4 Mar 2021 18:58:16 +0800 Subject: [PATCH] Refactor code 1. add error as return value 2. check assertion success Signed-off-by: sunby --- internal/proxynode/segment.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/internal/proxynode/segment.go b/internal/proxynode/segment.go index c05cfc3cde..a7d37fc8e5 100644 --- a/internal/proxynode/segment.go +++ b/internal/proxynode/segment.go @@ -76,7 +76,11 @@ func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 { func (info *assignInfo) RemoveExpired(ts Timestamp) { for e := info.segInfos.Front(); e != nil; e = e.Next() { - segInfo := e.Value.(*segInfo) + segInfo, ok := e.Value.(*segInfo) + if !ok { + log.Printf("can not cast to segInfo") + continue + } if segInfo.IsExpired(ts) { info.segInfos.Remove(e) } @@ -190,8 +194,8 @@ func (sa *SegIDAssigner) pickCanDoFunc() { } records[collID][partitionID][channelName] += segRequest.count - assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName) - if assign == nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] { + assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName) + if err != nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] { sa.segReqs = append(sa.segReqs, &datapb.SegIDRequest{ ChannelName: channelName, Count: segRequest.count, @@ -206,10 +210,10 @@ func (sa *SegIDAssigner) pickCanDoFunc() { sa.ToDoReqs = newTodoReqs } -func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) *assignInfo { +func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) (*assignInfo, error) { assignInfos, ok := sa.assignInfos[collID] if !ok { - return nil + return nil, fmt.Errorf("can not find collection %d", collID) } for e := assignInfos.Front(); e != nil; e = e.Next() { @@ -217,9 +221,10 @@ func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channe if info.partitionID != partitionID || info.channelName != channelName { continue } - return info + return info, nil } - return nil + return nil, fmt.Errorf("can not find assign info with collID %d, partitionID %d, channelName %s", + collID, partitionID, channelName) } func (sa *SegIDAssigner) checkSyncFunc(timeout bool) bool { @@ -296,13 +301,13 @@ func (sa *SegIDAssigner) syncSegments() bool { log.Println("SyncSegment Error:", info.Status.Reason) continue } - assign := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName) + assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName) segInfo := &segInfo{ segID: info.SegID, count: info.Count, expireTime: info.ExpireTime, } - if assign == nil { + if err != nil { colInfos, ok := sa.assignInfos[info.CollectionID] if !ok { colInfos = list.New() @@ -329,9 +334,9 @@ func (sa *SegIDAssigner) syncSegments() bool { func (sa *SegIDAssigner) processFunc(req allocator.Request) error { segRequest := req.(*segRequest) - assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName) - if assign == nil { - return errors.New("Failed to GetSegmentID") + assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName) + if err != nil { + return err } result, err := assign.Assign(segRequest.timestamp, segRequest.count) segRequest.segInfo = result