feat: Storage v2 compaction (#40667)

- Feat: Support Mix compaction. Covering tests include compatibility and
rollback ability.
  - Read v1 segments and compact with v2 format.
  - Read both v1 and v2 segments and compact with v2 format.
  - Read v2 segments and compact with v2 format.
  - Compact with duplicate primary key test.
  - Compact with bm25 segments.
  - Compact with merge sort segments.
  - Compact with no expiration segments.
  - Compact with lack binlog segments.
  - Compact with nullable field segments.
- Feat: Support Clustering compaction. Covering tests include
compatibility and rollback ability.
  - Read v1 segments and compact with v2 format.
  - Read both v1 and v2 segments and compact with v2 format.
  - Read v2 segments and compact with v2 format.
  - Compact bm25 segments with v2 format.
  - Compact with memory limit.
- Enhance: Use serdeMap serialize in BuildRecord function to support all
Milvus data types.
related: #39173

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-03-21 10:16:12 +08:00 committed by GitHub
parent 8eb537fdd0
commit d7df78a6c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 954 additions and 402 deletions

View File

@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence # Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage") milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION 26992ec ) set( milvus-storage_VERSION f21caea )
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

View File

@ -326,7 +326,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
} }
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100) writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
buffer := newClusterBuffer(id, writer, fieldStats) buffer := newClusterBuffer(id, writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, buffer) t.clusterBuffers = append(t.clusterBuffers, buffer)
@ -342,7 +342,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
} }
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100) writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
nullBuffer = newClusterBuffer(len(buckets), writer, fieldStats) nullBuffer = newClusterBuffer(len(buckets), writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, nullBuffer) t.clusterBuffers = append(t.clusterBuffers, nullBuffer)
@ -395,7 +395,7 @@ func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, buff
fieldStats.SetVectorCentroids(centroidValues...) fieldStats.SetVectorCentroids(centroidValues...)
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100) writer := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
buffer := newClusterBuffer(id, writer, fieldStats) buffer := newClusterBuffer(id, writer, fieldStats)
t.clusterBuffers = append(t.clusterBuffers, buffer) t.clusterBuffers = append(t.clusterBuffers, buffer)
@ -461,6 +461,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
// only FieldBinlogs and deltalogs needed // only FieldBinlogs and deltalogs needed
Deltalogs: segment.Deltalogs, Deltalogs: segment.Deltalogs,
FieldBinlogs: segment.FieldBinlogs, FieldBinlogs: segment.FieldBinlogs,
StorageVersion: segment.StorageVersion,
} }
future := t.mappingPool.Submit(func() (any, error) { future := t.mappingPool.Submit(func() (any, error) {
err := t.mappingSegment(ctx, segmentClone) err := t.mappingSegment(ctx, segmentClone)
@ -571,7 +572,7 @@ func (t *clusteringCompactionTask) mappingSegment(
rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths) return t.binlogIO.Download(ctx, paths)
})) }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize))
if err != nil { if err != nil {
log.Warn("new binlog record reader wrong", zap.Error(err)) log.Warn("new binlog record reader wrong", zap.Error(err))
return err return err
@ -805,17 +806,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyzeSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("scalarAnalyzeSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID()))
defer span.End() defer span.End()
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("segmentID", segment.GetSegmentID())) log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("segmentID", segment.GetSegmentID()))
// vars
processStart := time.Now() processStart := time.Now()
fieldBinlogPaths := make([][]string, 0)
// initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state
var (
timestampTo int64 = -1
timestampFrom int64 = -1
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
// Get the number of field binlog files from non-empty segment // Get the number of field binlog files from non-empty segment
var binlogNum int var binlogNum int
@ -831,6 +822,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
return nil, merr.WrapErrIllegalCompactionPlan("all segments' binlogs are empty") return nil, merr.WrapErrIllegalCompactionPlan("all segments' binlogs are empty")
} }
log.Debug("binlogNum", zap.Int("binlogNum", binlogNum)) log.Debug("binlogNum", zap.Int("binlogNum", binlogNum))
fieldBinlogPaths := make([][]string, 0)
for idx := 0; idx < binlogNum; idx++ { for idx := 0; idx < binlogNum; idx++ {
var ps []string var ps []string
for _, f := range segment.GetFieldBinlogs() { for _, f := range segment.GetFieldBinlogs() {
@ -840,22 +832,38 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
} }
expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime) expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime)
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths) rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths)
}), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize))
if err != nil { if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) log.Warn("new binlog record reader wrong", zap.Error(err))
return nil, err return make(map[interface{}]int64), err
} }
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v} pkIter := storage.NewDeserializeReader(rr, func(r storage.Record, v []*storage.Value) error {
return storage.ValueDeserializer(r, v, t.plan.Schema.Fields)
}) })
defer pkIter.Close()
pkIter, err := storage.NewBinlogDeserializeReader(t.plan.Schema, storage.MakeBlobsReader(blobs)) analyzeResult, remained, err := t.iterAndGetScalarAnalyzeResult(pkIter, expiredFilter)
if err != nil { if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", paths), zap.Error(err))
return nil, err return nil, err
} }
log.Info("analyze segment end",
zap.Int64("remained entities", remained),
zap.Int("expired entities", expiredFilter.GetExpiredCount()),
zap.Duration("map elapse", time.Since(processStart)))
return analyzeResult, nil
}
func (t *clusteringCompactionTask) iterAndGetScalarAnalyzeResult(pkIter *storage.DeserializeReaderImpl[*storage.Value], expiredFilter compaction.EntityFilter) (map[interface{}]int64, int64, error) {
// initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state
var (
timestampTo int64 = -1
timestampFrom int64 = -1
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
for { for {
v, err := pkIter.NextValue() v, err := pkIter.NextValue()
if err != nil { if err != nil {
@ -864,7 +872,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
break break
} else { } else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err)) log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return nil, err return nil, 0, err
} }
} }
@ -883,8 +891,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
// rowValue := vIter.GetData().(*iterators.InsertRow).GetValue() // rowValue := vIter.GetData().(*iterators.InsertRow).GetValue()
row, ok := (*v).Value.(map[typeutil.UniqueID]interface{}) row, ok := (*v).Value.(map[typeutil.UniqueID]interface{})
if !ok { if !ok {
log.Warn("transfer interface to map wrong", zap.Strings("path", paths)) return nil, 0, errors.New("unexpected error")
return nil, errors.New("unexpected error")
} }
key := row[t.clusteringKeyField.GetFieldID()] key := row[t.clusteringKeyField.GetFieldID()]
if _, exist := analyzeResult[key]; exist { if _, exist := analyzeResult[key]; exist {
@ -894,13 +901,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
} }
remained++ remained++
} }
} return analyzeResult, remained, nil
log.Info("analyze segment end",
zap.Int64("remained entities", remained),
zap.Int("expired entities", expiredFilter.GetExpiredCount()),
zap.Duration("map elapse", time.Since(processStart)))
return analyzeResult, nil
} }
func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} { func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {

View File

@ -0,0 +1,281 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compactor
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/objectstorage"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
func TestClusteringCompactionTaskStorageV2Suite(t *testing.T) {
suite.Run(t, new(ClusteringCompactionTaskStorageV2Suite))
}
type ClusteringCompactionTaskStorageV2Suite struct {
ClusteringCompactionTaskSuite
}
func (s *ClusteringCompactionTaskStorageV2Suite) SetupTest() {
s.setupTest()
paramtable.Get().Save("common.storageType", "local")
paramtable.Get().Save("common.storage.enableV2", "true")
initcore.InitStorageV2FileSystem(paramtable.Get())
}
func (s *ClusteringCompactionTaskStorageV2Suite) TearDownTest() {
paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key)
paramtable.Get().Reset("common.storageType")
paramtable.Get().Reset("common.storage.enableV2")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "insert_log")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "delta_log")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "stats_log")
}
func (s *ClusteringCompactionTaskStorageV2Suite) TestScalarCompactionNormal() {
s.preparScalarCompactionNormalTask()
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(len(compactionResult.GetSegments()), len(compactionResult.GetSegments()))
for i := 0; i < len(compactionResult.GetSegments()); i++ {
seg := compactionResult.GetSegments()[i]
s.EqualValues(1, len(seg.InsertLogs))
}
s.EqualValues(10239,
lo.SumBy(compactionResult.GetSegments(), func(seg *datapb.CompactionSegment) int64 {
return seg.GetNumOfRows()
}),
)
}
func (s *ClusteringCompactionTaskStorageV2Suite) TestScalarCompactionNormal_V2ToV2Format() {
var segmentID int64 = 1001
fBinlogs, deltalogs, _, _, _, err := s.initStorageV2Segments(10240, segmentID)
s.NoError(err)
dblobs, err := getInt64DeltaBlobs(
1,
[]int64{100},
[]uint64{tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second), 0)},
)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{deltalogs.GetBinlogs()[0].GetLogPath()}).
Return([][]byte{dblobs.GetValue()}, nil).Once()
s.task.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segmentID,
FieldBinlogs: lo.Values(fBinlogs),
Deltalogs: []*datapb.FieldBinlog{deltalogs},
StorageVersion: storage.StorageV2,
},
}
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 2048
s.task.plan.MaxSegmentRows = 2048
s.task.plan.MaxSize = 1024 * 1024 * 1024 // max segment size = 1GB, we won't touch this value
s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: 1,
End: 101,
}
compactionResultV2, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(5, len(s.task.clusterBuffers))
s.Equal(5, len(compactionResultV2.GetSegments()))
totalRowNum := int64(0)
statsRowNum := int64(0)
for _, seg := range compactionResultV2.GetSegments() {
s.Equal(1, len(seg.GetInsertLogs()))
s.Equal(1, len(seg.GetField2StatslogPaths()))
totalRowNum += seg.GetNumOfRows()
statsRowNum += seg.GetField2StatslogPaths()[0].GetBinlogs()[0].GetEntriesNum()
}
s.Equal(totalRowNum, statsRowNum)
s.EqualValues(10239,
lo.SumBy(compactionResultV2.GetSegments(), func(seg *datapb.CompactionSegment) int64 {
return seg.GetNumOfRows()
}),
)
}
func (s *ClusteringCompactionTaskStorageV2Suite) TestScalarCompactionNormal_V2ToV1Format() {
paramtable.Get().Save("common.storage.enableV2", "false")
var segmentID int64 = 1001
fBinlogs, deltalogs, _, _, _, err := s.initStorageV2Segments(10240, segmentID)
s.NoError(err)
dblobs, err := getInt64DeltaBlobs(
1,
[]int64{100},
[]uint64{tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second), 0)},
)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{deltalogs.GetBinlogs()[0].GetLogPath()}).
Return([][]byte{dblobs.GetValue()}, nil).Once()
s.task.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segmentID,
FieldBinlogs: lo.Values(fBinlogs),
Deltalogs: []*datapb.FieldBinlog{deltalogs},
StorageVersion: storage.StorageV2,
},
}
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 2048
s.task.plan.MaxSegmentRows = 2048
s.task.plan.MaxSize = 1024 * 1024 * 1024 // max segment size = 1GB, we won't touch this value
s.task.plan.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: 1,
End: 101,
}
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(5, len(s.task.clusterBuffers))
s.Equal(5, len(compactionResult.GetSegments()))
totalBinlogNum := 0
totalRowNum := int64(0)
for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() {
for _, b := range fb.GetBinlogs() {
totalBinlogNum++
if fb.GetFieldID() == 100 {
totalRowNum += b.GetEntriesNum()
}
}
}
statsBinlogNum := 0
statsRowNum := int64(0)
for _, sb := range compactionResult.GetSegments()[0].GetField2StatslogPaths() {
for _, b := range sb.GetBinlogs() {
statsBinlogNum++
statsRowNum += b.GetEntriesNum()
}
}
s.Equal(1, totalBinlogNum/len(s.plan.Schema.GetFields()))
s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum)
s.EqualValues(10239,
lo.SumBy(compactionResult.GetSegments(), func(seg *datapb.CompactionSegment) int64 {
return seg.GetNumOfRows()
}),
)
}
func (s *ClusteringCompactionTaskStorageV2Suite) TestCompactionWithBM25Function() {
// 8 + 8 + 8 + 7 + 8 = 39
// 39*1024 = 39936
// plus buffer on null bitsets etc., let's make it 45000
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
s.prepareCompactionWithBM25FunctionTask()
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(5, len(compactionResult.GetSegments()))
for i := 0; i < len(compactionResult.GetSegments()); i++ {
seg := compactionResult.GetSegments()[i]
s.Equal(1, len(seg.InsertLogs))
s.Equal(1, len(seg.Bm25Logs))
}
}
func (s *ClusteringCompactionTaskStorageV2Suite) TestScalarCompactionNormalByMemoryLimit() {
s.prepareScalarCompactionNormalByMemoryLimit()
// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(2, len(s.task.clusterBuffers))
s.Equal(2, len(compactionResult.GetSegments()))
segment := compactionResult.GetSegments()[0]
s.Equal(1, len(segment.InsertLogs))
s.Equal(1, len(segment.Field2StatslogPaths))
}
func (s *ClusteringCompactionTaskStorageV2Suite) initStorageV2Segments(rows int, segmentID int64) (
inserts map[int64]*datapb.FieldBinlog,
deltas *datapb.FieldBinlog,
stats map[int64]*datapb.FieldBinlog,
bm25Stats map[int64]*datapb.FieldBinlog,
size int64,
err error,
) {
rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
cm := storage.NewLocalChunkManager(objectstorage.RootPath(rootPath))
bfs := pkoracle.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
metacache.UpdateNumOfRows(int64(rows))(seg)
mc := metacache.NewMockMetaCache(s.T())
mc.EXPECT().Collection().Return(CollectionID).Maybe()
mc.EXPECT().Schema().Return(genCollectionSchema()).Maybe()
mc.EXPECT().GetSegmentByID(segmentID).Return(seg, true).Maybe()
mc.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}).Maybe()
mc.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(seg)
}).Return().Maybe()
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", CollectionID)
deleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(100)}, []uint64{tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second), 0)})
pack := new(syncmgr.SyncPack).WithCollectionID(CollectionID).WithPartitionID(PartitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData(genInsertData(rows, segmentID, genCollectionSchema())).WithDeleteData(deleteData)
bw := syncmgr.NewBulkPackWriterV2(mc, cm, s.mockAlloc, packed.DefaultWriteBufferSize, 0)
return bw.Write(context.Background(), pack)
}
func genInsertData(size int, seed int64, schema *schemapb.CollectionSchema) []*storage.InsertData {
buf, _ := storage.NewInsertData(schema)
for i := 0; i < size; i++ {
buf.Append(genRow(int64(i)))
}
return []*storage.InsertData{buf}
}

