diff --git a/go.mod b/go.mod index cd0495b7cc..611d60687d 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,6 @@ require ( github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 github.com/valyala/fastjson v1.6.4 - google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -235,6 +234,7 @@ require ( google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/internal/datacoord/analyze_meta.go b/internal/datacoord/analyze_meta.go index 3e543e2b9c..402e5ee6a3 100644 --- a/internal/datacoord/analyze_meta.go +++ b/internal/datacoord/analyze_meta.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sync" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -27,11 +26,12 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/timerecord" ) type analyzeMeta struct { - sync.RWMutex + lock.RWMutex ctx context.Context catalog metastore.DataCoordCatalog diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 1ba48aae45..42eda97490 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sync" "time" "github.com/samber/lo" @@ -33,13 +32,14 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" ) // ChannelManagerImpl manages the allocation and the balance between channels and data nodes. type ChannelManagerImpl struct { ctx context.Context - mu sync.RWMutex + mu lock.RWMutex h Handler store RWChannelStore factory ChannelPolicyFactory diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index 6cec89ad30..63e146cb1a 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -62,7 +63,7 @@ type SubCluster interface { type ChannelManagerImplV2 struct { cancel context.CancelFunc - mu sync.RWMutex + mu lock.RWMutex wg sync.WaitGroup h Handler diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index b47142b108..6f593ad39e 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -182,7 +183,7 @@ func (c *ClusterImpl) DropImport(nodeID int64, in *datapb.DropImportRequest) err func (c *ClusterImpl) QuerySlots() map[int64]int64 { nodeIDs := c.sessionManager.GetSessionIDs() nodeSlots := make(map[int64]int64) - mu := &sync.Mutex{} + mu := &lock.Mutex{} wg := &sync.WaitGroup{} for _, nodeID := range nodeIDs { wg.Add(1) diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index 71b58824c5..8047fd3850 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -18,7 +18,6 @@ package datacoord import ( "context" - "sync" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -26,11 +25,12 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/timerecord" ) type compactionTaskMeta struct { - sync.RWMutex + lock.RWMutex ctx context.Context catalog metastore.DataCoordCatalog // currently only clustering compaction task is stored in persist meta @@ -39,7 +39,7 @@ type compactionTaskMeta struct { func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*compactionTaskMeta, error) { csm := &compactionTaskMeta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0), diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 46e975160c..430ad0f93b 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -75,7 +76,7 @@ type compactionTrigger struct { signals chan *compactionSignal compactionHandler compactionPlanContext globalTrigger *time.Ticker - forceMu sync.Mutex + forceMu lock.Mutex quit chan struct{} wg sync.WaitGroup diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 2b36991889..93a96f7e37 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -25,7 +25,6 @@ import ( "path" "strconv" "strings" - "sync" "testing" "time" @@ -51,6 +50,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -361,7 +361,7 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta indexID = UniqueID(400) ) return &meta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, @@ -476,7 +476,7 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m }, } meta := &meta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, @@ -641,7 +641,7 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta }, } meta := &meta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, diff --git a/internal/datacoord/import_meta.go b/internal/datacoord/import_meta.go index 35483b22ee..debf5509e8 100644 --- a/internal/datacoord/import_meta.go +++ b/internal/datacoord/import_meta.go @@ -17,9 +17,8 @@ package datacoord import ( - "sync" - "github.com/milvus-io/milvus/internal/metastore" + "github.com/milvus-io/milvus/pkg/util/lock" ) type ImportMeta interface { @@ -37,7 +36,7 @@ type ImportMeta interface { } type importMeta struct { - mu sync.RWMutex // guards jobs and tasks + mu lock.RWMutex // guards jobs and tasks jobs map[int64]ImportJob tasks map[int64]ImportTask diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 9d6b9d4f24..453c4bd761 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" ) const ( @@ -147,7 +148,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 { return s.info.NodeID }) nodeSlots := make(map[int64]int64) - mu := &sync.Mutex{} + mu := &lock.Mutex{} wg := &sync.WaitGroup{} for _, nodeID := range nodeIDs { wg.Add(1) diff --git a/internal/datacoord/index_engine_version_manager.go b/internal/datacoord/index_engine_version_manager.go index 3c5d4d25ae..f8b39b51f5 100644 --- a/internal/datacoord/index_engine_version_manager.go +++ b/internal/datacoord/index_engine_version_manager.go @@ -2,12 +2,12 @@ package datacoord import ( "math" - "sync" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" ) type IndexEngineVersionManager interface { @@ -21,7 +21,7 @@ type IndexEngineVersionManager interface { } type versionManagerImpl struct { - mu sync.Mutex + mu lock.Mutex versions map[int64]sessionutil.IndexEngineVersion } diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index f257f472b6..0a2f978b80 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "strconv" - "sync" "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" @@ -35,12 +34,13 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) type indexMeta struct { - sync.RWMutex + lock.RWMutex ctx context.Context catalog metastore.DataCoordCatalog diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 196071abd9..6165a55cd6 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -19,7 +19,6 @@ package datacoord import ( "context" - "sync" "testing" "github.com/cockroachdb/errors" @@ -34,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/lock" ) func TestReloadFromKV(t *testing.T) { @@ -264,7 +264,7 @@ func TestMeta_HasSameReq(t *testing.T) { func newSegmentIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta { return &indexMeta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: context.Background(), catalog: catalog, indexes: make(map[UniqueID]map[UniqueID]*model.Index), diff --git a/internal/datacoord/indexnode_manager.go b/internal/datacoord/indexnode_manager.go index 711551012e..ab032e0639 100644 --- a/internal/datacoord/indexnode_manager.go +++ b/internal/datacoord/indexnode_manager.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -45,7 +46,7 @@ type WorkerManager interface { type IndexNodeManager struct { nodeClients map[UniqueID]types.IndexNodeClient stoppingNodes map[UniqueID]struct{} - lock sync.RWMutex + lock lock.RWMutex ctx context.Context indexNodeCreator indexNodeCreatorFunc } @@ -55,7 +56,7 @@ func NewNodeManager(ctx context.Context, indexNodeCreator indexNodeCreatorFunc) return &IndexNodeManager{ nodeClients: make(map[UniqueID]types.IndexNodeClient), stoppingNodes: make(map[UniqueID]struct{}), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, ctx: ctx, indexNodeCreator: indexNodeCreator, } @@ -114,7 +115,7 @@ func (nm *IndexNodeManager) PickClient() (UniqueID, types.IndexNodeClient) { ctx, cancel := context.WithCancel(nm.ctx) var ( pickNodeID = UniqueID(0) - nodeMutex = sync.Mutex{} + nodeMutex = lock.Mutex{} wg = sync.WaitGroup{} ) @@ -170,7 +171,7 @@ func (nm *IndexNodeManager) ClientSupportDisk() bool { ctx, cancel := context.WithCancel(nm.ctx) var ( enableDisk = false - nodeMutex = sync.Mutex{} + nodeMutex = lock.Mutex{} wg = sync.WaitGroup{} ) diff --git a/internal/datacoord/indexnode_manager_test.go b/internal/datacoord/indexnode_manager_test.go index fcacdee2fd..360953fea2 100644 --- a/internal/datacoord/indexnode_manager_test.go +++ b/internal/datacoord/indexnode_manager_test.go @@ -18,7 +18,6 @@ package datacoord import ( "context" - "sync" "testing" "github.com/cockroachdb/errors" @@ -28,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -108,7 +108,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("support", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ Status: merr.Success(), @@ -126,7 +126,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("not support", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ Status: merr.Success(), @@ -144,7 +144,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("no indexnode", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{}, } @@ -155,7 +155,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("error", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(nil, err), }, @@ -168,7 +168,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { t.Run("fail reason", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), - lock: sync.RWMutex{}, + lock: lock.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ Status: merr.Status(err), diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index aebf140a5a..6c7cfe0ef5 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -109,7 +110,7 @@ var _ Manager = (*SegmentManager)(nil) // SegmentManager handles L1 segment related logic type SegmentManager struct { meta *meta - mu sync.RWMutex + mu lock.RWMutex allocator allocator helper allocHelper segments []UniqueID diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index fd82d324ac..bc8303bac6 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -56,6 +56,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -3152,7 +3153,7 @@ func Test_CheckHealth(t *testing.T) { sm := NewSessionManagerImpl() sm.sessions = struct { - sync.RWMutex + lock.RWMutex data map[int64]*Session }{data: map[int64]*Session{1: { client: client, diff --git a/internal/datacoord/session.go b/internal/datacoord/session.go index e3393281df..f77e1d28f6 100644 --- a/internal/datacoord/session.go +++ b/internal/datacoord/session.go @@ -19,11 +19,11 @@ package datacoord import ( "context" "fmt" - "sync" "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/util/lock" ) var errDisposed = errors.New("client is disposed") @@ -37,7 +37,7 @@ type NodeInfo struct { // Session contains session info of a node type Session struct { - sync.Mutex + lock.Mutex info *NodeInfo client types.DataNodeClient clientCreator dataNodeCreatorFunc diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 3af66bc4de..399fe0232a 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sync" "time" "github.com/cockroachdb/errors" @@ -35,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" @@ -80,7 +80,7 @@ var _ SessionManager = (*SessionManagerImpl)(nil) // SessionManagerImpl provides the grpc interfaces of cluster type SessionManagerImpl struct { sessions struct { - sync.RWMutex + lock.RWMutex data map[int64]*Session } sessionCreator dataNodeCreatorFunc @@ -103,7 +103,7 @@ func defaultSessionCreator() dataNodeCreatorFunc { func NewSessionManagerImpl(options ...SessionOpt) *SessionManagerImpl { m := &SessionManagerImpl{ sessions: struct { - sync.RWMutex + lock.RWMutex data map[int64]*Session }{data: make(map[int64]*Session)}, sessionCreator: defaultSessionCreator(), diff --git a/internal/datacoord/sync_segments_scheduler_test.go b/internal/datacoord/sync_segments_scheduler_test.go index 1030a55e82..1cf982ba6a 100644 --- a/internal/datacoord/sync_segments_scheduler_test.go +++ b/internal/datacoord/sync_segments_scheduler_test.go @@ -17,7 +17,6 @@ package datacoord import ( - "sync" "sync/atomic" "testing" @@ -29,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/lock" ) type SyncSegmentsSchedulerSuite struct { @@ -259,7 +259,7 @@ func (s *SyncSegmentsSchedulerSuite) initParams() { }, } s.m = &meta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, collections: map[UniqueID]*collectionInfo{ 1: { ID: 1, diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 6b07689551..2a1a6494de 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" ) const ( @@ -34,7 +35,7 @@ const ( ) type taskScheduler struct { - sync.RWMutex + lock.RWMutex ctx context.Context cancel context.CancelFunc diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index fef5fb8fe8..b2ddaa2408 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sync" "testing" "time" @@ -38,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -946,7 +946,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { }, }, &indexMeta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, }) @@ -990,7 +990,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { workerManager := NewMockWorkerManager(s.T()) mt := createMeta(catalog, s.createAnalyzeMeta(catalog), &indexMeta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, }) @@ -1229,7 +1229,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { catalog: catalog, }, &indexMeta{ - RWMutex: sync.RWMutex{}, + RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, indexes: map[UniqueID]map[UniqueID]*model.Index{ diff --git a/pkg/go.mod b/pkg/go.mod index 49e9b30a90..addfed19ca 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -21,6 +21,7 @@ require ( github.com/prometheus/client_golang v1.14.0 github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/samber/lo v1.27.0 + github.com/sasha-s/go-deadlock v0.3.1 github.com/shirou/gopsutil/v3 v3.22.9 github.com/sirupsen/logrus v1.9.0 github.com/spaolacci/murmur3 v1.1.0 @@ -123,6 +124,7 @@ require ( github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml v1.9.3 // indirect + github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect diff --git a/pkg/go.sum b/pkg/go.sum index beff409b3c..c4a0a72efd 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -576,6 +576,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ= +github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -655,6 +657,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/samber/lo v1.27.0 h1:GOyDWxsblvqYobqsmUuMddPa2/mMzkKyojlXol4+LaQ= github.com/samber/lo v1.27.0/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg= github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= +github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= +github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= diff --git a/scripts/run_go_codecov.sh b/scripts/run_go_codecov.sh index 177894ed8c..174e730fa2 100755 --- a/scripts/run_go_codecov.sh +++ b/scripts/run_go_codecov.sh @@ -36,14 +36,14 @@ fi # starting the timer beginTime=`date +%s` for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} rm profile.out fi done for d in $(go list ./cmd/tools/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out @@ -51,7 +51,7 @@ for d in $(go list ./cmd/tools/... | grep -v -e vendor -e kafka -e planparserv2/ done pushd pkg for d in $(go list ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out @@ -61,7 +61,7 @@ popd # milvusclient pushd client for d in $(go list ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 0aaad491ae..0f542d3c91 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -60,111 +60,111 @@ done function test_proxy() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_querynode() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/querynodev2/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/querynode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/querynodev2/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/querynode/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_kv() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/kv/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/kv/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_mq() { -go test -race -cover -tags dynamic $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast -count=1 -ldflags="-r ${RPATH}" } function test_storage() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/storage" -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/storage" -failfast -count=1 -ldflags="-r ${RPATH}" } function test_allocator() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/allocator/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/allocator/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_tso() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/tso/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/tso/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_util() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/funcutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/funcutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" pushd pkg -go test -race -cover -tags dynamic "${PKG_DIR}/util/retry/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/util/retry/..." -failfast -count=1 -ldflags="-r ${RPATH}" popd -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_pkg() { pushd pkg -go test -race -cover -tags dynamic "${PKG_DIR}/common/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/config/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/log/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/mq/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/tracer/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${PKG_DIR}/util/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/common/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/config/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/log/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/mq/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/tracer/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${PKG_DIR}/util/..." -failfast -count=1 -ldflags="-r ${RPATH}" popd } function test_datanode { -go test -race -cover -tags dynamic "${MILVUS_DIR}/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_indexnode() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/indexnode/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/indexnode/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_rootcoord() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/rootcoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/rootcoord" -failfast -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/rootcoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/rootcoord" -failfast -ldflags="-r ${RPATH}" } function test_datacoord() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_querycoord() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/querycoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" -go test -race -cover -tags dynamic "${MILVUS_DIR}/querycoordv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/querycoord/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/querycoordv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" } #function test_indexcoord() #{ -#go test -race -cover -tags dynamic "${MILVUS_DIR}/indexcoord/..." -failfast +#go test -race -cover -tags dynamic,test "${MILVUS_DIR}/indexcoord/..." -failfast #} function test_metastore() { -go test -race -cover -tags dynamic "${MILVUS_DIR}/metastore/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/metastore/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_cmd() { -go test -race -cover -tags dynamic "${ROOT_DIR}/cmd/tools/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${ROOT_DIR}/cmd/tools/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_all()