From 21b0e5ca9dd568adffe4da818c53051687342ef3 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 22 Dec 2025 20:55:19 +0800 Subject: [PATCH] enhance: Don't seal segments when only alter collection properties (#46488) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### **PR Type** Enhancement ___ ### **Description** - Only flush and fence segments for schema-changing alter collection messages - Skip segment sealing for collection property-only alterations - Add conditional check using messageutil.IsSchemaChange utility function ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Alter Collection Message"] --> B{"Is Schema Change?"} B -->|Yes| C["Flush and Fence Segments"] B -->|No| D["Skip Segment Operations"] C --> E["Set Flushed Segment IDs"] D --> E E --> F["Append Operation"] ```

File Walkthrough

Relevant files
Enhancement
shard_interceptor.go
Conditional segment sealing based on schema changes           

internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go
  • Added import for messageutil package to access schema change detection
    utility
  • Modified handleAlterCollection to conditionally flush and fence
    segments only for schema-changing messages
  • Wrapped segment flushing logic in if
    messageutil.IsSchemaChange(header) check
  • Skips unnecessary segment sealing when only collection properties are
    altered
+9/-3     
___ ## Summary by CodeRabbit * **Bug Fixes** * Optimized collection schema alteration to conditionally perform segment allocation operations only when schema changes are detected, reducing unnecessary overhead in unmodified collection scenarios. ✏️ Tip: You can customize this high-level summary in your review settings. Signed-off-by: Cai Zhang --- .../wal/interceptors/shard/shard_interceptor.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go index 01ce0f3277..3b9fe9de7a 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/messageutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" ) @@ -251,10 +252,15 @@ func (impl *shardInterceptor) handleSchemaChange(ctx context.Context, msg messag func (impl *shardInterceptor) handleAlterCollection(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (message.MessageID, error) { putCollectionMsg := message.MustAsMutableAlterCollectionMessageV2(msg) header := putCollectionMsg.Header() - segmentIDs, err := impl.shardManager.FlushAndFenceSegmentAllocUntil(header.GetCollectionId(), msg.TimeTick()) - if err != nil { - return nil, status.NewUnrecoverableError(err.Error()) + var segmentIDs []int64 + var err error + if messageutil.IsSchemaChange(header) { + segmentIDs, err = impl.shardManager.FlushAndFenceSegmentAllocUntil(header.GetCollectionId(), msg.TimeTick()) + if err != nil { + return nil, status.NewUnrecoverableError(err.Error()) + } } + header.FlushedSegmentIds = segmentIDs putCollectionMsg.OverwriteHeader(header) return appendOp(ctx, msg)