diff --git a/cmd/tools/datameta/main.go b/cmd/tools/datameta/main.go new file mode 100644 index 0000000000..479d5dfc84 --- /dev/null +++ b/cmd/tools/datameta/main.go @@ -0,0 +1,121 @@ +package main + +import ( + "flag" + "fmt" + "sort" + "strings" + + "github.com/golang/protobuf/proto" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "go.uber.org/zap" +) + +var ( + etcdAddr = flag.String("etcd", "127.0.0.1:2379", "Etcd Endpoint to connect") + rootPath = flag.String("rootPath", "by-dev/meta/datacoord-meta/s", "Datacoord Segment root path to iterate") + + collectionID = flag.Int64("collection", 0, "Collection ID to filter with") + partitionID = flag.Int64("partition", 0, "Partition ID to filter with") + segmentID = flag.Int64("segment", 0, "Segment ID to filter with") + channel = flag.String("channel", "", "Channel name to filter with") + detailBinlogs = flag.Bool("detail", false, "Display detail binlog path content") +) + +func main() { + flag.Parse() + etcdkv, err := etcdkv.NewEtcdKV([]string{*etcdAddr}, *rootPath) + if err != nil { + log.Fatal("failed to connect to ected", zap.Error(err)) + } + + keys, values, err := etcdkv.LoadWithPrefix("/") + if err != nil { + log.Fatal("failed to list ", zap.Error(err)) + } + for i := range keys { + info := &datapb.SegmentInfo{} + err = proto.Unmarshal([]byte(values[i]), info) + if err != nil { + continue + } + if *collectionID > 0 && info.CollectionID != *collectionID { + continue + } + + if *partitionID > 0 && info.PartitionID != *partitionID { + continue + } + + if *segmentID > 0 && info.ID != *segmentID { + continue + } + + if len(*channel) > 0 && !strings.Contains(info.InsertChannel, *channel) { + continue + } + + printSegmentInfo(info) + } +} + +const ( + tsPrintFormat = "2006-01-02 15:04:05.999 -0700" +) + +func printSegmentInfo(info *datapb.SegmentInfo) { + fmt.Println("================================================================================") + fmt.Printf("Segment ID: %d\n", info.ID) + fmt.Printf("Segment State:%v\n", info.State) + fmt.Printf("Collection ID: %d\t\tPartitionID: %d\n", info.CollectionID, info.PartitionID) + fmt.Printf("Insert Channel:%s\n", info.InsertChannel) + fmt.Printf("Num of Rows: %d\t\tMax Row Num: %d\n", info.NumOfRows, info.MaxRowNum) + lastExpireTime, _ := tsoutil.ParseTS(info.LastExpireTime) + fmt.Printf("Last Expire Time: %s\n", lastExpireTime.Format(tsPrintFormat)) + if info.StartPosition != nil { + startTime, _ := tsoutil.ParseTS(info.StartPosition.Timestamp) + fmt.Printf("Start Position ID: %v, time: %s\n", info.StartPosition.MsgID, startTime.Format(tsPrintFormat)) + } else { + fmt.Println("Start Position: nil") + } + if info.DmlPosition != nil { + dmlTime, _ := tsoutil.ParseTS(info.DmlPosition.Timestamp) + fmt.Printf("Dml Position ID: %v, time: %s\n", info.StartPosition.MsgID, dmlTime.Format(tsPrintFormat)) + } else { + fmt.Println("Dml Position: nil") + } + fmt.Printf("Binlog Nums %d\tStatsLog Nums: %d\tDeltaLog Nums:%d\n", + len(info.Binlogs), len(info.Statslogs), len(info.Deltalogs)) + + if *detailBinlogs { + fmt.Println("**************************************") + fmt.Println("Binlogs:") + sort.Slice(info.Binlogs, func(i, j int) bool { + return info.Binlogs[i].FieldID < info.Binlogs[j].FieldID + }) + for _, log := range info.Binlogs { + fmt.Printf("Field %d: %v\n", log.FieldID, log.Binlogs) + } + + fmt.Println("**************************************") + fmt.Println("Statslogs:") + sort.Slice(info.Statslogs, func(i, j int) bool { + return info.Statslogs[i].FieldID < info.Statslogs[j].FieldID + }) + for _, log := range info.Statslogs { + fmt.Printf("Field %d: %v\n", log.FieldID, log.Binlogs) + } + + fmt.Println("**************************************") + fmt.Println("Delta Logs:") + for _, log := range info.GetDeltalogs() { + fmt.Printf("Entries: %d From: %v - To: %v\n", log.RecordEntries, log.TimestampFrom, log.TimestampTo) + fmt.Printf("Path: %v\n", log.DeltaLogPath) + } + } + + fmt.Println("================================================================================") +}