View File

@ -30,7 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util" "github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/common"
@ -61,7 +61,7 @@ func (s *ClusteringCompactionTaskSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable()) paramtable.Get().Init(paramtable.NewBaseTable())
} }
func (s *ClusteringCompactionTaskSuite) SetupTest() { func (s *ClusteringCompactionTaskSuite) setupTest() {
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe() s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe()
@ -96,6 +96,10 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.task.plan = s.plan s.task.plan = s.plan
} }
func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.setupTest()
}
func (s *ClusteringCompactionTaskSuite) SetupSubTest() { func (s *ClusteringCompactionTaskSuite) SetupSubTest() {
s.SetupTest() s.SetupTest()
} }
@ -168,7 +172,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
s.Equal(8, s.task.flushPool.Cap()) s.Equal(8, s.task.flushPool.Cap())
} }
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { func (s *ClusteringCompactionTaskSuite) preparScalarCompactionNormalTask() {
dblobs, err := getInt64DeltaBlobs( dblobs, err := getInt64DeltaBlobs(
1, 1,
[]int64{100}, []int64{100},
@ -194,6 +198,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
segWriter.FlushAndIsFull() segWriter.FlushAndIsFull()
kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err) s.NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil)
@ -216,7 +221,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
Begin: 1, Begin: 1,
End: 101, End: 101,
} }
}
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.preparScalarCompactionNormalTask()
// 8+8+8+4+7+4*4=51 // 8+8+8+4+7+4*4=51
// 51*1024 = 52224 // 51*1024 = 52224
// writer will automatically flush after 1024 rows. // writer will automatically flush after 1024 rows.
@ -245,7 +253,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
statsRowNum += b.GetEntriesNum() statsRowNum += b.GetEntriesNum()
} }
} }
s.Equal(2, totalBinlogNum/len(schema.GetFields())) s.Equal(2, totalBinlogNum/len(s.plan.Schema.GetFields()))
s.Equal(1, statsBinlogNum) s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum) s.Equal(totalRowNum, statsRowNum)
@ -256,7 +264,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
) )
} }
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() { func (s *ClusteringCompactionTaskSuite) prepareScalarCompactionNormalByMemoryLimit() {
schema := genCollectionSchema() schema := genCollectionSchema()
var segmentID int64 = 1001 var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{}) segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{})
@ -300,7 +308,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
Begin: 1, Begin: 1,
End: 1000, End: 1000,
} }
}
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() {
s.prepareScalarCompactionNormalByMemoryLimit()
// 8+8+8+4+7+4*4=51 // 8+8+8+4+7+4*4=51
// 51*1024 = 52224 // 51*1024 = 52224
// writer will automatically flush after 1024 rows. // writer will automatically flush after 1024 rows.
@ -331,12 +342,13 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
statsRowNum += b.GetEntriesNum() statsRowNum += b.GetEntriesNum()
} }
} }
s.Equal(5, totalBinlogNum/len(schema.GetFields())) s.Equal(5, totalBinlogNum/len(s.task.plan.Schema.GetFields()))
s.Equal(1, statsBinlogNum) s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum) s.Equal(totalRowNum, statsRowNum)
} }
func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { func (s *ClusteringCompactionTaskSuite) prepareCompactionWithBM25FunctionTask() {
s.SetupTest()
schema := genCollectionSchemaWithBM25() schema := genCollectionSchemaWithBM25()
var segmentID int64 = 1001 var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{102}) segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{102})
@ -374,13 +386,16 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
Begin: 1, Begin: 1,
End: 1000, End: 1000,
} }
}
func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
// 8 + 8 + 8 + 7 + 8 = 39 // 8 + 8 + 8 + 7 + 8 = 39
// 39*1024 = 39936 // 39*1024 = 39936
// plus buffer on null bitsets etc., let's make it 45000 // plus buffer on null bitsets etc., let's make it 45000
// writer will automatically flush after 1024 rows. // writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000") paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
s.prepareCompactionWithBM25FunctionTask()
compactionResult, err := s.task.Compact() compactionResult, err := s.task.Compact()
s.Require().NoError(err) s.Require().NoError(err)
@ -404,7 +419,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
statsRowNum += b.GetEntriesNum() statsRowNum += b.GetEntriesNum()
} }
} }
s.Equal(2, totalBinlogNum/len(schema.GetFields())) s.Equal(2, totalBinlogNum/len(s.task.plan.Schema.GetFields()))
s.Equal(1, statsBinlogNum) s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum) s.Equal(totalRowNum, statsRowNum)

