mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: Enhance logs for proxy and rootcoord meta table (#46652)
issue: https://github.com/milvus-io/milvus/issues/46651 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Enhancement: Add Context-Aware Logging for Proxy and RootCoord Meta Table Operations **Core Invariant**: All changes maintain existing cache behavior and state transition logic by purely enhancing observability through context-aware logging without modifying control flow, return values, or data structures. **Logic Simplified Without Regression**: - Removed internal helper method `getFullCollectionInfo` from MetaCache by inlining its logic directly into GetCollectionInfo, eliminating an unnecessary abstraction layer while preserving the exact same cache-hit/miss and fetch-or-update paths - This consolidation has no impact on behavior because the helper was only called from one location and the inlined logic executes identically **Enhanced Logging for Observability (No Behavior Changes)**: - Added context-aware logging (log.Ctx(ctx)) to cache miss scenarios and timestamp comparisons in proxy MetaCache, enabling request tracing without altering cache lookup logic - Expanded RootCoord MetaTable's internal helper method signatures to propagate context for contextual logging across collection lifecycle events (begin truncate, update state, remove names/aliases, delete from collections map), while keeping all call sites and state transitions unchanged - Enhanced DescribeCollection logging in proxy to capture request scope (role, database, collection IDs, timestamp) and response schema at operation boundaries **Type**: Enhancement focused on improved observability. All modifications are strictly additive logging; no data structures, caching strategies, or core logic paths were altered. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
e3a85be435
commit
b7761d67a3
@ -449,7 +449,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
|
||||
collectionName = collection.Schema.GetName()
|
||||
}
|
||||
if database == "" {
|
||||
log.Warn("database is empty, use default database name", zap.String("collectionName", collectionName), zap.Stack("stack"))
|
||||
log.Ctx(ctx).Warn("database is empty, use default database name", zap.String("collectionName", collectionName), zap.Stack("stack"))
|
||||
}
|
||||
isolation, err := common.IsPartitionKeyIsolationKvEnabled(collection.Properties...)
|
||||
if err != nil {
|
||||
@ -463,7 +463,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
|
||||
curVersion := m.collectionCacheVersion[collection.GetCollectionID()]
|
||||
// Compatibility logic: if the rootcoord version is lower(requestTime = 0), update the cache directly.
|
||||
if collection.GetRequestTime() < curVersion && collection.GetRequestTime() != 0 {
|
||||
log.Debug("describe collection timestamp less than version, don't update cache",
|
||||
log.Ctx(ctx).Debug("describe collection timestamp less than version, don't update cache",
|
||||
zap.String("collectionName", collectionName),
|
||||
zap.Uint64("version", collection.GetRequestTime()), zap.Uint64("cache version", curVersion))
|
||||
return &collectionInfo{
|
||||
@ -617,31 +617,6 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, coll
|
||||
return collInfo, nil
|
||||
}
|
||||
|
||||
// GetCollectionInfo returns the collection information related to provided collection name
|
||||
// If the information is not found, proxy will try to fetch information for other source (RootCoord for now)
|
||||
// TODO: may cause data race of this implementation, should be refactored in future.
|
||||
func (m *MetaCache) getFullCollectionInfo(ctx context.Context, database, collectionName string, collectionID int64) (*collectionInfo, error) {
|
||||
collInfo, ok := m.getCollection(database, collectionName, collectionID)
|
||||
|
||||
method := "GetCollectionInfo"
|
||||
// if collInfo.collID != collectionID, means that the cache is not trustable
|
||||
// try to get collection according to collectionID
|
||||
if !ok || collInfo.collID != collectionID {
|
||||
tr := timerecord.NewTimeRecorder("UpdateCache")
|
||||
metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc()
|
||||
|
||||
collInfo, err := m.UpdateByID(ctx, database, collectionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return collInfo, nil
|
||||
}
|
||||
|
||||
metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc()
|
||||
return collInfo, nil
|
||||
}
|
||||
|
||||
func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectionName string) (*schemaInfo, error) {
|
||||
collInfo, ok := m.getCollection(database, collectionName, 0)
|
||||
|
||||
|
||||
@ -140,6 +140,16 @@ func cloneStructArrayFields(fields []*schemapb.StructArrayFieldSchema) []*schema
|
||||
func (node *CachedProxyServiceProvider) DescribeCollection(ctx context.Context,
|
||||
request *milvuspb.DescribeCollectionRequest,
|
||||
) (resp *milvuspb.DescribeCollectionResponse, err error) {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("role", typeutil.ProxyRole),
|
||||
zap.String("db", request.GetDbName()),
|
||||
zap.String("collection", request.GetCollectionName()),
|
||||
zap.Int64("collectionID", request.GetCollectionID()),
|
||||
zap.Uint64("timestamp", request.GetTimeStamp()),
|
||||
)
|
||||
|
||||
log.Debug("DescribeCollection received")
|
||||
|
||||
resp = &milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Success(),
|
||||
CollectionName: request.CollectionName,
|
||||
@ -227,6 +237,11 @@ func (node *CachedProxyServiceProvider) DescribeCollection(ctx context.Context,
|
||||
resp.Aliases = c.aliases
|
||||
resp.Properties = c.properties
|
||||
|
||||
log.Debug("DescribeCollection done",
|
||||
zap.Int64("collectionID", resp.GetCollectionID()),
|
||||
zap.Any("schema", resp.GetSchema()),
|
||||
)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@ -280,8 +295,6 @@ func (node *RemoteProxyServiceProvider) DescribeCollection(ctx context.Context,
|
||||
log.Debug("DescribeCollection done",
|
||||
zap.Uint64("BeginTS", dct.BeginTs()),
|
||||
zap.Uint64("EndTS", dct.EndTs()),
|
||||
zap.String("db", request.DbName),
|
||||
zap.String("collection", request.CollectionName),
|
||||
)
|
||||
|
||||
return dct.result, nil
|
||||
|
||||
@ -562,6 +562,10 @@ func (mt *MetaTable) DropCollection(ctx context.Context, collectionID UniqueID,
|
||||
return err
|
||||
}
|
||||
mt.collID2Meta[collectionID] = clone
|
||||
log.Ctx(ctx).Info("update coll state to dropping",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.String("state", clone.State.String()),
|
||||
)
|
||||
|
||||
db, err := mt.getDatabaseByIDInternal(ctx, coll.DBID, typeutil.MaxTimestamp)
|
||||
if err != nil {
|
||||
@ -580,31 +584,50 @@ func (mt *MetaTable) DropCollection(ctx context.Context, collectionID UniqueID,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mt *MetaTable) removeIfNameMatchedInternal(collectionID UniqueID, name string) {
|
||||
func (mt *MetaTable) removeIfNameMatchedInternal(ctx context.Context, collectionID UniqueID, name string) {
|
||||
mt.names.removeIf(func(db string, collection string, id UniqueID) bool {
|
||||
return collectionID == id
|
||||
if collectionID == id {
|
||||
log.Ctx(ctx).Info("remove from names",
|
||||
zap.String("dbName", db),
|
||||
zap.String("collectionName", collection),
|
||||
zap.Int64("collectionID", id),
|
||||
)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (mt *MetaTable) removeIfAliasMatchedInternal(collectionID UniqueID, alias string) {
|
||||
func (mt *MetaTable) removeIfAliasMatchedInternal(ctx context.Context, collectionID UniqueID, alias string) {
|
||||
mt.aliases.removeIf(func(db string, collection string, id UniqueID) bool {
|
||||
return collectionID == id
|
||||
if collectionID == id {
|
||||
log.Ctx(ctx).Info("remove from aliases",
|
||||
zap.String("dbName", db),
|
||||
zap.String("alias", collection),
|
||||
zap.Int64("collectionID", id),
|
||||
)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (mt *MetaTable) removeIfMatchedInternal(collectionID UniqueID, name string) {
|
||||
mt.removeIfNameMatchedInternal(collectionID, name)
|
||||
mt.removeIfAliasMatchedInternal(collectionID, name)
|
||||
func (mt *MetaTable) removeIfMatchedInternal(ctx context.Context, collectionID UniqueID, name string) {
|
||||
mt.removeIfNameMatchedInternal(ctx, collectionID, name)
|
||||
mt.removeIfAliasMatchedInternal(ctx, collectionID, name)
|
||||
}
|
||||
|
||||
func (mt *MetaTable) removeAllNamesIfMatchedInternal(collectionID UniqueID, names []string) {
|
||||
func (mt *MetaTable) removeAllNamesIfMatchedInternal(ctx context.Context, collectionID UniqueID, names []string) {
|
||||
for _, name := range names {
|
||||
mt.removeIfMatchedInternal(collectionID, name)
|
||||
mt.removeIfMatchedInternal(ctx, collectionID, name)
|
||||
}
|
||||
}
|
||||
|
||||
func (mt *MetaTable) removeCollectionByIDInternal(collectionID UniqueID) {
|
||||
func (mt *MetaTable) removeCollectionByIDInternal(ctx context.Context, collectionID UniqueID) {
|
||||
delete(mt.collID2Meta, collectionID)
|
||||
log.Ctx(ctx).Info("delete from collID2Meta",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
)
|
||||
}
|
||||
|
||||
func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID, ts Timestamp) error {
|
||||
@ -640,8 +663,8 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID
|
||||
allNames = append(allNames, coll.Name)
|
||||
|
||||
// We cannot delete the name directly, since newly collection with same name may be created.
|
||||
mt.removeAllNamesIfMatchedInternal(collectionID, allNames)
|
||||
mt.removeCollectionByIDInternal(collectionID)
|
||||
mt.removeAllNamesIfMatchedInternal(ctx, collectionID, allNames)
|
||||
mt.removeCollectionByIDInternal(ctx, collectionID)
|
||||
|
||||
log.Ctx(ctx).Info("remove collection",
|
||||
zap.Int64("dbID", coll.DBID),
|
||||
@ -975,7 +998,17 @@ func (mt *MetaTable) AlterCollection(ctx context.Context, result message.Broadca
|
||||
mt.names.remove(oldColl.DBName, oldColl.Name)
|
||||
mt.names.insert(newColl.DBName, newColl.Name, newColl.CollectionID)
|
||||
mt.collID2Meta[header.CollectionId] = newColl
|
||||
log.Ctx(ctx).Info("alter collection finished", zap.Bool("dbChanged", dbChanged), zap.Int64("collectionID", oldColl.CollectionID), zap.Uint64("ts", newColl.UpdateTimestamp))
|
||||
log.Ctx(ctx).Info("alter collection finished",
|
||||
zap.String("oldDBName", oldColl.DBName),
|
||||
zap.String("newDBName", newColl.DBName),
|
||||
zap.String("oldCollectionName", oldColl.Name),
|
||||
zap.String("newCollectionName", newColl.Name),
|
||||
zap.Int64("headerCollectionID", header.CollectionId),
|
||||
zap.Int64("newCollectionID", newColl.CollectionID),
|
||||
zap.Int64("oldCollectionID", oldColl.CollectionID),
|
||||
zap.Bool("dbChanged", dbChanged),
|
||||
zap.Uint64("ts", newColl.UpdateTimestamp),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1004,6 +1037,9 @@ func (mt *MetaTable) BeginTruncateCollection(ctx context.Context, collectionID U
|
||||
return err
|
||||
}
|
||||
mt.collID2Meta[coll.CollectionID] = newColl
|
||||
log.Ctx(ctx).Info("update collID2Meta for begin truncate collection",
|
||||
zap.Int64("collectionID", coll.CollectionID),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1032,6 +1068,9 @@ func (mt *MetaTable) TruncateCollection(ctx context.Context, result message.Broa
|
||||
return err
|
||||
}
|
||||
mt.collID2Meta[coll.CollectionID] = newColl
|
||||
log.Ctx(ctx).Info("update collID2Meta for truncate collection",
|
||||
zap.Int64("collectionID", coll.CollectionID),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1347,7 +1386,8 @@ func (mt *MetaTable) AlterAlias(ctx context.Context, result message.BroadcastRes
|
||||
log.Ctx(ctx).Info("alter alias",
|
||||
zap.String("db", header.DbName),
|
||||
zap.String("alias", header.Alias),
|
||||
zap.String("collection", header.CollectionName),
|
||||
zap.String("collectionName", header.CollectionName),
|
||||
zap.Int64("collectionID", header.CollectionId),
|
||||
zap.Uint64("ts", result.GetControlChannelResult().TimeTick),
|
||||
)
|
||||
return nil
|
||||
@ -1585,7 +1625,7 @@ func (mt *MetaTable) AlterCredential(ctx context.Context, result message.Broadca
|
||||
}
|
||||
// if the credential already exists and the version is not greater than the current timetick.
|
||||
if existsCredential != nil && existsCredential.TimeTick >= result.GetControlChannelResult().TimeTick {
|
||||
log.Info("credential already exists and the version is not greater than the current timetick",
|
||||
log.Ctx(ctx).Info("credential already exists and the version is not greater than the current timetick",
|
||||
zap.String("username", body.CredentialInfo.Username),
|
||||
zap.Uint64("incoming", result.GetControlChannelResult().TimeTick),
|
||||
zap.Uint64("current", existsCredential.TimeTick),
|
||||
@ -1637,7 +1677,7 @@ func (mt *MetaTable) DeleteCredential(ctx context.Context, result message.Broadc
|
||||
}
|
||||
// if the credential already exists and the version is not greater than the current timetick.
|
||||
if existsCredential != nil && existsCredential.TimeTick >= result.GetControlChannelResult().TimeTick {
|
||||
log.Info("credential already exists and the version is not greater than the current timetick",
|
||||
log.Ctx(ctx).Info("credential already exists and the version is not greater than the current timetick",
|
||||
zap.String("username", result.Message.Header().UserName),
|
||||
zap.Uint64("incoming", result.GetControlChannelResult().TimeTick),
|
||||
zap.Uint64("current", existsCredential.TimeTick),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user