mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
issue: #45881 Add persistent task management for external collections with automatic detection of external_source and external_spec changes. When source changes, the system aborts running tasks and creates new ones, ensuring only one active task per collection. Tasks validate their source on completion to prevent superseded tasks from committing results. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: at most one active UpdateExternalCollection task exists per collection — tasks are serialized by collectionID (collection-level locking) and any change to external_source or external_spec aborts superseded tasks and causes a new task creation (externalCollectionManager + external_collection_task_meta collection-based locks enforce this). - What was simplified/removed: per-task fine-grained locking and concurrent multi-task acceptance per collection were replaced by collection-level synchronization (external_collection_task_meta.go) and a single persistent task lifecycle in DataCoord/Index task code; redundant double-concurrent update paths were removed by checking existing task presence in AddTask/LoadOrStore and aborting/overwriting via Drop/Cancel flows. - Why this does NOT cause data loss or regress behavior: task state transitions and commit are validated against the current external source/spec before applying changes — UpdateStateWithMeta and SetJobInfo verify task metadata and persist via catalog only under matching collection-state; DataNode externalCollectionManager persists task results to in-memory manager and exposes Query/Drop flows (services.go) without modifying existing segment data unless a task successfully finishes and SetJobInfo atomically updates segments via meta/catalog calls, preventing superseded tasks from committing stale results. - New capability added: end-to-end external collection update workflow — DataCoord Index task + Cluster RPC helpers + DataNode external task runner and ExternalCollectionManager enable creating, querying, cancelling, and applying external collection updates (fragment-to-segment balancing, kept/updated segment handling, allocator integration); accompanying unit tests cover success, failure, cancellation, allocator errors, and balancing logic. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
457 lines
13 KiB
Go
457 lines
13 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
|
)
|
|
|
|
func ensureContext(ctx context.Context) error {
|
|
if ctx == nil {
|
|
return nil
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Fragment represents a data fragment from external source
|
|
type Fragment struct {
|
|
FragmentID int64
|
|
RowCount int64
|
|
}
|
|
|
|
// FragmentRowRange represents the row index range for a fragment within a segment
|
|
type FragmentRowRange struct {
|
|
FragmentID int64
|
|
StartRow int64 // inclusive
|
|
EndRow int64 // exclusive
|
|
}
|
|
|
|
// SegmentRowMapping holds the row index mapping for all fragments in a segment
|
|
type SegmentRowMapping struct {
|
|
SegmentID int64
|
|
TotalRows int64
|
|
Ranges []FragmentRowRange
|
|
Fragments []Fragment
|
|
}
|
|
|
|
// NewSegmentRowMapping creates a row mapping from fragments
|
|
// Fragments are mapped sequentially: fragment1 gets [0, rowCount1), fragment2 gets [rowCount1, rowCount1+rowCount2), etc.
|
|
func NewSegmentRowMapping(segmentID int64, fragments []Fragment) *SegmentRowMapping {
|
|
mapping := &SegmentRowMapping{
|
|
SegmentID: segmentID,
|
|
Fragments: fragments,
|
|
Ranges: make([]FragmentRowRange, len(fragments)),
|
|
}
|
|
|
|
var offset int64
|
|
for i, f := range fragments {
|
|
mapping.Ranges[i] = FragmentRowRange{
|
|
FragmentID: f.FragmentID,
|
|
StartRow: offset,
|
|
EndRow: offset + f.RowCount,
|
|
}
|
|
offset += f.RowCount
|
|
}
|
|
mapping.TotalRows = offset
|
|
|
|
return mapping
|
|
}
|
|
|
|
// GetFragmentByRowIndex returns the fragment range that contains the given row index
|
|
// Returns nil if rowIndex is out of range
|
|
// To get local index within fragment: rowIndex - range.StartRow
|
|
func (m *SegmentRowMapping) GetFragmentByRowIndex(rowIndex int64) *FragmentRowRange {
|
|
if rowIndex < 0 || rowIndex >= m.TotalRows {
|
|
return nil
|
|
}
|
|
|
|
// Binary search for efficiency
|
|
left, right := 0, len(m.Ranges)-1
|
|
for left <= right {
|
|
mid := (left + right) / 2
|
|
r := &m.Ranges[mid]
|
|
if rowIndex >= r.StartRow && rowIndex < r.EndRow {
|
|
return r
|
|
} else if rowIndex < r.StartRow {
|
|
right = mid - 1
|
|
} else {
|
|
left = mid + 1
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SegmentFragments maps segment ID to its fragments
|
|
type SegmentFragments map[int64][]Fragment
|
|
|
|
// SegmentResult holds a segment and its fragment row mapping
|
|
type SegmentResult struct {
|
|
Segment *datapb.SegmentInfo
|
|
RowMapping *SegmentRowMapping
|
|
}
|
|
|
|
// UpdateExternalTask handles updating external collection segments
|
|
type UpdateExternalTask struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
req *datapb.UpdateExternalCollectionRequest
|
|
tr *timerecord.TimeRecorder
|
|
|
|
state indexpb.JobState
|
|
failReason string
|
|
|
|
// Result after execution
|
|
updatedSegments []*datapb.SegmentInfo
|
|
segmentMappings map[int64]*SegmentRowMapping // segmentID -> row mapping
|
|
}
|
|
|
|
// NewUpdateExternalTask creates a new update external task
|
|
func NewUpdateExternalTask(
|
|
ctx context.Context,
|
|
cancel context.CancelFunc,
|
|
req *datapb.UpdateExternalCollectionRequest,
|
|
) *UpdateExternalTask {
|
|
return &UpdateExternalTask{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
req: req,
|
|
tr: timerecord.NewTimeRecorder(fmt.Sprintf("UpdateExternalTask: %d", req.GetTaskID())),
|
|
state: indexpb.JobState_JobStateInit,
|
|
segmentMappings: make(map[int64]*SegmentRowMapping),
|
|
}
|
|
}
|
|
|
|
func (t *UpdateExternalTask) Ctx() context.Context {
|
|
return t.ctx
|
|
}
|
|
|
|
func (t *UpdateExternalTask) Name() string {
|
|
return fmt.Sprintf("UpdateExternalTask-%d", t.req.GetTaskID())
|
|
}
|
|
|
|
func (t *UpdateExternalTask) OnEnqueue(ctx context.Context) error {
|
|
t.tr.RecordSpan()
|
|
log.Ctx(ctx).Info("UpdateExternalTask enqueued",
|
|
zap.Int64("taskID", t.req.GetTaskID()),
|
|
zap.Int64("collectionID", t.req.GetCollectionID()))
|
|
return nil
|
|
}
|
|
|
|
func (t *UpdateExternalTask) SetState(state indexpb.JobState, failReason string) {
|
|
t.state = state
|
|
t.failReason = failReason
|
|
}
|
|
|
|
func (t *UpdateExternalTask) GetState() indexpb.JobState {
|
|
return t.state
|
|
}
|
|
|
|
func (t *UpdateExternalTask) GetSlot() int64 {
|
|
return 1
|
|
}
|
|
|
|
func (t *UpdateExternalTask) Reset() {
|
|
t.ctx = nil
|
|
t.cancel = nil
|
|
t.req = nil
|
|
t.tr = nil
|
|
t.updatedSegments = nil
|
|
t.segmentMappings = nil
|
|
}
|
|
|
|
func (t *UpdateExternalTask) PreExecute(ctx context.Context) error {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return err
|
|
}
|
|
log.Ctx(ctx).Info("UpdateExternalTask PreExecute",
|
|
zap.Int64("taskID", t.req.GetTaskID()),
|
|
zap.Int64("collectionID", t.req.GetCollectionID()))
|
|
|
|
if t.req == nil {
|
|
return fmt.Errorf("request is nil")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *UpdateExternalTask) Execute(ctx context.Context) error {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return err
|
|
}
|
|
log.Ctx(ctx).Info("UpdateExternalTask Execute",
|
|
zap.Int64("taskID", t.req.GetTaskID()),
|
|
zap.Int64("collectionID", t.req.GetCollectionID()))
|
|
|
|
// TODO: Fetch fragments from external source
|
|
// newFragments := fetchFragmentsFromExternalSource(t.req.GetExternalSource(), t.req.GetExternalSpec())
|
|
var newFragments []Fragment
|
|
|
|
// Build current segment -> fragments mapping
|
|
// TODO: This mapping should come from metadata or be stored in SegmentInfo
|
|
currentSegmentFragments := t.buildCurrentSegmentFragments()
|
|
|
|
// Compare and organize segments
|
|
updatedSegments, err := t.organizeSegments(ctx, currentSegmentFragments, newFragments)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.updatedSegments = updatedSegments
|
|
return nil
|
|
}
|
|
|
|
func (t *UpdateExternalTask) PostExecute(ctx context.Context) error {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return err
|
|
}
|
|
log.Ctx(ctx).Info("UpdateExternalTask PostExecute",
|
|
zap.Int64("taskID", t.req.GetTaskID()),
|
|
zap.Int64("collectionID", t.req.GetCollectionID()),
|
|
zap.Int("updatedSegments", len(t.updatedSegments)))
|
|
return nil
|
|
}
|
|
|
|
// GetUpdatedSegments returns the result segments after execution
|
|
func (t *UpdateExternalTask) GetUpdatedSegments() []*datapb.SegmentInfo {
|
|
return t.updatedSegments
|
|
}
|
|
|
|
// GetSegmentMappings returns the row mappings for all segments (segmentID -> mapping)
|
|
func (t *UpdateExternalTask) GetSegmentMappings() map[int64]*SegmentRowMapping {
|
|
return t.segmentMappings
|
|
}
|
|
|
|
// buildCurrentSegmentFragments builds segment to fragments mapping from current segments
|
|
func (t *UpdateExternalTask) buildCurrentSegmentFragments() SegmentFragments {
|
|
result := make(SegmentFragments)
|
|
// TODO: Extract fragment information from segment metadata
|
|
// For now, this is a placeholder - fragment info should be stored in segment metadata
|
|
for _, seg := range t.req.GetCurrentSegments() {
|
|
// Placeholder: each segment has its own "virtual" fragment
|
|
result[seg.GetID()] = []Fragment{
|
|
{FragmentID: seg.GetID(), RowCount: seg.GetNumOfRows()},
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// organizeSegments compares fragments and organizes them into segments
|
|
func (t *UpdateExternalTask) organizeSegments(
|
|
ctx context.Context,
|
|
currentSegmentFragments SegmentFragments,
|
|
newFragments []Fragment,
|
|
) ([]*datapb.SegmentInfo, error) {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
log := log.Ctx(ctx)
|
|
|
|
// Build new fragment map for quick lookup
|
|
newFragmentMap := make(map[int64]Fragment)
|
|
for _, f := range newFragments {
|
|
newFragmentMap[f.FragmentID] = f
|
|
}
|
|
|
|
// Track which fragments are used by kept segments
|
|
usedFragments := make(map[int64]bool)
|
|
var keptSegments []*datapb.SegmentInfo
|
|
|
|
// Check each current segment
|
|
for _, seg := range t.req.GetCurrentSegments() {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
fragments := currentSegmentFragments[seg.GetID()]
|
|
allFragmentsExist := true
|
|
|
|
// Check if all fragments of this segment still exist
|
|
for _, f := range fragments {
|
|
if _, exists := newFragmentMap[f.FragmentID]; !exists {
|
|
allFragmentsExist = false
|
|
log.Info("Fragment removed from segment",
|
|
zap.Int64("segmentID", seg.GetID()),
|
|
zap.Int64("fragmentID", f.FragmentID))
|
|
break
|
|
}
|
|
}
|
|
|
|
if allFragmentsExist {
|
|
// Keep this segment unchanged
|
|
keptSegments = append(keptSegments, seg)
|
|
for _, f := range fragments {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
usedFragments[f.FragmentID] = true
|
|
}
|
|
// Compute row mapping for kept segment
|
|
t.segmentMappings[seg.GetID()] = NewSegmentRowMapping(seg.GetID(), fragments)
|
|
log.Debug("Segment kept unchanged",
|
|
zap.Int64("segmentID", seg.GetID()))
|
|
} else {
|
|
// Segment invalidated - its remaining fragments become orphans
|
|
log.Info("Segment invalidated due to removed fragments",
|
|
zap.Int64("segmentID", seg.GetID()))
|
|
}
|
|
}
|
|
|
|
// Collect orphan fragments (new + from invalidated segments)
|
|
var orphanFragments []Fragment
|
|
for _, f := range newFragments {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
if !usedFragments[f.FragmentID] {
|
|
orphanFragments = append(orphanFragments, f)
|
|
}
|
|
}
|
|
|
|
// Organize orphan fragments into new segments with balanced row counts
|
|
newSegments, err := t.balanceFragmentsToSegments(ctx, orphanFragments)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Combine kept and new segments
|
|
result := append(keptSegments, newSegments...)
|
|
|
|
log.Info("Segment organization complete",
|
|
zap.Int("keptSegments", len(keptSegments)),
|
|
zap.Int("newSegments", len(newSegments)),
|
|
zap.Int("totalSegments", len(result)))
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// balanceFragmentsToSegments organizes fragments into segments with balanced row counts
|
|
func (t *UpdateExternalTask) balanceFragmentsToSegments(ctx context.Context, fragments []Fragment) ([]*datapb.SegmentInfo, error) {
|
|
if len(fragments) == 0 {
|
|
return nil, nil
|
|
}
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log := log.Ctx(ctx)
|
|
|
|
// Calculate total rows
|
|
var totalRows int64
|
|
for _, f := range fragments {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
totalRows += f.RowCount
|
|
}
|
|
|
|
// Determine target rows per segment (use a reasonable default)
|
|
// TODO: Make this configurable or based on segment size limits
|
|
targetRowsPerSegment := int64(1000000) // 1M rows per segment as default
|
|
if totalRows < targetRowsPerSegment {
|
|
targetRowsPerSegment = totalRows
|
|
}
|
|
|
|
numSegments := (totalRows + targetRowsPerSegment - 1) / targetRowsPerSegment
|
|
if numSegments == 0 {
|
|
numSegments = 1
|
|
}
|
|
|
|
avgRowsPerSegment := totalRows / numSegments
|
|
|
|
log.Info("Balancing fragments to segments",
|
|
zap.Int("numFragments", len(fragments)),
|
|
zap.Int64("totalRows", totalRows),
|
|
zap.Int64("numSegments", numSegments),
|
|
zap.Int64("avgRowsPerSegment", avgRowsPerSegment))
|
|
|
|
// Sort fragments by row count descending for better bin-packing
|
|
sortedFragments := make([]Fragment, len(fragments))
|
|
copy(sortedFragments, fragments)
|
|
sort.Slice(sortedFragments, func(i, j int) bool {
|
|
return sortedFragments[i].RowCount > sortedFragments[j].RowCount
|
|
})
|
|
|
|
// Initialize segment bins
|
|
type segmentBin struct {
|
|
fragments []Fragment
|
|
rowCount int64
|
|
}
|
|
bins := make([]segmentBin, numSegments)
|
|
|
|
// Greedy bin-packing: assign each fragment to the bin with lowest current row count
|
|
for _, f := range sortedFragments {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
// Find bin with minimum row count
|
|
minIdx := 0
|
|
for i := 1; i < len(bins); i++ {
|
|
if bins[i].rowCount < bins[minIdx].rowCount {
|
|
minIdx = i
|
|
}
|
|
}
|
|
bins[minIdx].fragments = append(bins[minIdx].fragments, f)
|
|
bins[minIdx].rowCount += f.RowCount
|
|
}
|
|
|
|
// Convert bins to SegmentInfo
|
|
var result []*datapb.SegmentInfo
|
|
for i, bin := range bins {
|
|
if err := ensureContext(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
if len(bin.fragments) == 0 {
|
|
continue
|
|
}
|
|
|
|
// TODO: Generate column groups
|
|
// Just placeholder here. ID will be assigned by coordinator.
|
|
segmentID := int64(i + 1)
|
|
|
|
seg := &datapb.SegmentInfo{
|
|
ID: segmentID,
|
|
CollectionID: t.req.GetCollectionID(),
|
|
NumOfRows: bin.rowCount,
|
|
// TODO: Fill other required fields
|
|
}
|
|
result = append(result, seg)
|
|
|
|
// Compute and store row mapping for new segment
|
|
t.segmentMappings[segmentID] = NewSegmentRowMapping(segmentID, bin.fragments)
|
|
|
|
log.Debug("Created new segment from fragments",
|
|
zap.Int64("segmentID placeholder", segmentID),
|
|
zap.Int64("rowCount", bin.rowCount),
|
|
zap.Int("numFragments", len(bin.fragments)))
|
|
}
|
|
|
|
return result, nil
|
|
}
|