View File

@ -53,7 +53,7 @@ func mergeSortMultipleSegments(ctx context.Context,
segmentReaders := make([]storage.RecordReader, len(binlogs)) segmentReaders := make([]storage.RecordReader, len(binlogs))
segmentFilters := make([]compaction.EntityFilter, len(binlogs)) segmentFilters := make([]compaction.EntityFilter, len(binlogs))
for i, s := range binlogs { for i, s := range binlogs {
reader, err := storage.NewBinlogRecordReader(ctx, s.GetFieldBinlogs(), plan.GetSchema(), storage.WithDownloader(binlogIO.Download)) reader, err := storage.NewBinlogRecordReader(ctx, s.GetFieldBinlogs(), plan.GetSchema(), storage.WithDownloader(binlogIO.Download), storage.WithVersion(s.StorageVersion))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -206,7 +206,7 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
} }
entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
reader, err := storage.NewBinlogRecordReader(ctx, seg.GetFieldBinlogs(), t.plan.GetSchema(), storage.WithDownloader(t.binlogIO.Download)) reader, err := storage.NewBinlogRecordReader(ctx, seg.GetFieldBinlogs(), t.plan.GetSchema(), storage.WithDownloader(t.binlogIO.Download), storage.WithVersion(seg.GetStorageVersion()))
if err != nil { if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err)) log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return return
@ -312,16 +312,12 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return nil, err return nil, err
} }
// Unable to deal with all empty segments cases, so return error // Unable to deal with all empty segments cases, so return error
isEmpty := func() bool { isEmpty := lo.EveryBy(lo.FlatMap(t.plan.GetSegmentBinlogs(), func(seg *datapb.CompactionSegmentBinlogs, _ int) []*datapb.FieldBinlog {
for _, seg := range t.plan.GetSegmentBinlogs() { return seg.GetFieldBinlogs()
for _, field := range seg.GetFieldBinlogs() { }), func(field *datapb.FieldBinlog) bool {
if len(field.GetBinlogs()) > 0 { return len(field.GetBinlogs()) == 0
return false })
}
}
}
return true
}()
if isEmpty { if isEmpty {
log.Warn("compact wrong, all segments' binlogs are empty") log.Warn("compact wrong, all segments' binlogs are empty")
return nil, errors.New("illegal compaction plan") return nil, errors.New("illegal compaction plan")

View File

@ -0,0 +1,363 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compactor
import (
"context"
"fmt"
"math"
"os"
"testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/objectstorage"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
func TestMixCompactionTaskStorageV2Suite(t *testing.T) {
suite.Run(t, new(MixCompactionTaskStorageV2Suite))
}
type MixCompactionTaskStorageV2Suite struct {
MixCompactionTaskSuite
}
func (s *MixCompactionTaskStorageV2Suite) SetupTest() {
s.setupTest()
paramtable.Get().Save("common.storageType", "local")
paramtable.Get().Save("common.storage.enableV2", "true")
initcore.InitStorageV2FileSystem(paramtable.Get())
}
func (s *MixCompactionTaskStorageV2Suite) TearDownTest() {
paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key)
paramtable.Get().Reset("common.storageType")
paramtable.Get().Reset("common.storage.enableV2")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "insert_log")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "delta_log")
os.RemoveAll(paramtable.Get().LocalStorageCfg.Path.GetValue() + "stats_log")
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactDupPK() {
s.prepareCompactDupPKSegments()
result, err := s.task.Compact()
s.NoError(err)
s.NotNil(result)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
s.Equal(1, len(result.GetSegments()[0].GetInsertLogs()))
s.Equal(1, len(result.GetSegments()[0].GetField2StatslogPaths()))
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactDupPK_MixToV2Format() {
segments := []int64{7, 8, 9}
dblobs, err := getInt64DeltaBlobs(
1,
[]int64{100},
[]uint64{tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second), 0)},
)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"1"}).
Return([][]byte{dblobs.GetValue()}, nil).Times(len(segments))
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64)
// Clear original segments
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
for _, segID := range segments {
s.initSegBuffer(1, segID)
kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter)
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool {
left, right := lo.Difference(keys, lo.Keys(kvs))
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(fBinlogs),
Deltalogs: []*datapb.FieldBinlog{
{Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "1"}}},
},
})
}
v2Segments := []int64{10, 11}
for _, segID := range v2Segments {
binlogs, _, _, _, _, err := s.initStorageV2Segments(1, segID, alloc)
s.NoError(err)
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(binlogs),
Deltalogs: []*datapb.FieldBinlog{},
StorageVersion: storage.StorageV2,
})
}
result, err := s.task.Compact()
s.NoError(err)
s.NotNil(result)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(5, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs)
s.NotEmpty(segment.Field2StatslogPaths)
s.Empty(segment.Deltalogs)
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactDupPK_V2ToV2Format() {
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64)
// Clear original segments
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
v2Segments := []int64{10, 11}
for _, segID := range v2Segments {
binlogs, _, _, _, _, err := s.initStorageV2Segments(1, segID, alloc)
s.NoError(err)
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(binlogs),
Deltalogs: []*datapb.FieldBinlog{},
StorageVersion: storage.StorageV2,
})
}
result, err := s.task.Compact()
s.NoError(err)
s.NotNil(result)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(2, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs)
s.NotEmpty(segment.Field2StatslogPaths)
s.Empty(segment.Deltalogs)
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactDupPK_V2ToV1Format() {
paramtable.Get().Save("common.storage.enableV2", "false")
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64)
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
v2Segments := []int64{10, 11}
for _, segID := range v2Segments {
binlogs, _, _, _, _, err := s.initStorageV2Segments(1, segID, alloc)
s.NoError(err)
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(binlogs),
Deltalogs: []*datapb.FieldBinlog{},
StorageVersion: storage.StorageV2,
})
}
result, err := s.task.Compact()
s.NoError(err)
s.NotNil(result)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(2, segment.GetNumOfRows())
// each field has only one insert log for storage v1
s.EqualValues(len(s.task.plan.Schema.Fields), len(segment.GetInsertLogs()))
s.NotEmpty(segment.Field2StatslogPaths)
s.Empty(segment.Deltalogs)
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactTwoToOne() {
s.prepareCompactTwoToOneSegments()
result, err := s.task.Compact()
s.NoError(err)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
s.Equal(1, len(result.GetSegments()[0].GetInsertLogs()))
s.Equal(1, len(result.GetSegments()[0].GetField2StatslogPaths()))
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactTwoToOneWithBM25() {
s.prepareCompactTwoToOneWithBM25Segments()
result, err := s.task.Compact()
s.NoError(err)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(3, segment.GetNumOfRows())
s.Empty(segment.Deltalogs)
s.Equal(1, len(segment.InsertLogs))
s.Equal(1, len(segment.Bm25Logs))
s.Equal(1, len(segment.Field2StatslogPaths))
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactSortedSegment() {
s.prepareCompactSortedSegment()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
result, err := s.task.Compact()
s.NoError(err)
s.NotNil(result)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
s.True(result.GetSegments()[0].GetIsSorted())
segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(291, segment.GetNumOfRows())
s.EqualValues(1, len(segment.InsertLogs))
s.EqualValues(1, len(segment.Field2StatslogPaths))
s.Empty(segment.Deltalogs)
}
func (s *MixCompactionTaskStorageV2Suite) TestMergeNoExpiration_V1ToV2Format() {
s.TestMergeNoExpiration()
}
func (s *MixCompactionTaskStorageV2Suite) TestCompactSortedSegmentLackBinlog() {
s.prepareCompactSortedSegmentLackBinlog()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
result, err := s.task.Compact()
s.NoError(err)
s.NotNil(result)
s.Equal(s.task.plan.GetPlanID(), result.GetPlanID())
s.Equal(1, len(result.GetSegments()))
s.True(result.GetSegments()[0].GetIsSorted())
segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(291, segment.GetNumOfRows())
s.EqualValues(1, len(segment.InsertLogs))
s.EqualValues(1, len(segment.Field2StatslogPaths))
s.Empty(segment.Deltalogs)
}
func (s *MixCompactionTaskStorageV2Suite) TestSplitMergeEntityExpired_V1ToV2Format() {
s.TestSplitMergeEntityExpired()
}
func (s *MixCompactionTaskStorageV2Suite) TestMergeNoExpirationLackBinlog_V1ToV2Format() {
s.TestMergeNoExpirationLackBinlog()
}
func (s *MixCompactionTaskStorageV2Suite) initStorageV2Segments(rows int, seed int64, alloc allocator.Interface) (
inserts map[int64]*datapb.FieldBinlog,
deltas *datapb.FieldBinlog,
stats map[int64]*datapb.FieldBinlog,
bm25Stats map[int64]*datapb.FieldBinlog,
size int64,
err error,
) {
rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
cm := storage.NewLocalChunkManager(objectstorage.RootPath(rootPath))
bfs := pkoracle.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
metacache.UpdateNumOfRows(1000)(seg)
mc := metacache.NewMockMetaCache(s.T())
mc.EXPECT().Collection().Return(CollectionID).Maybe()
mc.EXPECT().Schema().Return(s.meta.Schema).Maybe()
mc.EXPECT().GetSegmentByID(seed).Return(seg, true).Maybe()
mc.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}).Maybe()
mc.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(seg)
}).Return().Maybe()
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", CollectionID)
pack := new(syncmgr.SyncPack).WithCollectionID(CollectionID).WithPartitionID(PartitionID).WithSegmentID(seed).WithChannelName(channelName).WithInsertData(getInsertData(rows, seed, s.meta.GetSchema()))
bw := syncmgr.NewBulkPackWriterV2(mc, cm, alloc, packed.DefaultWriteBufferSize, 0)
return bw.Write(context.Background(), pack)
}
func getRowWithoutNil(magic int64) map[int64]interface{} {
ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)
return map[int64]interface{}{
common.RowIDField: magic,
common.TimeStampField: int64(ts), // should be int64 here
BoolField: true,
Int8Field: int8(magic),
Int16Field: int16(magic),
Int32Field: int32(magic),
Int64Field: magic,
FloatField: float32(magic),
DoubleField: float64(magic),
StringField: "str",
VarCharField: "varchar",
BinaryVectorField: []byte{0},
FloatVectorField: []float32{4, 5, 6, 7},
Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255},
BFloat16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255},
SparseFloatVectorField: typeutil.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{4, 5, 6}),
ArrayField: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}},
},
},
JSONField: []byte(`{"batch":ok}`),
BoolFieldWithDefaultValue: true,
Int8FieldWithDefaultValue: int8(magic),
Int16FieldWithDefaultValue: int16(magic),
Int32FieldWithDefaultValue: int32(magic),
Int64FieldWithDefaultValue: magic,
FloatFieldWithDefaultValue: float32(magic),
DoubleFieldWithDefaultValue: float64(magic),
StringFieldWithDefaultValue: "str",
VarCharFieldWithDefaultValue: "varchar",
}
}
func getInsertData(size int, seed int64, schema *schemapb.CollectionSchema) []*storage.InsertData {
buf, _ := storage.NewInsertData(schema)
for i := 0; i < size; i++ {
buf.Append(getRowWithoutNil(seed))
}
return []*storage.InsertData{buf}
}

