mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
issue: #46358 This PR implements segment reopening functionality on query nodes, enabling the application of data or schema changes to already-loaded segments without requiring a full reload. ### Core (C++) **New SegmentLoadInfo class** (`internal/core/src/segcore/SegmentLoadInfo.h/cpp`): - Encapsulates segment load configuration with structured access - Implements `ComputeDiff()` to calculate differences between old and new load states - Tracks indexes, binlogs, and column groups that need to be loaded or dropped - Provides `ConvertFieldIndexInfoToLoadIndexInfo()` for index loading **ChunkedSegmentSealedImpl modifications**: - Added `Reopen(const SegmentLoadInfo&)` method to apply incremental changes based on computed diff - Refactored `LoadColumnGroups()` and `LoadColumnGroup()` to support selective loading via field ID map - Extracted `LoadBatchIndexes()` and `LoadBatchFieldData()` for reusable batch loading logic - Added `LoadManifest()` for manifest-based loading path - Updated all methods to use `SegmentLoadInfo` wrapper instead of direct proto access **SegmentGrowingImpl modifications**: - Added `Reopen()` stub method for interface compliance **C API additions** (`segment_c.h/cpp`): - Added `ReopenSegment()` function exposing reopen to Go layer ### Go Side **QueryNode handlers** (`internal/querynodev2/`): - Added `HandleReopen()` in handlers.go - Added `ReopenSegments()` RPC in services.go **Segment interface** (`internal/querynodev2/segments/`): - Extended `Segment` interface with `Reopen()` method - Implemented `Reopen()` in LocalSegment - Added `Reopen()` to segment loader **Segcore wrapper** (`internal/util/segcore/`): - Added `Reopen()` method in segment.go - Added `ReopenSegmentRequest` in requests.go ### Proto - Added new fields to support reopen in `query_coord.proto` --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
124 lines
3.7 KiB
Go
124 lines
3.7 KiB
Go
package segcore
|
|
|
|
/*
|
|
#cgo pkg-config: milvus_core
|
|
|
|
#include "common/type_c.h"
|
|
#include "segcore/load_field_data_c.h"
|
|
*/
|
|
import "C"
|
|
|
|
import (
|
|
"unsafe"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/internal/util/initcore"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/segcorepb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
type RetrievePlanWithOffsets struct {
|
|
*RetrievePlan
|
|
Offsets []int64
|
|
}
|
|
|
|
type InsertRequest struct {
|
|
RowIDs []int64
|
|
Timestamps []typeutil.Timestamp
|
|
Record *segcorepb.InsertRecord
|
|
}
|
|
|
|
type DeleteRequest struct {
|
|
PrimaryKeys storage.PrimaryKeys
|
|
Timestamps []typeutil.Timestamp
|
|
}
|
|
|
|
type LoadFieldDataRequest struct {
|
|
Fields []LoadFieldDataInfo
|
|
MMapDir string
|
|
RowCount int64
|
|
StorageVersion int64
|
|
LoadPriority commonpb.LoadPriority
|
|
WarmupPolicy string
|
|
}
|
|
|
|
type LoadFieldDataInfo struct {
|
|
Field *datapb.FieldBinlog
|
|
EnableMMap bool
|
|
}
|
|
|
|
func (req *LoadFieldDataRequest) getCLoadFieldDataRequest() (result *cLoadFieldDataRequest, err error) {
|
|
var cLoadFieldDataInfo C.CLoadFieldDataInfo
|
|
status := C.NewLoadFieldDataInfo(&cLoadFieldDataInfo, C.int64_t(req.StorageVersion))
|
|
if err := ConsumeCStatusIntoError(&status); err != nil {
|
|
return nil, errors.Wrap(err, "NewLoadFieldDataInfo failed")
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
C.DeleteLoadFieldDataInfo(cLoadFieldDataInfo)
|
|
}
|
|
}()
|
|
rowCount := C.int64_t(req.RowCount)
|
|
|
|
for _, field := range req.Fields {
|
|
cFieldID := C.int64_t(field.Field.GetFieldID())
|
|
status = C.AppendLoadFieldInfo(cLoadFieldDataInfo, cFieldID, rowCount)
|
|
if err := ConsumeCStatusIntoError(&status); err != nil {
|
|
return nil, errors.Wrapf(err, "AppendLoadFieldInfo failed at fieldID, %d", field.Field.GetFieldID())
|
|
}
|
|
for _, binlog := range field.Field.Binlogs {
|
|
cEntriesNum := C.int64_t(binlog.GetEntriesNum())
|
|
cMemorySize := C.int64_t(binlog.GetMemorySize())
|
|
cFile := C.CString(binlog.GetLogPath())
|
|
defer C.free(unsafe.Pointer(cFile))
|
|
|
|
status = C.AppendLoadFieldDataPath(cLoadFieldDataInfo, cFieldID, cEntriesNum, cMemorySize, cFile)
|
|
if err := ConsumeCStatusIntoError(&status); err != nil {
|
|
return nil, errors.Wrapf(err, "AppendLoadFieldDataPath failed at binlog, %d, %s", field.Field.GetFieldID(), binlog.GetLogPath())
|
|
}
|
|
}
|
|
|
|
childField := field.Field.GetChildFields()
|
|
if len(childField) > 0 {
|
|
status = C.SetLoadFieldInfoChildFields(cLoadFieldDataInfo, cFieldID, (*C.int64_t)(unsafe.Pointer(&childField[0])), C.int64_t(len(childField)))
|
|
if err := ConsumeCStatusIntoError(&status); err != nil {
|
|
return nil, errors.Wrapf(err, "SetLoadFieldInfoChildFields failed at binlog, %d", field.Field.GetFieldID())
|
|
}
|
|
}
|
|
|
|
C.EnableMmap(cLoadFieldDataInfo, cFieldID, C.bool(field.EnableMMap))
|
|
}
|
|
C.SetLoadPriority(cLoadFieldDataInfo, C.int32_t(req.LoadPriority))
|
|
if len(req.WarmupPolicy) > 0 {
|
|
warmupPolicy, err := initcore.ConvertCacheWarmupPolicy(req.WarmupPolicy)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "ConvertCacheWarmupPolicy failed at warmupPolicy, %s", req.WarmupPolicy)
|
|
}
|
|
C.AppendWarmupPolicy(cLoadFieldDataInfo, C.CacheWarmupPolicy(warmupPolicy))
|
|
}
|
|
return &cLoadFieldDataRequest{
|
|
cLoadFieldDataInfo: cLoadFieldDataInfo,
|
|
}, nil
|
|
}
|
|
|
|
type cLoadFieldDataRequest struct {
|
|
cLoadFieldDataInfo C.CLoadFieldDataInfo
|
|
}
|
|
|
|
func (req *cLoadFieldDataRequest) Release() {
|
|
C.DeleteLoadFieldDataInfo(req.cLoadFieldDataInfo)
|
|
}
|
|
|
|
type AddFieldDataInfoRequest = LoadFieldDataRequest
|
|
|
|
type AddFieldDataInfoResult struct{}
|
|
|
|
type ReopenRequest struct {
|
|
LoadInfo *querypb.SegmentLoadInfo
|
|
}
|