From eb1f0bc8052b16720baea47880d009db8d7be470 Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 21 Jun 2022 10:28:12 +0800 Subject: [PATCH] Add the consistency check tool (#17136) Signed-off-by: SimFG --- cmd/milvus/help.go | 66 +++ cmd/milvus/mck.go | 708 +++++++++++++++++++++++++++++++++ cmd/milvus/milvus.go | 332 ++-------------- cmd/milvus/run.go | 171 ++++++++ cmd/milvus/stop.go | 73 ++++ cmd/milvus/util.go | 124 ++++++ internal/util/typeutil/type.go | 24 ++ 7 files changed, 1191 insertions(+), 307 deletions(-) create mode 100644 cmd/milvus/help.go create mode 100644 cmd/milvus/mck.go create mode 100644 cmd/milvus/run.go create mode 100644 cmd/milvus/stop.go create mode 100644 cmd/milvus/util.go diff --git a/cmd/milvus/help.go b/cmd/milvus/help.go new file mode 100644 index 0000000000..38687cdadc --- /dev/null +++ b/cmd/milvus/help.go @@ -0,0 +1,66 @@ +package milvus + +import ( + "fmt" + "strings" + + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +var ( + usageLine = fmt.Sprintf("Usage:\n"+ + "%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine) + + serverTypeLine = ` +[server type] + ` + strings.Join(typeutil.ServerTypeList(), "\n\t") + ` + mixture +` + runLine = ` +milvus run [server type] [flags] + Start a Milvus Server. + Tips: Only the server type is 'mixture', flags about starting server can be used. +[flags] + -rootcoord 'true' + Start the rootcoord server. + -querycoord 'true' + Start the querycoord server. + -indexcoord 'true' + Start the indexcoord server. + -datacoord 'true' + Start the datacoord server. + -alias '' + Set alias +` + stopLine = ` +milvus stop [server type] [flags] + Stop a Milvus Server. +[flags] + -alias '' + Set alias +` + mckLine = ` +milvus mck run [flags] + Milvus data consistency check. + Tips: The flags are optional. +[flags] + -etcdIp '' + Ip to connect the ectd server. + -etcdRootPath '' + The root path of operating the etcd data. + -minioAddress '' + Address to connect the minio server. + -minioUsername '' + The username to login the minio server. + -minioPassword '' + The password to login the minio server. + -minioUseSSL 'false' + Whether to use the ssl to connect the minio server. + -minioBucketName '' + The bucket to operate the data in it + +milvus mck cleanTrash [flags] + Clean the back inconsistent data + Tips: The flags is the same as its of the 'milvus mck [flags]' +` +) diff --git a/cmd/milvus/mck.go b/cmd/milvus/mck.go new file mode 100644 index 0000000000..bda3019c26 --- /dev/null +++ b/cmd/milvus/mck.go @@ -0,0 +1,708 @@ +package milvus + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/milvus-io/milvus/internal/storage" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/milvus-io/milvus/internal/util/logutil" + + "github.com/milvus-io/milvus/internal/util/paramtable" + + pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/querypb" + + "github.com/golang/protobuf/proto" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/etcd" + "go.uber.org/zap" +) + +const ( + MckCmd = "mck" + MckTypeRun = "run" + MckTypeClean = "cleanTrash" + + segmentPrefix = "datacoord-meta/s" + collectionPrefix = "snapshots/root-coord/collection" + triggerTaskPrefix = "queryCoord-triggerTask" + activeTaskPrefix = "queryCoord-activeTask" + taskInfoPrefix = "queryCoord-taskInfo" + MckTrash = "mck-trash" +) + +type mck struct { + params paramtable.GrpcServerConfig + taskKeyMap map[int64]string + taskNameMap map[int64]string + allTaskInfo map[string]string + partitionIDToTasks map[int64][]int64 + segmentIDToTasks map[int64][]int64 + taskIDToInvalidPath map[int64][]string + segmentIDMap map[int64]*datapb.SegmentInfo + partitionIDMap map[int64]struct{} + etcdKV *etcdkv.EtcdKV + minioChunkManager storage.ChunkManager + + etcdIP string + ectdRootPath string + minioAddress string + minioUsername string + minioPassword string + minioUseSSL string + minioBucketName string + + flagStartIndex int +} + +func (c *mck) execute(args []string, flags *flag.FlagSet) { + if len(args) < 3 { + fmt.Fprintln(os.Stderr, mckLine) + return + } + c.initParam() + flags.Usage = func() { + fmt.Fprintln(os.Stderr, mckLine) + } + + logutil.SetupLogger(&log.Config{ + Level: "info", + File: log.FileLogConfig{ + Filename: fmt.Sprintf("mck-%s.log", time.Now().Format("20060102150405.99")), + }, + }) + + mckType := args[2] + c.flagStartIndex = 2 + isClean := mckType == MckTypeClean + if isClean { + c.flagStartIndex = 3 + } + c.formatFlags(args, flags) + c.connectEctd() + + switch mckType { + case MckTypeRun: + c.run() + case MckTypeClean: + c.cleanTrash() + return + default: + fmt.Fprintln(os.Stderr, mckLine) + return + } +} + +func (c *mck) run() { + c.connectMinio() + + _, values, err := c.etcdKV.LoadWithPrefix(segmentPrefix) + if err != nil { + log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err)) + } + for _, value := range values { + info := &datapb.SegmentInfo{} + err = proto.Unmarshal([]byte(value), info) + if err != nil { + log.Warn("fail to unmarshal the segment info", zap.Error(err)) + continue + } + c.segmentIDMap[info.ID] = info + } + + _, values, err = c.etcdKV.LoadWithPrefix(collectionPrefix) + if err != nil { + log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err)) + } + for _, value := range values { + collInfo := pb.CollectionInfo{} + err = proto.Unmarshal([]byte(value), &collInfo) + if err != nil { + log.Warn("fail to unmarshal the collection info", zap.Error(err)) + continue + } + for _, id := range collInfo.PartitionIDs { + c.partitionIDMap[id] = struct{}{} + } + } + + // log Segment ids and partition ids + var ids []int64 + for id := range c.segmentIDMap { + ids = append(ids, id) + } + log.Info("segment ids", zap.Int64s("ids", ids)) + + ids = []int64{} + for id := range c.partitionIDMap { + ids = append(ids, id) + } + log.Info("partition ids", zap.Int64s("ids", ids)) + + keys, values, err := c.etcdKV.LoadWithPrefix(triggerTaskPrefix) + if err != nil { + log.Fatal("failed to list the trigger task info", zap.Error(err)) + } + c.extractTask(triggerTaskPrefix, keys, values) + + keys, values, err = c.etcdKV.LoadWithPrefix(activeTaskPrefix) + if err != nil { + log.Fatal("failed to list the active task info", zap.Error(err)) + } + c.extractTask(activeTaskPrefix, keys, values) + + // log all tasks + if len(c.taskNameMap) > 0 { + log.Info("all tasks") + for taskID, taskName := range c.taskNameMap { + log.Info("task info", zap.String("name", taskName), zap.Int64("id", taskID)) + } + } + + // collect invalid tasks + invalidTasks := c.collectInvalidTask() + if len(invalidTasks) > 0 { + line() + fmt.Println("All invalid tasks") + for _, invalidTask := range invalidTasks { + line2() + fmt.Printf("TaskID: %d\t%s\n", invalidTask, c.taskNameMap[invalidTask]) + } + } + c.handleUserEnter(invalidTasks) +} + +func (c *mck) initParam() { + c.taskKeyMap = make(map[int64]string) + c.taskNameMap = make(map[int64]string) + c.allTaskInfo = make(map[string]string) + c.partitionIDToTasks = make(map[int64][]int64) + c.segmentIDToTasks = make(map[int64][]int64) + c.taskIDToInvalidPath = make(map[int64][]string) + c.segmentIDMap = make(map[int64]*datapb.SegmentInfo) + c.partitionIDMap = make(map[int64]struct{}) +} + +func (c *mck) formatFlags(args []string, flags *flag.FlagSet) { + flags.StringVar(&c.etcdIP, "etcdIp", "", "Etcd endpoint to connect") + flags.StringVar(&c.ectdRootPath, "etcdRootPath", "", "Etcd root path") + flags.StringVar(&c.minioAddress, "minioAddress", "", "Minio endpoint to connect") + flags.StringVar(&c.minioUsername, "minioUsername", "", "Minio username") + flags.StringVar(&c.minioPassword, "minioPassword", "", "Minio password") + flags.StringVar(&c.minioUseSSL, "minioUseSSL", "", "Minio to use ssl") + flags.StringVar(&c.minioBucketName, "minioBucketName", "", "Minio bucket name") + + if err := flags.Parse(os.Args[2:]); err != nil { + log.Fatal("failed to parse flags", zap.Error(err)) + } + log.Info("args", zap.Strings("args", args)) +} + +func (c *mck) connectEctd() { + c.params.Init() + var etcdCli *clientv3.Client + var err error + if c.etcdIP != "" { + etcdCli, err = etcd.GetRemoteEtcdClient([]string{c.etcdIP}) + } else { + etcdCli, err = etcd.GetEtcdClient(&c.params.EtcdCfg) + } + if err != nil { + log.Fatal("failed to connect to etcd", zap.Error(err)) + } + + rootPath := getConfigValue(c.ectdRootPath, c.params.EtcdCfg.MetaRootPath, "ectd_root_path") + c.etcdKV = etcdkv.NewEtcdKV(etcdCli, rootPath) + log.Info("Etcd root path", zap.String("root_path", rootPath)) +} + +func (c *mck) connectMinio() { + useSSL := c.params.MinioCfg.UseSSL + if c.minioUseSSL == "true" || c.minioUseSSL == "false" { + minioUseSSL, err := strconv.ParseBool(c.minioUseSSL) + if err != nil { + log.Panic("fail to parse the 'minioUseSSL' string to the bool value", zap.String("minioUseSSL", c.minioUseSSL), zap.Error(err)) + } + useSSL = minioUseSSL + } + chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio", + storage.RootPath(c.params.LocalStorageCfg.Path), + storage.Address(getConfigValue(c.minioAddress, c.params.MinioCfg.Address, "minio_address")), + storage.AccessKeyID(getConfigValue(c.minioUsername, c.params.MinioCfg.AccessKeyID, "minio_username")), + storage.SecretAccessKeyID(getConfigValue(c.minioPassword, c.params.MinioCfg.SecretAccessKey, "minio_password")), + storage.UseSSL(useSSL), + storage.BucketName(getConfigValue(c.minioBucketName, c.params.MinioCfg.BucketName, "minio_bucket_name")), + storage.CreateBucket(true)) + + var err error + c.minioChunkManager, err = chunkManagerFactory.NewVectorStorageChunkManager(context.Background()) + if err != nil { + log.Fatal("failed to connect to etcd", zap.Error(err)) + } +} + +func getConfigValue(a string, b string, name string) string { + if a != "" { + return a + } + if b != "" { + return b + } + log.Panic(fmt.Sprintf("the config '%s' is empty", name)) + return "" +} + +func (c *mck) cleanTrash() { + keys, _, err := c.etcdKV.LoadWithPrefix(MckTrash) + if err != nil { + log.Error("failed to load backup info", zap.Error(err)) + return + } + if len(keys) == 0 { + fmt.Println("Empty backup task info") + return + } + fmt.Println(strings.Join(keys, "\n")) + fmt.Print("Delete all backup infos, [Y/n]:") + deleteAll := "" + fmt.Scanln(&deleteAll) + if deleteAll == "Y" { + err = c.etcdKV.RemoveWithPrefix(MckTrash) + if err != nil { + log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err)) + return + } + } +} + +func (c *mck) collectInvalidTask() []int64 { + var invalidTasks []int64 + invalidTasks = append(invalidTasks, c.collectInvalidTaskForPartition()...) + invalidTasks = append(invalidTasks, c.collectInvalidTaskForSegment()...) + invalidTasks = append(invalidTasks, c.collectInvalidTaskForMinioPath()...) + invalidTasks = removeRepeatElement(invalidTasks) + return invalidTasks +} + +// collect invalid tasks related with invalid partitions +func (c *mck) collectInvalidTaskForPartition() []int64 { + var invalidTasksOfPartition []int64 + var buffer bytes.Buffer + for id, tasks := range c.partitionIDToTasks { + if _, ok := c.partitionIDMap[id]; !ok { + invalidTasksOfPartition = append(invalidTasksOfPartition, tasks...) + buffer.WriteString(fmt.Sprintf("Partition ID: %d\n", id)) + buffer.WriteString(fmt.Sprintf("Tasks: %v\n", tasks)) + } + } + invalidTasksOfPartition = removeRepeatElement(invalidTasksOfPartition) + if len(invalidTasksOfPartition) > 0 { + line() + fmt.Println("Invalid Partitions") + fmt.Println(buffer.String()) + } + return invalidTasksOfPartition +} + +// collect invalid tasks related with invalid segments +func (c *mck) collectInvalidTaskForSegment() []int64 { + var invalidTasksOfSegment []int64 + var buffer bytes.Buffer + if len(c.segmentIDToTasks) > 0 { + for id, tasks := range c.segmentIDToTasks { + if _, ok := c.segmentIDMap[id]; !ok { + invalidTasksOfSegment = append(invalidTasksOfSegment, tasks...) + buffer.WriteString(fmt.Sprintf("Segment ID: %d\n", id)) + buffer.WriteString(fmt.Sprintf("Tasks: %v\n", tasks)) + } + } + invalidTasksOfSegment = removeRepeatElement(invalidTasksOfSegment) + } + if len(invalidTasksOfSegment) > 0 { + line() + fmt.Println("Invalid Segments") + fmt.Println(buffer.String()) + } + return invalidTasksOfSegment +} + +// collect invalid tasks related with incorrect file paths in minio +func (c *mck) collectInvalidTaskForMinioPath() []int64 { + var invalidTasksOfPath []int64 + if len(c.taskIDToInvalidPath) > 0 { + line() + fmt.Println("Invalid Paths") + for id, paths := range c.taskIDToInvalidPath { + fmt.Printf("Task ID: %d\n", id) + for _, path := range paths { + fmt.Printf("\t%s\n", path) + } + invalidTasksOfPath = append(invalidTasksOfPath, id) + } + invalidTasksOfPath = removeRepeatElement(invalidTasksOfPath) + } + return invalidTasksOfPath +} + +func redPrint(msg string) { + fmt.Printf("\033[0;31;40m%s\033[0m", msg) +} + +func line() { + fmt.Println("================================================================================") +} + +func line2() { + fmt.Println("--------------------------------------------------------------------------------") +} + +func getTrashKey(taskType, key string) string { + return fmt.Sprintf("%s/%s/%s", MckTrash, taskType, key) +} + +func (c *mck) extractTask(prefix string, keys []string, values []string) { + + for i := range keys { + taskID, err := strconv.ParseInt(filepath.Base(keys[i]), 10, 64) + if err != nil { + log.Warn("failed to parse int", zap.String("key", filepath.Base(keys[i])), zap.String("tasks", filepath.Base(keys[i]))) + continue + } + + taskName, pids, sids, err := c.unmarshalTask(taskID, values[i]) + if err != nil { + log.Warn("failed to unmarshal task", zap.Int64("task_id", taskID)) + continue + } + for _, pid := range pids { + c.partitionIDToTasks[pid] = append(c.partitionIDToTasks[pid], taskID) + } + for _, sid := range sids { + c.segmentIDToTasks[sid] = append(c.segmentIDToTasks[sid], taskID) + } + + c.taskKeyMap[taskID] = fmt.Sprintf("%s/%d", prefix, taskID) + c.allTaskInfo[c.taskKeyMap[taskID]] = values[i] + c.taskNameMap[taskID] = taskName + } +} + +func (c *mck) removeTask(invalidTask int64) bool { + taskType := c.taskNameMap[invalidTask] + key := c.taskKeyMap[invalidTask] + err := c.etcdKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key]) + if err != nil { + log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err)) + return false + } + fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key)) + err = c.etcdKV.Remove(key) + if err != nil { + log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err)) + return false + } + + key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask) + taskInfo, err := c.etcdKV.Load(key) + if err != nil { + log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err)) + return false + } + err = c.etcdKV.Save(getTrashKey(taskType, key), taskInfo) + if err != nil { + log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err)) + return false + } + fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key)) + err = c.etcdKV.Remove(key) + if err != nil { + log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err)) + } + return true +} + +func emptyInt64() []int64 { + return []int64{} +} + +func errReturn(taskID int64, pbName string, err error) (string, []int64, []int64, error) { + return "", emptyInt64(), emptyInt64(), fmt.Errorf("task id: %d, failed to unmarshal %s, err %s ", taskID, pbName, err.Error()) +} + +func int64Map(ids []int64) map[int64]interface{} { + idMap := make(map[int64]interface{}) + for _, id := range ids { + idMap[id] = nil + } + return idMap +} + +func removeRepeatElement(ids []int64) []int64 { + idMap := int64Map(ids) + elements := make([]int64, 0, len(idMap)) + for k := range idMap { + elements = append(elements, k) + } + return elements +} + +func (c *mck) extractDataSegmentInfos(taskID int64, infos []*datapb.SegmentInfo) ([]int64, []int64) { + var partitionIDs []int64 + var segmentIDs []int64 + for _, info := range infos { + partitionIDs = append(partitionIDs, info.PartitionID) + segmentIDs = append(segmentIDs, info.ID) + c.extractFieldBinlog(taskID, info.Binlogs) + c.extractFieldBinlog(taskID, info.Statslogs) + c.extractFieldBinlog(taskID, info.Deltalogs) + } + + return partitionIDs, segmentIDs +} + +func (c *mck) extractQuerySegmentInfos(taskID int64, infos []*querypb.SegmentInfo) ([]int64, []int64) { + var partitionIDs []int64 + var segmentIDs []int64 + for _, info := range infos { + partitionIDs = append(partitionIDs, info.PartitionID) + segmentIDs = append(segmentIDs, info.SegmentID) + c.extractVecFieldIndexInfo(taskID, info.IndexInfos) + } + + return partitionIDs, segmentIDs +} + +func (c *mck) extractVchannelInfo(taskID int64, infos []*datapb.VchannelInfo) ([]int64, []int64) { + var partitionIDs []int64 + var segmentIDs []int64 + for _, info := range infos { + pids, sids := c.extractDataSegmentInfos(taskID, info.DroppedSegments) + partitionIDs = append(partitionIDs, pids...) + segmentIDs = append(segmentIDs, sids...) + + pids, sids = c.extractDataSegmentInfos(taskID, info.FlushedSegments) + partitionIDs = append(partitionIDs, pids...) + segmentIDs = append(segmentIDs, sids...) + + pids, sids = c.extractDataSegmentInfos(taskID, info.UnflushedSegments) + partitionIDs = append(partitionIDs, pids...) + segmentIDs = append(segmentIDs, sids...) + } + return partitionIDs, segmentIDs +} + +func (c *mck) extractFieldBinlog(taskID int64, fieldBinlogList []*datapb.FieldBinlog) { + for _, fieldBinlog := range fieldBinlogList { + for _, binlog := range fieldBinlog.Binlogs { + ok, _ := c.minioChunkManager.Exist(binlog.LogPath) + if !ok { + c.taskIDToInvalidPath[taskID] = append(c.taskIDToInvalidPath[taskID], binlog.LogPath) + } + } + } +} + +func (c *mck) extractVecFieldIndexInfo(taskID int64, infos []*querypb.FieldIndexInfo) { + for _, info := range infos { + for _, indexPath := range info.IndexFilePaths { + ok, _ := c.minioChunkManager.Exist(indexPath) + if !ok { + c.taskIDToInvalidPath[taskID] = append(c.taskIDToInvalidPath[taskID], indexPath) + } + } + } +} + +// return partitionIDs,segmentIDs,error +func (c *mck) unmarshalTask(taskID int64, t string) (string, []int64, []int64, error) { + header := commonpb.MsgHeader{} + err := proto.Unmarshal([]byte(t), &header) + + if err != nil { + return errReturn(taskID, "MsgHeader", err) + } + + switch header.Base.MsgType { + case commonpb.MsgType_LoadCollection: + loadReq := querypb.LoadCollectionRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "LoadCollectionRequest", err) + } + log.Info("LoadCollection", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "LoadCollection", emptyInt64(), emptyInt64(), nil + case commonpb.MsgType_LoadPartitions: + loadReq := querypb.LoadPartitionsRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "LoadPartitionsRequest", err) + } + log.Info("LoadPartitions", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "LoadPartitions", loadReq.PartitionIDs, emptyInt64(), nil + case commonpb.MsgType_ReleaseCollection: + loadReq := querypb.ReleaseCollectionRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "ReleaseCollectionRequest", err) + } + log.Info("ReleaseCollection", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "ReleaseCollection", emptyInt64(), emptyInt64(), nil + case commonpb.MsgType_ReleasePartitions: + loadReq := querypb.ReleasePartitionsRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "ReleasePartitionsRequest", err) + } + log.Info("ReleasePartitions", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "ReleasePartitions", loadReq.PartitionIDs, emptyInt64(), nil + case commonpb.MsgType_LoadSegments: + loadReq := querypb.LoadSegmentsRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "LoadSegmentsRequest", err) + } + + fmt.Printf("LoadSegments, task-id: %d, %+v\n", taskID, loadReq) + + var partitionIDs []int64 + var segmentIDs []int64 + if loadReq.LoadMeta != nil { + partitionIDs = append(partitionIDs, loadReq.LoadMeta.PartitionIDs...) + } + for _, info := range loadReq.Infos { + partitionIDs = append(partitionIDs, info.PartitionID) + segmentIDs = append(segmentIDs, info.SegmentID) + c.extractFieldBinlog(taskID, info.BinlogPaths) + c.extractFieldBinlog(taskID, info.Statslogs) + c.extractFieldBinlog(taskID, info.Deltalogs) + c.extractVecFieldIndexInfo(taskID, info.IndexInfos) + } + log.Info("LoadSegments", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "LoadSegments", removeRepeatElement(partitionIDs), removeRepeatElement(segmentIDs), nil + case commonpb.MsgType_ReleaseSegments: + loadReq := querypb.ReleaseSegmentsRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "ReleaseSegmentsRequest", err) + } + log.Info("ReleaseSegments", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "ReleaseSegments", loadReq.PartitionIDs, loadReq.SegmentIDs, nil + case commonpb.MsgType_WatchDmChannels: + loadReq := querypb.WatchDmChannelsRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "WatchDmChannelsRequest", err) + } + + var partitionIDs []int64 + var segmentIDs []int64 + if loadReq.LoadMeta != nil { + partitionIDs = append(partitionIDs, loadReq.LoadMeta.PartitionIDs...) + } + + pids, sids := c.extractVchannelInfo(taskID, loadReq.Infos) + partitionIDs = append(partitionIDs, pids...) + segmentIDs = append(segmentIDs, sids...) + pids, sids = c.extractDataSegmentInfos(taskID, loadReq.ExcludeInfos) + partitionIDs = append(partitionIDs, pids...) + segmentIDs = append(segmentIDs, sids...) + log.Info("WatchDmChannels", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "WatchDmChannels", removeRepeatElement(partitionIDs), removeRepeatElement(segmentIDs), nil + case commonpb.MsgType_WatchDeltaChannels: + loadReq := querypb.WatchDeltaChannelsRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "WatchDeltaChannelsRequest", err) + } + var partitionIDs []int64 + var segmentIDs []int64 + if loadReq.LoadMeta != nil { + partitionIDs = append(partitionIDs, loadReq.LoadMeta.PartitionIDs...) + } + pids, sids := c.extractVchannelInfo(taskID, loadReq.Infos) + partitionIDs = append(partitionIDs, pids...) + segmentIDs = append(segmentIDs, sids...) + log.Info("WatchDeltaChannels", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "WatchDeltaChannels", removeRepeatElement(partitionIDs), removeRepeatElement(segmentIDs), nil + case commonpb.MsgType_WatchQueryChannels: + log.Warn("legacy WatchQueryChannels type found, ignore") + return "WatchQueryChannels", emptyInt64(), emptyInt64(), nil + case commonpb.MsgType_LoadBalanceSegments: + loadReq := querypb.LoadBalanceRequest{} + err = proto.Unmarshal([]byte(t), &loadReq) + if err != nil { + return errReturn(taskID, "LoadBalanceRequest", err) + } + log.Info("LoadBalanceSegments", zap.String("detail", fmt.Sprintf("+%v", loadReq))) + return "LoadBalanceSegments", emptyInt64(), loadReq.SealedSegmentIDs, nil + case commonpb.MsgType_HandoffSegments: + handoffReq := querypb.HandoffSegmentsRequest{} + err = proto.Unmarshal([]byte(t), &handoffReq) + if err != nil { + return errReturn(taskID, "HandoffSegmentsRequest", err) + } + pids, sids := c.extractQuerySegmentInfos(taskID, handoffReq.SegmentInfos) + log.Info("HandoffSegments", zap.String("detail", fmt.Sprintf("+%v", handoffReq))) + return "HandoffSegments", pids, sids, nil + default: + err = errors.New("inValid msg type when unMarshal task") + log.Error("invalid message task", zap.Int("type", int(header.Base.MsgType)), zap.Error(err)) + return "", emptyInt64(), emptyInt64(), err + } +} + +func (c *mck) handleUserEnter(invalidTasks []int64) { + if len(invalidTasks) > 0 { + fmt.Print("Delete all invalid tasks, [Y/n]:") + deleteAll := "" + fmt.Scanln(&deleteAll) + if deleteAll == "Y" { + for _, invalidTask := range invalidTasks { + if !c.removeTask(invalidTask) { + continue + } + fmt.Println("Delete task id: ", invalidTask) + } + } else { + deleteTask := "" + idMap := int64Map(invalidTasks) + fmt.Println("Enter 'Exit' to end") + for len(idMap) > 0 { + fmt.Print("Enter a invalid task id to delete, [Exit]:") + fmt.Scanln(&deleteTask) + if deleteTask == "Exit" { + return + } + invalidTask, err := strconv.ParseInt(deleteTask, 10, 64) + if err != nil { + fmt.Println("Invalid task id.") + continue + } + if _, ok := idMap[invalidTask]; !ok { + fmt.Println("This is not a invalid task id.") + continue + } + if !c.removeTask(invalidTask) { + continue + } + delete(idMap, invalidTask) + fmt.Println("Delete task id: ", invalidTask) + } + } + } +} diff --git a/cmd/milvus/milvus.go b/cmd/milvus/milvus.go index 58d8677c6f..8d3cfa1d8d 100644 --- a/cmd/milvus/milvus.go +++ b/cmd/milvus/milvus.go @@ -3,33 +3,13 @@ package milvus import ( "flag" "fmt" - "io" - "io/ioutil" - syslog "log" "os" - "path" - "runtime" - "strings" - "syscall" - - "github.com/gofrs/flock" - "github.com/milvus-io/milvus/cmd/roles" - "github.com/milvus-io/milvus/internal/util/paramtable" - "github.com/milvus-io/milvus/internal/util/typeutil" - "go.uber.org/automaxprocs/maxprocs" - - // use auto max procs to set container CPU quota - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/metricsinfo" ) const ( - roleMixture = "mixture" + DryRunCmd = "dry-run" ) -// inject variable at build-time var ( BuildTags = "unknown" BuildTime = "unknown" @@ -37,307 +17,45 @@ var ( GoVersion = "unknown" ) -func printBanner(w io.Writer) { - fmt.Fprintln(w) - fmt.Fprintln(w, " __ _________ _ ____ ______ ") - fmt.Fprintln(w, " / |/ / _/ /| | / / / / / __/ ") - fmt.Fprintln(w, " / /|_/ // // /_| |/ / /_/ /\\ \\ ") - fmt.Fprintln(w, " /_/ /_/___/____/___/\\____/___/ ") - fmt.Fprintln(w) - fmt.Fprintln(w, "Welcome to use Milvus!") - fmt.Fprintln(w, "Version: "+BuildTags) - fmt.Fprintln(w, "Built: "+BuildTime) - fmt.Fprintln(w, "GitCommit: "+GitCommit) - fmt.Fprintln(w, "GoVersion: "+GoVersion) - fmt.Fprintln(w) +type command interface { + execute(args []string, flags *flag.FlagSet) } -func injectVariablesToEnv() { - // inject in need +type dryRun struct{} - var err error +func (c *dryRun) execute(args []string, flags *flag.FlagSet) {} - err = os.Setenv(metricsinfo.GitCommitEnvKey, GitCommit) - if err != nil { - log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitCommitEnvKey), - zap.Error(err)) - } +type defaultCommand struct{} - err = os.Setenv(metricsinfo.GitBuildTagsEnvKey, BuildTags) - if err != nil { - log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitBuildTagsEnvKey), - zap.Error(err)) - } - - err = os.Setenv(metricsinfo.MilvusBuildTimeEnvKey, BuildTime) - if err != nil { - log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusBuildTimeEnvKey), - zap.Error(err)) - } - - err = os.Setenv(metricsinfo.MilvusUsedGoVersion, GoVersion) - if err != nil { - log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusUsedGoVersion), - zap.Error(err)) - } -} - -func getPidFileName(serverType string, alias string) string { - var filename string - if len(alias) != 0 { - filename = fmt.Sprintf("%s-%s.pid", serverType, alias) - } else { - filename = serverType + ".pid" - } - return filename -} - -func createPidFile(w io.Writer, filename string, runtimeDir string) (*flock.Flock, error) { - fileFullName := path.Join(runtimeDir, filename) - - fd, err := os.OpenFile(fileFullName, os.O_CREATE|os.O_RDWR, 0664) - if err != nil { - return nil, fmt.Errorf("file %s is locked, error = %w", filename, err) - } - fmt.Fprintln(w, "open pid file:", fileFullName) - - defer fd.Close() - - fd.Truncate(0) - _, err = fd.WriteString(fmt.Sprintf("%d", os.Getpid())) - if err != nil { - return nil, fmt.Errorf("file %s write fail, error = %w", filename, err) - } - - lock := flock.New(fileFullName) - _, err = lock.TryLock() - if err != nil { - return nil, fmt.Errorf("file %s is locked, error = %w", filename, err) - } - - fmt.Fprintln(w, "lock pid file:", fileFullName) - return lock, nil -} - -func closePidFile(fd *os.File) { - fd.Close() -} - -func removePidFile(lock *flock.Flock) { - filename := lock.Path() - lock.Close() - os.Remove(filename) -} - -func stopPid(filename string, runtimeDir string) error { - var pid int - - fd, err := os.OpenFile(path.Join(runtimeDir, filename), os.O_RDONLY, 0664) - if err != nil { - return err - } - defer closePidFile(fd) - - if _, err = fmt.Fscanf(fd, "%d", &pid); err != nil { - return err - } - - if process, err := os.FindProcess(pid); err == nil { - return process.Signal(syscall.SIGTERM) - } - return nil -} - -func makeRuntimeDir(dir string) error { - perm := os.FileMode(0755) - // os.MkdirAll equal to `mkdir -p` - err := os.MkdirAll(dir, perm) - if err != nil { - // err will be raised only when dir exists and dir is a file instead of a directory. - return fmt.Errorf("create runtime dir %s failed, err: %s", dir, err.Error()) - } - - tmpFile, err := ioutil.TempFile(dir, "tmp") - if err != nil { - return err - } - fileName := tmpFile.Name() - tmpFile.Close() - os.Remove(fileName) - return nil -} - -// simplified print from flag package -func printUsage(w io.Writer, f *flag.Flag) { - s := fmt.Sprintf(" -%s", f.Name) // Two spaces before -; see next two comments. - name, usage := flag.UnquoteUsage(f) - if len(name) > 0 { - s += " " + name - } - // Boolean flags of one ASCII letter are so common we - // treat them specially, putting their usage on the same line. - if len(s) <= 4 { // space, space, '-', 'x'. - s += "\t" - } else { - // Four spaces before the tab triggers good alignment - // for both 4- and 8-space tab stops. - s += "\n \t" - } - s += strings.ReplaceAll(usage, "\n", "\n \t") - - fmt.Fprint(w, s, "\n") -} - -// create runtime folder -func createRuntimeDir(sType string) string { - var writer io.Writer - if sType == typeutil.EmbeddedRole { - writer = io.Discard - } else { - writer = os.Stderr - } - runtimeDir := "/run/milvus" - if runtime.GOOS == "windows" { - runtimeDir = "run" - if err := makeRuntimeDir(runtimeDir); err != nil { - fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir) - os.Exit(-1) - } - } else { - if err := makeRuntimeDir(runtimeDir); err != nil { - fmt.Fprintf(writer, "Set runtime dir at %s failed, set it to /tmp/milvus directory\n", runtimeDir) - runtimeDir = "/tmp/milvus" - if err = makeRuntimeDir(runtimeDir); err != nil { - fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir) - os.Exit(-1) - } - } - } - return runtimeDir +func (c *defaultCommand) execute(args []string, flags *flag.FlagSet) { + fmt.Fprintf(os.Stderr, "unknown command : %s\n", args[1]) + fmt.Fprintln(os.Stdout, usageLine) } func RunMilvus(args []string) { - if len(args) < 3 { - _, _ = fmt.Fprint(os.Stderr, "usage: milvus [command] [server type] [flags]\n") + if len(args) < 2 { + fmt.Fprintln(os.Stderr, usageLine) return } - command := args[1] - serverType := args[2] + cmd := args[1] flags := flag.NewFlagSet(args[0], flag.ExitOnError) - - var svrAlias string - flags.StringVar(&svrAlias, "alias", "", "set alias") - - var enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord bool - flags.BoolVar(&enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator") - flags.BoolVar(&enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator") - flags.BoolVar(&enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator") - flags.BoolVar(&enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator") - - // Discard Milvus welcome logs, init logs and maxprocs logs in embedded Milvus. - if serverType == typeutil.EmbeddedRole { - flags.SetOutput(io.Discard) - // Initialize maxprocs while discarding log. - maxprocs.Set(maxprocs.Logger(nil)) - } else { - // Initialize maxprocs. - maxprocs.Set(maxprocs.Logger(syslog.Printf)) - } flags.Usage = func() { - fmt.Fprintf(flags.Output(), "Usage of %s:\n", args[0]) - switch { - case serverType == roleMixture: - flags.VisitAll(func(f *flag.Flag) { - printUsage(flags.Output(), f) - }) - default: - flags.VisitAll(func(f *flag.Flag) { - if f.Name != "alias" { - return - } - printUsage(flags.Output(), f) - }) - } + fmt.Fprintln(os.Stderr, usageLine) } - if err := flags.Parse(args[3:]); err != nil { - os.Exit(-1) - } - - var local = false - role := roles.MilvusRoles{} - switch serverType { - case typeutil.RootCoordRole: - role.EnableRootCoord = true - case typeutil.ProxyRole: - role.EnableProxy = true - case typeutil.QueryCoordRole: - role.EnableQueryCoord = true - case typeutil.QueryNodeRole: - role.EnableQueryNode = true - case typeutil.DataCoordRole: - role.EnableDataCoord = true - case typeutil.DataNodeRole: - role.EnableDataNode = true - case typeutil.IndexCoordRole: - role.EnableIndexCoord = true - case typeutil.IndexNodeRole: - role.EnableIndexNode = true - case typeutil.StandaloneRole, typeutil.EmbeddedRole: - role.HasMultipleRoles = true - role.EnableRootCoord = true - role.EnableProxy = true - role.EnableQueryCoord = true - role.EnableQueryNode = true - role.EnableDataCoord = true - role.EnableDataNode = true - role.EnableIndexCoord = true - role.EnableIndexNode = true - local = true - case roleMixture: - role.HasMultipleRoles = true - role.EnableRootCoord = enableRootCoord - role.EnableQueryCoord = enableQueryCoord - role.EnableDataCoord = enableDataCoord - role.EnableIndexCoord = enableIndexCoord + var c command + switch cmd { + case RunCmd: + c = &run{} + case StopCmd: + c = &stop{} + case DryRunCmd: + c = &dryRun{} + case MckCmd: + c = &mck{} default: - fmt.Fprintf(os.Stderr, "Unknown server type = %s\n", serverType) - os.Exit(-1) + c = &defaultCommand{} } - // Setup logger in advance for standalone and embedded Milvus. - // Any log from this point on is under control. - if serverType == typeutil.StandaloneRole || serverType == typeutil.EmbeddedRole { - var params paramtable.BaseTable - if serverType == typeutil.EmbeddedRole { - params.GlobalInitWithYaml("embedded-milvus.yaml") - } else { - params.Init() - } - params.SetLogConfig() - params.RoleName = serverType - params.SetLogger(0) - } - - runtimeDir := createRuntimeDir(serverType) - filename := getPidFileName(serverType, svrAlias) - switch command { - case "run": - printBanner(flags.Output()) - injectVariablesToEnv() - lock, err := createPidFile(flags.Output(), filename, runtimeDir) - if err != nil { - panic(err) - } - defer removePidFile(lock) - role.Run(local, svrAlias) - case "stop": - if err := stopPid(filename, runtimeDir); err != nil { - fmt.Fprintf(os.Stderr, "%s\n\n", err.Error()) - } - case "dry-run": - // A dry run does not actually bring up the Milvus instance. - default: - fmt.Fprintf(os.Stderr, "unknown command : %s\n", command) - } + c.execute(args, flags) } diff --git a/cmd/milvus/run.go b/cmd/milvus/run.go new file mode 100644 index 0000000000..b54545f63d --- /dev/null +++ b/cmd/milvus/run.go @@ -0,0 +1,171 @@ +package milvus + +import ( + "flag" + "fmt" + "io" + "os" + + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/paramtable" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + + "github.com/milvus-io/milvus/cmd/roles" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +const ( + RunCmd = "run" + roleMixture = "mixture" +) + +type run struct { + serverType string + // flags + svrAlias string + enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord bool +} + +func (c *run) getHelp() string { + return runLine + "\n" + serverTypeLine +} + +func (c *run) execute(args []string, flags *flag.FlagSet) { + if len(args) < 3 { + fmt.Fprintln(os.Stderr, c.getHelp()) + return + } + flags.Usage = func() { + fmt.Fprintln(os.Stderr, c.getHelp()) + } + c.serverType = args[2] + c.formatFlags(args, flags) + + var local = false + role := roles.MilvusRoles{} + switch c.serverType { + case typeutil.RootCoordRole: + role.EnableRootCoord = true + case typeutil.ProxyRole: + role.EnableProxy = true + case typeutil.QueryCoordRole: + role.EnableQueryCoord = true + case typeutil.QueryNodeRole: + role.EnableQueryNode = true + case typeutil.DataCoordRole: + role.EnableDataCoord = true + case typeutil.DataNodeRole: + role.EnableDataNode = true + case typeutil.IndexCoordRole: + role.EnableIndexCoord = true + case typeutil.IndexNodeRole: + role.EnableIndexNode = true + case typeutil.StandaloneRole, typeutil.EmbeddedRole: + role.HasMultipleRoles = true + role.EnableRootCoord = true + role.EnableProxy = true + role.EnableQueryCoord = true + role.EnableQueryNode = true + role.EnableDataCoord = true + role.EnableDataNode = true + role.EnableIndexCoord = true + role.EnableIndexNode = true + local = true + case roleMixture: + role.HasMultipleRoles = true + role.EnableRootCoord = c.enableRootCoord + role.EnableQueryCoord = c.enableQueryCoord + role.EnableDataCoord = c.enableDataCoord + role.EnableIndexCoord = c.enableIndexCoord + default: + fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", c.serverType, c.getHelp()) + os.Exit(-1) + } + + // Setup logger in advance for standalone and embedded Milvus. + // Any log from this point on is under control. + if c.serverType == typeutil.StandaloneRole || c.serverType == typeutil.EmbeddedRole { + var params paramtable.BaseTable + if c.serverType == typeutil.EmbeddedRole { + params.GlobalInitWithYaml("embedded-milvus.yaml") + } else { + params.Init() + } + params.SetLogConfig() + params.RoleName = c.serverType + params.SetLogger(0) + } + + runtimeDir := createRuntimeDir(c.serverType) + filename := getPidFileName(c.serverType, c.svrAlias) + + c.printBanner(flags.Output()) + c.injectVariablesToEnv() + lock, err := createPidFile(flags.Output(), filename, runtimeDir) + if err != nil { + panic(err) + } + defer removePidFile(lock) + role.Run(local, c.svrAlias) +} + +func (c *run) formatFlags(args []string, flags *flag.FlagSet) { + flags.StringVar(&c.svrAlias, "alias", "", "set alias") + + flags.BoolVar(&c.enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator") + flags.BoolVar(&c.enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator") + flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator") + flags.BoolVar(&c.enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator") + + initMaxprocs(c.serverType, flags) + if err := flags.Parse(args[3:]); err != nil { + os.Exit(-1) + } +} + +func (c *run) printBanner(w io.Writer) { + fmt.Fprintln(w) + fmt.Fprintln(w, " __ _________ _ ____ ______ ") + fmt.Fprintln(w, " / |/ / _/ /| | / / / / / __/ ") + fmt.Fprintln(w, " / /|_/ // // /_| |/ / /_/ /\\ \\ ") + fmt.Fprintln(w, " /_/ /_/___/____/___/\\____/___/ ") + fmt.Fprintln(w) + fmt.Fprintln(w, "Welcome to use Milvus!") + fmt.Fprintln(w, "Version: "+BuildTags) + fmt.Fprintln(w, "Built: "+BuildTime) + fmt.Fprintln(w, "GitCommit: "+GitCommit) + fmt.Fprintln(w, "GoVersion: "+GoVersion) + fmt.Fprintln(w) +} + +func (c *run) injectVariablesToEnv() { + // inject in need + + var err error + + err = os.Setenv(metricsinfo.GitCommitEnvKey, GitCommit) + if err != nil { + log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitCommitEnvKey), + zap.Error(err)) + } + + err = os.Setenv(metricsinfo.GitBuildTagsEnvKey, BuildTags) + if err != nil { + log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitBuildTagsEnvKey), + zap.Error(err)) + } + + err = os.Setenv(metricsinfo.MilvusBuildTimeEnvKey, BuildTime) + if err != nil { + log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusBuildTimeEnvKey), + zap.Error(err)) + } + + err = os.Setenv(metricsinfo.MilvusUsedGoVersion, GoVersion) + if err != nil { + log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusUsedGoVersion), + zap.Error(err)) + } +} diff --git a/cmd/milvus/stop.go b/cmd/milvus/stop.go new file mode 100644 index 0000000000..e025b4a3cd --- /dev/null +++ b/cmd/milvus/stop.go @@ -0,0 +1,73 @@ +package milvus + +import ( + "flag" + "fmt" + "os" + "path" + "syscall" + + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +const ( + StopCmd = "stop" +) + +type stop struct { + serverType string + svrAlias string +} + +func (c *stop) getHelp() string { + return stopLine + "\n" + serverTypeLine +} + +func (c *stop) execute(args []string, flags *flag.FlagSet) { + if len(args) < 3 { + fmt.Fprintln(os.Stderr, c.getHelp()) + return + } + flags.Usage = func() { + fmt.Fprintln(os.Stderr, c.getHelp()) + } + c.serverType = args[2] + if _, ok := typeutil.ServerTypeMap()[c.serverType]; !ok { + fmt.Fprintf(os.Stderr, "Unknown server type = %s\n", c.serverType) + os.Exit(-1) + } + c.formatFlags(args, flags) + + runtimeDir := createRuntimeDir(c.serverType) + filename := getPidFileName(c.serverType, c.svrAlias) + if err := c.stopPid(filename, runtimeDir); err != nil { + fmt.Fprintf(os.Stderr, "%s\n\n", err.Error()) + } +} + +func (c *stop) formatFlags(args []string, flags *flag.FlagSet) { + flags.StringVar(&(c.svrAlias), "alias", "", "set alias") + initMaxprocs(c.serverType, flags) + if err := flags.Parse(args[3:]); err != nil { + os.Exit(-1) + } +} + +func (c *stop) stopPid(filename string, runtimeDir string) error { + var pid int + + fd, err := os.OpenFile(path.Join(runtimeDir, filename), os.O_RDONLY, 0664) + if err != nil { + return err + } + defer closePidFile(fd) + + if _, err = fmt.Fscanf(fd, "%d", &pid); err != nil { + return err + } + + if process, err := os.FindProcess(pid); err == nil { + return process.Signal(syscall.SIGTERM) + } + return nil +} diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go new file mode 100644 index 0000000000..a7a901e392 --- /dev/null +++ b/cmd/milvus/util.go @@ -0,0 +1,124 @@ +package milvus + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + syslog "log" + "os" + "path" + "runtime" + + "github.com/gofrs/flock" + + "go.uber.org/automaxprocs/maxprocs" + + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +func makeRuntimeDir(dir string) error { + perm := os.FileMode(0755) + // os.MkdirAll equal to `mkdir -p` + err := os.MkdirAll(dir, perm) + if err != nil { + // err will be raised only when dir exists and dir is a file instead of a directory. + return fmt.Errorf("create runtime dir %s failed, err: %s", dir, err.Error()) + } + + tmpFile, err := ioutil.TempFile(dir, "tmp") + if err != nil { + return err + } + fileName := tmpFile.Name() + tmpFile.Close() + os.Remove(fileName) + return nil +} + +// create runtime folder +func createRuntimeDir(sType string) string { + var writer io.Writer + if sType == typeutil.EmbeddedRole { + writer = io.Discard + } else { + writer = os.Stderr + } + runtimeDir := "/run/milvus" + if runtime.GOOS == "windows" { + runtimeDir = "run" + if err := makeRuntimeDir(runtimeDir); err != nil { + fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir) + os.Exit(-1) + } + } else { + if err := makeRuntimeDir(runtimeDir); err != nil { + fmt.Fprintf(writer, "Set runtime dir at %s failed, set it to /tmp/milvus directory\n", runtimeDir) + runtimeDir = "/tmp/milvus" + if err = makeRuntimeDir(runtimeDir); err != nil { + fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir) + os.Exit(-1) + } + } + } + return runtimeDir +} + +// Initialize maxprocs +func initMaxprocs(serverType string, flags *flag.FlagSet) { + if serverType == typeutil.EmbeddedRole { + flags.SetOutput(io.Discard) + // Initialize maxprocs while discarding log. + maxprocs.Set(maxprocs.Logger(nil)) + } else { + // Initialize maxprocs. + maxprocs.Set(maxprocs.Logger(syslog.Printf)) + } +} + +func createPidFile(w io.Writer, filename string, runtimeDir string) (*flock.Flock, error) { + fileFullName := path.Join(runtimeDir, filename) + + fd, err := os.OpenFile(fileFullName, os.O_CREATE|os.O_RDWR, 0664) + if err != nil { + return nil, fmt.Errorf("file %s is locked, error = %w", filename, err) + } + fmt.Fprintln(w, "open pid file:", fileFullName) + + defer fd.Close() + + fd.Truncate(0) + _, err = fd.WriteString(fmt.Sprintf("%d", os.Getpid())) + if err != nil { + return nil, fmt.Errorf("file %s write fail, error = %w", filename, err) + } + + lock := flock.New(fileFullName) + _, err = lock.TryLock() + if err != nil { + return nil, fmt.Errorf("file %s is locked, error = %w", filename, err) + } + + fmt.Fprintln(w, "lock pid file:", fileFullName) + return lock, nil +} + +func getPidFileName(serverType string, alias string) string { + var filename string + if len(alias) != 0 { + filename = fmt.Sprintf("%s-%s.pid", serverType, alias) + } else { + filename = serverType + ".pid" + } + return filename +} + +func closePidFile(fd *os.File) { + fd.Close() +} + +func removePidFile(lock *flock.Flock) { + filename := lock.Path() + lock.Close() + os.Remove(filename) +} diff --git a/internal/util/typeutil/type.go b/internal/util/typeutil/type.go index 1c4348a543..2b6d9a5ee7 100644 --- a/internal/util/typeutil/type.go +++ b/internal/util/typeutil/type.go @@ -47,3 +47,27 @@ const ( // DataNodeRole is a constant represent DataNode DataNodeRole = "datanode" ) + +func ServerTypeMap() map[string]interface{} { + return map[string]interface{}{ + EmbeddedRole: nil, + StandaloneRole: nil, + RootCoordRole: nil, + ProxyRole: nil, + QueryCoordRole: nil, + QueryNodeRole: nil, + IndexCoordRole: nil, + IndexNodeRole: nil, + DataCoordRole: nil, + DataNodeRole: nil, + } +} + +func ServerTypeList() []string { + serverTypeMap := ServerTypeMap() + types := make([]string, 0, len(serverTypeMap)) + for key := range serverTypeMap { + types = append(types, key) + } + return types +}