View File

@ -65,7 +65,7 @@ func (s *MixCompactionTaskSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable()) paramtable.Get().Init(paramtable.NewBaseTable())
} }
func (s *MixCompactionTaskSuite) SetupTest() { func (s *MixCompactionTaskSuite) setupTest() {
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.meta = genTestCollectionMeta() s.meta = genTestCollectionMeta()
@ -91,6 +91,10 @@ func (s *MixCompactionTaskSuite) SetupTest() {
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan) s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan)
} }
func (s *MixCompactionTaskSuite) SetupTest() {
s.setupTest()
}
func (s *MixCompactionTaskSuite) SetupBM25() { func (s *MixCompactionTaskSuite) SetupBM25() {
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.meta = genTestCollectionMetaWithBM25() s.meta = genTestCollectionMetaWithBM25()
@ -126,9 +130,7 @@ func getMilvusBirthday() time.Time {
return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC) return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC)
} }
func (s *MixCompactionTaskSuite) TestCompactDupPK() { func (s *MixCompactionTaskSuite) prepareCompactDupPKSegments() {
// Test merge compactions, two segments with the same pk, one deletion pk=1
// The merged segment 19530 should remain 3 pk without pk=100
segments := []int64{7, 8, 9} segments := []int64{7, 8, 9}
dblobs, err := getInt64DeltaBlobs( dblobs, err := getInt64DeltaBlobs(
1, 1,
@ -138,24 +140,14 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
s.Require().NoError(err) s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"1"}). s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"1"}).
Return([][]byte{dblobs.GetValue()}, nil).Times(3) Return([][]byte{dblobs.GetValue()}, nil).Times(len(segments))
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64) alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64)
// clear origial segments // Clear original segments
s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0)
for _, segID := range segments { for _, segID := range segments {
s.initSegBuffer(1, segID) s.initSegBuffer(1, segID)
row := getRow(100)
v := &storage.Value{
PK: storage.NewInt64PrimaryKey(100),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: row,
}
err := s.segWriter.Write(v)
s.segWriter.FlushAndIsFull()
s.Require().NoError(err)
kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter)
s.Require().NoError(err) s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool { s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool {
@ -171,6 +163,10 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
}, },
}) })
} }
}
func (s *MixCompactionTaskSuite) TestCompactDupPK() {
s.prepareCompactDupPKSegments()
result, err := s.task.Compact() result, err := s.task.Compact()
s.NoError(err) s.NoError(err)
s.NotNil(result) s.NotNil(result)
@ -181,12 +177,12 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
segment := result.GetSegments()[0] segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID()) s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(3, segment.GetNumOfRows()) s.EqualValues(3, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs) s.EqualValues(len(s.task.plan.Schema.Fields), len(segment.InsertLogs))
s.NotEmpty(segment.Field2StatslogPaths) s.EqualValues(1, len(segment.Field2StatslogPaths))
s.Empty(segment.Deltalogs) s.Empty(segment.Deltalogs)
} }
func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { func (s *MixCompactionTaskSuite) prepareCompactTwoToOneSegments() {
segments := []int64{5, 6, 7} segments := []int64{5, 6, 7}
alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64) alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
@ -217,7 +213,10 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: seg.SegmentID(), SegmentID: seg.SegmentID(),
}) })
}
func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
s.prepareCompactTwoToOneSegments()
result, err := s.task.Compact() result, err := s.task.Compact()
s.Require().NoError(err) s.Require().NoError(err)
s.NotNil(result) s.NotNil(result)
@ -228,12 +227,12 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
segment := result.GetSegments()[0] segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID()) s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(3, segment.GetNumOfRows()) s.EqualValues(3, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs) s.EqualValues(len(s.task.plan.Schema.Fields), len(segment.InsertLogs))
s.NotEmpty(segment.Field2StatslogPaths) s.EqualValues(1, len(segment.Field2StatslogPaths))
s.Empty(segment.Deltalogs) s.Empty(segment.Deltalogs)
} }
func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() { func (s *MixCompactionTaskSuite) prepareCompactTwoToOneWithBM25Segments() {
s.SetupBM25() s.SetupBM25()
segments := []int64{5, 6, 7} segments := []int64{5, 6, 7}
alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64) alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64)
@ -265,7 +264,10 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() {
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: seg.SegmentID(), SegmentID: seg.SegmentID(),
}) })
}
func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() {
s.prepareCompactTwoToOneWithBM25Segments()
result, err := s.task.Compact() result, err := s.task.Compact()
s.NoError(err) s.NoError(err)
s.NotNil(result) s.NotNil(result)
@ -276,15 +278,13 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() {
segment := result.GetSegments()[0] segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID()) s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(3, segment.GetNumOfRows()) s.EqualValues(3, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs) s.EqualValues(len(s.task.plan.Schema.Fields), len(segment.InsertLogs))
s.NotEmpty(segment.Bm25Logs) s.EqualValues(1, len(segment.Field2StatslogPaths))
s.NotEmpty(segment.Field2StatslogPaths) s.EqualValues(1, len(segment.Bm25Logs))
s.Empty(segment.Deltalogs) s.Empty(segment.Deltalogs)
} }
func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { func (s *MixCompactionTaskSuite) prepareCompactSortedSegment() {
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
segments := []int64{1001, 1002, 1003} segments := []int64{1001, 1002, 1003}
alloc := allocator.NewLocalAllocator(100, math.MaxInt64) alloc := allocator.NewLocalAllocator(100, math.MaxInt64)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
@ -319,6 +319,12 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() {
}) })
} }
}
func (s *MixCompactionTaskSuite) TestCompactSortedSegment() {
s.prepareCompactSortedSegment()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
result, err := s.task.Compact() result, err := s.task.Compact()
s.NoError(err) s.NoError(err)
@ -331,15 +337,12 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() {
segment := result.GetSegments()[0] segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID()) s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(291, segment.GetNumOfRows()) s.EqualValues(291, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs) s.EqualValues(len(s.task.plan.Schema.Fields), len(segment.InsertLogs))
s.EqualValues(1, len(segment.Field2StatslogPaths))
s.NotEmpty(segment.Field2StatslogPaths)
s.Empty(segment.Deltalogs) s.Empty(segment.Deltalogs)
} }
func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() { func (s *MixCompactionTaskSuite) prepareCompactSortedSegmentLackBinlog() {
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
segments := []int64{1001, 1002, 1003} segments := []int64{1001, 1002, 1003}
alloc := allocator.NewLocalAllocator(100, math.MaxInt64) alloc := allocator.NewLocalAllocator(100, math.MaxInt64)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
@ -395,6 +398,12 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() {
}) })
} }
}
func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() {
s.prepareCompactSortedSegmentLackBinlog()
paramtable.Get().Save("dataNode.compaction.useMergeSort", "true")
defer paramtable.Get().Reset("dataNode.compaction.useMergeSort")
result, err := s.task.Compact() result, err := s.task.Compact()
s.NoError(err) s.NoError(err)
@ -407,13 +416,12 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() {
segment := result.GetSegments()[0] segment := result.GetSegments()[0]
s.EqualValues(19531, segment.GetSegmentID()) s.EqualValues(19531, segment.GetSegmentID())
s.EqualValues(291, segment.GetNumOfRows()) s.EqualValues(291, segment.GetNumOfRows())
s.NotEmpty(segment.InsertLogs) s.EqualValues(len(s.task.plan.Schema.Fields), len(segment.InsertLogs))
s.EqualValues(1, len(segment.Field2StatslogPaths))
s.NotEmpty(segment.Field2StatslogPaths)
s.Empty(segment.Deltalogs) s.Empty(segment.Deltalogs)
} }
func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { func (s *MixCompactionTaskSuite) prepareSplitMergeEntityExpired() {
s.initSegBuffer(1, 3) s.initSegBuffer(1, 3)
collTTL := 864000 // 10 days collTTL := 864000 // 10 days
s.task.currentTime = getMilvusBirthday().Add(time.Second * (time.Duration(collTTL) + 1)) s.task.currentTime = getMilvusBirthday().Add(time.Second * (time.Duration(collTTL) + 1))
@ -444,6 +452,10 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
}) })
} }
s.task.plan.SegmentBinlogs[0].FieldBinlogs = fieldBinlogs s.task.plan.SegmentBinlogs[0].FieldBinlogs = fieldBinlogs
}
func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
s.prepareSplitMergeEntityExpired()
compactionSegments, err := s.task.mergeSplit(s.task.ctx) compactionSegments, err := s.task.mergeSplit(s.task.ctx)
s.NoError(err) s.NoError(err)
s.Equal(1, len(compactionSegments)) s.Equal(1, len(compactionSegments))

