mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
key changes: * fix unstable storage v2 compaction unit test by guaranteeing the order of paths during sync. * bump milvus-storage version, include https://github.com/milvus-io/milvus-storage/pull/222 https://github.com/milvus-io/milvus-storage/pull/223 https://github.com/milvus-io/milvus-storage/pull/224 https://github.com/milvus-io/milvus-storage/pull/225 https://github.com/milvus-io/milvus-storage/pull/226 * Also fix the below related oom issue. related: https://github.com/milvus-io/milvus/issues/43310 Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
81 lines
1.9 KiB
Go
81 lines
1.9 KiB
Go
package syncmgr
|
|
|
|
import (
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/allocator"
|
|
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/retry"
|
|
)
|
|
|
|
func NewSyncTask() *SyncTask {
|
|
return new(SyncTask)
|
|
}
|
|
|
|
func (t *SyncTask) WithSyncPack(pack *SyncPack) *SyncTask {
|
|
t.pack = pack
|
|
|
|
// legacy code, remove later
|
|
t.collectionID = t.pack.collectionID
|
|
t.partitionID = t.pack.partitionID
|
|
t.channelName = t.pack.channelName
|
|
t.segmentID = t.pack.segmentID
|
|
t.batchRows = t.pack.batchRows
|
|
// t.metacache = t.pack.metacache
|
|
// t.schema = t.metacache.Schema()
|
|
t.startPosition = t.pack.startPosition
|
|
t.checkpoint = t.pack.checkpoint
|
|
t.level = t.pack.level
|
|
t.dataSource = t.pack.dataSource
|
|
t.tsFrom = t.pack.tsFrom
|
|
t.tsTo = t.pack.tsTo
|
|
t.failureCallback = t.pack.errHandler
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithChunkManager(cm storage.ChunkManager) *SyncTask {
|
|
t.chunkManager = cm
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithStorageConfig(storageConfig *indexpb.StorageConfig) *SyncTask {
|
|
t.storageConfig = storageConfig
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask {
|
|
t.allocator = allocator
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithDrop() *SyncTask {
|
|
t.pack.isDrop = true
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithMetaCache(metacache metacache.MetaCache) *SyncTask {
|
|
t.metacache = metacache
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithSchema(schema *schemapb.CollectionSchema) *SyncTask {
|
|
t.schema = schema
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithMetaWriter(metaWriter MetaWriter) *SyncTask {
|
|
t.metaWriter = metaWriter
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithWriteRetryOptions(opts ...retry.Option) *SyncTask {
|
|
t.writeRetryOpts = opts
|
|
return t
|
|
}
|
|
|
|
func (t *SyncTask) WithFailureCallback(callback func(error)) *SyncTask {
|
|
t.failureCallback = callback
|
|
return t
|
|
}
|