mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Cherry-pick from master pr: #33063 See also #33062 This PR: - Add lock.RWMutex & lock.Mutex alias to switch implementation based on build flags - When build flags has test in it, use go-deadlock to detect possible deadlocks - Replace all sync.RWMutex & sync.Mutex in datacoord pkg Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
99586066f5
commit
6b348e4e91
2
go.mod
2
go.mod
@ -69,7 +69,6 @@ require (
|
||||
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/valyala/fastjson v1.6.4
|
||||
google.golang.org/protobuf v1.33.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@ -235,6 +234,7 @@ require (
|
||||
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||
|
||||
@ -19,7 +19,6 @@ package datacoord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
@ -27,11 +26,12 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
type analyzeMeta struct {
|
||||
sync.RWMutex
|
||||
lock.RWMutex
|
||||
|
||||
ctx context.Context
|
||||
catalog metastore.DataCoordCatalog
|
||||
|
||||
@ -19,7 +19,6 @@ package datacoord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
@ -33,13 +32,14 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
)
|
||||
|
||||
// ChannelManagerImpl manages the allocation and the balance between channels and data nodes.
|
||||
type ChannelManagerImpl struct {
|
||||
ctx context.Context
|
||||
mu sync.RWMutex
|
||||
mu lock.RWMutex
|
||||
h Handler
|
||||
store RWChannelStore
|
||||
factory ChannelPolicyFactory
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/kv"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
@ -62,7 +63,7 @@ type SubCluster interface {
|
||||
|
||||
type ChannelManagerImplV2 struct {
|
||||
cancel context.CancelFunc
|
||||
mu sync.RWMutex
|
||||
mu lock.RWMutex
|
||||
wg sync.WaitGroup
|
||||
|
||||
h Handler
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
@ -182,7 +183,7 @@ func (c *ClusterImpl) DropImport(nodeID int64, in *datapb.DropImportRequest) err
|
||||
func (c *ClusterImpl) QuerySlots() map[int64]int64 {
|
||||
nodeIDs := c.sessionManager.GetSessionIDs()
|
||||
nodeSlots := make(map[int64]int64)
|
||||
mu := &sync.Mutex{}
|
||||
mu := &lock.Mutex{}
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, nodeID := range nodeIDs {
|
||||
wg.Add(1)
|
||||
|
||||
@ -18,7 +18,6 @@ package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
@ -26,11 +25,12 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
type compactionTaskMeta struct {
|
||||
sync.RWMutex
|
||||
lock.RWMutex
|
||||
ctx context.Context
|
||||
catalog metastore.DataCoordCatalog
|
||||
// currently only clustering compaction task is stored in persist meta
|
||||
@ -39,7 +39,7 @@ type compactionTaskMeta struct {
|
||||
|
||||
func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*compactionTaskMeta, error) {
|
||||
csm := &compactionTaskMeta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
catalog: catalog,
|
||||
compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0),
|
||||
|
||||
@ -33,6 +33,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
@ -75,7 +76,7 @@ type compactionTrigger struct {
|
||||
signals chan *compactionSignal
|
||||
compactionHandler compactionPlanContext
|
||||
globalTrigger *time.Ticker
|
||||
forceMu sync.Mutex
|
||||
forceMu lock.Mutex
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
|
||||
@ -25,7 +25,6 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -51,6 +50,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
@ -361,7 +361,7 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta
|
||||
indexID = UniqueID(400)
|
||||
)
|
||||
return &meta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
catalog: catalog,
|
||||
collections: nil,
|
||||
@ -476,7 +476,7 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
|
||||
},
|
||||
}
|
||||
meta := &meta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
catalog: catalog,
|
||||
collections: nil,
|
||||
@ -641,7 +641,7 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta
|
||||
},
|
||||
}
|
||||
meta := &meta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
catalog: catalog,
|
||||
collections: nil,
|
||||
|
||||
@ -17,9 +17,8 @@
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
type ImportMeta interface {
|
||||
@ -37,7 +36,7 @@ type ImportMeta interface {
|
||||
}
|
||||
|
||||
type importMeta struct {
|
||||
mu sync.RWMutex // guards jobs and tasks
|
||||
mu lock.RWMutex // guards jobs and tasks
|
||||
jobs map[int64]ImportJob
|
||||
tasks map[int64]ImportTask
|
||||
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -147,7 +148,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 {
|
||||
return s.info.NodeID
|
||||
})
|
||||
nodeSlots := make(map[int64]int64)
|
||||
mu := &sync.Mutex{}
|
||||
mu := &lock.Mutex{}
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, nodeID := range nodeIDs {
|
||||
wg.Add(1)
|
||||
|
||||
@ -2,12 +2,12 @@ package datacoord
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
type IndexEngineVersionManager interface {
|
||||
@ -21,7 +21,7 @@ type IndexEngineVersionManager interface {
|
||||
}
|
||||
|
||||
type versionManagerImpl struct {
|
||||
mu sync.Mutex
|
||||
mu lock.Mutex
|
||||
versions map[int64]sessionutil.IndexEngineVersion
|
||||
}
|
||||
|
||||
|
||||
@ -21,7 +21,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -35,12 +34,13 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type indexMeta struct {
|
||||
sync.RWMutex
|
||||
lock.RWMutex
|
||||
ctx context.Context
|
||||
catalog metastore.DataCoordCatalog
|
||||
|
||||
|
||||
@ -19,7 +19,6 @@ package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -34,6 +33,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
func TestReloadFromKV(t *testing.T) {
|
||||
@ -264,7 +264,7 @@ func TestMeta_HasSameReq(t *testing.T) {
|
||||
|
||||
func newSegmentIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
|
||||
return &indexMeta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: context.Background(),
|
||||
catalog: catalog,
|
||||
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
@ -45,7 +46,7 @@ type WorkerManager interface {
|
||||
type IndexNodeManager struct {
|
||||
nodeClients map[UniqueID]types.IndexNodeClient
|
||||
stoppingNodes map[UniqueID]struct{}
|
||||
lock sync.RWMutex
|
||||
lock lock.RWMutex
|
||||
ctx context.Context
|
||||
indexNodeCreator indexNodeCreatorFunc
|
||||
}
|
||||
@ -55,7 +56,7 @@ func NewNodeManager(ctx context.Context, indexNodeCreator indexNodeCreatorFunc)
|
||||
return &IndexNodeManager{
|
||||
nodeClients: make(map[UniqueID]types.IndexNodeClient),
|
||||
stoppingNodes: make(map[UniqueID]struct{}),
|
||||
lock: sync.RWMutex{},
|
||||
lock: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
indexNodeCreator: indexNodeCreator,
|
||||
}
|
||||
@ -114,7 +115,7 @@ func (nm *IndexNodeManager) PickClient() (UniqueID, types.IndexNodeClient) {
|
||||
ctx, cancel := context.WithCancel(nm.ctx)
|
||||
var (
|
||||
pickNodeID = UniqueID(0)
|
||||
nodeMutex = sync.Mutex{}
|
||||
nodeMutex = lock.Mutex{}
|
||||
wg = sync.WaitGroup{}
|
||||
)
|
||||
|
||||
@ -170,7 +171,7 @@ func (nm *IndexNodeManager) ClientSupportDisk() bool {
|
||||
ctx, cancel := context.WithCancel(nm.ctx)
|
||||
var (
|
||||
enableDisk = false
|
||||
nodeMutex = sync.Mutex{}
|
||||
nodeMutex = lock.Mutex{}
|
||||
wg = sync.WaitGroup{}
|
||||
)
|
||||
|
||||
|
||||
@ -18,7 +18,6 @@ package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -28,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
@ -108,7 +108,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
|
||||
t.Run("support", func(t *testing.T) {
|
||||
nm := &IndexNodeManager{
|
||||
ctx: context.Background(),
|
||||
lock: sync.RWMutex{},
|
||||
lock: lock.RWMutex{},
|
||||
nodeClients: map[UniqueID]types.IndexNodeClient{
|
||||
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
|
||||
Status: merr.Success(),
|
||||
@ -126,7 +126,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
|
||||
t.Run("not support", func(t *testing.T) {
|
||||
nm := &IndexNodeManager{
|
||||
ctx: context.Background(),
|
||||
lock: sync.RWMutex{},
|
||||
lock: lock.RWMutex{},
|
||||
nodeClients: map[UniqueID]types.IndexNodeClient{
|
||||
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
|
||||
Status: merr.Success(),
|
||||
@ -144,7 +144,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
|
||||
t.Run("no indexnode", func(t *testing.T) {
|
||||
nm := &IndexNodeManager{
|
||||
ctx: context.Background(),
|
||||
lock: sync.RWMutex{},
|
||||
lock: lock.RWMutex{},
|
||||
nodeClients: map[UniqueID]types.IndexNodeClient{},
|
||||
}
|
||||
|
||||
@ -155,7 +155,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
|
||||
t.Run("error", func(t *testing.T) {
|
||||
nm := &IndexNodeManager{
|
||||
ctx: context.Background(),
|
||||
lock: sync.RWMutex{},
|
||||
lock: lock.RWMutex{},
|
||||
nodeClients: map[UniqueID]types.IndexNodeClient{
|
||||
1: getMockedGetJobStatsClient(nil, err),
|
||||
},
|
||||
@ -168,7 +168,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
|
||||
t.Run("fail reason", func(t *testing.T) {
|
||||
nm := &IndexNodeManager{
|
||||
ctx: context.Background(),
|
||||
lock: sync.RWMutex{},
|
||||
lock: lock.RWMutex{},
|
||||
nodeClients: map[UniqueID]types.IndexNodeClient{
|
||||
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
|
||||
Status: merr.Status(err),
|
||||
|
||||
@ -32,6 +32,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -109,7 +110,7 @@ var _ Manager = (*SegmentManager)(nil)
|
||||
// SegmentManager handles L1 segment related logic
|
||||
type SegmentManager struct {
|
||||
meta *meta
|
||||
mu sync.RWMutex
|
||||
mu lock.RWMutex
|
||||
allocator allocator
|
||||
helper allocHelper
|
||||
segments []UniqueID
|
||||
|
||||
@ -56,6 +56,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
@ -3152,7 +3153,7 @@ func Test_CheckHealth(t *testing.T) {
|
||||
|
||||
sm := NewSessionManagerImpl()
|
||||
sm.sessions = struct {
|
||||
sync.RWMutex
|
||||
lock.RWMutex
|
||||
data map[int64]*Session
|
||||
}{data: map[int64]*Session{1: {
|
||||
client: client,
|
||||
|
||||
@ -19,11 +19,11 @@ package datacoord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
var errDisposed = errors.New("client is disposed")
|
||||
@ -37,7 +37,7 @@ type NodeInfo struct {
|
||||
|
||||
// Session contains session info of a node
|
||||
type Session struct {
|
||||
sync.Mutex
|
||||
lock.Mutex
|
||||
info *NodeInfo
|
||||
client types.DataNodeClient
|
||||
clientCreator dataNodeCreatorFunc
|
||||
|
||||
@ -19,7 +19,6 @@ package datacoord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -35,6 +34,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
@ -80,7 +80,7 @@ var _ SessionManager = (*SessionManagerImpl)(nil)
|
||||
// SessionManagerImpl provides the grpc interfaces of cluster
|
||||
type SessionManagerImpl struct {
|
||||
sessions struct {
|
||||
sync.RWMutex
|
||||
lock.RWMutex
|
||||
data map[int64]*Session
|
||||
}
|
||||
sessionCreator dataNodeCreatorFunc
|
||||
@ -103,7 +103,7 @@ func defaultSessionCreator() dataNodeCreatorFunc {
|
||||
func NewSessionManagerImpl(options ...SessionOpt) *SessionManagerImpl {
|
||||
m := &SessionManagerImpl{
|
||||
sessions: struct {
|
||||
sync.RWMutex
|
||||
lock.RWMutex
|
||||
data map[int64]*Session
|
||||
}{data: make(map[int64]*Session)},
|
||||
sessionCreator: defaultSessionCreator(),
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
@ -29,6 +28,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
type SyncSegmentsSchedulerSuite struct {
|
||||
@ -259,7 +259,7 @@ func (s *SyncSegmentsSchedulerSuite) initParams() {
|
||||
},
|
||||
}
|
||||
s.m = &meta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
collections: map[UniqueID]*collectionInfo{
|
||||
1: {
|
||||
ID: 1,
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -34,7 +35,7 @@ const (
|
||||
)
|
||||
|
||||
type taskScheduler struct {
|
||||
sync.RWMutex
|
||||
lock.RWMutex
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
@ -19,7 +19,6 @@ package datacoord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -38,6 +37,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
@ -946,7 +946,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
|
||||
},
|
||||
},
|
||||
&indexMeta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
catalog: catalog,
|
||||
})
|
||||
@ -990,7 +990,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
|
||||
workerManager := NewMockWorkerManager(s.T())
|
||||
|
||||
mt := createMeta(catalog, s.createAnalyzeMeta(catalog), &indexMeta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
catalog: catalog,
|
||||
})
|
||||
@ -1229,7 +1229,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
|
||||
catalog: catalog,
|
||||
},
|
||||
&indexMeta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
RWMutex: lock.RWMutex{},
|
||||
ctx: ctx,
|
||||
catalog: catalog,
|
||||
indexes: map[UniqueID]map[UniqueID]*model.Index{
|
||||
|
||||
@ -21,6 +21,7 @@ require (
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/quasilyte/go-ruleguard/dsl v0.3.22
|
||||
github.com/samber/lo v1.27.0
|
||||
github.com/sasha-s/go-deadlock v0.3.1
|
||||
github.com/shirou/gopsutil/v3 v3.22.9
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
github.com/spaolacci/murmur3 v1.1.0
|
||||
@ -123,6 +124,7 @@ require (
|
||||
github.com/opencontainers/runtime-spec v1.0.2 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/pelletier/go-toml v1.9.3 // indirect
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
|
||||
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
|
||||
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
|
||||
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect
|
||||
|
||||
@ -576,6 +576,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ=
|
||||
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
|
||||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
|
||||
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
@ -655,6 +657,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo
|
||||
github.com/samber/lo v1.27.0 h1:GOyDWxsblvqYobqsmUuMddPa2/mMzkKyojlXol4+LaQ=
|
||||
github.com/samber/lo v1.27.0/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg=
|
||||
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
|
||||
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
|
||||
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
|
||||
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
||||
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
||||
|
||||
@ -36,14 +36,14 @@ fi
|
||||
# starting the timer
|
||||
beginTime=`date +%s`
|
||||
for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do
|
||||
$TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
$TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
if [ -f profile.out ]; then
|
||||
grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO}
|
||||
rm profile.out
|
||||
fi
|
||||
done
|
||||
for d in $(go list ./cmd/tools/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do
|
||||
$TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
$TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
if [ -f profile.out ]; then
|
||||
grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO}
|
||||
rm profile.out
|
||||
@ -51,7 +51,7 @@ for d in $(go list ./cmd/tools/... | grep -v -e vendor -e kafka -e planparserv2/
|
||||
done
|
||||
pushd pkg
|
||||
for d in $(go list ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do
|
||||
$TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
$TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
if [ -f profile.out ]; then
|
||||
grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO}
|
||||
rm profile.out
|
||||
@ -61,7 +61,7 @@ popd
|
||||
# milvusclient
|
||||
pushd client
|
||||
for d in $(go list ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do
|
||||
$TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
$TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
|
||||
if [ -f profile.out ]; then
|
||||
grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO}
|
||||
rm profile.out
|
||||
|
||||
@ -60,111 +60,111 @@ done
|
||||
|
||||
function test_proxy()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/proxy/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_querynode()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/querynodev2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/querynode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/querynodev2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/querynode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
|
||||
function test_kv()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/kv/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/kv/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_mq()
|
||||
{
|
||||
go test -race -cover -tags dynamic $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_storage()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/storage" -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/storage" -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_allocator()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/allocator/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/allocator/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_tso()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/tso/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/tso/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_util()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/funcutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/funcutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
pushd pkg
|
||||
go test -race -cover -tags dynamic "${PKG_DIR}/util/retry/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${PKG_DIR}/util/retry/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
popd
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/sessionutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_pkg()
|
||||
{
|
||||
pushd pkg
|
||||
go test -race -cover -tags dynamic "${PKG_DIR}/common/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${PKG_DIR}/config/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${PKG_DIR}/log/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${PKG_DIR}/mq/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${PKG_DIR}/tracer/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${PKG_DIR}/util/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${PKG_DIR}/common/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${PKG_DIR}/config/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${PKG_DIR}/log/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${PKG_DIR}/mq/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${PKG_DIR}/tracer/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${PKG_DIR}/util/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
popd
|
||||
}
|
||||
|
||||
function test_datanode
|
||||
{
|
||||
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/datanode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
|
||||
}
|
||||
|
||||
function test_indexnode()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/indexnode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/indexnode/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_rootcoord()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/rootcoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/rootcoord" -failfast -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/rootcoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/rootcoord" -failfast -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_datacoord()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/datacoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_querycoord()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/distributed/querycoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/querycoordv2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/distributed/querycoord/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/querycoordv2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
#function test_indexcoord()
|
||||
#{
|
||||
#go test -race -cover -tags dynamic "${MILVUS_DIR}/indexcoord/..." -failfast
|
||||
#go test -race -cover -tags dynamic,test "${MILVUS_DIR}/indexcoord/..." -failfast
|
||||
#}
|
||||
|
||||
function test_metastore()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${MILVUS_DIR}/metastore/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${MILVUS_DIR}/metastore/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_cmd()
|
||||
{
|
||||
go test -race -cover -tags dynamic "${ROOT_DIR}/cmd/tools/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
go test -race -cover -tags dynamic,test "${ROOT_DIR}/cmd/tools/..." -failfast -count=1 -ldflags="-r ${RPATH}"
|
||||
}
|
||||
|
||||
function test_all()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user