View File

@ -32,6 +32,8 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer" "github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -63,6 +65,9 @@ type MultiSegmentWriter struct {
res []*datapb.CompactionSegment res []*datapb.CompactionSegment
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord // DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
storageVersion int64
rwOption []storage.RwOption
} }
type compactionAlloactor struct { type compactionAlloactor struct {
@ -83,8 +88,16 @@ func (alloc *compactionAlloactor) allocSegmentID() (typeutil.UniqueID, error) {
func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator *compactionAlloactor, segmentSize int64, func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator *compactionAlloactor, segmentSize int64,
schema *schemapb.CollectionSchema, schema *schemapb.CollectionSchema,
maxRows int64, partitionID, collectionID int64, channel string, batchSize int, maxRows int64, partitionID, collectionID int64, channel string, batchSize int, rwOption ...storage.RwOption,
) *MultiSegmentWriter { ) *MultiSegmentWriter {
storageVersion := storage.StorageV1
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
storageVersion = storage.StorageV2
}
rwOpts := rwOption
if len(rwOption) == 0 {
rwOpts = make([]storage.RwOption, 0)
}
return &MultiSegmentWriter{ return &MultiSegmentWriter{
ctx: ctx, ctx: ctx,
binlogIO: binlogIO, binlogIO: binlogIO,
@ -97,6 +110,8 @@ func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator
channel: channel, channel: channel,
batchSize: batchSize, batchSize: batchSize,
res: make([]*datapb.CompactionSegment, 0), res: make([]*datapb.CompactionSegment, 0),
storageVersion: storageVersion,
rwOption: rwOpts,
} }
} }
@ -115,6 +130,7 @@ func (w *MultiSegmentWriter) closeWriter() error {
NumOfRows: w.writer.GetRowNum(), NumOfRows: w.writer.GetRowNum(),
Channel: w.channel, Channel: w.channel,
Bm25Logs: lo.Values(bm25Logs), Bm25Logs: lo.Values(bm25Logs),
StorageVersion: w.storageVersion,
} }
w.res = append(w.res, result) w.res = append(w.res, result)
@ -124,7 +140,8 @@ func (w *MultiSegmentWriter) closeWriter() error {
zap.String("channel", w.channel), zap.String("channel", w.channel),
zap.Int64("totalRows", w.writer.GetRowNum()), zap.Int64("totalRows", w.writer.GetRowNum()),
zap.Uint64("totalSize", w.writer.GetWrittenUncompressed()), zap.Uint64("totalSize", w.writer.GetWrittenUncompressed()),
zap.Int64("expected segment size", w.segmentSize)) zap.Int64("expected segment size", w.segmentSize),
zap.Int64("storageVersion", w.storageVersion))
} }
return nil return nil
} }
@ -143,11 +160,15 @@ func (w *MultiSegmentWriter) rotateWriter() error {
chunkSize := paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() chunkSize := paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64()
rootPath := binlog.GetRootPath() rootPath := binlog.GetRootPath()
rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID, newSegmentID, w.rwOption = append(w.rwOption,
w.schema, w.allocator.logIDAlloc, chunkSize, rootPath, w.maxRows,
storage.WithUploader(func(ctx context.Context, kvs map[string][]byte) error { storage.WithUploader(func(ctx context.Context, kvs map[string][]byte) error {
return w.binlogIO.Upload(ctx, kvs) return w.binlogIO.Upload(ctx, kvs)
})) }),
storage.WithVersion(w.storageVersion),
)
rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID, newSegmentID,
w.schema, w.allocator.logIDAlloc, chunkSize, rootPath, w.maxRows, w.rwOption...,
)
if err != nil { if err != nil {
return err return err
} }
@ -156,6 +177,25 @@ func (w *MultiSegmentWriter) rotateWriter() error {
return nil return nil
} }
func (w *MultiSegmentWriter) splitColumnByRecord(r storage.Record, splitThresHold int64) []storagecommon.ColumnGroup {
groups := make([]storagecommon.ColumnGroup, 0)
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0)}
for i, field := range w.schema.Fields {
arr := r.Column(field.FieldID)
size := storage.CalculateArraySize(arr)
rows := arr.Len()
if rows != 0 && int64(size/rows) >= splitThresHold {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append(groups, shortColumnGroup)
}
return groups
}
func (w *MultiSegmentWriter) GetWrittenUncompressed() uint64 { func (w *MultiSegmentWriter) GetWrittenUncompressed() uint64 {
if w.writer == nil { if w.writer == nil {
return 0 return 0
@ -176,16 +216,29 @@ func (w *MultiSegmentWriter) GetCompactionSegments() []*datapb.CompactionSegment
func (w *MultiSegmentWriter) Write(r storage.Record) error { func (w *MultiSegmentWriter) Write(r storage.Record) error {
if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) { if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
if w.storageVersion == storage.StorageV2 {
w.rwOption = append(w.rwOption,
storage.WithColumnGroups(w.splitColumnByRecord(r, packed.ColumnGroupSizeThreshold)),
)
}
if err := w.rotateWriter(); err != nil { if err := w.rotateWriter(); err != nil {
return err return err
} }
} }
return w.writer.Write(r) return w.writer.Write(r)
} }
func (w *MultiSegmentWriter) WriteValue(v *storage.Value) error { func (w *MultiSegmentWriter) WriteValue(v *storage.Value) error {
if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) { if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
if w.storageVersion == storage.StorageV2 {
r, err := storage.ValueSerializer([]*storage.Value{v}, w.schema.Fields)
if err != nil {
return err
}
w.rwOption = append(w.rwOption,
storage.WithColumnGroups(w.splitColumnByRecord(r, packed.ColumnGroupSizeThreshold)),
)
}
if err := w.rotateWriter(); err != nil { if err := w.rotateWriter(); err != nil {
return err return err
} }

View File

@ -51,6 +51,7 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/kv"
@ -287,6 +288,12 @@ func (node *DataNode) Init() error {
node.flowgraphManager = pipeline.NewFlowgraphManager() node.flowgraphManager = pipeline.NewFlowgraphManager()
index.InitSegcore() index.InitSegcore()
// init storage v2 file system.
err = initcore.InitStorageV2FileSystem(paramtable.Get())
if err != nil {
initError = err
return
}
log.Info("init datanode done", zap.String("Address", node.address)) log.Info("init datanode done", zap.String("Address", node.address))
}) })

View File

@ -21,18 +21,17 @@ import (
"fmt" "fmt"
"math" "math"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array" "github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory" "github.com/apache/arrow/go/v17/arrow/memory"
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagecommon" "github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/storagev2/packed" "github.com/milvus-io/milvus/internal/storagev2/packed"
iTypeutil "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -43,7 +42,7 @@ import (
type BulkPackWriterV2 struct { type BulkPackWriterV2 struct {
*BulkPackWriter *BulkPackWriter
arrowSchema *arrow.Schema schema *schemapb.CollectionSchema
bufferSize int64 bufferSize int64
multiPartUploadSize int64 multiPartUploadSize int64
} }
@ -51,10 +50,6 @@ type BulkPackWriterV2 struct {
func NewBulkPackWriterV2(metaCache metacache.MetaCache, chunkManager storage.ChunkManager, func NewBulkPackWriterV2(metaCache metacache.MetaCache, chunkManager storage.ChunkManager,
allocator allocator.Interface, bufferSize, multiPartUploadSize int64, writeRetryOpts ...retry.Option, allocator allocator.Interface, bufferSize, multiPartUploadSize int64, writeRetryOpts ...retry.Option,
) *BulkPackWriterV2 { ) *BulkPackWriterV2 {
arrowSchema, err := storage.ConvertToArrowSchema(metaCache.Schema().Fields)
if err != nil {
return nil
}
return &BulkPackWriterV2{ return &BulkPackWriterV2{
BulkPackWriter: &BulkPackWriter{ BulkPackWriter: &BulkPackWriter{
metaCache: metaCache, metaCache: metaCache,
@ -62,7 +57,7 @@ func NewBulkPackWriterV2(metaCache metacache.MetaCache, chunkManager storage.Chu
allocator: allocator, allocator: allocator,
writeRetryOpts: writeRetryOpts, writeRetryOpts: writeRetryOpts,
}, },
arrowSchema: arrowSchema, schema: metaCache.Schema(),
bufferSize: bufferSize, bufferSize: bufferSize,
multiPartUploadSize: multiPartUploadSize, multiPartUploadSize: multiPartUploadSize,
} }
@ -139,7 +134,7 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
} }
} }
w, err := storage.NewPackedRecordWriter(paths, bw.arrowSchema, bw.bufferSize, bw.multiPartUploadSize, columnGroups) w, err := storage.NewPackedRecordWriter(paths, bw.schema, bw.bufferSize, bw.multiPartUploadSize, columnGroups)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -208,11 +203,15 @@ func (bw *BulkPackWriterV2) serializeBinlog(ctx context.Context, pack *SyncPack)
if len(pack.insertData) == 0 { if len(pack.insertData) == 0 {
return nil, nil return nil, nil
} }
builder := array.NewRecordBuilder(memory.DefaultAllocator, bw.arrowSchema) arrowSchema, err := storage.ConvertToArrowSchema(bw.schema.Fields)
if err != nil {
return nil, err
}
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
defer builder.Release() defer builder.Release()
for _, chunk := range pack.insertData { for _, chunk := range pack.insertData {
if err := iTypeutil.BuildRecord(builder, chunk, bw.metaCache.Schema().GetFields()); err != nil { if err := storage.BuildRecord(builder, chunk, bw.metaCache.Schema().GetFields()); err != nil {
return nil, err return nil, err
} }
} }

View File

@ -1,6 +1,8 @@
package storage package storage
import ( import (
"strconv"
"github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -13,26 +15,29 @@ func ConvertToArrowSchema(fields []*schemapb.FieldSchema) (*arrow.Schema, error)
if serdeMap[field.DataType].arrowType == nil { if serdeMap[field.DataType].arrowType == nil {
return nil, merr.WrapErrParameterInvalidMsg("unknown field data type [%s] for field [%s]", field.DataType, field.GetName()) return nil, merr.WrapErrParameterInvalidMsg("unknown field data type [%s] for field [%s]", field.DataType, field.GetName())
} }
var dim int
switch field.DataType { switch field.DataType {
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector,
schemapb.DataType_Int8Vector, schemapb.DataType_FloatVector: schemapb.DataType_Int8Vector, schemapb.DataType_FloatVector:
dim, err := GetDimFromParams(field.TypeParams) var err error
dim, err = GetDimFromParams(field.TypeParams)
if err != nil { if err != nil {
return nil, merr.WrapErrParameterInvalidMsg("dim not found in field [%s] params", field.GetName()) return nil, merr.WrapErrParameterInvalidMsg("dim not found in field [%s] params", field.GetName())
} }
arrowFields = append(arrowFields, arrow.Field{
Name: field.GetName(),
Type: serdeMap[field.DataType].arrowType(dim),
Nullable: field.GetNullable(),
})
default: default:
arrowFields = append(arrowFields, arrow.Field{ dim = 0
Name: field.GetName(),
Type: serdeMap[field.DataType].arrowType(0),
Nullable: field.GetNullable(),
})
} }
arrowFields = append(arrowFields, ConvertToArrowField(field, serdeMap[field.DataType].arrowType(dim)))
} }
return arrow.NewSchema(arrowFields, nil), nil return arrow.NewSchema(arrowFields, nil), nil
} }
func ConvertToArrowField(field *schemapb.FieldSchema, dataType arrow.DataType) arrow.Field {
return arrow.Field{
Name: field.GetName(),
Type: dataType,
Metadata: arrow.NewMetadata([]string{"FieldID"}, []string{strconv.Itoa(int(field.GetFieldID()))}),
Nullable: field.GetNullable(),
}
}

