From 8cf2cf5c94592fc165ff1285e9ef57566b4d751b Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 15 May 2024 16:33:34 +0800 Subject: [PATCH] enhance: Add `go-deadlock` as unittest only dependency (#33063) See also #33062 This PR: - Add `lock.RWMutex` & `lock.Mutex` alias to switch implementation based on build flags - When build flags has `test` in it, use `go-deadlock` to detect possible deadlocks - Replace all `sync.RWMutex` & `sync.Mutex` in datacoord pkg Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- go.mod | 2 + go.sum | 4 + internal/datacoord/channel_manager.go | 4 +- internal/datacoord/channel_manager_v2.go | 4 +- internal/datacoord/compaction.go | 3 +- internal/datacoord/compaction_scheduler.go | 4 +- internal/datacoord/compaction_trigger.go | 3 +- internal/datacoord/compaction_view_manager.go | 3 +- internal/datacoord/garbage_collector_test.go | 8 +- internal/datacoord/import_meta.go | 5 +- internal/datacoord/import_scheduler.go | 3 +- internal/datacoord/index_builder.go | 3 +- .../datacoord/index_engine_version_manager.go | 4 +- internal/datacoord/indexnode_manager.go | 9 ++- internal/datacoord/indexnode_manager_test.go | 12 +-- internal/datacoord/meta.go | 6 +- internal/datacoord/segment_manager.go | 3 +- internal/datacoord/server.go | 2 + internal/datacoord/server_test.go | 5 +- internal/datacoord/session.go | 4 +- internal/datacoord/session_manager.go | 6 +- pkg/go.mod | 2 + pkg/go.sum | 4 + pkg/util/lock/mutex.go | 27 +++++++ pkg/util/lock/mutex_deadlock.go | 29 +++++++ scripts/run_go_codecov.sh | 6 +- scripts/run_go_unittest.sh | 75 ++++++++----------- 27 files changed, 154 insertions(+), 86 deletions(-) create mode 100644 pkg/util/lock/mutex.go create mode 100644 pkg/util/lock/mutex_deadlock.go diff --git a/go.mod b/go.mod index b52871cdd3..b4132f9065 100644 --- a/go.mod +++ b/go.mod @@ -170,6 +170,7 @@ require ( github.com/panjf2000/ants/v2 v2.7.2 // indirect github.com/pelletier/go-toml v1.9.3 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect @@ -183,6 +184,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/rs/xid v1.5.0 // indirect + github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/shirou/gopsutil/v3 v3.22.9 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 4ad66c38ad..4d6a99aa25 100644 --- a/go.sum +++ b/go.sum @@ -676,6 +676,8 @@ github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5d github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -764,6 +766,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/samber/lo v1.27.0 h1:GOyDWxsblvqYobqsmUuMddPa2/mMzkKyojlXol4+LaQ= github.com/samber/lo v1.27.0/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg= github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= +github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= +github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= github.com/sbinet/npyio v0.6.0 h1:IyqqQIzRjDym9xnIXsToCKei/qCzxDP+Y74KoMlMgXo= github.com/sbinet/npyio v0.6.0/go.mod h1:/q3BNr6dJOy+t6h7RZchTJ0nwRJO52mivaem29WE1j8= github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 396fcd6eaf..f7ce6ea949 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sync" "time" "github.com/samber/lo" @@ -33,13 +32,14 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" ) // ChannelManagerImpl manages the allocation and the balance between channels and data nodes. type ChannelManagerImpl struct { ctx context.Context - mu sync.RWMutex + mu lock.RWMutex h Handler store RWChannelStore factory ChannelPolicyFactory diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index 2d63596f1f..402b09ebb8 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sync" "time" "github.com/cockroachdb/errors" @@ -30,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -61,7 +61,7 @@ type SubCluster interface { type ChannelManagerImplV2 struct { ctx context.Context cancel context.CancelFunc - mu sync.RWMutex + mu lock.RWMutex h Handler store RWChannelStore diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index dff05b00aa..fef788b659 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -112,7 +113,7 @@ func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask var _ compactionPlanContext = (*compactionPlanHandler)(nil) type compactionPlanHandler struct { - mu sync.RWMutex + mu lock.RWMutex plans map[int64]*compactionTask // planID -> task meta CompactionMeta diff --git a/internal/datacoord/compaction_scheduler.go b/internal/datacoord/compaction_scheduler.go index e2ab976ba9..89b8308dee 100644 --- a/internal/datacoord/compaction_scheduler.go +++ b/internal/datacoord/compaction_scheduler.go @@ -2,7 +2,6 @@ package datacoord import ( "fmt" - "sync" "github.com/samber/lo" "go.uber.org/atomic" @@ -11,6 +10,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -32,7 +32,7 @@ type CompactionScheduler struct { queuingTasks []*compactionTask parallelTasks map[int64][]*compactionTask // parallel by nodeID - taskGuard sync.RWMutex + taskGuard lock.RWMutex planHandler *compactionPlanHandler } diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 0c26db1126..b6ff595887 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -75,7 +76,7 @@ type compactionTrigger struct { signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker - forceMu sync.Mutex + forceMu lock.Mutex quit chan struct{} wg sync.WaitGroup diff --git a/internal/datacoord/compaction_view_manager.go b/internal/datacoord/compaction_view_manager.go index 373044f77e..733fe18af2 100644 --- a/internal/datacoord/compaction_view_manager.go +++ b/internal/datacoord/compaction_view_manager.go @@ -11,13 +11,14 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) type CompactionViewManager struct { view *FullViews - viewGuard sync.RWMutex + viewGuard lock.RWMutex meta *meta trigger TriggerManager diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index a81003cda7..66f7873b4b 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -25,7 +25,6 @@ import ( "path" "strconv" "strings" - "sync" "testing" "time" @@ -51,6 +50,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -361,7 +361,7 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta indexID = UniqueID(400) ) return &meta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, @@ -469,7 +469,7 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m }, } meta := &meta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, @@ -634,7 +634,7 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta }, } meta := &meta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, diff --git a/internal/datacoord/import_meta.go b/internal/datacoord/import_meta.go index 35483b22ee..debf5509e8 100644 --- a/internal/datacoord/import_meta.go +++ b/internal/datacoord/import_meta.go @@ -17,9 +17,8 @@ package datacoord import ( - "sync" - "github.com/milvus-io/milvus/internal/metastore" + "github.com/milvus-io/milvus/pkg/util/lock" ) type ImportMeta interface { @@ -37,7 +36,7 @@ type ImportMeta interface { } type importMeta struct { - mu sync.RWMutex // guards jobs and tasks + mu lock.RWMutex // guards jobs and tasks jobs map[int64]ImportJob tasks map[int64]ImportTask diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index bfe48e041d..f1cf30003c 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" ) const ( @@ -147,7 +148,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 { return s.info.NodeID }) nodeSlots := make(map[int64]int64) - mu := &sync.Mutex{} + mu := &lock.Mutex{} wg := &sync.WaitGroup{} for _, nodeID := range nodeIDs { wg.Add(1) diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index be03a613ef..c565545613 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparams" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -73,7 +74,7 @@ type indexBuilder struct { cancel context.CancelFunc wg sync.WaitGroup - taskMutex sync.RWMutex + taskMutex lock.RWMutex scheduleDuration time.Duration // TODO @xiaocai2333: use priority queue diff --git a/internal/datacoord/index_engine_version_manager.go b/internal/datacoord/index_engine_version_manager.go index 3c5d4d25ae..f8b39b51f5 100644 --- a/internal/datacoord/index_engine_version_manager.go +++ b/internal/datacoord/index_engine_version_manager.go @@ -2,12 +2,12 @@ package datacoord import ( "math" - "sync" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" ) type IndexEngineVersionManager interface { @@ -21,7 +21,7 @@ type IndexEngineVersionManager interface { } type versionManagerImpl struct { - mu sync.Mutex + mu lock.Mutex versions map[int64]sessionutil.IndexEngineVersion } diff --git a/internal/datacoord/indexnode_manager.go b/internal/datacoord/indexnode_manager.go index d454743790..4cd8560fba 100644 --- a/internal/datacoord/indexnode_manager.go +++ b/internal/datacoord/indexnode_manager.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -36,7 +37,7 @@ import ( type IndexNodeManager struct { nodeClients map[UniqueID]types.IndexNodeClient stoppingNodes map[UniqueID]struct{} - lock sync.RWMutex + lock lock.RWMutex ctx context.Context indexNodeCreator indexNodeCreatorFunc } @@ -46,7 +47,7 @@ func NewNodeManager(ctx context.Context, indexNodeCreator indexNodeCreatorFunc) return &IndexNodeManager{ nodeClients: make(map[UniqueID]types.IndexNodeClient), stoppingNodes: make(map[UniqueID]struct{}), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, ctx: ctx, indexNodeCreator: indexNodeCreator, } @@ -109,7 +110,7 @@ func (nm *IndexNodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, type ctx, cancel := context.WithCancel(nm.ctx) var ( peekNodeID = UniqueID(0) - nodeMutex = sync.Mutex{} + nodeMutex = lock.Mutex{} wg = sync.WaitGroup{} ) @@ -165,7 +166,7 @@ func (nm *IndexNodeManager) ClientSupportDisk() bool { ctx, cancel := context.WithCancel(nm.ctx) var ( enableDisk = false - nodeMutex = sync.Mutex{} + nodeMutex = lock.Mutex{} wg = sync.WaitGroup{} ) diff --git a/internal/datacoord/indexnode_manager_test.go b/internal/datacoord/indexnode_manager_test.go index 6abad8c191..c3948a040f 100644 --- a/internal/datacoord/indexnode_manager_test.go +++ b/internal/datacoord/indexnode_manager_test.go @@ -18,7 +18,6 @@ package datacoord import ( "context" - "sync" "testing" "github.com/cockroachdb/errors" @@ -29,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -112,7 +112,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("support", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ Status: merr.Success(), @@ -130,7 +130,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("not support", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ Status: merr.Success(), @@ -148,7 +148,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("no indexnode", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{}, } @@ -159,7 +159,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("error", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(nil, err), }, @@ -172,7 +172,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("fail reason", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ Status: merr.Status(err), diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 0708d78f98..892166fc4b 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "math" - "sync" "time" "github.com/cockroachdb/errors" @@ -40,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -48,7 +48,7 @@ import ( ) type meta struct { - sync.RWMutex + lock.RWMutex ctx context.Context catalog metastore.DataCoordCatalog collections map[UniqueID]*collectionInfo // collection id to collection info @@ -60,7 +60,7 @@ type meta struct { } type channelCPs struct { - sync.RWMutex + lock.RWMutex checkpoints map[string]*msgpb.MsgPosition } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 5106c8a489..a113552e07 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -109,7 +110,7 @@ var _ Manager = (*SegmentManager)(nil) // SegmentManager handles L1 segment related logic type SegmentManager struct { meta *meta - mu sync.RWMutex + mu lock.RWMutex allocator allocator helper allocHelper segments []UniqueID diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index abe346ceee..5662b47a1b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -901,6 +901,7 @@ func (s *Server) handleSessionEvent(ctx context.Context, role string, event *ses if event == nil { return nil } + log := log.Ctx(ctx) switch role { case typeutil.DataNodeRole: info := &datapb.DataNodeInfo{ @@ -1019,6 +1020,7 @@ func (s *Server) startFlushLoop(ctx context.Context) { // 2. notify RootCoord segment is flushed // 3. change segment state to `Flushed` in meta func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { + log := log.Ctx(ctx) segment := s.meta.GetHealthySegment(segmentID) if segment == nil { return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush") diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 349c498617..c32dc62303 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -55,6 +55,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -3113,7 +3114,7 @@ func Test_CheckHealth(t *testing.T) { } sm := NewSessionManagerImpl() sm.sessions = struct { - sync.RWMutex + lock.RWMutex data map[int64]*Session }{data: map[int64]*Session{1: { client: healthClient, @@ -3139,7 +3140,7 @@ func Test_CheckHealth(t *testing.T) { } sm := NewSessionManagerImpl() sm.sessions = struct { - sync.RWMutex + lock.RWMutex data map[int64]*Session }{data: map[int64]*Session{1: { client: unhealthClient, diff --git a/internal/datacoord/session.go b/internal/datacoord/session.go index e3393281df..f77e1d28f6 100644 --- a/internal/datacoord/session.go +++ b/internal/datacoord/session.go @@ -19,11 +19,11 @@ package datacoord import ( "context" "fmt" - "sync" "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/util/lock" ) var errDisposed = errors.New("client is disposed") @@ -37,7 +37,7 @@ type NodeInfo struct { // Session contains session info of a node type Session struct { - sync.Mutex + lock.Mutex info *NodeInfo client types.DataNodeClient clientCreator dataNodeCreatorFunc diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 2f7c6d1152..7b3e018920 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sync" "time" "go.uber.org/zap" @@ -34,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" @@ -73,7 +73,7 @@ var _ SessionManager = (*SessionManagerImpl)(nil) // SessionManagerImpl provides the grpc interfaces of cluster type SessionManagerImpl struct { sessions struct { - sync.RWMutex + lock.RWMutex data map[int64]*Session } sessionCreator dataNodeCreatorFunc @@ -96,7 +96,7 @@ func defaultSessionCreator() dataNodeCreatorFunc { func NewSessionManagerImpl(options ...SessionOpt) *SessionManagerImpl { m := &SessionManagerImpl{ sessions: struct { - sync.RWMutex + lock.RWMutex data map[int64]*Session }{data: make(map[int64]*Session)}, sessionCreator: defaultSessionCreator(), diff --git a/pkg/go.mod b/pkg/go.mod index 2c69da0d0a..79f2f56ef8 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -21,6 +21,7 @@ require ( github.com/prometheus/client_golang v1.14.0 github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/samber/lo v1.27.0 + github.com/sasha-s/go-deadlock v0.3.1 github.com/shirou/gopsutil/v3 v3.22.9 github.com/sirupsen/logrus v1.9.0 github.com/spaolacci/murmur3 v1.1.0 @@ -119,6 +120,7 @@ require ( github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml v1.9.3 // indirect + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect diff --git a/pkg/go.sum b/pkg/go.sum index 7a169c4ad5..16b829e7a0 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -557,6 +557,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -635,6 +637,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/samber/lo v1.27.0 h1:GOyDWxsblvqYobqsmUuMddPa2/mMzkKyojlXol4+LaQ= github.com/samber/lo v1.27.0/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg= github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= +github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= +github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= diff --git a/pkg/util/lock/mutex.go b/pkg/util/lock/mutex.go new file mode 100644 index 0000000000..dc7f149997 --- /dev/null +++ b/pkg/util/lock/mutex.go @@ -0,0 +1,27 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !test + +package lock + +import "sync" + +// use `sync.Mutex` for production build +type Mutex = sync.Mutex + +// use `sync.RWMutex` for production build +type RWMutex = sync.RWMutex diff --git a/pkg/util/lock/mutex_deadlock.go b/pkg/util/lock/mutex_deadlock.go new file mode 100644 index 0000000000..783481f80a --- /dev/null +++ b/pkg/util/lock/mutex_deadlock.go @@ -0,0 +1,29 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build test + +package lock + +import ( + "github.com/sasha-s/go-deadlock" +) + +// use `deadlock.Mutex` for test build +type Mutex = deadlock.Mutex + +// use `deadlock.RWMutex` for test build +type RWMutex = deadlock.RWMutex diff --git a/scripts/run_go_codecov.sh b/scripts/run_go_codecov.sh index 177894ed8c..edac723fd8 100755 --- a/scripts/run_go_codecov.sh +++ b/scripts/run_go_codecov.sh @@ -36,14 +36,14 @@ fi # starting the timer beginTime=`date +%s` for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} rm profile.out fi done for d in $(go list ./cmd/tools/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out @@ -51,7 +51,7 @@ for d in $(go list ./cmd/tools/... | grep -v -e vendor -e kafka -e planparserv2/ done pushd pkg for d in $(go list ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 0aaad491ae..919d000fe3 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -60,111 +60,106 @@ done function test_proxy() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_querynode() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/querynodev2/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/querynode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/querynodev2/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/querynode/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_kv() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/kv/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/kv/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_mq() { -go test -race -cover -tags dynamic $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast -count=1 -ldflags="-r ${RPATH}" } function test_storage() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/storage" -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/storage" -failfast -count=1 -ldflags="-r ${RPATH}" } function test_allocator() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/allocator/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/allocator/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_tso() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/tso/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/tso/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_util() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/funcutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/funcutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" pushd pkg -go test -race -cover -tags dynamic "${PKG_DIR}/util/retry/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/util/retry/..." -failfast -count=1 -ldflags="-r ${RPATH}" popd -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_pkg() { pushd pkg -go test -race -cover -tags dynamic "${PKG_DIR}/common/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/config/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/log/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/mq/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/tracer/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/util/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/common/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/config/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/log/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/mq/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/tracer/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/util/..." -failfast -count=1 -ldflags="-r ${RPATH}" popd } function test_datanode { -go test -race -cover -tags dynamic "${MILVUS_DIR}/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_indexnode() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/indexnode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/indexnode/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_rootcoord() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/rootcoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/rootcoord" -failfast -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/rootcoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/rootcoord" -failfast -ldflags="-r ${RPATH}" } function test_datacoord() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_querycoord() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/querycoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/querycoordv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/querycoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/querycoordv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" } -#function test_indexcoord() -#{ -#go test -race -cover -tags dynamic "${MILVUS_DIR}/indexcoord/..." -failfast -#} - function test_metastore() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/metastore/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/metastore/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_cmd() { -go test -race -cover -tags dynamic "${ROOT_DIR}/cmd/tools/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${ROOT_DIR}/cmd/tools/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_all() @@ -176,7 +171,6 @@ test_indexnode test_rootcoord test_querycoord test_datacoord -#test_indexcoord test_kv test_mq test_storage @@ -212,9 +206,6 @@ case "${TEST_TAG}" in datacoord) test_datacoord ;; -# indexcoord) -# test_indexcoord -# ;; kv) test_kv ;;