Fix Alloc timstamp failure blocks compaction queue forever (#22039)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-02-07 19:02:31 +08:00 committed by GitHub
parent 39377fde4e
commit 7adabe09f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 182 additions and 5 deletions

View File

@ -35,6 +35,7 @@ import (
const (
maxParallelCompactionTaskNum = 100
rpcCompactionTimeout = 10 * time.Second
tsTimeout = uint64(1)
)
type compactionPlanContext interface {
@ -183,6 +184,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
ts, err := c.allocator.allocTimestamp(context.TODO())
if err != nil {
log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID()))
// update plan ts to TIMEOUT ts
c.mu.Lock()
c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) {
task.plan.StartTime = tsTimeout
})
c.mu.Unlock()
return
}

View File

@ -18,6 +18,7 @@ package datacoord
import (
"context"
"errors"
"sync"
"testing"
"time"
@ -41,9 +42,10 @@ import (
func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
ch := make(chan interface{}, 1)
type fields struct {
plans map[int64]*compactionTask
sessions *SessionManager
chManager *ChannelManager
plans map[int64]*compactionTask
sessions *SessionManager
chManager *ChannelManager
allocatorFactory func() allocator
}
type args struct {
signal *compactionSignal
@ -77,6 +79,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
},
},
},
allocatorFactory: func() allocator { return newMockAllocator() },
},
args{
signal: &compactionSignal{id: 100},
@ -106,6 +109,49 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
},
},
},
allocatorFactory: func() allocator { return newMockAllocator() },
},
args{
signal: &compactionSignal{id: 100},
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction},
},
true,
nil,
},
{
"test_allocate_ts_failed",
fields{
plans: map[int64]*compactionTask{},
sessions: &SessionManager{
sessions: struct {
sync.RWMutex
data map[int64]*Session
}{
data: map[int64]*Session{
1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
},
},
},
chManager: &ChannelManager{
store: &ChannelStore{
channelsInfo: map[int64]*NodeChannelInfo{
1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}},
},
},
},
allocatorFactory: func() allocator {
a := &NMockAllocator{}
start := time.Now()
a.EXPECT().allocTimestamp(mock.Anything).Call.Return(func(_ context.Context) Timestamp {
return tsoutil.ComposeTSByTime(time.Now(), 0)
}, func(_ context.Context) error {
if time.Since(start) > time.Second*2 {
return nil
}
return errors.New("mocked")
})
return a
},
},
args{
signal: &compactionSignal{id: 100},
@ -122,14 +168,13 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
sessions: tt.fields.sessions,
chManager: tt.fields.chManager,
parallelCh: make(map[int64]chan struct{}),
allocator: newMockAllocator(),
allocator: tt.fields.allocatorFactory(),
}
Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1")
c.start()
err := c.execCompactionPlan(tt.args.signal, tt.args.plan)
assert.Equal(t, tt.err, err)
if err == nil {
<-ch
task := c.getCompaction(tt.args.plan.PlanID)
if !tt.wantErr {
assert.Equal(t, tt.args.plan, task.plan)

View File

@ -0,0 +1,125 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
package datacoord
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// NMockAllocator is an autogenerated mock type for the allocator type
type NMockAllocator struct {
mock.Mock
}
type NMockAllocator_Expecter struct {
mock *mock.Mock
}
func (_m *NMockAllocator) EXPECT() *NMockAllocator_Expecter {
return &NMockAllocator_Expecter{mock: &_m.Mock}
}
// allocID provides a mock function with given fields: _a0
func (_m *NMockAllocator) allocID(_a0 context.Context) (int64, error) {
ret := _m.Called(_a0)
var r0 int64
if rf, ok := ret.Get(0).(func(context.Context) int64); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NMockAllocator_allocID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocID'
type NMockAllocator_allocID_Call struct {
*mock.Call
}
// allocID is a helper method to define mock.On call
// - _a0 context.Context
func (_e *NMockAllocator_Expecter) allocID(_a0 interface{}) *NMockAllocator_allocID_Call {
return &NMockAllocator_allocID_Call{Call: _e.mock.On("allocID", _a0)}
}
func (_c *NMockAllocator_allocID_Call) Run(run func(_a0 context.Context)) *NMockAllocator_allocID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *NMockAllocator_allocID_Call) Return(_a0 int64, _a1 error) *NMockAllocator_allocID_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// allocTimestamp provides a mock function with given fields: _a0
func (_m *NMockAllocator) allocTimestamp(_a0 context.Context) (uint64, error) {
ret := _m.Called(_a0)
var r0 uint64
if rf, ok := ret.Get(0).(func(context.Context) uint64); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NMockAllocator_allocTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocTimestamp'
type NMockAllocator_allocTimestamp_Call struct {
*mock.Call
}
// allocTimestamp is a helper method to define mock.On call
// - _a0 context.Context
func (_e *NMockAllocator_Expecter) allocTimestamp(_a0 interface{}) *NMockAllocator_allocTimestamp_Call {
return &NMockAllocator_allocTimestamp_Call{Call: _e.mock.On("allocTimestamp", _a0)}
}
func (_c *NMockAllocator_allocTimestamp_Call) Run(run func(_a0 context.Context)) *NMockAllocator_allocTimestamp_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *NMockAllocator_allocTimestamp_Call) Return(_a0 uint64, _a1 error) *NMockAllocator_allocTimestamp_Call {
_c.Call.Return(_a0, _a1)
return _c
}
type mockConstructorTestingTNewNMockAllocator interface {
mock.TestingT
Cleanup(func())
}
// NewNMockAllocator creates a new instance of NMockAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewNMockAllocator(t mockConstructorTestingTNewNMockAllocator) *NMockAllocator {
mock := &NMockAllocator{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}