View File

@ -18,6 +18,7 @@ package storage
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"io" "io"
"math" "math"
"strconv" "strconv"
@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -594,7 +596,7 @@ func (r *selectiveRecord) Slice(start, end int) Record {
panic("not implemented") panic("not implemented")
} }
func calculateArraySize(a arrow.Array) int { func CalculateArraySize(a arrow.Array) int {
if a == nil || a.Data() == nil || a.Data().Buffers() == nil { if a == nil || a.Data() == nil || a.Data().Buffers() == nil {
return 0 return 0
} }
@ -715,7 +717,7 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error {
sfw.numRows += r.Len() sfw.numRows += r.Len()
a := r.Column(sfw.fieldId) a := r.Column(sfw.fieldId)
sfw.writtenUncompressed += uint64(calculateArraySize(a)) sfw.writtenUncompressed += uint64(CalculateArraySize(a))
rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len())) rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len()))
defer rec.Release() defer rec.Release()
return sfw.fw.WriteBuffered(rec) return sfw.fw.WriteBuffered(rec)
@ -789,7 +791,7 @@ func (mfw *multiFieldRecordWriter) Write(r Record) error {
columns := make([]arrow.Array, len(mfw.fieldIds)) columns := make([]arrow.Array, len(mfw.fieldIds))
for i, fieldId := range mfw.fieldIds { for i, fieldId := range mfw.fieldIds {
columns[i] = r.Column(fieldId) columns[i] = r.Column(fieldId)
mfw.writtenUncompressed += uint64(calculateArraySize(columns[i])) mfw.writtenUncompressed += uint64(CalculateArraySize(columns[i]))
} }
rec := array.NewRecord(mfw.schema, columns, int64(r.Len())) rec := array.NewRecord(mfw.schema, columns, int64(r.Len()))
defer rec.Release() defer rec.Release()
@ -923,3 +925,23 @@ func NewSimpleArrowRecord(r arrow.Record, field2Col map[FieldID]int) *simpleArro
field2Col: field2Col, field2Col: field2Col,
} }
} }
func BuildRecord(b *array.RecordBuilder, data *InsertData, fields []*schemapb.FieldSchema) error {
if data == nil {
return nil
}
for i, field := range fields {
fBuilder := b.Field(i)
typeEntry, ok := serdeMap[field.DataType]
if !ok {
panic("unknown type")
}
for j := 0; j < data.Data[field.FieldID].RowNum(); j++ {
ok = typeEntry.serialize(fBuilder, data.Data[field.FieldID].GetRow(j))
if !ok {
return merr.WrapErrServiceInternal(fmt.Sprintf("serialize error on type %s", field.DataType.String()))
}
}
}
return nil
}

