mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Change genKey into JoinIDPath (#12959)
1. Simplify genKey(), make genKey a rpc call 2. Unify the usage of JoinIDPath Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
a9a332dbcf
commit
dbf757e08b
@ -31,7 +31,7 @@ import (
|
||||
type allocatorInterface interface {
|
||||
allocID() (UniqueID, error)
|
||||
allocIDBatch(count uint32) (UniqueID, uint32, error)
|
||||
genKey(alloc bool, ids ...UniqueID) (key string, err error)
|
||||
genKey(ids ...UniqueID) (key string, err error)
|
||||
}
|
||||
|
||||
type allocator struct {
|
||||
@ -93,16 +93,12 @@ func (alloc *allocator) allocIDBatch(count uint32) (UniqueID, uint32, error) {
|
||||
}
|
||||
|
||||
// genKey gives a valid key string for lists of UniqueIDs:
|
||||
// if alloc is true, the returned keys will have a generated-unique ID at the end.
|
||||
// if alloc is false, the returned keys will only consist of provided ids.
|
||||
func (alloc *allocator) genKey(isalloc bool, ids ...UniqueID) (string, error) {
|
||||
if isalloc {
|
||||
idx, err := alloc.allocID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ids = append(ids, idx)
|
||||
func (alloc *allocator) genKey(ids ...UniqueID) (string, error) {
|
||||
idx, err := alloc.allocID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ids = append(ids, idx)
|
||||
return JoinIDPath(ids...), nil
|
||||
}
|
||||
|
||||
|
||||
@ -52,75 +52,49 @@ func TestAllocator_Basic(t *testing.T) {
|
||||
t.Run("Test genKey", func(t *testing.T) {
|
||||
ms.setID(666)
|
||||
|
||||
type in struct {
|
||||
isalloc bool
|
||||
ids []UniqueID
|
||||
}
|
||||
|
||||
type out struct {
|
||||
key string
|
||||
err error
|
||||
}
|
||||
|
||||
type Test struct {
|
||||
in
|
||||
out
|
||||
inIDs []UniqueID
|
||||
outKey string
|
||||
|
||||
description string
|
||||
}
|
||||
|
||||
tests := []Test{
|
||||
{in{true, []UniqueID{}}, out{"666", nil}},
|
||||
{in{true, []UniqueID{1}}, out{"1/666", nil}},
|
||||
{in{true, make([]UniqueID, 0)}, out{"666", nil}},
|
||||
{in{false, []UniqueID{}}, out{"", nil}},
|
||||
{in{false, []UniqueID{1, 2, 3}}, out{"1/2/3", nil}},
|
||||
{in{false, []UniqueID{1}}, out{"1", nil}},
|
||||
{in{false, []UniqueID{2, 2, 2}}, out{"2/2/2", nil}},
|
||||
{[]UniqueID{}, "666", "genKey with empty input ids"},
|
||||
{[]UniqueID{1}, "1/666", "genKey with 1 input id"},
|
||||
{[]UniqueID{1, 2, 3}, "1/2/3/666", "genKey with input 3 ids"},
|
||||
{[]UniqueID{2, 2, 2}, "2/2/2/666", "genKey with input 3 ids"},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
key, err := allocator.genKey(test.in.isalloc, test.in.ids...)
|
||||
|
||||
assert.Equalf(t, test.out.key, key, "#%d", i)
|
||||
assert.Equalf(t, test.out.err, err, "#%d", i)
|
||||
key, err := allocator.genKey(test.inIDs...)
|
||||
assert.NoError(t, err)
|
||||
assert.Equalf(t, test.outKey, key, "#%d", i)
|
||||
}
|
||||
|
||||
// Status.ErrorCode != Success
|
||||
ms.setID(0)
|
||||
tests = []Test{
|
||||
{in{true, []UniqueID{}}, out{}},
|
||||
{in{true, []UniqueID{1}}, out{}},
|
||||
{in{true, make([]UniqueID, 0)}, out{}},
|
||||
{[]UniqueID{}, "", "error rpc status"},
|
||||
{[]UniqueID{1}, "", "error rpc status"},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
_, err := allocator.genKey(test.in.isalloc, test.in.ids...)
|
||||
assert.Errorf(t, err, "number: %d", i)
|
||||
for _, test := range tests {
|
||||
k, err := allocator.genKey(test.inIDs...)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, test.outKey, k)
|
||||
}
|
||||
|
||||
// Grpc error
|
||||
ms.setID(-1)
|
||||
tests = []Test{
|
||||
{in{true, make([]UniqueID, 0)}, out{}},
|
||||
{in{true, []UniqueID{1}}, out{}},
|
||||
{in{true, make([]UniqueID, 0)}, out{}},
|
||||
{[]UniqueID{}, "", "error rpc"},
|
||||
{[]UniqueID{1}, "", "error rpc"},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
_, err := allocator.genKey(test.in.isalloc, test.in.ids...)
|
||||
assert.Errorf(t, err, "number: %d", i)
|
||||
}
|
||||
|
||||
// RootCoord's unavailability doesn't affects genKey when alloc == false
|
||||
tests = []Test{
|
||||
{in{false, []UniqueID{1, 2, 3}}, out{"1/2/3", nil}},
|
||||
{in{false, []UniqueID{1}}, out{"1", nil}},
|
||||
{in{false, []UniqueID{2, 2, 2}}, out{"2/2/2", nil}},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
key, err := allocator.genKey(test.in.isalloc, test.in.ids...)
|
||||
assert.Equalf(t, test.out.key, key, "#%d", i)
|
||||
assert.Equalf(t, test.out.err, err, "#%d", i)
|
||||
for _, test := range tests {
|
||||
k, err := allocator.genKey(test.inIDs...)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, test.outKey, k)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -195,7 +195,7 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
k, err := b.genKey(true, collID, partID, segID)
|
||||
k, err := b.genKey(collID, partID, segID)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
@ -295,7 +295,7 @@ func TestFlushSegment(t *testing.T) {
|
||||
assert.NotNil(t, fu.field2Path)
|
||||
assert.Equal(t, fu.segID, segmentID)
|
||||
|
||||
k, _ := idAllocMock.genKey(false, collectionID, partitionID, segmentID, 0)
|
||||
k := JoinIDPath(collectionID, partitionID, segmentID, 0)
|
||||
key := path.Join(Params.StatsBinlogRootPath, k)
|
||||
_, values, _ := mockMinIO.LoadWithPrefix(key)
|
||||
assert.Equal(t, len(values), 1)
|
||||
|
||||
@ -350,7 +350,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
|
||||
logidx := start + int64(idx)
|
||||
|
||||
// no error raise if alloc=false
|
||||
k, _ := m.genKey(false, collID, partID, segmentID, fieldID, logidx)
|
||||
k := JoinIDPath(collID, partID, segmentID, fieldID, logidx)
|
||||
|
||||
key := path.Join(Params.InsertBinlogRootPath, k)
|
||||
paths = append(paths, key)
|
||||
@ -371,7 +371,7 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
|
||||
logidx := field2Logidx[fieldID]
|
||||
|
||||
// no error raise if alloc=false
|
||||
k, _ := m.genKey(false, collID, partID, segmentID, fieldID, logidx)
|
||||
k := JoinIDPath(collID, partID, segmentID, fieldID, logidx)
|
||||
|
||||
key := path.Join(Params.StatsBinlogRootPath, k)
|
||||
kvs[key] = string(blob.Value[:])
|
||||
@ -422,7 +422,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
|
||||
return err
|
||||
}
|
||||
|
||||
blobKey, _ := m.genKey(false, collID, partID, segmentID, logID)
|
||||
blobKey := JoinIDPath(collID, partID, segmentID, logID)
|
||||
blobPath := path.Join(Params.DeleteBinlogRootPath, blobKey)
|
||||
kvs := map[string]string{blobPath: string(blob.Value[:])}
|
||||
data.fileSize = int64(len(blob.Value))
|
||||
|
||||
@ -23,8 +23,6 @@ import (
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -596,22 +594,13 @@ func (alloc *AllocatorFactory) allocIDBatch(count uint32) (UniqueID, uint32, err
|
||||
return start, count, err
|
||||
}
|
||||
|
||||
func (alloc *AllocatorFactory) genKey(isalloc bool, ids ...UniqueID) (key string, err error) {
|
||||
if isalloc {
|
||||
idx, err := alloc.allocID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ids = append(ids, idx)
|
||||
func (alloc *AllocatorFactory) genKey(ids ...UniqueID) (string, error) {
|
||||
idx, err := alloc.allocID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
idStr := make([]string, len(ids))
|
||||
for _, id := range ids {
|
||||
idStr = append(idStr, strconv.FormatInt(id, 10))
|
||||
}
|
||||
|
||||
key = path.Join(idStr...)
|
||||
return
|
||||
ids = append(ids, idx)
|
||||
return JoinIDPath(ids...), nil
|
||||
}
|
||||
|
||||
// If id == 0, AllocID will return not successful status
|
||||
|
||||
@ -21,7 +21,6 @@ import (
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strconv"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
@ -555,17 +554,6 @@ func genSimpleInsertData() (*storage.InsertData, error) {
|
||||
return genInsertData(defaultMsgLength, schema)
|
||||
}
|
||||
|
||||
func genKey(collectionID, partitionID, segmentID UniqueID, fieldID int64) string {
|
||||
ids := []string{
|
||||
defaultKVRootPath,
|
||||
strconv.FormatInt(collectionID, 10),
|
||||
strconv.FormatInt(partitionID, 10),
|
||||
strconv.FormatInt(segmentID, 10),
|
||||
strconv.FormatInt(fieldID, 10),
|
||||
}
|
||||
return path.Join(ids...)
|
||||
}
|
||||
|
||||
func genStorageBlob(collectionID UniqueID,
|
||||
partitionID UniqueID,
|
||||
segmentID UniqueID,
|
||||
@ -684,7 +672,7 @@ func saveBinLog(ctx context.Context,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key := genKey(collectionID, partitionID, segmentID, fieldID)
|
||||
key := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
|
||||
kvs[key] = string(blob.Value[:])
|
||||
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
|
||||
FieldID: fieldID,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user