diff --git a/go.mod b/go.mod index fea7099f84..fb12c41466 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/bytedance/sonic v1.12.2 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cockroachdb/redact v1.1.3 + github.com/goccy/go-json v0.10.3 github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jolestar/go-commons-pool/v2 v2.1.2 @@ -131,7 +132,6 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/goccy/go-json v0.10.3 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 89f4787711..116d1f2597 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -50,7 +50,7 @@ func (s *L0CompactionTaskSuite) SetupTest() { s.mockMeta = NewMockCompactionMeta(s.T()) s.mockSessMgr = session.NewMockDataNodeManager(s.T()) s.mockAlloc = allocator.NewMockAllocator(s.T()) - //s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + // s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) } func (s *L0CompactionTaskSuite) SetupSubTest() { diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index cdc536eb1d..b055186dd4 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -18,23 +18,43 @@ package datacoord import ( "context" + "encoding/json" "sync" + "time" + "github.com/hashicorp/golang-lru/v2/expirable" "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/timerecord" ) +func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.CompactionTask { + return &metricsinfo.CompactionTask{ + PlanID: task.PlanID, + CollectionID: task.CollectionID, + Type: task.Type.String(), + State: task.State.String(), + FailReason: task.FailReason, + StartTime: task.StartTime, + EndTime: task.EndTime, + TotalRows: task.TotalRows, + InputSegments: task.InputSegments, + ResultSegments: task.ResultSegments, + } +} + type compactionTaskMeta struct { sync.RWMutex ctx context.Context catalog metastore.DataCoordCatalog // currently only clustering compaction task is stored in persist meta compactionTasks map[int64]map[int64]*datapb.CompactionTask // triggerID -> planID + taskStats *expirable.LRU[UniqueID, *metricsinfo.CompactionTask] } func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*compactionTaskMeta, error) { @@ -43,6 +63,7 @@ func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatal ctx: ctx, catalog: catalog, compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0), + taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](1024, nil, time.Minute*60), } if err := csm.reloadFromKV(); err != nil { return nil, err @@ -125,16 +146,17 @@ func (csm *compactionTaskMeta) SaveCompactionTask(task *datapb.CompactionTask) e log.Error("meta update: update compaction task fail", zap.Error(err)) return err } - return csm.saveCompactionTaskMemory(task) + csm.saveCompactionTaskMemory(task) + return nil } -func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) error { +func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) { _, triggerIDExist := csm.compactionTasks[task.TriggerID] if !triggerIDExist { csm.compactionTasks[task.TriggerID] = make(map[int64]*datapb.CompactionTask, 0) } csm.compactionTasks[task.TriggerID][task.PlanID] = task - return nil + csm.taskStats.Add(task.PlanID, newCompactionTaskStats(task)) } func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) error { @@ -153,3 +175,16 @@ func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) e } return nil } + +func (csm *compactionTaskMeta) TaskStatsJSON() string { + tasks := csm.taskStats.Values() + if len(tasks) == 0 { + return "" + } + + ret, err := json.Marshal(tasks) + if err != nil { + return "" + } + return string(ret) +} diff --git a/internal/datacoord/compaction_task_meta_test.go b/internal/datacoord/compaction_task_meta_test.go index 90ee04882f..48050fb640 100644 --- a/internal/datacoord/compaction_task_meta_test.go +++ b/internal/datacoord/compaction_task_meta_test.go @@ -18,13 +18,16 @@ package datacoord import ( "context" + "encoding/json" "testing" + "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) func TestCompactionTaskMetaSuite(t *testing.T) { @@ -79,3 +82,49 @@ func (suite *CompactionTaskMetaSuite) TestGetCompactionTasksByCollectionAbnormal res := suite.meta.GetCompactionTasksByCollection(101) suite.Equal(1, len(res)) } + +func (suite *CompactionTaskMetaSuite) TestTaskStatsJSON() { + task1 := &datapb.CompactionTask{ + PlanID: 1, + CollectionID: 100, + Type: datapb.CompactionType_MergeCompaction, + State: datapb.CompactionTaskState_completed, + FailReason: "", + StartTime: time.Now().Unix(), + EndTime: time.Now().Add(time.Hour).Unix(), + TotalRows: 1000, + InputSegments: []int64{1, 2}, + ResultSegments: []int64{3}, + } + task2 := &datapb.CompactionTask{ + PlanID: 2, + CollectionID: 101, + Type: datapb.CompactionType_MergeCompaction, + State: datapb.CompactionTaskState_completed, + FailReason: "", + StartTime: time.Now().Unix(), + EndTime: time.Now().Add(time.Hour).Unix(), + TotalRows: 2000, + InputSegments: []int64{4, 5}, + ResultSegments: []int64{6}, + } + + // testing return empty string + actualJSON := suite.meta.TaskStatsJSON() + suite.Equal("", actualJSON) + + err := suite.meta.SaveCompactionTask(task1) + suite.NoError(err) + err = suite.meta.SaveCompactionTask(task2) + suite.NoError(err) + + expectedTasks := []*metricsinfo.CompactionTask{ + newCompactionTaskStats(task1), + newCompactionTaskStats(task2), + } + expectedJSON, err := json.Marshal(expectedTasks) + suite.NoError(err) + + actualJSON = suite.meta.TaskStatsJSON() + suite.JSONEq(string(expectedJSON), actualJSON) +} diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 009cf85517..ec345b4b75 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -133,7 +133,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { plans []*datapb.CompactionPlan expectedOut []UniqueID // planID }{ - {"with L0 tasks diff channel", + { + "with L0 tasks diff channel", []CompactionTask{ newL0CompactionTask(&datapb.CompactionTask{ PlanID: 10, @@ -156,7 +157,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { }, []UniqueID{10, 11}, }, - {"with L0 tasks same channel", + { + "with L0 tasks same channel", []CompactionTask{ newMixCompactionTask(&datapb.CompactionTask{ PlanID: 11, @@ -179,7 +181,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { }, []UniqueID{10}, }, - {"without L0 tasks", + { + "without L0 tasks", []CompactionTask{ newMixCompactionTask(&datapb.CompactionTask{ PlanID: 14, @@ -202,10 +205,12 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { }, []UniqueID{13, 14}, }, - {"empty tasks", + { + "empty tasks", []CompactionTask{}, []*datapb.CompactionPlan{}, - []UniqueID{}}, + []UniqueID{}, + }, } for _, test := range tests { @@ -235,7 +240,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { plans []*datapb.CompactionPlan expectedOut []UniqueID // planID }{ - {"with L0 tasks diff channel", + { + "with L0 tasks diff channel", []CompactionTask{ newL0CompactionTask(&datapb.CompactionTask{ PlanID: 10, @@ -255,29 +261,31 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { []*datapb.CompactionPlan{{}, {}}, []UniqueID{10, 11}, }, - {"with L0 tasks same channel", []CompactionTask{ - newL0CompactionTask(&datapb.CompactionTask{ - PlanID: 10, - Type: datapb.CompactionType_Level0DeleteCompaction, - State: datapb.CompactionTaskState_pipelining, - Channel: "ch-11", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - newMixCompactionTask(&datapb.CompactionTask{ - PlanID: 11, - Type: datapb.CompactionType_MixCompaction, - State: datapb.CompactionTaskState_pipelining, - Channel: "ch-11", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - newMixCompactionTask(&datapb.CompactionTask{ - PlanID: 13, - Type: datapb.CompactionType_MixCompaction, - State: datapb.CompactionTaskState_pipelining, - Channel: "ch-3", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - }, + { + "with L0 tasks same channel", + []CompactionTask{ + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 10, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + newMixCompactionTask(&datapb.CompactionTask{ + PlanID: 11, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + newMixCompactionTask(&datapb.CompactionTask{ + PlanID: 13, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-3", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + }, []*datapb.CompactionPlan{ {PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}, {PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, @@ -285,29 +293,31 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { }, []UniqueID{10, 13}, }, - {"with multiple L0 tasks same channel", []CompactionTask{ - newL0CompactionTask(&datapb.CompactionTask{ - PlanID: 10, - Type: datapb.CompactionType_Level0DeleteCompaction, - State: datapb.CompactionTaskState_pipelining, - Channel: "ch-11", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - newL0CompactionTask(&datapb.CompactionTask{ - PlanID: 11, - Type: datapb.CompactionType_Level0DeleteCompaction, - State: datapb.CompactionTaskState_pipelining, - Channel: "ch-11", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - newL0CompactionTask(&datapb.CompactionTask{ - PlanID: 12, - Type: datapb.CompactionType_Level0DeleteCompaction, - State: datapb.CompactionTaskState_pipelining, - Channel: "ch-11", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - }, + { + "with multiple L0 tasks same channel", + []CompactionTask{ + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 10, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 11, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 12, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_pipelining, + Channel: "ch-11", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + }, []*datapb.CompactionPlan{ {PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}, {PlanID: 11, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}, @@ -315,25 +325,28 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { }, []UniqueID{10, 11, 12}, }, - {"without L0 tasks", []CompactionTask{ - newMixCompactionTask(&datapb.CompactionTask{ - PlanID: 14, - Type: datapb.CompactionType_MixCompaction, - Channel: "ch-3", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - newMixCompactionTask(&datapb.CompactionTask{ - PlanID: 13, - Type: datapb.CompactionType_MixCompaction, - Channel: "ch-11", - NodeID: 102, - }, nil, s.mockMeta, s.mockSessMgr), - }, + { + "without L0 tasks", + []CompactionTask{ + newMixCompactionTask(&datapb.CompactionTask{ + PlanID: 14, + Type: datapb.CompactionType_MixCompaction, + Channel: "ch-3", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + newMixCompactionTask(&datapb.CompactionTask{ + PlanID: 13, + Type: datapb.CompactionType_MixCompaction, + Channel: "ch-11", + NodeID: 102, + }, nil, s.mockMeta, s.mockSessMgr), + }, []*datapb.CompactionPlan{ {PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction}, {}, }, - []UniqueID{13, 14}}, + []UniqueID{13, 14}, + }, {"empty tasks", []CompactionTask{}, []*datapb.CompactionPlan{}, []UniqueID{}}, } diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 8e7d98ab36..bd444bd682 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -416,7 +416,7 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta }, }, }, - buildID2SegmentIndex: nil, + segmentBuildInfo: newSegmentIndexBuildInfo(), }, } } @@ -524,47 +524,47 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m }, }, }, - indexes: map[UniqueID]map[UniqueID]*model.Index{}, - buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID, - NodeID: 1, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 10, - IndexFileKeys: []string{"file1", "file2"}, - IndexSize: 0, - WriteHandoff: false, - }, - buildID + 1: { - SegmentID: segID + 1, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 1, - NodeID: 1, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 10, - IndexFileKeys: []string{"file1", "file2"}, - IndexSize: 0, - WriteHandoff: false, - }, - }, + indexes: map[UniqueID]map[UniqueID]*model.Index{}, + segmentBuildInfo: newSegmentIndexBuildInfo(), }, channelCPs: nil, chunkManager: nil, } + + meta.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID, + NodeID: 1, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 10, + IndexFileKeys: []string{"file1", "file2"}, + IndexSize: 0, + }) + + meta.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 1, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 1, + NodeID: 1, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 10, + IndexFileKeys: []string{"file1", "file2"}, + IndexSize: 0, + }) + for id, segment := range segments { meta.segments.SetSegment(id, segment) } @@ -706,45 +706,43 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta }, }, }, - buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID, - NodeID: 1, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 10, - IndexFileKeys: []string{"file1", "file2"}, - IndexSize: 0, - WriteHandoff: false, - }, - buildID + 1: { - SegmentID: segID + 1, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 1, - NodeID: 1, - IndexVersion: 1, - IndexState: commonpb.IndexState_InProgress, - FailReason: "", - IsDeleted: false, - CreateTime: 10, - IndexFileKeys: nil, - IndexSize: 0, - WriteHandoff: false, - }, - }, + segmentBuildInfo: newSegmentIndexBuildInfo(), }, } - + meta.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID, + NodeID: 1, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 10, + IndexFileKeys: []string{"file1", "file2"}, + IndexSize: 0, + WriteHandoff: false, + }) + meta.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 1, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 1, + NodeID: 1, + IndexVersion: 1, + IndexState: commonpb.IndexState_InProgress, + FailReason: "", + IsDeleted: false, + CreateTime: 10, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }) for id, segment := range segments { meta.segments.SetSegment(id, segment) } @@ -1091,43 +1089,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { }, }, }, - - buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 5000, - IndexID: indexID, - BuildID: buildID, - NodeID: 0, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 0, - IndexFileKeys: []string{"file1", "file2"}, - IndexSize: 1024, - WriteHandoff: false, - }, - buildID + 1: { - SegmentID: segID + 1, - CollectionID: collID, - PartitionID: partID, - NumRows: 5000, - IndexID: indexID, - BuildID: buildID + 1, - NodeID: 0, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 0, - IndexFileKeys: []string{"file3", "file4"}, - IndexSize: 1024, - WriteHandoff: false, - }, - }, + segmentBuildInfo: newSegmentIndexBuildInfo(), indexes: map[UniqueID]map[UniqueID]*model.Index{ collID: { indexID: { @@ -1175,6 +1137,42 @@ func TestGarbageCollector_clearETCD(t *testing.T) { }, } + m.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 5000, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: []string{"file1", "file2"}, + IndexSize: 1024, + WriteHandoff: false, + }) + + m.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 1, + CollectionID: collID, + PartitionID: partID, + NumRows: 5000, + IndexID: indexID, + BuildID: buildID + 1, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: []string{"file3", "file4"}, + IndexSize: 1024, + WriteHandoff: false, + }) + for id, segment := range segments { m.segments.SetSegment(id, segment) } diff --git a/internal/datacoord/import_meta.go b/internal/datacoord/import_meta.go index 606a33a9fa..da142157b5 100644 --- a/internal/datacoord/import_meta.go +++ b/internal/datacoord/import_meta.go @@ -17,6 +17,12 @@ package datacoord import ( + "encoding/json" + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "golang.org/x/exp/maps" + "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -35,13 +41,54 @@ type ImportMeta interface { GetTask(taskID int64) ImportTask GetTaskBy(filters ...ImportTaskFilter) []ImportTask RemoveTask(taskID int64) error + TaskStatsJSON() string +} + +type importTasks struct { + tasks map[int64]ImportTask + taskStats *expirable.LRU[int64, ImportTask] +} + +func newImportTasks() *importTasks { + return &importTasks{ + tasks: make(map[int64]ImportTask), + taskStats: expirable.NewLRU[UniqueID, ImportTask](4096, nil, time.Minute*60), + } +} + +func (t *importTasks) get(taskID int64) ImportTask { + ret, ok := t.tasks[taskID] + if !ok { + return nil + } + return ret +} + +func (t *importTasks) add(task ImportTask) { + t.tasks[task.GetTaskID()] = task + t.taskStats.Add(task.GetTaskID(), task) +} + +func (t *importTasks) remove(taskID int64) { + task, ok := t.tasks[taskID] + if ok { + delete(t.tasks, taskID) + t.taskStats.Add(task.GetTaskID(), task) + } +} + +func (t *importTasks) listTasks() []ImportTask { + return maps.Values(t.tasks) +} + +func (t *importTasks) listTaskStats() []ImportTask { + return t.taskStats.Values() } type importMeta struct { - mu lock.RWMutex // guards jobs and tasks - jobs map[int64]ImportJob - tasks map[int64]ImportTask - + mu lock.RWMutex // guards jobs and tasks + jobs map[int64]ImportJob + tasks *importTasks catalog metastore.DataCoordCatalog } @@ -59,18 +106,19 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) { return nil, err } - tasks := make(map[int64]ImportTask) + tasks := newImportTasks() + for _, task := range restoredPreImportTasks { - tasks[task.GetTaskID()] = &preImportTask{ + tasks.add(&preImportTask{ PreImportTask: task, tr: timerecord.NewTimeRecorder("preimport task"), - } + }) } for _, task := range restoredImportTasks { - tasks[task.GetTaskID()] = &importTask{ + tasks.add(&importTask{ ImportTaskV2: task, tr: timerecord.NewTimeRecorder("import task"), - } + }) } jobs := make(map[int64]ImportJob) @@ -170,13 +218,13 @@ func (m *importMeta) AddTask(task ImportTask) error { if err != nil { return err } - m.tasks[task.GetTaskID()] = task + m.tasks.add(task) case ImportTaskType: err := m.catalog.SaveImportTask(task.(*importTask).ImportTaskV2) if err != nil { return err } - m.tasks[task.GetTaskID()] = task + m.tasks.add(task) } return nil } @@ -184,7 +232,7 @@ func (m *importMeta) AddTask(task ImportTask) error { func (m *importMeta) UpdateTask(taskID int64, actions ...UpdateAction) error { m.mu.Lock() defer m.mu.Unlock() - if task, ok := m.tasks[taskID]; ok { + if task := m.tasks.get(taskID); task != nil { updatedTask := task.Clone() for _, action := range actions { action(updatedTask) @@ -195,13 +243,13 @@ func (m *importMeta) UpdateTask(taskID int64, actions ...UpdateAction) error { if err != nil { return err } - m.tasks[updatedTask.GetTaskID()] = updatedTask + m.tasks.add(updatedTask) case ImportTaskType: err := m.catalog.SaveImportTask(updatedTask.(*importTask).ImportTaskV2) if err != nil { return err } - m.tasks[updatedTask.GetTaskID()] = updatedTask + m.tasks.add(updatedTask) } } @@ -211,7 +259,7 @@ func (m *importMeta) UpdateTask(taskID int64, actions ...UpdateAction) error { func (m *importMeta) GetTask(taskID int64) ImportTask { m.mu.RLock() defer m.mu.RUnlock() - return m.tasks[taskID] + return m.tasks.get(taskID) } func (m *importMeta) GetTaskBy(filters ...ImportTaskFilter) []ImportTask { @@ -219,7 +267,7 @@ func (m *importMeta) GetTaskBy(filters ...ImportTaskFilter) []ImportTask { defer m.mu.RUnlock() ret := make([]ImportTask, 0) OUTER: - for _, task := range m.tasks { + for _, task := range m.tasks.listTasks() { for _, f := range filters { if !f(task) { continue OUTER @@ -233,7 +281,7 @@ OUTER: func (m *importMeta) RemoveTask(taskID int64) error { m.mu.Lock() defer m.mu.Unlock() - if task, ok := m.tasks[taskID]; ok { + if task := m.tasks.get(taskID); task != nil { switch task.GetType() { case PreImportTaskType: err := m.catalog.DropPreImportTask(taskID) @@ -246,7 +294,20 @@ func (m *importMeta) RemoveTask(taskID int64) error { return err } } - delete(m.tasks, taskID) + m.tasks.remove(taskID) } return nil } + +func (m *importMeta) TaskStatsJSON() string { + tasks := m.tasks.listTaskStats() + if len(tasks) == 0 { + return "" + } + + ret, err := json.Marshal(tasks) + if err != nil { + return "" + } + return string(ret) +} diff --git a/internal/datacoord/import_meta_test.go b/internal/datacoord/import_meta_test.go index 667a9ab1ab..c61abbf69e 100644 --- a/internal/datacoord/import_meta_test.go +++ b/internal/datacoord/import_meta_test.go @@ -17,6 +17,7 @@ package datacoord import ( + "encoding/json" "fmt" "math/rand" "testing" @@ -28,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) func TestImportMeta_Restore(t *testing.T) { @@ -231,9 +233,54 @@ func TestImportMeta_Task_Failed(t *testing.T) { err = im.AddTask(task) assert.Error(t, err) - im.(*importMeta).tasks[task.GetTaskID()] = task + im.(*importMeta).tasks.add(task) err = im.UpdateTask(task.GetTaskID(), UpdateNodeID(9)) assert.Error(t, err) err = im.RemoveTask(task.GetTaskID()) assert.Error(t, err) } + +func TestTaskStatsJSON(t *testing.T) { + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().ListImportJobs().Return(nil, nil) + catalog.EXPECT().ListPreImportTasks().Return(nil, nil) + catalog.EXPECT().ListImportTasks().Return(nil, nil) + catalog.EXPECT().SaveImportTask(mock.Anything).Return(nil) + + im, err := NewImportMeta(catalog) + assert.NoError(t, err) + + statsJSON := im.TaskStatsJSON() + assert.Equal(t, "", statsJSON) + + task1 := &importTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + TaskID: 1, + }, + } + err = im.AddTask(task1) + assert.NoError(t, err) + + task2 := &importTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + TaskID: 2, + }, + } + err = im.AddTask(task2) + assert.NoError(t, err) + + err = im.UpdateTask(1, UpdateState(datapb.ImportTaskStateV2_Completed)) + assert.NoError(t, err) + + statsJSON = im.TaskStatsJSON() + var tasks []*metricsinfo.ImportTask + err = json.Unmarshal([]byte(statsJSON), &tasks) + assert.NoError(t, err) + assert.Equal(t, 2, len(tasks)) + + taskMeta := im.(*importMeta).tasks + taskMeta.remove(1) + assert.Nil(t, taskMeta.get(1)) + assert.NotNil(t, taskMeta.get(2)) + assert.Equal(t, 2, len(taskMeta.listTaskStats())) +} diff --git a/internal/datacoord/import_task.go b/internal/datacoord/import_task.go index b615e635ba..fb2e59422a 100644 --- a/internal/datacoord/import_task.go +++ b/internal/datacoord/import_task.go @@ -17,10 +17,13 @@ package datacoord import ( + "encoding/json" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -92,8 +95,11 @@ func UpdateReason(reason string) UpdateAction { func UpdateCompleteTime(completeTime string) UpdateAction { return func(t ImportTask) { - if task, ok := t.(*importTask); ok { - task.ImportTaskV2.CompleteTime = completeTime + switch t.GetType() { + case PreImportTaskType: + t.(*preImportTask).PreImportTask.CompleteTime = completeTime + case ImportTaskType: + t.(*importTask).ImportTaskV2.CompleteTime = completeTime } } } @@ -171,6 +177,21 @@ func (p *preImportTask) Clone() ImportTask { } } +func (p *preImportTask) MarshalJSON() ([]byte, error) { + importTask := metricsinfo.ImportTask{ + JobID: p.GetJobID(), + TaskID: p.GetTaskID(), + CollectionID: p.GetCollectionID(), + NodeID: p.GetNodeID(), + State: p.GetState().String(), + Reason: p.GetReason(), + TaskType: "PreImportTask", + CreatedTime: p.GetCreatedTime(), + CompleteTime: p.GetCompleteTime(), + } + return json.Marshal(importTask) +} + type importTask struct { *datapb.ImportTaskV2 tr *timerecord.TimeRecorder @@ -201,3 +222,18 @@ func (t *importTask) Clone() ImportTask { tr: t.tr, } } + +func (t *importTask) MarshalJSON() ([]byte, error) { + importTask := metricsinfo.ImportTask{ + JobID: t.GetJobID(), + TaskID: t.GetTaskID(), + CollectionID: t.GetCollectionID(), + NodeID: t.GetNodeID(), + State: t.GetState().String(), + Reason: t.GetReason(), + TaskType: "ImportTask", + CreatedTime: t.GetCreatedTime(), + CompleteTime: t.GetCompleteTime(), + } + return json.Marshal(importTask) +} diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 1c6a1c83d5..37990205a5 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -76,6 +76,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, CollectionID: job.GetCollectionID(), State: datapb.ImportTaskStateV2_Pending, FileStats: fileStats, + CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"), }, tr: timerecord.NewTimeRecorder("preimport task"), } @@ -101,6 +102,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, NodeID: NullNodeID, State: datapb.ImportTaskStateV2_Pending, FileStats: group, + CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"), }, tr: timerecord.NewTimeRecorder("import task"), } diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index ecd6a53d4b..51ed9ddfcc 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -18,10 +18,12 @@ package datacoord import ( "context" + "encoding/json" "fmt" "math/rand" "path" "testing" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -39,7 +41,9 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) func TestImportUtil_NewPreImportTasks(t *testing.T) { @@ -645,3 +649,69 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.Equal(t, internalpb.ImportJobState_Completed, state) assert.Equal(t, "", reason) } + +func TestPreImportTask_MarshalJSON(t *testing.T) { + task := &preImportTask{ + PreImportTask: &datapb.PreImportTask{ + JobID: 1, + TaskID: 2, + CollectionID: 3, + NodeID: 4, + State: datapb.ImportTaskStateV2_Pending, + Reason: "test reason", + CreatedTime: time.Now().Format(time.RFC3339), + CompleteTime: time.Now().Add(time.Hour).Format(time.RFC3339), + }, + tr: timerecord.NewTimeRecorder("test"), + } + + jsonData, err := task.MarshalJSON() + assert.NoError(t, err) + + var importTask metricsinfo.ImportTask + err = json.Unmarshal(jsonData, &importTask) + assert.NoError(t, err) + + assert.Equal(t, task.GetJobID(), importTask.JobID) + assert.Equal(t, task.GetTaskID(), importTask.TaskID) + assert.Equal(t, task.GetCollectionID(), importTask.CollectionID) + assert.Equal(t, task.GetNodeID(), importTask.NodeID) + assert.Equal(t, task.GetState().String(), importTask.State) + assert.Equal(t, task.GetReason(), importTask.Reason) + assert.Equal(t, "PreImportTask", importTask.TaskType) + assert.Equal(t, task.GetCreatedTime(), importTask.CreatedTime) + assert.Equal(t, task.GetCompleteTime(), importTask.CompleteTime) +} + +func TestImportTask_MarshalJSON(t *testing.T) { + task := &importTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + JobID: 1, + TaskID: 2, + CollectionID: 3, + NodeID: 4, + State: datapb.ImportTaskStateV2_Pending, + Reason: "test reason", + CreatedTime: time.Now().Format(time.RFC3339), + CompleteTime: time.Now().Add(time.Hour).Format(time.RFC3339), + }, + tr: timerecord.NewTimeRecorder("test"), + } + + jsonData, err := task.MarshalJSON() + assert.NoError(t, err) + + var importTask metricsinfo.ImportTask + err = json.Unmarshal(jsonData, &importTask) + assert.NoError(t, err) + + assert.Equal(t, task.GetJobID(), importTask.JobID) + assert.Equal(t, task.GetTaskID(), importTask.TaskID) + assert.Equal(t, task.GetCollectionID(), importTask.CollectionID) + assert.Equal(t, task.GetNodeID(), importTask.NodeID) + assert.Equal(t, task.GetState().String(), importTask.State) + assert.Equal(t, task.GetReason(), importTask.Reason) + assert.Equal(t, "ImportTask", importTask.TaskType) + assert.Equal(t, task.GetCreatedTime(), importTask.CreatedTime) + assert.Equal(t, task.GetCompleteTime(), importTask.CompleteTime) +} diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 6cf65b6fda..cf8eb7d42d 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -19,11 +19,14 @@ package datacoord import ( "context" + "encoding/json" "fmt" "strconv" "sync" + "time" "github.com/golang/protobuf/proto" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/zap" @@ -52,22 +55,87 @@ type indexMeta struct { // collectionIndexes records which indexes are on the collection // collID -> indexID -> index indexes map[UniqueID]map[UniqueID]*model.Index - // buildID2Meta records the meta information of the segment - // buildID -> segmentIndex - buildID2SegmentIndex map[UniqueID]*model.SegmentIndex + + // buildID2Meta records building index meta information of the segment + segmentBuildInfo *segmentBuildInfo // segmentID -> indexID -> segmentIndex segmentIndexes map[UniqueID]map[UniqueID]*model.SegmentIndex } +type indexTaskStats struct { + IndexID UniqueID `json:"index_id,omitempty"` + CollectionID UniqueID `json:"collection_id,omitempty"` + SegmentID UniqueID `json:"segment_id,omitempty"` + BuildID UniqueID `json:"build_id,omitempty"` + IndexState string `json:"index_state,omitempty"` + FailReason string `json:"fail_reason,omitempty"` + IndexSize uint64 `json:"index_size,omitempty"` + IndexVersion int64 `json:"index_version,omitempty"` + CreateTime uint64 `json:"create_time,omitempty"` +} + +func newIndexTaskStats(s *model.SegmentIndex) *indexTaskStats { + return &indexTaskStats{ + IndexID: s.IndexID, + CollectionID: s.CollectionID, + SegmentID: s.SegmentID, + BuildID: s.BuildID, + IndexState: s.IndexState.String(), + FailReason: s.FailReason, + IndexSize: s.IndexSize, + IndexVersion: s.IndexVersion, + CreateTime: s.CreateTime, + } +} + +type segmentBuildInfo struct { + // buildID2Meta records the meta information of the segment + // buildID -> segmentIndex + buildID2SegmentIndex map[UniqueID]*model.SegmentIndex + // taskStats records the task stats of the segment + taskStats *expirable.LRU[UniqueID, *indexTaskStats] +} + +func newSegmentIndexBuildInfo() *segmentBuildInfo { + return &segmentBuildInfo{ + // build ID -> segment index + buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex), + // build ID -> task stats + taskStats: expirable.NewLRU[UniqueID, *indexTaskStats](1024, nil, time.Minute*60), + } +} + +func (m *segmentBuildInfo) Add(segIdx *model.SegmentIndex) { + m.buildID2SegmentIndex[segIdx.BuildID] = segIdx + m.taskStats.Add(segIdx.BuildID, newIndexTaskStats(segIdx)) +} + +func (m *segmentBuildInfo) Get(key UniqueID) (*model.SegmentIndex, bool) { + value, exists := m.buildID2SegmentIndex[key] + return value, exists +} + +func (m *segmentBuildInfo) Remove(key UniqueID) { + delete(m.buildID2SegmentIndex, key) +} + +func (m *segmentBuildInfo) List() map[UniqueID]*model.SegmentIndex { + return m.buildID2SegmentIndex +} + +func (m *segmentBuildInfo) GetTaskStats() []*indexTaskStats { + return m.taskStats.Values() +} + // NewMeta creates meta from provided `kv.TxnKV` func newIndexMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*indexMeta, error) { mt := &indexMeta{ - ctx: ctx, - catalog: catalog, - indexes: make(map[UniqueID]map[UniqueID]*model.Index), - buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex), - segmentIndexes: make(map[UniqueID]map[UniqueID]*model.SegmentIndex), + ctx: ctx, + catalog: catalog, + indexes: make(map[UniqueID]map[UniqueID]*model.Index), + segmentBuildInfo: newSegmentIndexBuildInfo(), + segmentIndexes: make(map[UniqueID]map[UniqueID]*model.SegmentIndex), } err := mt.reloadFromKV() if err != nil { @@ -116,7 +184,7 @@ func (m *indexMeta) updateSegmentIndex(segIdx *model.SegmentIndex) { m.segmentIndexes[segIdx.SegmentID] = make(map[UniqueID]*model.SegmentIndex) m.segmentIndexes[segIdx.SegmentID][segIdx.IndexID] = segIdx } - m.buildID2SegmentIndex[segIdx.BuildID] = segIdx + m.segmentBuildInfo.Add(segIdx) } func (m *indexMeta) alterSegmentIndexes(segIdxes []*model.SegmentIndex) error { @@ -142,7 +210,7 @@ func (m *indexMeta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc fu func (m *indexMeta) updateIndexTasksMetrics() { taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int) - for _, segIdx := range m.buildID2SegmentIndex { + for _, segIdx := range m.segmentBuildInfo.List() { if segIdx.IsDeleted { continue } @@ -674,7 +742,7 @@ func (m *indexMeta) GetIndexJob(buildID UniqueID) (*model.SegmentIndex, bool) { m.RLock() defer m.RUnlock() - segIdx, ok := m.buildID2SegmentIndex[buildID] + segIdx, ok := m.segmentBuildInfo.Get(buildID) if ok { return model.CloneSegmentIndex(segIdx), true } @@ -703,7 +771,7 @@ func (m *indexMeta) UpdateVersion(buildID, nodeID UniqueID) error { defer m.Unlock() log.Info("IndexCoord metaTable UpdateVersion receive", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) - segIdx, ok := m.buildID2SegmentIndex[buildID] + segIdx, ok := m.segmentBuildInfo.Get(buildID) if !ok { return fmt.Errorf("there is no index with buildID: %d", buildID) } @@ -721,7 +789,7 @@ func (m *indexMeta) FinishTask(taskInfo *workerpb.IndexTaskInfo) error { m.Lock() defer m.Unlock() - segIdx, ok := m.buildID2SegmentIndex[taskInfo.GetBuildID()] + segIdx, ok := m.segmentBuildInfo.Get(taskInfo.GetBuildID()) if !ok { log.Warn("there is no index with buildID", zap.Int64("buildID", taskInfo.GetBuildID())) return nil @@ -752,7 +820,7 @@ func (m *indexMeta) DeleteTask(buildID int64) error { m.Lock() defer m.Unlock() - segIdx, ok := m.buildID2SegmentIndex[buildID] + segIdx, ok := m.segmentBuildInfo.Get(buildID) if !ok { log.Warn("there is no index with buildID", zap.Int64("buildID", buildID)) return nil @@ -777,7 +845,7 @@ func (m *indexMeta) BuildIndex(buildID UniqueID) error { m.Lock() defer m.Unlock() - segIdx, ok := m.buildID2SegmentIndex[buildID] + segIdx, ok := m.segmentBuildInfo.Get(buildID) if !ok { return fmt.Errorf("there is no index with buildID: %d", buildID) } @@ -806,8 +874,9 @@ func (m *indexMeta) GetAllSegIndexes() map[int64]*model.SegmentIndex { m.RLock() defer m.RUnlock() - segIndexes := make(map[int64]*model.SegmentIndex, len(m.buildID2SegmentIndex)) - for buildID, segIndex := range m.buildID2SegmentIndex { + tasks := m.segmentBuildInfo.List() + segIndexes := make(map[int64]*model.SegmentIndex, len(tasks)) + for buildID, segIndex := range tasks { segIndexes[buildID] = model.CloneSegmentIndex(segIndex) } return segIndexes @@ -821,7 +890,7 @@ func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*colle var total uint64 metrics.DataCoordStoredIndexFilesSize.Reset() - for _, segmentIdx := range m.buildID2SegmentIndex { + for _, segmentIdx := range m.segmentBuildInfo.List() { coll, ok := collections[segmentIdx.CollectionID] if ok { metrics.DataCoordStoredIndexFilesSize.WithLabelValues(coll.DatabaseName, coll.Schema.GetName(), @@ -849,7 +918,7 @@ func (m *indexMeta) RemoveSegmentIndex(collID, partID, segID, indexID, buildID U delete(m.segmentIndexes, segID) } - delete(m.buildID2SegmentIndex, buildID) + m.segmentBuildInfo.Remove(buildID) m.updateIndexTasksMetrics() return nil } @@ -896,7 +965,7 @@ func (m *indexMeta) CheckCleanSegmentIndex(buildID UniqueID) (bool, *model.Segme m.RLock() defer m.RUnlock() - if segIndex, ok := m.buildID2SegmentIndex[buildID]; ok { + if segIndex, ok := m.segmentBuildInfo.Get(buildID); ok { if segIndex.IndexState == commonpb.IndexState_Finished { return true, model.CloneSegmentIndex(segIndex) } @@ -910,7 +979,7 @@ func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex { defer m.RUnlock() metas := make([]*model.SegmentIndex, 0) - for _, segIndex := range m.buildID2SegmentIndex { + for _, segIndex := range m.segmentBuildInfo.List() { if segIndex.IsDeleted { continue } @@ -1003,3 +1072,16 @@ func (m *indexMeta) HasIndex(collectionID int64) bool { } return false } + +func (m *indexMeta) TaskStatsJSON() string { + tasks := m.segmentBuildInfo.GetTaskStats() + if len(tasks) == 0 { + return "" + } + + ret, err := json.Marshal(tasks) + if err != nil { + return "" + } + return string(ret) +} diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 29219ffeda..c5b2fa9c0c 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -19,8 +19,10 @@ package datacoord import ( "context" + "encoding/json" "sync" "testing" + "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -435,12 +437,12 @@ func TestMeta_HasSameReq(t *testing.T) { func newSegmentIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta { return &indexMeta{ - RWMutex: sync.RWMutex{}, - ctx: context.Background(), - catalog: catalog, - indexes: make(map[UniqueID]map[UniqueID]*model.Index), - buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex), - segmentIndexes: make(map[UniqueID]map[UniqueID]*model.SegmentIndex), + RWMutex: sync.RWMutex{}, + ctx: context.Background(), + catalog: catalog, + indexes: make(map[UniqueID]map[UniqueID]*model.Index), + segmentBuildInfo: newSegmentIndexBuildInfo(), + segmentIndexes: make(map[UniqueID]map[UniqueID]*model.SegmentIndex), } } @@ -765,24 +767,23 @@ func TestMeta_GetIndexedSegment(t *testing.T) { }, }, } - m.buildID2SegmentIndex = map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 1025, - IndexID: indexID, - BuildID: buildID, - NodeID: nodeID, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 10, - IndexFileKeys: nil, - IndexSize: 0, - }, - } + + m.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: nodeID, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 10, + IndexFileKeys: nil, + IndexSize: 0, + }) t.Run("success", func(t *testing.T) { segments := m.GetIndexedSegments(collID, []int64{segID}, []int64{fieldID}) @@ -1089,24 +1090,22 @@ func TestMeta_GetIndexParams(t *testing.T) { func TestMeta_GetIndexJob(t *testing.T) { m := newSegmentIndexMeta(nil) - m.buildID2SegmentIndex = map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 1025, - IndexID: indexID, - BuildID: buildID, - NodeID: 1, - IndexVersion: 1, - IndexState: commonpb.IndexState_Unissued, - FailReason: "", - IsDeleted: false, - CreateTime: 0, - IndexFileKeys: nil, - IndexSize: 0, - }, - } + m.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: 1, + IndexVersion: 1, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) t.Run("exist", func(t *testing.T) { segIndex, exist := m.GetIndexJob(buildID) @@ -1178,6 +1177,24 @@ func updateSegmentIndexMeta(t *testing.T) *indexMeta { mock.Anything, ).Return(nil) + indexBuildInfo := newSegmentIndexBuildInfo() + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) + return &indexMeta{ catalog: sc, segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ @@ -1217,24 +1234,7 @@ func updateSegmentIndexMeta(t *testing.T) *indexMeta { }, }, }, - buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 1025, - IndexID: indexID, - BuildID: buildID, - NodeID: 0, - IndexVersion: 0, - IndexState: commonpb.IndexState_Unissued, - FailReason: "", - IsDeleted: false, - CreateTime: 0, - IndexFileKeys: nil, - IndexSize: 0, - }, - }, + segmentBuildInfo: indexBuildInfo, } } @@ -1363,11 +1363,12 @@ func TestMeta_DeleteTask_Error(t *testing.T) { ).Return(errors.New("fail")) m.catalog = ec - m.buildID2SegmentIndex[buildID] = &model.SegmentIndex{ + m.segmentBuildInfo.Add(&model.SegmentIndex{ + BuildID: buildID, SegmentID: segID, PartitionID: partID, CollectionID: collID, - } + }) err := m.DeleteTask(buildID) assert.Error(t, err) @@ -1488,16 +1489,14 @@ func TestRemoveSegmentIndex(t *testing.T) { indexID: &model.SegmentIndex{}, }, }, - buildID2SegmentIndex: map[int64]*model.SegmentIndex{ - buildID: {}, - }, + segmentBuildInfo: newSegmentIndexBuildInfo(), } err := m.RemoveSegmentIndex(collID, partID, segID, indexID, buildID) assert.NoError(t, err) assert.Equal(t, len(m.segmentIndexes), 0) - assert.Equal(t, len(m.buildID2SegmentIndex), 0) + assert.Equal(t, len(m.segmentBuildInfo.List()), 0) }) } @@ -1517,3 +1516,53 @@ func TestIndexMeta_GetUnindexedSegments(t *testing.T) { unindexed = m.indexMeta.GetUnindexedSegments(collID+1, segmentIDs) assert.Equal(t, 0, len(unindexed)) } + +func TestBuildIndexTaskStatsJSON(t *testing.T) { + im := &indexMeta{segmentBuildInfo: newSegmentIndexBuildInfo()} + si1 := &model.SegmentIndex{ + BuildID: 1, + CollectionID: 100, + SegmentID: 1000, + IndexID: 10, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IndexSize: 1024, + IndexVersion: 1, + CreateTime: uint64(time.Now().Unix()), + } + si2 := &model.SegmentIndex{ + BuildID: 2, + CollectionID: 101, + SegmentID: 1001, + IndexID: 11, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IndexSize: 2048, + IndexVersion: 1, + CreateTime: uint64(time.Now().Unix()), + } + + actualJSON := im.TaskStatsJSON() + assert.Equal(t, "", actualJSON) + + im.segmentBuildInfo.Add(si1) + im.segmentBuildInfo.Add(si2) + + assert.Equal(t, 2, len(im.segmentBuildInfo.List())) + ret1, ok := im.segmentBuildInfo.Get(si1.BuildID) + assert.True(t, ok) + assert.EqualValues(t, si1, ret1) + + expectedTasks := []*indexTaskStats{ + newIndexTaskStats(si1), + newIndexTaskStats(si2), + } + expectedJSON, err := json.Marshal(expectedTasks) + assert.NoError(t, err) + + actualJSON = im.TaskStatsJSON() + assert.JSONEq(t, string(expectedJSON), actualJSON) + + im.segmentBuildInfo.Remove(si1.BuildID) + assert.Equal(t, 1, len(im.segmentBuildInfo.List())) +} diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 9ffbdc7e6d..435a54dbbc 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -2341,8 +2341,8 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) { m := &meta{ segments: NewSegmentsInfo(), indexMeta: &indexMeta{ - buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex), - segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, + segmentBuildInfo: newSegmentIndexBuildInfo(), + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, indexes: map[UniqueID]map[UniqueID]*model.Index{ collID: { indexID: { diff --git a/internal/datacoord/job_manager_test.go b/internal/datacoord/job_manager_test.go index 7eb3ac024d..5a02a8017d 100644 --- a/internal/datacoord/job_manager_test.go +++ b/internal/datacoord/job_manager_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -103,6 +104,7 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { allocator: alloc, tasks: make(map[int64]Task), meta: mt, + taskStats: expirable.NewLRU[UniqueID, Task](1024, nil, time.Minute*5), }, allocator: alloc, } diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 03f0b9e552..1728aee08d 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -18,9 +18,11 @@ package datacoord import ( "context" + "encoding/json" "github.com/cockroachdb/errors" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -80,11 +82,81 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor return ret } +// GetSyncTaskMetrics retrieves and aggregates the sync task metrics of the datanode. +func (s *Server) GetSyncTaskMetrics( + ctx context.Context, + req *milvuspb.GetMetricsRequest, +) (string, error) { + resp, err := s.requestDataNodeGetMetrics(ctx, req) + if err != nil { + return "", err + } + + tasks := make(map[string][]*metricsinfo.SyncTask, resp.Len()) + resp.Range(func(key string, value *milvuspb.GetMetricsResponse) bool { + if value.Response != "" { + var sts []*metricsinfo.SyncTask + if err1 := json.Unmarshal([]byte(value.Response), &sts); err1 != nil { + log.Warn("failed to unmarshal sync task metrics") + err = err1 + return false + } + tasks[key] = sts + } + return true + }) + + if err != nil { + return "", err + } + + if len(tasks) == 0 { + return "", nil + } + + bs, err := json.Marshal(tasks) + if err != nil { + return "", err + } + return (string)(bs), nil +} + +func (s *Server) requestDataNodeGetMetrics( + ctx context.Context, + req *milvuspb.GetMetricsRequest, +) (*typeutil.ConcurrentMap[string, *milvuspb.GetMetricsResponse], error) { + nodes := s.cluster.GetSessions() + + rets := typeutil.NewConcurrentMap[string, *milvuspb.GetMetricsResponse]() + wg, ctx := errgroup.WithContext(ctx) + for _, node := range nodes { + wg.Go(func() error { + cli, err := node.GetOrCreateClient(ctx) + if err != nil { + return err + } + ret, err := cli.GetMetrics(ctx, req) + if err != nil { + return err + } + key := metricsinfo.ConstructComponentName(typeutil.DataNodeRole, node.NodeID()) + rets.Insert(key, ret) + return nil + }) + } + + err := wg.Wait() + if err != nil { + return nil, err + } + return rets, nil +} + // getSystemInfoMetrics composes data cluster metrics func (s *Server) getSystemInfoMetrics( ctx context.Context, req *milvuspb.GetMetricsRequest, -) (*milvuspb.GetMetricsResponse, error) { +) (string, error) { // TODO(dragondriver): add more detail metrics // get datacoord info @@ -125,18 +197,11 @@ func (s *Server) getSystemInfoMetrics( }, } - resp := &milvuspb.GetMetricsResponse{ - Status: merr.Success(), - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), - } - var err error - resp.Response, err = metricsinfo.MarshalTopology(coordTopology) + ret, err := metricsinfo.MarshalTopology(coordTopology) if err != nil { - resp.Status = merr.Status(err) - return resp, nil + return "", err } - - return resp, nil + return ret, nil } // getDataCoordMetrics composes datacoord infos diff --git a/internal/datacoord/metrics_info_test.go b/internal/datacoord/metrics_info_test.go index 79d01991e4..f85d43998e 100644 --- a/internal/datacoord/metrics_info_test.go +++ b/internal/datacoord/metrics_info_test.go @@ -199,3 +199,116 @@ func TestGetIndexNodeMetrics(t *testing.T) { assert.False(t, info.HasError) assert.Equal(t, metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, 100), info.BaseComponentInfos.Name) } + +func TestGetSyncTaskMetrics(t *testing.T) { + svr := Server{} + t.Run("ReturnsCorrectJSON", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + task := `[{"segment_id": 1, "batch_rows": 100, "segment_level": "L0", "ts_from": 1000, "ts_to": 2000,"delta_row_count": 10, "flush_size": 1024, "running_time": 2000000000}]` + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: task, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + assert.NoError(t, err) + expectedJSON := `{"datanode1":[{"segment_id":1,"batch_rows":100,"segment_level":"L0","ts_from":1000,"ts_to":2000,"delta_row_count":10,"flush_size":1024,"running_time":2000000000}]}` + assert.Equal(t, expectedJSON, actualJSON) + }) + + t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return nil, errors.New("request failed") + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: `invalid json`, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsEmptyJSONWhenNoTasks", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: "", + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + expectedJSON := "" + actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 2cdf364cb5..3046183c2d 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -29,12 +29,14 @@ import ( "github.com/blang/semver/v4" "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/tidwall/gjson" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" globalIDAllocator "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" @@ -162,6 +164,8 @@ type Server struct { // streamingcoord server is embedding in datacoord now. streamingCoord *streamingcoord.Server + + metricsRequest *metricsinfo.MetricsRequest } type CollectionNameInfo struct { @@ -214,6 +218,7 @@ func CreateServer(ctx context.Context, factory dependency.Factory, opts ...Optio rootCoordClientCreator: defaultRootCoordCreatorFunc, metricsCacheManager: metricsinfo.NewMetricsCacheManager(), enableActiveStandBy: Params.DataCoordCfg.EnableActiveStandby.GetAsBool(), + metricsRequest: metricsinfo.NewMetricsRequest(), } for _, opt := range opts { @@ -296,6 +301,7 @@ func (s *Server) initSession() error { // Init change server state to Initializing func (s *Server) Init() error { var err error + s.registerMetricsRequest() s.factory.Init(Params) if err = s.initSession(); err != nil { return err @@ -1125,20 +1131,33 @@ func (s *Server) stopServerLoop() { s.serverLoopWg.Wait() } -// func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error { -// if !s.meta.HasCollection(collID) { -// return fmt.Errorf("can not find collection %d", collID) -// } -// if !s.meta.HasPartition(collID, partID) { -// return fmt.Errorf("can not find partition %d", partID) -// } -// for _, name := range s.insertChannels { -// if name == channelName { -// return nil -// } -// } -// return fmt.Errorf("can not find channel %s", channelName) -// } +func (s *Server) registerMetricsRequest() { + s.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getSystemInfoMetrics(ctx, req) + }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTasks, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.importMeta.TaskStatsJSON(), nil + }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.CompactionTasks, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.compactionTaskMeta.TaskStatsJSON(), nil + }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.BuildIndexTasks, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.indexMeta.TaskStatsJSON(), nil + }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.GetSyncTaskMetrics(ctx, req) + }) + log.Info("register metrics actions finished") +} // loadCollectionFromRootCoord communicates with RootCoord and asks for collection information. // collection information will be added to server meta info. diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 6a9ab3edd4..1c39070793 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -804,14 +804,11 @@ func TestServer_getSystemInfoMetrics(t *testing.T) { req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) assert.NoError(t, err) - resp, err := svr.getSystemInfoMetrics(svr.ctx, req) + ret, err := svr.getSystemInfoMetrics(svr.ctx, req) assert.NoError(t, err) - log.Info("TestServer_getSystemInfoMetrics", - zap.String("name", resp.ComponentName), - zap.String("response", resp.Response)) var coordTopology metricsinfo.DataCoordTopology - err = metricsinfo.UnmarshalTopology(resp.Response, &coordTopology) + err = metricsinfo.UnmarshalTopology(ret, &coordTopology) assert.NoError(t, err) assert.Equal(t, len(svr.cluster.GetSessions()), len(coordTopology.Cluster.ConnectedDataNodes)) for _, nodeMetrics := range coordTopology.Cluster.ConnectedDataNodes { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 74c5b37797..2fd524fddd 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" - "github.com/tidwall/gjson" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -1073,54 +1072,27 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { log := log.Ctx(ctx) if err := merr.CheckHealthyStandby(s.GetStateCode()); err != nil { + msg := "failed to get metrics" + log.Warn(msg, zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: merr.Status(err), + Status: merr.Status(errors.Wrap(err, msg)), }, nil } - ret := gjson.Parse(req.GetRequest()) - metricType, err := metricsinfo.ParseMetricRequestType(ret) + resp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, + paramtable.GetNodeID()), + } + + ret, err := s.metricsRequest.ExecuteMetricsRequest(ctx, req) if err != nil { - log.Warn("DataCoord.GetMetrics failed to parse metric type", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.Error(err), - ) - - return &milvuspb.GetMetricsResponse{ - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), - Status: merr.Status(err), - }, nil + resp.Status = merr.Status(err) + return resp, nil } - if metricType == metricsinfo.SystemInfoMetrics { - metrics, err := s.getSystemInfoMetrics(ctx, req) - if err != nil { - log.Warn("DataCoord GetMetrics failed", zap.Int64("nodeID", paramtable.GetNodeID()), zap.Error(err)) - return &milvuspb.GetMetricsResponse{ - Status: merr.Status(err), - }, nil - } - - log.RatedDebug(60, "DataCoord.GetMetrics", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.String("metricType", metricType), - zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large - zap.Error(err)) - - return metrics, nil - } - - log.RatedWarn(60.0, "DataCoord.GetMetrics failed, request metric type is not implemented yet", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.String("metricType", metricType)) - - return &milvuspb.GetMetricsResponse{ - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), - Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), - }, nil + resp.Response = ret + return resp, nil } // ManualCompaction triggers a compaction for a collection diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 21fd6b7791..d176326ed0 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/hashicorp/golang-lru/v2/expirable" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -61,6 +62,8 @@ type taskScheduler struct { indexEngineVersionManager IndexEngineVersionManager handler Handler allocator allocator.Allocator + + taskStats *expirable.LRU[UniqueID, Task] } func newTaskScheduler( @@ -88,6 +91,7 @@ func newTaskScheduler( handler: handler, indexEngineVersionManager: indexEngineVersionManager, allocator: allocator, + taskStats: expirable.NewLRU[UniqueID, Task](1024, nil, time.Minute*5), } ts.reloadFromMeta() return ts @@ -112,7 +116,7 @@ func (s *taskScheduler) reloadFromMeta() { continue } if segIndex.IndexState != commonpb.IndexState_Finished && segIndex.IndexState != commonpb.IndexState_Failed { - s.tasks[segIndex.BuildID] = &indexBuildTask{ + s.enqueue(&indexBuildTask{ taskID: segIndex.BuildID, nodeID: segIndex.NodeID, taskInfo: &workerpb.IndexTaskInfo{ @@ -123,7 +127,7 @@ func (s *taskScheduler) reloadFromMeta() { queueTime: time.Now(), startTime: time.Now(), endTime: time.Now(), - } + }) } } } @@ -131,7 +135,7 @@ func (s *taskScheduler) reloadFromMeta() { allAnalyzeTasks := s.meta.analyzeMeta.GetAllTasks() for taskID, t := range allAnalyzeTasks { if t.State != indexpb.JobState_JobStateFinished && t.State != indexpb.JobState_JobStateFailed { - s.tasks[taskID] = &analyzeTask{ + s.enqueue(&analyzeTask{ taskID: taskID, nodeID: t.NodeID, taskInfo: &workerpb.AnalyzeResult{ @@ -142,14 +146,14 @@ func (s *taskScheduler) reloadFromMeta() { queueTime: time.Now(), startTime: time.Now(), endTime: time.Now(), - } + }) } } allStatsTasks := s.meta.statsTaskMeta.GetAllTasks() for taskID, t := range allStatsTasks { if t.GetState() != indexpb.JobState_JobStateFinished && t.GetState() != indexpb.JobState_JobStateFailed { - s.tasks[taskID] = &statsTask{ + s.enqueue(&statsTask{ taskID: taskID, segmentID: t.GetSegmentID(), targetSegmentID: t.GetTargetSegmentID(), @@ -163,7 +167,7 @@ func (s *taskScheduler) reloadFromMeta() { startTime: time.Now(), endTime: time.Now(), subJobType: t.GetSubJobType(), - } + }) } } } @@ -184,6 +188,7 @@ func (s *taskScheduler) enqueue(task Task) { taskID := task.GetTaskID() if _, ok := s.tasks[taskID]; !ok { s.tasks[taskID] = task + s.taskStats.Add(taskID, task) task.SetQueueTime(time.Now()) log.Info("taskScheduler enqueue task", zap.Int64("taskID", taskID)) } diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 6999b66bd8..b1c6b1404c 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -56,6 +56,194 @@ var ( ) func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta { + indexBuildInfo := newSegmentIndexBuildInfo() + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 1, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 1, + NodeID: nodeID, + IndexVersion: 1, + IndexState: commonpb.IndexState_InProgress, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 2, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 2, + NodeID: nodeID, + IndexVersion: 1, + IndexState: commonpb.IndexState_InProgress, + FailReason: "", + IsDeleted: true, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 3, + CollectionID: collID, + PartitionID: partID, + NumRows: 500, + IndexID: indexID, + BuildID: buildID + 3, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 4, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 4, + NodeID: nodeID, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 5, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 5, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 6, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 6, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 7, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 7, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Failed, + FailReason: "error", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 8, + CollectionID: collID, + PartitionID: partID, + NumRows: 1026, + IndexID: indexID, + BuildID: buildID + 8, + NodeID: nodeID + 1, + IndexVersion: 1, + IndexState: commonpb.IndexState_InProgress, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 9, + CollectionID: collID, + PartitionID: partID, + NumRows: 500, + IndexID: indexID, + BuildID: buildID + 9, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + + indexBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID + 10, + CollectionID: collID, + PartitionID: partID, + NumRows: 500, + IndexID: indexID, + BuildID: buildID + 10, + NodeID: nodeID, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 1111, + IndexFileKeys: nil, + IndexSize: 1, + }) + return &indexMeta{ catalog: catalog, indexes: map[UniqueID]map[UniqueID]*model.Index{ @@ -287,184 +475,7 @@ func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta { }, }, }, - buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: 1025, - IndexID: indexID, - BuildID: buildID, - NodeID: 0, - IndexVersion: 0, - IndexState: commonpb.IndexState_Unissued, - FailReason: "", - IsDeleted: false, - CreateTime: 0, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 1: { - SegmentID: segID + 1, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 1, - NodeID: nodeID, - IndexVersion: 1, - IndexState: commonpb.IndexState_InProgress, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 2: { - SegmentID: segID + 2, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 2, - NodeID: nodeID, - IndexVersion: 1, - IndexState: commonpb.IndexState_InProgress, - FailReason: "", - IsDeleted: true, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 3: { - SegmentID: segID + 3, - CollectionID: collID, - PartitionID: partID, - NumRows: 500, - IndexID: indexID, - BuildID: buildID + 3, - NodeID: 0, - IndexVersion: 0, - IndexState: commonpb.IndexState_Unissued, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 4: { - SegmentID: segID + 4, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 4, - NodeID: nodeID, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 5: { - SegmentID: segID + 5, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 5, - NodeID: 0, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 6: { - SegmentID: segID + 6, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 6, - NodeID: 0, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 7: { - SegmentID: segID + 7, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 7, - NodeID: 0, - IndexVersion: 1, - IndexState: commonpb.IndexState_Failed, - FailReason: "error", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 8: { - SegmentID: segID + 8, - CollectionID: collID, - PartitionID: partID, - NumRows: 1026, - IndexID: indexID, - BuildID: buildID + 8, - NodeID: nodeID + 1, - IndexVersion: 1, - IndexState: commonpb.IndexState_InProgress, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 9: { - SegmentID: segID + 9, - CollectionID: collID, - PartitionID: partID, - NumRows: 500, - IndexID: indexID, - BuildID: buildID + 9, - NodeID: 0, - IndexVersion: 0, - IndexState: commonpb.IndexState_Unissued, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - buildID + 10: { - SegmentID: segID + 10, - CollectionID: collID, - PartitionID: partID, - NumRows: 500, - IndexID: indexID, - BuildID: buildID + 10, - NodeID: nodeID, - IndexVersion: 0, - IndexState: commonpb.IndexState_Unissued, - FailReason: "", - IsDeleted: false, - CreateTime: 1111, - IndexFileKeys: nil, - IndexSize: 1, - }, - }, + segmentBuildInfo: indexBuildInfo, } } @@ -1296,17 +1307,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { }, }, }, - buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: s.collectionID, - PartitionID: s.partitionID, - NumRows: 1025, - IndexID: indexID, - BuildID: buildID, - IndexState: commonpb.IndexState_Unissued, - }, - }, + segmentBuildInfo: newSegmentIndexBuildInfo(), segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ segID: { buildID: { @@ -1326,6 +1327,15 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { catalog: catalog, })) + mt.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: s.collectionID, + PartitionID: s.partitionID, + NumRows: 1025, + IndexID: indexID, + BuildID: buildID, + IndexState: commonpb.IndexState_Unissued, + }) cm := mocks.NewChunkManager(s.T()) cm.EXPECT().RootPath().Return("ut-index") @@ -1548,24 +1558,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { }, }, }, - buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ - buildID: { - SegmentID: segID, - CollectionID: collID, - PartitionID: partID, - NumRows: minNumberOfRowsToBuild, - IndexID: indexID, - BuildID: buildID, - NodeID: 0, - IndexVersion: 0, - IndexState: commonpb.IndexState_Unissued, - FailReason: "", - IsDeleted: false, - CreateTime: 0, - IndexFileKeys: nil, - IndexSize: 0, - }, - }, + segmentBuildInfo: newSegmentIndexBuildInfo(), }, segments: &SegmentsInfo{ segments: map[UniqueID]*SegmentInfo{ @@ -1589,6 +1582,22 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { }, } + mt.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: minNumberOfRowsToBuild, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) cm := mocks.NewChunkManager(s.T()) cm.EXPECT().RootPath().Return("ut-index") @@ -1621,7 +1630,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { } resetMetaFunc := func() { - mt.indexMeta.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued + t, ok := mt.indexMeta.segmentBuildInfo.Get(buildID) + s.True(ok) + t.IndexState = commonpb.IndexState_Unissued mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = "HNSW" mt.collections[collID].Schema.Fields[0].DataType = schemapb.DataType_FloatVector diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index b374c113ae..eccba38a15 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -30,10 +30,12 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/tidwall/gjson" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/channel" "github.com/milvus-io/milvus/internal/datanode/compaction" @@ -57,6 +59,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/expr" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -127,6 +130,8 @@ type DataNode struct { reportImportRetryTimes uint // unitest set this value to 1 to save time, default is 10 pool *conc.Pool[any] + + metricsRequest *metricsinfo.MetricsRequest } // NewDataNode will return a DataNode with abnormal state. @@ -144,6 +149,7 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode { segmentCache: util.NewCache(), compactionExecutor: compaction.NewExecutor(), reportImportRetryTimes: 10, + metricsRequest: metricsinfo.NewMetricsRequest(), } node.UpdateStateCode(commonpb.StateCode_Abnormal) expr.Register("datanode", node) @@ -221,6 +227,7 @@ func (node *DataNode) GetNodeID() int64 { func (node *DataNode) Init() error { var initError error node.initOnce.Do(func() { + node.registerMetricsRequest() logutil.Logger(node.ctx).Info("DataNode server initializing", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick.GetValue()), ) @@ -272,6 +279,18 @@ func (node *DataNode) Init() error { return initError } +func (node *DataNode) registerMetricsRequest() { + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.getSystemInfoMetrics(ctx, req) + }) + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.syncMgr.TaskStatsJSON(), nil + }) + log.Info("register metrics actions finished") +} + // tryToReleaseFlowgraph tries to release a flowgraph func (node *DataNode) tryToReleaseFlowgraph(channel string) { log.Info("try to release flowgraph", zap.String("channel", channel)) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index d642f90c44..9994d1e2e9 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -29,7 +29,6 @@ import ( "github.com/stretchr/testify/mock" "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/flushcommon/broker" "github.com/milvus-io/milvus/internal/flushcommon/pipeline" @@ -187,10 +186,7 @@ func TestDataNode(t *testing.T) { assert.NoError(t, err) resp, err := emptyNode.getSystemInfoMetrics(context.TODO(), req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - log.Info("Test DataNode.getSystemInfoMetrics", - zap.String("name", resp.ComponentName), - zap.String("response", resp.Response)) + assert.NotEmpty(t, resp) }) t.Run("Test getSystemInfoMetrics with quotaMetric error", func(t *testing.T) { @@ -202,8 +198,8 @@ func TestDataNode(t *testing.T) { assert.NoError(t, err) util2.DeregisterRateCollector(metricsinfo.InsertConsumeThroughput) resp, err := emptyNode.getSystemInfoMetrics(context.TODO(), req) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + assert.Error(t, err) + assert.Empty(t, resp) util2.RegisterRateCollector(metricsinfo.InsertConsumeThroughput) }) } diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 96b2b7bdf8..eb6c592f85 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -95,7 +95,7 @@ func NewSyncTask(ctx context.Context, WithTimeRange(ts, ts). WithLevel(segmentLevel). WithDataSource(metrics.BulkinsertDataSourceLabel). - WithBatchSize(int64(insertData.GetRowNum())) + WithBatchRows(int64(insertData.GetRowNum())) if bm25Stats != nil { syncPack.WithBM25Stats(bm25Stats) } diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 8ae42196c2..7e59297c0a 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -22,7 +22,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/pkg/util/hardware" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" @@ -66,17 +65,14 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro }, nil } -func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { +func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMetricsRequest) (string, error) { // TODO(dragondriver): add more metrics usedMem := hardware.GetUsedMemoryCount() totalMem := hardware.GetMemoryCount() quotaMetrics, err := node.getQuotaMetrics() if err != nil { - return &milvuspb.GetMetricsResponse{ - Status: merr.Status(err), - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()), - }, nil + return "", err } hardwareMetrics := metricsinfo.HardwareMetrics{ IP: node.session.Address, @@ -106,19 +102,5 @@ func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMet } metricsinfo.FillDeployMetricsWithEnv(&nodeInfos.SystemInfo) - - resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) - if err != nil { - return &milvuspb.GetMetricsResponse{ - Status: merr.Status(err), - Response: "", - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()), - }, nil - } - - return &milvuspb.GetMetricsResponse{ - Status: merr.Success(), - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()), - }, nil + return metricsinfo.MarshalComponentInfos(nodeInfos) } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 6ddfa95667..fa14b7d418 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -24,7 +24,6 @@ import ( "fmt" "github.com/samber/lo" - "github.com/tidwall/gjson" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -45,7 +44,9 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // WatchDmChannels is not in use @@ -166,39 +167,20 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe }, nil } - ret := gjson.Parse(req.GetRequest()) - metricType, err := metricsinfo.ParseMetricRequestType(ret) + resp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, + paramtable.GetNodeID()), + } + + ret, err := node.metricsRequest.ExecuteMetricsRequest(ctx, req) if err != nil { - log.Warn("DataNode.GetMetrics failed to parse metric type", - zap.Int64("nodeID", node.GetNodeID()), - zap.String("req", req.Request), - zap.Error(err)) - - return &milvuspb.GetMetricsResponse{ - Status: merr.Status(err), - }, nil + resp.Status = merr.Status(err) + return resp, nil } - if metricType == metricsinfo.SystemInfoMetrics { - systemInfoMetrics, err := node.getSystemInfoMetrics(ctx, req) - if err != nil { - log.Warn("DataNode GetMetrics failed", zap.Int64("nodeID", node.GetNodeID()), zap.Error(err)) - return &milvuspb.GetMetricsResponse{ - Status: merr.Status(err), - }, nil - } - - return systemInfoMetrics, nil - } - - log.RatedWarn(60, "DataNode.GetMetrics failed, request metric type is not implemented yet", - zap.Int64("nodeID", node.GetNodeID()), - zap.String("req", req.Request), - zap.String("metric_type", metricType)) - - return &milvuspb.GetMetricsResponse{ - Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), - }, nil + resp.Response = ret + return resp, nil } // CompactionV2 handles compaction request from DataCoord diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 13f65fb99d..616354a164 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -481,7 +481,8 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() { } func (s *DataNodeServicesSuite) TestGetMetrics() { - node := &DataNode{} + node := NewDataNode(context.TODO(), nil) + node.registerMetricsRequest() node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}) node.flowgraphManager = pipeline.NewFlowgraphManager() // server is closed diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index c497ae82a3..8fb4a5ab0f 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -22,6 +22,7 @@ import ( "crypto/x509" "fmt" "io" + "net" "net/http" "os" "strings" @@ -94,6 +95,9 @@ type Server struct { ctx context.Context wg sync.WaitGroup proxy types.ProxyComponent + httpListener net.Listener + grpcListener net.Listener + tcpServer cmux.CMux httpServer *http.Server grpcInternalServer *grpc.Server grpcExternalServer *grpc.Server diff --git a/internal/flushcommon/broker/mock_broker.go b/internal/flushcommon/broker/mock_broker.go index ae735bff96..f5dc156dae 100644 --- a/internal/flushcommon/broker/mock_broker.go +++ b/internal/flushcommon/broker/mock_broker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package broker @@ -35,6 +35,10 @@ func (_m *MockBroker) AssignSegmentID(ctx context.Context, reqs ...*datapb.Segme _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for AssignSegmentID") + } + var r0 []int64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, ...*datapb.SegmentIDRequest) ([]int64, error)); ok { @@ -97,6 +101,10 @@ func (_c *MockBroker_AssignSegmentID_Call) RunAndReturn(run func(context.Context func (_m *MockBroker) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for DropVirtualChannel") + } + var r0 *datapb.DropVirtualChannelResponse var r1 error if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)); ok { @@ -152,6 +160,10 @@ func (_c *MockBroker_DropVirtualChannel_Call) RunAndReturn(run func(context.Cont func (_m *MockBroker) GetSegmentInfo(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error) { ret := _m.Called(ctx, segmentIDs) + if len(ret) == 0 { + panic("no return value specified for GetSegmentInfo") + } + var r0 []*datapb.SegmentInfo var r1 error if rf, ok := ret.Get(0).(func(context.Context, []int64) ([]*datapb.SegmentInfo, error)); ok { @@ -207,6 +219,10 @@ func (_c *MockBroker_GetSegmentInfo_Call) RunAndReturn(run func(context.Context, func (_m *MockBroker) ReportTimeTick(ctx context.Context, msgs []*msgpb.DataNodeTtMsg) error { ret := _m.Called(ctx, msgs) + if len(ret) == 0 { + panic("no return value specified for ReportTimeTick") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []*msgpb.DataNodeTtMsg) error); ok { r0 = rf(ctx, msgs) @@ -250,6 +266,10 @@ func (_c *MockBroker_ReportTimeTick_Call) RunAndReturn(run func(context.Context, func (_m *MockBroker) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for SaveBinlogPaths") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *datapb.SaveBinlogPathsRequest) error); ok { r0 = rf(ctx, req) @@ -293,6 +313,10 @@ func (_c *MockBroker_SaveBinlogPaths_Call) RunAndReturn(run func(context.Context func (_m *MockBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error { ret := _m.Called(ctx, channelCPs) + if len(ret) == 0 { + panic("no return value specified for UpdateChannelCheckpoint") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []*msgpb.MsgPosition) error); ok { r0 = rf(ctx, channelCPs) @@ -336,6 +360,10 @@ func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context func (_m *MockBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error { ret := _m.Called(ctx, req) + if len(ret) == 0 { + panic("no return value specified for UpdateSegmentStatistics") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *datapb.UpdateSegmentStatisticsRequest) error); ok { r0 = rf(ctx, req) diff --git a/internal/flushcommon/metacache/mock_meta_cache.go b/internal/flushcommon/metacache/mock_meta_cache.go index e88b6ae8c4..2476ce4443 100644 --- a/internal/flushcommon/metacache/mock_meta_cache.go +++ b/internal/flushcommon/metacache/mock_meta_cache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package metacache @@ -80,6 +80,10 @@ func (_c *MockMetaCache_AddSegment_Call) RunAndReturn(run func(*datapb.SegmentIn func (_m *MockMetaCache) Collection() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Collection") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -121,6 +125,10 @@ func (_c *MockMetaCache_Collection_Call) RunAndReturn(run func() int64) *MockMet func (_m *MockMetaCache) DetectMissingSegments(segments map[int64]struct{}) []int64 { ret := _m.Called(segments) + if len(ret) == 0 { + panic("no return value specified for DetectMissingSegments") + } + var r0 []int64 if rf, ok := ret.Get(0).(func(map[int64]struct{}) []int64); ok { r0 = rf(segments) @@ -172,6 +180,10 @@ func (_m *MockMetaCache) GetSegmentByID(id int64, filters ...SegmentFilter) (*Se _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for GetSegmentByID") + } + var r0 *SegmentInfo var r1 bool if rf, ok := ret.Get(0).(func(int64, ...SegmentFilter) (*SegmentInfo, bool)); ok { @@ -240,6 +252,10 @@ func (_m *MockMetaCache) GetSegmentIDsBy(filters ...SegmentFilter) []int64 { _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for GetSegmentIDsBy") + } + var r0 []int64 if rf, ok := ret.Get(0).(func(...SegmentFilter) []int64); ok { r0 = rf(filters...) @@ -297,6 +313,10 @@ func (_m *MockMetaCache) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for GetSegmentsBy") + } + var r0 []*SegmentInfo if rf, ok := ret.Get(0).(func(...SegmentFilter) []*SegmentInfo); ok { r0 = rf(filters...) @@ -355,6 +375,10 @@ func (_m *MockMetaCache) PredictSegments(pk storage.PrimaryKey, filters ...Segme _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for PredictSegments") + } + var r0 []int64 var r1 bool if rf, ok := ret.Get(0).(func(storage.PrimaryKey, ...SegmentFilter) ([]int64, bool)); ok { @@ -423,6 +447,10 @@ func (_m *MockMetaCache) RemoveSegments(filters ...SegmentFilter) []int64 { _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for RemoveSegments") + } + var r0 []int64 if rf, ok := ret.Get(0).(func(...SegmentFilter) []int64); ok { r0 = rf(filters...) @@ -474,6 +502,10 @@ func (_c *MockMetaCache_RemoveSegments_Call) RunAndReturn(run func(...SegmentFil func (_m *MockMetaCache) Schema() *schemapb.CollectionSchema { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Schema") + } + var r0 *schemapb.CollectionSchema if rf, ok := ret.Get(0).(func() *schemapb.CollectionSchema); ok { r0 = rf() diff --git a/internal/flushcommon/syncmgr/meta_writer.go b/internal/flushcommon/syncmgr/meta_writer.go index 7ffb04de33..3f6773c37a 100644 --- a/internal/flushcommon/syncmgr/meta_writer.go +++ b/internal/flushcommon/syncmgr/meta_writer.go @@ -60,7 +60,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error } checkPoints = append(checkPoints, &datapb.CheckPoint{ SegmentID: pack.segmentID, - NumOfRows: segment.FlushedRows() + pack.batchSize, + NumOfRows: segment.FlushedRows() + pack.batchRows, Position: pack.checkpoint, }) diff --git a/internal/flushcommon/syncmgr/mock_sync_manager.go b/internal/flushcommon/syncmgr/mock_sync_manager.go index 37baca41fa..dc57aaee95 100644 --- a/internal/flushcommon/syncmgr/mock_sync_manager.go +++ b/internal/flushcommon/syncmgr/mock_sync_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package syncmgr @@ -34,6 +34,10 @@ func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks .. _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for SyncData") + } + var r0 *conc.Future[struct{}] if rf, ok := ret.Get(0).(func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]); ok { r0 = rf(ctx, task, callbacks...) @@ -83,6 +87,51 @@ func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, return _c } +// TaskStatsJSON provides a mock function with given fields: +func (_m *MockSyncManager) TaskStatsJSON() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for TaskStatsJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockSyncManager_TaskStatsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TaskStatsJSON' +type MockSyncManager_TaskStatsJSON_Call struct { + *mock.Call +} + +// TaskStatsJSON is a helper method to define mock.On call +func (_e *MockSyncManager_Expecter) TaskStatsJSON() *MockSyncManager_TaskStatsJSON_Call { + return &MockSyncManager_TaskStatsJSON_Call{Call: _e.mock.On("TaskStatsJSON")} +} + +func (_c *MockSyncManager_TaskStatsJSON_Call) Run(run func()) *MockSyncManager_TaskStatsJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSyncManager_TaskStatsJSON_Call) Return(_a0 string) *MockSyncManager_TaskStatsJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSyncManager_TaskStatsJSON_Call) RunAndReturn(run func() string) *MockSyncManager_TaskStatsJSON_Call { + _c.Call.Return(run) + return _c +} + // NewMockSyncManager creates a new instance of MockSyncManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockSyncManager(t interface { diff --git a/internal/flushcommon/syncmgr/options.go b/internal/flushcommon/syncmgr/options.go index 3696d9cda2..7c84d7099e 100644 --- a/internal/flushcommon/syncmgr/options.go +++ b/internal/flushcommon/syncmgr/options.go @@ -108,8 +108,8 @@ func (t *SyncTask) WithFailureCallback(callback func(error)) *SyncTask { return t } -func (t *SyncTask) WithBatchSize(batchSize int64) *SyncTask { - t.batchSize = batchSize +func (t *SyncTask) WithBatchRows(batchRows int64) *SyncTask { + t.batchRows = batchRows return t } diff --git a/internal/flushcommon/syncmgr/serializer.go b/internal/flushcommon/syncmgr/serializer.go index 90b621e21b..25dfd8103f 100644 --- a/internal/flushcommon/syncmgr/serializer.go +++ b/internal/flushcommon/syncmgr/serializer.go @@ -48,7 +48,7 @@ type SyncPack struct { tsTo typeutil.Timestamp startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition - batchSize int64 // batchSize is the row number of this sync task,not the total num of rows of segemnt + batchRows int64 // batchRows is the row number of this sync task,not the total num of rows of segment dataSource string isFlush bool isDrop bool @@ -124,8 +124,8 @@ func (p *SyncPack) WithDrop() *SyncPack { return p } -func (p *SyncPack) WithBatchSize(batchSize int64) *SyncPack { - p.batchSize = batchSize +func (p *SyncPack) WithBatchRows(batchRows int64) *SyncPack { + p.batchRows = batchRows return p } diff --git a/internal/flushcommon/syncmgr/storage_serializer.go b/internal/flushcommon/syncmgr/storage_serializer.go index bbe2b62adc..6f4606c1c7 100644 --- a/internal/flushcommon/syncmgr/storage_serializer.go +++ b/internal/flushcommon/syncmgr/storage_serializer.go @@ -169,7 +169,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) { WithPartitionID(pack.partitionID). WithChannelName(pack.channelName). WithSegmentID(pack.segmentID). - WithBatchSize(pack.batchSize). + WithBatchRows(pack.batchRows). WithSchema(s.metacache.Schema()). WithStartPosition(pack.startPosition). WithCheckpoint(pack.checkpoint). @@ -235,7 +235,7 @@ func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.Primar stats.UpdateByMsgs(chunkPkData) } - blob, err := s.inCodec.SerializePkStats(stats, pack.batchSize) + blob, err := s.inCodec.SerializePkStats(stats, pack.batchRows) if err != nil { return nil, nil, err } diff --git a/internal/flushcommon/syncmgr/storage_serializer_test.go b/internal/flushcommon/syncmgr/storage_serializer_test.go index 2be0b0788e..b6d94d574c 100644 --- a/internal/flushcommon/syncmgr/storage_serializer_test.go +++ b/internal/flushcommon/syncmgr/storage_serializer_test.go @@ -198,7 +198,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { s.Run("with_empty_data", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchSize(0) + pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchRows(0) _, err := s.serializer.EncodeBuffer(ctx, pack) s.Error(err) @@ -207,7 +207,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { s.Run("with_normal_data", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10) + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchRows(10) s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() @@ -241,7 +241,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { s.Run("with_flush", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10) + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchRows(10) pack.WithFlush() bfs := s.getBfs() diff --git a/internal/flushcommon/syncmgr/sync_manager.go b/internal/flushcommon/syncmgr/sync_manager.go index 329e354e36..aa52b04a1d 100644 --- a/internal/flushcommon/syncmgr/sync_manager.go +++ b/internal/flushcommon/syncmgr/sync_manager.go @@ -2,9 +2,12 @@ package syncmgr import ( "context" + "encoding/json" "fmt" "strconv" + "time" + "github.com/hashicorp/golang-lru/v2/expirable" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -45,13 +48,16 @@ type SyncMeta struct { type SyncManager interface { // SyncData is the method to submit sync task. SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] + + TaskStatsJSON() string } type syncManager struct { *keyLockDispatcher[int64] chunkManager storage.ChunkManager - tasks *typeutil.ConcurrentMap[string, Task] + tasks *typeutil.ConcurrentMap[string, Task] + taskStats *expirable.LRU[string, Task] } func NewSyncManager(chunkManager storage.ChunkManager) SyncManager { @@ -64,6 +70,7 @@ func NewSyncManager(chunkManager storage.ChunkManager) SyncManager { keyLockDispatcher: dispatcher, chunkManager: chunkManager, tasks: typeutil.NewConcurrentMap[string, Task](), + taskStats: expirable.NewLRU[string, Task](512, nil, time.Minute*15), } // setup config update watcher params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)) @@ -103,6 +110,7 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...fu func (mgr *syncManager) safeSubmitTask(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] { taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp()) mgr.tasks.Insert(taskKey, task) + mgr.taskStats.Add(taskKey, task) key := task.SegmentID() return mgr.submit(ctx, key, task, callbacks...) @@ -120,3 +128,17 @@ func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callba log.Info("sync mgr sumbit task with key", zap.Int64("key", key)) return mgr.Submit(ctx, key, task, callbacks...) } + +func (mgr *syncManager) TaskStatsJSON() string { + tasks := mgr.taskStats.Values() + if len(tasks) == 0 { + return "" + } + + ret, err := json.Marshal(tasks) + if err != nil { + log.Warn("failed to marshal sync task stats", zap.Error(err)) + return "" + } + return string(ret) +} diff --git a/internal/flushcommon/syncmgr/sync_manager_test.go b/internal/flushcommon/syncmgr/sync_manager_test.go index f05ccf554e..d8c19c7a68 100644 --- a/internal/flushcommon/syncmgr/sync_manager_test.go +++ b/internal/flushcommon/syncmgr/sync_manager_test.go @@ -2,6 +2,7 @@ package syncmgr import ( "context" + "encoding/json" "math/rand" "strconv" "testing" @@ -9,6 +10,7 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/atomic" @@ -271,6 +273,44 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() { s.Error(err) } +func (s *SyncManagerSuite) TestSyncManager_TaskStatsJSON() { + manager := NewSyncManager(s.chunkManager) + syncMgr, ok := manager.(*syncManager) + assert.True(s.T(), ok) + + task1 := &SyncTask{ + segmentID: 12345, + collectionID: 1, + partitionID: 1, + channelName: "channel1", + schema: &schemapb.CollectionSchema{}, + checkpoint: &msgpb.MsgPosition{}, + tsFrom: 1000, + tsTo: 2000, + } + + task2 := &SyncTask{ + segmentID: 67890, + collectionID: 2, + partitionID: 2, + channelName: "channel2", + schema: &schemapb.CollectionSchema{}, + checkpoint: &msgpb.MsgPosition{}, + tsFrom: 3000, + tsTo: 4000, + } + + syncMgr.taskStats.Add("12345-1000", task1) + syncMgr.taskStats.Add("67890-3000", task2) + + expectedTasks := []SyncTask{*task1, *task2} + expectedJSON, err := json.Marshal(expectedTasks) + s.NoError(err) + + actualJSON := syncMgr.TaskStatsJSON() + s.JSONEq(string(expectedJSON), actualJSON) +} + func TestSyncManager(t *testing.T) { suite.Run(t, new(SyncManagerSuite)) } diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index de9025faf7..86ae63df67 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -18,8 +18,10 @@ package syncmgr import ( "context" + "encoding/json" "fmt" "path" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -36,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -56,9 +59,9 @@ type SyncTask struct { startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition dataSource string - // batchSize is the row number of this sync task, + // batchRows is the row number of this sync task, // not the total num of rows of segemnt - batchSize int64 + batchRows int64 level datapb.SegmentLevel tsFrom typeutil.Timestamp @@ -97,6 +100,9 @@ type SyncTask struct { failureCallback func(err error) tr *timerecord.TimeRecorder + + flushedSize int64 + execTime time.Duration } func (t *SyncTask) getLogger() *log.MLogger { @@ -162,16 +168,17 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { return err } - var totalSize float64 - totalSize += lo.SumBy(lo.Values(t.binlogMemsize), func(fieldSize int64) float64 { - return float64(fieldSize) - }) - if t.deltaBlob != nil { - totalSize += float64(len(t.deltaBlob.Value)) + var totalSize int64 + for _, size := range t.binlogMemsize { + totalSize += size } + if t.deltaBlob != nil { + totalSize += int64(len(t.deltaBlob.Value)) + } + t.flushedSize = totalSize - metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(totalSize) - metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchSize)) + metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize)) + metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows)) metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds())) @@ -183,7 +190,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { } } - actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchSize)} + actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)} if t.isFlush { actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed)) } @@ -194,7 +201,8 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName)) } - log.Info("task done", zap.Float64("flushedSize", totalSize), zap.Duration("interval", t.tr.RecordSpan())) + t.execTime = t.tr.RecordSpan() + log.Info("task done", zap.Int64("flushedSize", totalSize), zap.Duration("timeTaken", t.execTime)) if !t.isFlush { metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc() @@ -288,7 +296,7 @@ func (t *SyncTask) processBM25StastBlob() { func (t *SyncTask) processStatsBlob() { if t.batchStatsBlob != nil { - t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), t.nextID(), t.batchSize) + t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), t.nextID(), t.batchRows) } if t.mergedStatsBlob != nil { totalRowNum := t.segment.NumOfRows() @@ -408,3 +416,16 @@ func (t *SyncTask) IsFlush() bool { func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) { return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog } + +func (t *SyncTask) MarshalJSON() ([]byte, error) { + return json.Marshal(&metricsinfo.SyncTask{ + SegmentID: t.segmentID, + BatchRows: t.batchRows, + SegmentLevel: t.level.String(), + TsFrom: t.tsFrom, + TsTo: t.tsTo, + DeltaRowCount: t.deltaRowCount, + FlushSize: t.flushedSize, + RunningTime: t.execTime, + }) +} diff --git a/internal/flushcommon/syncmgr/task_test.go b/internal/flushcommon/syncmgr/task_test.go index b4fc64ae7e..e132cdb963 100644 --- a/internal/flushcommon/syncmgr/task_test.go +++ b/internal/flushcommon/syncmgr/task_test.go @@ -382,6 +382,34 @@ func (s *SyncTaskSuite) TestNextID() { }) } +func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() { + task := &SyncTask{ + segmentID: 12345, + batchRows: 100, + level: datapb.SegmentLevel_L0, + tsFrom: 1000, + tsTo: 2000, + deltaRowCount: 10, + flushedSize: 1024, + execTime: 2 * time.Second, + } + + expectedJSON := `{ + "segment_id": 12345, + "batch_rows": 100, + "segment_level": "L0", + "ts_from": 1000, + "ts_to": 2000, + "delta_row_count": 10, + "flush_size": 1024, + "running_time": 2000000000 + }` + + data, err := task.MarshalJSON() + s.NoError(err) + s.JSONEq(expectedJSON, string(data)) +} + func TestSyncTask(t *testing.T) { suite.Run(t, new(SyncTaskSuite)) } diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 4cb0c32437..1ad6fb6636 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -604,7 +604,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy WithLevel(segmentInfo.Level()). WithDataSource(metrics.StreamingDataSourceLabel). WithCheckpoint(wb.checkpoint). - WithBatchSize(batchSize). + WithBatchRows(batchSize). WithErrorHandler(wb.errHandler) if len(bm25) != 0 { diff --git a/internal/http/router.go b/internal/http/router.go index 1830d6423e..4d6acec3f7 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -70,7 +70,14 @@ const ( ClusterClientsPath = "/_cluster/clients" ClusterDependenciesPath = "/_cluster/dependencies" HookConfigsPath = "/_hook/configs" - QcoordSegmentsPath = "/_qcoord/segments" - QcoordChannelsPath = "/_qcoord/channels" - QcoordTasksPath = "/_qcoord/tasks" + QCoordSegmentsPath = "/_qcoord/segments" + QCoordChannelsPath = "/_qcoord/channels" + QCoordAllTasksPath = "/_qcoord/tasks/all" + + DCoordAllTasksPath = "/_dcoord/tasks/all" + DCoordImportTasksPath = "/_dcoord/tasks/import" + DCoordCompactionTasksPath = "/_dcoord/tasks/compaction" + DCoordBuildIndexTasksPath = "/_dcoord/tasks/build_index" + + DNodeSyncTasksPath = "/_dnode/tasks/sync" ) diff --git a/internal/http/webui/5xx.html b/internal/http/webui/5xx.html index 3977cf767d..aafdacc75b 100644 --- a/internal/http/webui/5xx.html +++ b/internal/http/webui/5xx.html @@ -8,32 +8,21 @@ - - - - + + +
-Please try again later.
+Service Unavailable, please try again later.
+ +