View File

@ -443,11 +443,7 @@ func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, e
builder := builders[field.FieldID] builder := builders[field.FieldID]
arrays[i] = builder.NewArray() arrays[i] = builder.NewArray()
builder.Release() builder.Release()
fields[i] = arrow.Field{ fields[i] = ConvertToArrowField(field, arrays[i].DataType())
Name: field.Name,
Type: arrays[i].DataType(),
Metadata: arrow.NewMetadata([]string{"FieldID"}, []string{strconv.Itoa(int(field.FieldID))}),
}
field2Col[field.FieldID] = i field2Col[field.FieldID] = i
} }
return NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), field2Col), nil return NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), field2Col), nil

View File

@ -124,65 +124,7 @@ func NewPackedDeserializeReader(paths [][]string, schema *schemapb.CollectionSch
} }
return NewDeserializeReader(reader, func(r Record, v []*Value) error { return NewDeserializeReader(reader, func(r Record, v []*Value) error {
pkField := func() *schemapb.FieldSchema { return ValueDeserializer(r, v, schema.Fields)
for _, field := range schema.Fields {
if field.GetIsPrimaryKey() {
return field
}
}
return nil
}()
if pkField == nil {
return merr.WrapErrServiceInternal("no primary key field found")
}
rec, ok := r.(*simpleArrowRecord)
if !ok {
return merr.WrapErrServiceInternal("can not cast to simple arrow record")
}
numFields := len(schema.Fields)
for i := 0; i < rec.Len(); i++ {
if v[i] == nil {
v[i] = &Value{
Value: make(map[FieldID]interface{}, numFields),
}
}
value := v[i]
m := value.Value.(map[FieldID]interface{})
for _, field := range schema.Fields {
fieldID := field.FieldID
column := r.Column(fieldID)
if column.IsNull(i) {
m[fieldID] = nil
} else {
d, ok := serdeMap[field.DataType].deserialize(column, i)
if ok {
m[fieldID] = d
} else {
return merr.WrapErrServiceInternal(fmt.Sprintf("can not deserialize field [%s]", field.Name))
}
}
}
rowID, ok := m[common.RowIDField].(int64)
if !ok {
return merr.WrapErrIoKeyNotFound("no row id column found")
}
value.ID = rowID
value.Timestamp = m[common.TimeStampField].(int64)
pkCol := rec.field2Col[pkField.FieldID]
pk, err := GenPrimaryKeyByRawData(m[pkField.FieldID], schema.Fields[pkCol].DataType)
if err != nil {
return err
}
value.PK = pk
value.IsDeleted = false
value.Value = m
}
return nil
}), nil }), nil
} }
@ -194,20 +136,28 @@ type packedRecordWriter struct {
multiPartUploadSize int64 multiPartUploadSize int64
columnGroups []storagecommon.ColumnGroup columnGroups []storagecommon.ColumnGroup
paths []string paths []string
schema *arrow.Schema schema *schemapb.CollectionSchema
arrowSchema *arrow.Schema
rowNum int64 rowNum int64
writtenUncompressed uint64 writtenUncompressed uint64
columnGroupUncompressed []uint64 columnGroupUncompressed []uint64
} }
func (pw *packedRecordWriter) Write(r Record) error { func (pw *packedRecordWriter) Write(r Record) error {
rec, ok := r.(*simpleArrowRecord) var rec arrow.Record
sar, ok := r.(*simpleArrowRecord)
if !ok { if !ok {
return merr.WrapErrServiceInternal("can not cast to simple arrow record") arrays := make([]arrow.Array, len(pw.schema.Fields))
for i, field := range pw.schema.Fields {
arrays[i] = r.Column(field.FieldID)
}
rec = array.NewRecord(pw.arrowSchema, arrays, int64(r.Len()))
} else {
rec = sar.r
} }
pw.rowNum += int64(r.Len()) pw.rowNum += int64(r.Len())
for col, arr := range rec.r.Columns() { for col, arr := range rec.Columns() {
size := uint64(calculateArraySize(arr)) size := uint64(CalculateArraySize(arr))
pw.writtenUncompressed += size pw.writtenUncompressed += size
for columnGroup, group := range pw.columnGroups { for columnGroup, group := range pw.columnGroups {
if lo.Contains(group.Columns, col) { if lo.Contains(group.Columns, col) {
@ -217,7 +167,7 @@ func (pw *packedRecordWriter) Write(r Record) error {
} }
} }
defer rec.Release() defer rec.Release()
return pw.writer.WriteRecordBatch(rec.r) return pw.writer.WriteRecordBatch(rec)
} }
func (pw *packedRecordWriter) GetWrittenUncompressed() uint64 { func (pw *packedRecordWriter) GetWrittenUncompressed() uint64 {
@ -243,8 +193,13 @@ func (pw *packedRecordWriter) Close() error {
return nil return nil
} }
func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*packedRecordWriter, error) { func NewPackedRecordWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*packedRecordWriter, error) {
writer, err := packed.NewPackedWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups) arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil {
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error()))
}
writer, err := packed.NewPackedWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups)
if err != nil { if err != nil {
return nil, merr.WrapErrServiceInternal( return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error())) fmt.Sprintf("can not new packed record writer %s", err.Error()))
@ -253,6 +208,7 @@ func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int6
return &packedRecordWriter{ return &packedRecordWriter{
writer: writer, writer: writer,
schema: schema, schema: schema,
arrowSchema: arrowSchema,
bufferSize: bufferSize, bufferSize: bufferSize,
paths: paths, paths: paths,
columnGroups: columnGroups, columnGroups: columnGroups,
@ -263,12 +219,7 @@ func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int6
func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64,
multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int,
) (*SerializeWriterImpl[*Value], error) { ) (*SerializeWriterImpl[*Value], error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields) PackedBinlogRecordWriter, err := NewPackedRecordWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups)
if err != nil {
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error()))
}
PackedBinlogRecordWriter, err := NewPackedRecordWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups)
if err != nil { if err != nil {
return nil, merr.WrapErrServiceInternal( return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error())) fmt.Sprintf("can not new packed record writer %s", err.Error()))
@ -372,7 +323,7 @@ func (pw *PackedBinlogRecordWriter) initWriters() error {
paths = append(paths, path) paths = append(paths, path)
logIdStart++ logIdStart++
} }
pw.writer, err = NewPackedRecordWriter(paths, pw.arrowSchema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups) pw.writer, err = NewPackedRecordWriter(paths, pw.schema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups)
if err != nil { if err != nil {
return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error())) return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error()))
} }

