fix: use new path for streamingnode recovery info (#38516)

issue: #38399

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2024-12-17 17:10:45 +08:00 committed by GitHub
parent e19a4f764a
commit 8916fbf122
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 44 additions and 10 deletions

View File

@ -214,7 +214,12 @@ type StreamingCoordCataLog interface {
// StreamingNodeCataLog is the interface for streamingnode catalog // StreamingNodeCataLog is the interface for streamingnode catalog
type StreamingNodeCataLog interface { type StreamingNodeCataLog interface {
// WAL select the wal related recovery infos.
// Which must give the pchannel name.
// ListSegmentAssignment list all segment assignments for the wal.
ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error) ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error)
// SaveSegmentAssignments save the segment assignments for the wal.
SaveSegmentAssignments(ctx context.Context, pChannelName string, infos []*streamingpb.SegmentAssignmentMeta) error SaveSegmentAssignments(ctx context.Context, pChannelName string, infos []*streamingpb.SegmentAssignmentMeta) error
} }

View File

@ -1,7 +1,8 @@
package streamingnode package streamingnode
const ( const (
MetaPrefix = "streamingnode-meta" MetaPrefix = "streamingnode-meta"
SegmentAssignMeta = MetaPrefix + "/segment-assign"
SegmentAssignSubFolder = "s" DirectoryWAL = "wal"
DirectorySegmentAssign = "segment-assign"
) )

View File

@ -15,7 +15,22 @@ import (
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
) )
// NewCataLog creates a new catalog instance // NewCataLog creates a new streaming-node catalog instance.
// It's used to persist the recovery info for a streaming node and wal.
// The catalog is shown as following:
// streamingnode-meta
// └── wal
//
// ├── pchannel-1
// │   └── segment-assign
// │   ├── 456398247934
// │   ├── 456398247936
// │   └── 456398247939
// └── pchannel-2
// └── segment-assign
// ├── 456398247934
// ├── 456398247935
// └── 456398247938
func NewCataLog(metaKV kv.MetaKv) metastore.StreamingNodeCataLog { func NewCataLog(metaKV kv.MetaKv) metastore.StreamingNodeCataLog {
return &catalog{ return &catalog{
metaKV: metaKV, metaKV: metaKV,
@ -27,6 +42,7 @@ type catalog struct {
metaKV kv.MetaKv metaKV kv.MetaKv
} }
// ListSegmentAssignment lists the segment assignment info of the pchannel.
func (c *catalog) ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error) { func (c *catalog) ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error) {
prefix := buildSegmentAssignmentMetaPath(pChannelName) prefix := buildSegmentAssignmentMetaPath(pChannelName)
keys, values, err := c.metaKV.LoadWithPrefix(ctx, prefix) keys, values, err := c.metaKV.LoadWithPrefix(ctx, prefix)
@ -81,15 +97,16 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin
} }
// buildSegmentAssignmentMetaPath builds the path for segment assignment // buildSegmentAssignmentMetaPath builds the path for segment assignment
// streamingnode-meta/segment-assign/${pChannelName}
func buildSegmentAssignmentMetaPath(pChannelName string) string { func buildSegmentAssignmentMetaPath(pChannelName string) string {
// !!! bad implementation here, but we can't make compatibility for underlying meta kv. return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign) + "/"
// underlying meta kv will remove the last '/' of the path, cause the pchannel lost.
// So we add a special sub path to avoid this.
return path.Join(SegmentAssignMeta, pChannelName, SegmentAssignSubFolder) + "/"
} }
// buildSegmentAssignmentMetaPathOfSegment builds the path for segment assignment // buildSegmentAssignmentMetaPathOfSegment builds the path for segment assignment
func buildSegmentAssignmentMetaPathOfSegment(pChannelName string, segmentID int64) string { func buildSegmentAssignmentMetaPathOfSegment(pChannelName string, segmentID int64) string {
return path.Join(SegmentAssignMeta, pChannelName, SegmentAssignSubFolder, strconv.FormatInt(segmentID, 10)) return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign, strconv.FormatInt(segmentID, 10))
}
// buildWALDirectory builds the path for wal directory
func buildWALDirectory(pchannelName string) string {
return path.Join(MetaPrefix, DirectoryWAL, pchannelName) + "/"
} }

View File

@ -41,3 +41,14 @@ func TestCatalog(t *testing.T) {
}) })
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestBuildDirectory(t *testing.T) {
assert.Equal(t, "streamingnode-meta/wal/p1/", buildWALDirectory("p1"))
assert.Equal(t, "streamingnode-meta/wal/p2/", buildWALDirectory("p2"))
assert.Equal(t, "streamingnode-meta/wal/p1/segment-assign/", buildSegmentAssignmentMetaPath("p1"))
assert.Equal(t, "streamingnode-meta/wal/p2/segment-assign/", buildSegmentAssignmentMetaPath("p2"))
assert.Equal(t, "streamingnode-meta/wal/p1/segment-assign/1", buildSegmentAssignmentMetaPathOfSegment("p1", 1))
assert.Equal(t, "streamingnode-meta/wal/p2/segment-assign/2", buildSegmentAssignmentMetaPathOfSegment("p2", 2))
}