diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 2eae2673e9..af96c95bfb 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -307,7 +307,10 @@ func (s *Server) startServerLoop() { go s.startWatchService(s.serverLoopCtx) go s.startFlushLoop(s.serverLoopCtx) go s.session.LivenessCheck(s.serverLoopCtx, s.liveCh, func() { - s.Stop() + err := s.Stop() + if err != nil { + log.Error("server stop fail", zap.Error(err)) + } }) } @@ -394,7 +397,11 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { ch := ttMsg.ChannelName ts := ttMsg.Timestamp - s.segmentManager.ExpireAllocations(ch, ts) + err = s.segmentManager.ExpireAllocations(ch, ts) + if err != nil { + log.Warn("expire allocations failed", zap.Error(err)) + continue + } segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts) if err != nil { log.Warn("get flushable segments failed", zap.Error(err)) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index d90834e0a6..dd554a7bca 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -648,7 +648,8 @@ func TestChannel(t *testing.T) { segInfo := &datapb.SegmentInfo{ ID: segID, } - svr.meta.AddSegment(NewSegmentInfo(segInfo)) + err := svr.meta.AddSegment(NewSegmentInfo(segInfo)) + assert.Nil(t, err) stats := &internalpb.SegmentStatisticsUpdates{ SegmentID: segID, @@ -680,7 +681,7 @@ func TestChannel(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 123)) msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234)) msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 345)) - err := statsStream.Produce(&msgPack) + err = statsStream.Produce(&msgPack) assert.Nil(t, err) }) } @@ -870,7 +871,8 @@ func TestDataNodeTtChannel(t *testing.T) { msgPack := msgstream.MsgPack{} msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime) msgPack.Msgs = append(msgPack.Msgs, msg) - ttMsgStream.Produce(&msgPack) + err = ttMsgStream.Produce(&msgPack) + assert.Nil(t, err) flushMsg := <-ch flushReq := flushMsg.(*datapb.FlushSegmentsRequest) @@ -955,7 +957,8 @@ func TestDataNodeTtChannel(t *testing.T) { msgPack := msgstream.MsgPack{} msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime) msgPack.Msgs = append(msgPack.Msgs, msg) - ttMsgStream.Produce(&msgPack) + err = ttMsgStream.Produce(&msgPack) + assert.Nil(t, err) flushMsg := <-ch flushReq := flushMsg.(*datapb.FlushSegmentsRequest) assert.EqualValues(t, 1, len(flushReq.SegmentIDs)) @@ -1019,7 +1022,8 @@ func TestDataNodeTtChannel(t *testing.T) { msgPack := msgstream.MsgPack{} msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", resp.SegIDAssignments[0].ExpireTime) msgPack.Msgs = append(msgPack.Msgs, msg) - ttMsgStream.Produce(&msgPack) + err = ttMsgStream.Produce(&msgPack) + assert.Nil(t, err) <-ch segment = svr.meta.GetSegment(assignedSegmentID)