View File

@ -223,7 +223,7 @@ func TestCalculateArraySize(t *testing.T) {
arr := tt.arrayBuilder() arr := tt.arrayBuilder()
defer arr.Release() defer arr.Release()
size := calculateArraySize(arr) size := CalculateArraySize(arr)
if size != tt.expectedSize { if size != tt.expectedSize {
t.Errorf("Expected size %d, got %d", tt.expectedSize, size) t.Errorf("Expected size %d, got %d", tt.expectedSize, size)
} }
@ -245,7 +245,7 @@ func TestCalculateArraySizeWithOffset(t *testing.T) {
slicedArray := array.NewSlice(fullArray, 1, 4) // Offset = 1, End = 4 slicedArray := array.NewSlice(fullArray, 1, 4) // Offset = 1, End = 4
defer slicedArray.Release() defer slicedArray.Release()
size := calculateArraySize(slicedArray) size := CalculateArraySize(slicedArray)
expectedSize := len("one") + len("two") + len("three") + 1 // "one", "two", "three", bitmap(1 bytes) expectedSize := len("one") + len("two") + len("three") + 1 // "one", "two", "three", bitmap(1 bytes)
if size != expectedSize { if size != expectedSize {

View File

@ -285,12 +285,9 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader,
for c, builder := range builders { for c, builder := range builders {
arrays[c] = builder.NewArray() arrays[c] = builder.NewArray()
builder.Release()
fid := schema.Fields[c].FieldID fid := schema.Fields[c].FieldID
fields[c] = arrow.Field{ fields[c] = ConvertToArrowField(schema.Fields[c], arrays[c].DataType())
Name: strconv.Itoa(int(fid)),
Type: arrays[c].DataType(),
Nullable: true, // No nullable check here.
}
field2Col[fid] = c field2Col[fid] = c
} }

View File

@ -1,146 +0,0 @@
package typeutil
import (
"fmt"
"math"
"path"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func GetStorageURI(protocol, pathPrefix string, segmentID int64) (string, error) {
switch protocol {
case "s3":
var scheme string
if paramtable.Get().MinioCfg.UseSSL.GetAsBool() {
scheme = "https"
} else {
scheme = "http"
}
if pathPrefix != "" {
cleanPath := path.Clean(pathPrefix)
return fmt.Sprintf("s3://%s:%s@%s/%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", paramtable.Get().MinioCfg.AccessKeyID.GetValue(), paramtable.Get().MinioCfg.SecretAccessKey.GetValue(), paramtable.Get().MinioCfg.BucketName.GetValue(), cleanPath, segmentID, scheme, paramtable.Get().MinioCfg.Address.GetValue()), nil
} else {
return fmt.Sprintf("s3://%s:%s@%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", paramtable.Get().MinioCfg.AccessKeyID.GetValue(), paramtable.Get().MinioCfg.SecretAccessKey.GetValue(), paramtable.Get().MinioCfg.BucketName.GetValue(), segmentID, scheme, paramtable.Get().MinioCfg.Address.GetValue()), nil
}
case "file":
if pathPrefix != "" {
cleanPath := path.Clean(pathPrefix)
return fmt.Sprintf("file://%s/%d", cleanPath, segmentID), nil
} else {
return fmt.Sprintf("file://%d", segmentID), nil
}
default:
return "", merr.WrapErrParameterInvalidMsg("unsupported schema %s", protocol)
}
}
func BuildRecord(b *array.RecordBuilder, data *storage.InsertData, fields []*schemapb.FieldSchema) error {
if data == nil {
log.Info("no buffer data to flush")
return nil
}
for i, field := range fields {
fBuilder := b.Field(i)
switch field.DataType {
case schemapb.DataType_Bool:
fBuilder.(*array.BooleanBuilder).AppendValues(data.Data[field.FieldID].(*storage.BoolFieldData).Data, nil)
case schemapb.DataType_Int8:
fBuilder.(*array.Int8Builder).AppendValues(data.Data[field.FieldID].(*storage.Int8FieldData).Data, nil)
case schemapb.DataType_Int16:
fBuilder.(*array.Int16Builder).AppendValues(data.Data[field.FieldID].(*storage.Int16FieldData).Data, nil)
case schemapb.DataType_Int32:
fBuilder.(*array.Int32Builder).AppendValues(data.Data[field.FieldID].(*storage.Int32FieldData).Data, nil)
case schemapb.DataType_Int64:
fBuilder.(*array.Int64Builder).AppendValues(data.Data[field.FieldID].(*storage.Int64FieldData).Data, nil)
case schemapb.DataType_Float:
fBuilder.(*array.Float32Builder).AppendValues(data.Data[field.FieldID].(*storage.FloatFieldData).Data, nil)
case schemapb.DataType_Double:
fBuilder.(*array.Float64Builder).AppendValues(data.Data[field.FieldID].(*storage.DoubleFieldData).Data, nil)
case schemapb.DataType_VarChar, schemapb.DataType_String:
fBuilder.(*array.StringBuilder).AppendValues(data.Data[field.FieldID].(*storage.StringFieldData).Data, nil)
case schemapb.DataType_Array:
for _, data := range data.Data[field.FieldID].(*storage.ArrayFieldData).Data {
marsheled, err := proto.Marshal(data)
if err != nil {
return err
}
fBuilder.(*array.BinaryBuilder).Append(marsheled)
}
case schemapb.DataType_JSON:
fBuilder.(*array.BinaryBuilder).AppendValues(data.Data[field.FieldID].(*storage.JSONFieldData).Data, nil)
case schemapb.DataType_BinaryVector:
vecData := data.Data[field.FieldID].(*storage.BinaryVectorFieldData)
for i := 0; i < len(vecData.Data); i += vecData.Dim / 8 {
fBuilder.(*array.FixedSizeBinaryBuilder).Append(vecData.Data[i : i+vecData.Dim/8])
}
case schemapb.DataType_FloatVector:
vecData := data.Data[field.FieldID].(*storage.FloatVectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim * 4
length := len(data) / dim
builder.Reserve(length)
bytesData := make([]byte, byteLength)
for i := 0; i < length; i++ {
vec := data[i*dim : (i+1)*dim]
for j := range vec {
bytes := math.Float32bits(vec[j])
common.Endian.PutUint32(bytesData[j*4:], bytes)
}
builder.Append(bytesData)
}
case schemapb.DataType_Float16Vector:
vecData := data.Data[field.FieldID].(*storage.Float16VectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim * 2
length := len(data) / byteLength
builder.Reserve(length)
for i := 0; i < length; i++ {
builder.Append(data[i*byteLength : (i+1)*byteLength])
}
case schemapb.DataType_BFloat16Vector:
vecData := data.Data[field.FieldID].(*storage.BFloat16VectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim * 2
length := len(data) / byteLength
builder.Reserve(length)
for i := 0; i < length; i++ {
builder.Append(data[i*byteLength : (i+1)*byteLength])
}
case schemapb.DataType_Int8Vector:
vecData := data.Data[field.FieldID].(*storage.Int8VectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim
length := len(data) / byteLength
builder.Reserve(length)
for i := 0; i < length; i++ {
builder.Append(arrow.Int8Traits.CastToBytes(data[i*dim : (i+1)*dim]))
}
default:
return merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String())
}
}
return nil
}