From 02df310fa52addd8c5a503873aa9230fc45c3be3 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sat, 9 Jan 2021 12:30:22 +0800 Subject: [PATCH] Fix data race Signed-off-by: zhenshan.cao --- internal/master/client.go | 6 ++++++ internal/master/segment_manager_test.go | 2 ++ internal/master/timesync.go | 2 +- internal/msgstream/msgstream_test.go | 2 -- internal/proxy/proxy_test.go | 1 + internal/proxy/task_scheduler.go | 12 +++++++----- internal/proxy/timetick.go | 16 ++++++++++------ internal/querynode/collection_replica.go | 4 ++-- internal/querynode/segment.go | 16 +++++++++++++++- internal/querynode/tsafe.go | 5 ++++- internal/writenode/meta_table.go | 12 ++++++++++-- scripts/run_go_unittest.sh | 3 ++- 12 files changed, 60 insertions(+), 21 deletions(-) diff --git a/internal/master/client.go b/internal/master/client.go index 88e44d8f70..a351517676 100644 --- a/internal/master/client.go +++ b/internal/master/client.go @@ -1,6 +1,7 @@ package master import ( + "sync" "time" buildindexclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client" @@ -20,9 +21,12 @@ type MockWriteNodeClient struct { partitionTag string timestamp Timestamp collectionID UniqueID + lock sync.RWMutex } func (m *MockWriteNodeClient) FlushSegment(segmentID UniqueID, collectionID UniqueID, partitionTag string, timestamp Timestamp) error { + m.lock.Lock() + defer m.lock.Unlock() m.flushTime = time.Now() m.segmentID = segmentID m.collectionID = collectionID @@ -33,6 +37,8 @@ func (m *MockWriteNodeClient) FlushSegment(segmentID UniqueID, collectionID Uniq func (m *MockWriteNodeClient) DescribeSegment(segmentID UniqueID) (*writerclient.SegmentDescription, error) { now := time.Now() + m.lock.RLock() + defer m.lock.RUnlock() if now.Sub(m.flushTime).Seconds() > 2 { return &writerclient.SegmentDescription{ SegmentID: segmentID, diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 847ef6b29a..69078054d5 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -272,6 +272,8 @@ func TestSegmentManager_SycnWritenode(t *testing.T) { syncWriteChan <- tsMsg time.Sleep(300 * time.Millisecond) + segManager.mu.RLock() + defer segManager.mu.RUnlock() status := segManager.collStatus[collID] assert.Empty(t, status.segments) } diff --git a/internal/master/timesync.go b/internal/master/timesync.go index 49c388a8dd..79863f7ac8 100644 --- a/internal/master/timesync.go +++ b/internal/master/timesync.go @@ -81,7 +81,7 @@ func (ttBarrier *softTimeTickBarrier) Start() error { // get a legal Timestamp ts := ttBarrier.minTimestamp() lastTt := atomic.LoadInt64(&(ttBarrier.lastTt)) - if ttBarrier.lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) { + if lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) { continue } ttBarrier.outTt <- ts diff --git a/internal/msgstream/msgstream_test.go b/internal/msgstream/msgstream_test.go index c3b694c3c8..55c95cebaf 100644 --- a/internal/msgstream/msgstream_test.go +++ b/internal/msgstream/msgstream_test.go @@ -526,8 +526,6 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { log.Fatalf("broadcast error = %v", err) } receiveMsg(outputStream, len(msgPack1.Msgs)) - outputTtStream := (*outputStream).(*PulsarTtMsgStream) - fmt.Printf("timestamp = %v", outputTtStream.lastTimeStamp) (*inputStream).Close() (*outputStream).Close() } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 38ad83b7e3..486e5a23fb 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -210,6 +210,7 @@ func TestProxy_CreateCollection(t *testing.T) { wg.Add(1) go func(group *sync.WaitGroup) { defer group.Done() + println("collectionName:", collectionName) createCollection(t, collectionName) dropCollection(t, collectionName) }(&wg) diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index b0c7debb29..c529e22991 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -14,7 +14,7 @@ import ( type TaskQueue interface { utChan() <-chan int - utEmpty() bool + UTEmpty() bool utFull() bool addUnissuedTask(t task) error FrontUnissuedTask() task @@ -44,7 +44,9 @@ func (queue *BaseTaskQueue) utChan() <-chan int { return queue.utBufChan } -func (queue *BaseTaskQueue) utEmpty() bool { +func (queue *BaseTaskQueue) UTEmpty() bool { + queue.utLock.Lock() + defer queue.utLock.Unlock() return queue.unissuedTasks.Len() == 0 } @@ -316,7 +318,7 @@ func (sched *TaskScheduler) definitionLoop() { case <-sched.ctx.Done(): return case <-sched.DdQueue.utChan(): - if !sched.DdQueue.utEmpty() { + if !sched.DdQueue.UTEmpty() { t := sched.scheduleDdTask() sched.processTask(t, sched.DdQueue) } @@ -331,7 +333,7 @@ func (sched *TaskScheduler) manipulationLoop() { case <-sched.ctx.Done(): return case <-sched.DmQueue.utChan(): - if !sched.DmQueue.utEmpty() { + if !sched.DmQueue.UTEmpty() { t := sched.scheduleDmTask() go sched.processTask(t, sched.DmQueue) } @@ -348,7 +350,7 @@ func (sched *TaskScheduler) queryLoop() { return case <-sched.DqQueue.utChan(): log.Print("scheduler receive query request ...") - if !sched.DqQueue.utEmpty() { + if !sched.DqQueue.UTEmpty() { t := sched.scheduleDqTask() go sched.processTask(t, sched.DqQueue) } else { diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index 34b79ec26c..f47960e3af 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -24,12 +24,12 @@ type timeTick struct { tsoAllocator *allocator.TimestampAllocator tickMsgStream *msgstream.PulsarMsgStream - peerID UniqueID - wg sync.WaitGroup - ctx context.Context - cancel func() - timer *time.Ticker - + peerID UniqueID + wg sync.WaitGroup + ctx context.Context + cancel func() + timer *time.Ticker + tickLock sync.RWMutex checkFunc tickCheckFunc } @@ -85,6 +85,8 @@ func (tt *timeTick) tick() error { } else { //log.Printf("proxy send time tick message") } + tt.tickLock.Lock() + defer tt.tickLock.Unlock() tt.lastTick = tt.currentTick return nil } @@ -105,6 +107,8 @@ func (tt *timeTick) tickLoop() { } func (tt *timeTick) LastTick() Timestamp { + tt.tickLock.RLock() + defer tt.tickLock.RUnlock() return tt.lastTick } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 430fd55dd7..d36c55aaf9 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -332,11 +332,11 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb.Se SegmentID: segmentID, MemorySize: currentMemSize, NumRows: segmentNumOfRows, - RecentlyModified: segment.recentlyModified, + RecentlyModified: segment.GetRecentlyModified(), } statisticData = append(statisticData, &stat) - segment.recentlyModified = false + segment.SetRecentlyModified(false) } return statisticData diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 0379092865..1ae8ebf58d 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -13,6 +13,7 @@ package querynode import "C" import ( "strconv" + "sync" "unsafe" "github.com/stretchr/testify/assert" @@ -28,6 +29,7 @@ type Segment struct { collectionID UniqueID lastMemSize int64 lastRowCount int64 + mu sync.Mutex recentlyModified bool } @@ -35,6 +37,18 @@ func (s *Segment) ID() UniqueID { return s.segmentID } +func (s *Segment) SetRecentlyModified(modify bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.recentlyModified = modify +} + +func (s *Segment) GetRecentlyModified() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.recentlyModified +} + //-------------------------------------------------------------------------------------- constructor and destructor func newSegment(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID) *Segment { /* @@ -161,7 +175,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg) } - s.recentlyModified = true + s.SetRecentlyModified(true) return nil } diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 27a1b64004..8673359d65 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -44,6 +44,8 @@ func newTSafe() tSafe { } func (ts *tSafeImpl) registerTSafeWatcher(t *tSafeWatcher) { + ts.tSafeMu.Lock() + defer ts.tSafeMu.Unlock() ts.watcherList = append(ts.watcherList, t) } @@ -55,8 +57,9 @@ func (ts *tSafeImpl) get() Timestamp { func (ts *tSafeImpl) set(t Timestamp) { ts.tSafeMu.Lock() + defer ts.tSafeMu.Unlock() + ts.tSafe = t - ts.tSafeMu.Unlock() for _, watcher := range ts.watcherList { watcher.notify() } diff --git a/internal/writenode/meta_table.go b/internal/writenode/meta_table.go index 329c2132fb..ea7828874f 100644 --- a/internal/writenode/meta_table.go +++ b/internal/writenode/meta_table.go @@ -171,6 +171,8 @@ func (mt *metaTable) addSegmentFlush(segmentID UniqueID, timestamp Timestamp) er } func (mt *metaTable) getFlushCloseTime(segmentID UniqueID) (Timestamp, error) { + mt.lock.RLock() + defer mt.lock.RUnlock() meta, ok := mt.segID2FlushMeta[segmentID] if !ok { return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) @@ -179,6 +181,8 @@ func (mt *metaTable) getFlushCloseTime(segmentID UniqueID) (Timestamp, error) { } func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) { + mt.lock.RLock() + defer mt.lock.RUnlock() meta, ok := mt.segID2FlushMeta[segmentID] if !ok { return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) @@ -187,6 +191,8 @@ func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) { } func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) { + mt.lock.RLock() + defer mt.lock.RUnlock() meta, ok := mt.segID2FlushMeta[segmentID] if !ok { return false, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) @@ -195,7 +201,8 @@ func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) { } func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, error) { - + mt.lock.RLock() + defer mt.lock.RUnlock() meta, ok := mt.segID2FlushMeta[segmentID] if !ok { return nil, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10)) @@ -208,7 +215,8 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, } func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) { - + mt.lock.RLock() + defer mt.lock.RUnlock() meta, ok := mt.collID2DdlMeta[collID] if !ok { return nil, errors.Errorf("collection not exists with ID = " + strconv.FormatInt(collID, 10)) diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index f4b4bac81f..dc7f59d341 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -16,5 +16,6 @@ echo $MILVUS_DIR go test -cover "${MILVUS_DIR}/kv/..." -failfast go test -cover "${MILVUS_DIR}/proxy/..." -failfast go test -cover "${MILVUS_DIR}/writenode/..." -failfast -go test -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast +go test -cover "${MILVUS_DIR}/master/..." -failfast +go test -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast #go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast