Add the consistency check tool (#17136)

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2022-06-21 10:28:12 +08:00 committed by GitHub
parent 32a3ed0791
commit eb1f0bc805
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1191 additions and 307 deletions

66
cmd/milvus/help.go Normal file
View File

@ -0,0 +1,66 @@
package milvus
import (
"fmt"
"strings"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var (
usageLine = fmt.Sprintf("Usage:\n"+
"%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine)
serverTypeLine = `
[server type]
` + strings.Join(typeutil.ServerTypeList(), "\n\t") + `
mixture
`
runLine = `
milvus run [server type] [flags]
Start a Milvus Server.
Tips: Only the server type is 'mixture', flags about starting server can be used.
[flags]
-rootcoord 'true'
Start the rootcoord server.
-querycoord 'true'
Start the querycoord server.
-indexcoord 'true'
Start the indexcoord server.
-datacoord 'true'
Start the datacoord server.
-alias ''
Set alias
`
stopLine = `
milvus stop [server type] [flags]
Stop a Milvus Server.
[flags]
-alias ''
Set alias
`
mckLine = `
milvus mck run [flags]
Milvus data consistency check.
Tips: The flags are optional.
[flags]
-etcdIp ''
Ip to connect the ectd server.
-etcdRootPath ''
The root path of operating the etcd data.
-minioAddress ''
Address to connect the minio server.
-minioUsername ''
The username to login the minio server.
-minioPassword ''
The password to login the minio server.
-minioUseSSL 'false'
Whether to use the ssl to connect the minio server.
-minioBucketName ''
The bucket to operate the data in it
milvus mck cleanTrash [flags]
Clean the back inconsistent data
Tips: The flags is the same as its of the 'milvus mck [flags]'
`
)

708
cmd/milvus/mck.go Normal file
View File

@ -0,0 +1,708 @@
package milvus
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/milvus-io/milvus/internal/storage"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/util/logutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/golang/protobuf/proto"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/etcd"
"go.uber.org/zap"
)
const (
MckCmd = "mck"
MckTypeRun = "run"
MckTypeClean = "cleanTrash"
segmentPrefix = "datacoord-meta/s"
collectionPrefix = "snapshots/root-coord/collection"
triggerTaskPrefix = "queryCoord-triggerTask"
activeTaskPrefix = "queryCoord-activeTask"
taskInfoPrefix = "queryCoord-taskInfo"
MckTrash = "mck-trash"
)
type mck struct {
params paramtable.GrpcServerConfig
taskKeyMap map[int64]string
taskNameMap map[int64]string
allTaskInfo map[string]string
partitionIDToTasks map[int64][]int64
segmentIDToTasks map[int64][]int64
taskIDToInvalidPath map[int64][]string
segmentIDMap map[int64]*datapb.SegmentInfo
partitionIDMap map[int64]struct{}
etcdKV *etcdkv.EtcdKV
minioChunkManager storage.ChunkManager
etcdIP string
ectdRootPath string
minioAddress string
minioUsername string
minioPassword string
minioUseSSL string
minioBucketName string
flagStartIndex int
}
func (c *mck) execute(args []string, flags *flag.FlagSet) {
if len(args) < 3 {
fmt.Fprintln(os.Stderr, mckLine)
return
}
c.initParam()
flags.Usage = func() {
fmt.Fprintln(os.Stderr, mckLine)
}
logutil.SetupLogger(&log.Config{
Level: "info",
File: log.FileLogConfig{
Filename: fmt.Sprintf("mck-%s.log", time.Now().Format("20060102150405.99")),
},
})
mckType := args[2]
c.flagStartIndex = 2
isClean := mckType == MckTypeClean
if isClean {
c.flagStartIndex = 3
}
c.formatFlags(args, flags)
c.connectEctd()
switch mckType {
case MckTypeRun:
c.run()
case MckTypeClean:
c.cleanTrash()
return
default:
fmt.Fprintln(os.Stderr, mckLine)
return
}
}
func (c *mck) run() {
c.connectMinio()
_, values, err := c.etcdKV.LoadWithPrefix(segmentPrefix)
if err != nil {
log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err))
}
for _, value := range values {
info := &datapb.SegmentInfo{}
err = proto.Unmarshal([]byte(value), info)
if err != nil {
log.Warn("fail to unmarshal the segment info", zap.Error(err))
continue
}
c.segmentIDMap[info.ID] = info
}
_, values, err = c.etcdKV.LoadWithPrefix(collectionPrefix)
if err != nil {
log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err))
}
for _, value := range values {
collInfo := pb.CollectionInfo{}
err = proto.Unmarshal([]byte(value), &collInfo)
if err != nil {
log.Warn("fail to unmarshal the collection info", zap.Error(err))
continue
}
for _, id := range collInfo.PartitionIDs {
c.partitionIDMap[id] = struct{}{}
}
}
// log Segment ids and partition ids
var ids []int64
for id := range c.segmentIDMap {
ids = append(ids, id)
}
log.Info("segment ids", zap.Int64s("ids", ids))
ids = []int64{}
for id := range c.partitionIDMap {
ids = append(ids, id)
}
log.Info("partition ids", zap.Int64s("ids", ids))
keys, values, err := c.etcdKV.LoadWithPrefix(triggerTaskPrefix)
if err != nil {
log.Fatal("failed to list the trigger task info", zap.Error(err))
}
c.extractTask(triggerTaskPrefix, keys, values)
keys, values, err = c.etcdKV.LoadWithPrefix(activeTaskPrefix)
if err != nil {
log.Fatal("failed to list the active task info", zap.Error(err))
}
c.extractTask(activeTaskPrefix, keys, values)
// log all tasks
if len(c.taskNameMap) > 0 {
log.Info("all tasks")
for taskID, taskName := range c.taskNameMap {
log.Info("task info", zap.String("name", taskName), zap.Int64("id", taskID))
}
}
// collect invalid tasks
invalidTasks := c.collectInvalidTask()
if len(invalidTasks) > 0 {
line()
fmt.Println("All invalid tasks")
for _, invalidTask := range invalidTasks {
line2()
fmt.Printf("TaskID: %d\t%s\n", invalidTask, c.taskNameMap[invalidTask])
}
}
c.handleUserEnter(invalidTasks)
}
func (c *mck) initParam() {
c.taskKeyMap = make(map[int64]string)
c.taskNameMap = make(map[int64]string)
c.allTaskInfo = make(map[string]string)
c.partitionIDToTasks = make(map[int64][]int64)
c.segmentIDToTasks = make(map[int64][]int64)
c.taskIDToInvalidPath = make(map[int64][]string)
c.segmentIDMap = make(map[int64]*datapb.SegmentInfo)
c.partitionIDMap = make(map[int64]struct{})
}
func (c *mck) formatFlags(args []string, flags *flag.FlagSet) {
flags.StringVar(&c.etcdIP, "etcdIp", "", "Etcd endpoint to connect")
flags.StringVar(&c.ectdRootPath, "etcdRootPath", "", "Etcd root path")
flags.StringVar(&c.minioAddress, "minioAddress", "", "Minio endpoint to connect")
flags.StringVar(&c.minioUsername, "minioUsername", "", "Minio username")
flags.StringVar(&c.minioPassword, "minioPassword", "", "Minio password")
flags.StringVar(&c.minioUseSSL, "minioUseSSL", "", "Minio to use ssl")
flags.StringVar(&c.minioBucketName, "minioBucketName", "", "Minio bucket name")
if err := flags.Parse(os.Args[2:]); err != nil {
log.Fatal("failed to parse flags", zap.Error(err))
}
log.Info("args", zap.Strings("args", args))
}
func (c *mck) connectEctd() {
c.params.Init()
var etcdCli *clientv3.Client
var err error
if c.etcdIP != "" {
etcdCli, err = etcd.GetRemoteEtcdClient([]string{c.etcdIP})
} else {
etcdCli, err = etcd.GetEtcdClient(&c.params.EtcdCfg)
}
if err != nil {
log.Fatal("failed to connect to etcd", zap.Error(err))
}
rootPath := getConfigValue(c.ectdRootPath, c.params.EtcdCfg.MetaRootPath, "ectd_root_path")
c.etcdKV = etcdkv.NewEtcdKV(etcdCli, rootPath)
log.Info("Etcd root path", zap.String("root_path", rootPath))
}
func (c *mck) connectMinio() {
useSSL := c.params.MinioCfg.UseSSL
if c.minioUseSSL == "true" || c.minioUseSSL == "false" {
minioUseSSL, err := strconv.ParseBool(c.minioUseSSL)
if err != nil {
log.Panic("fail to parse the 'minioUseSSL' string to the bool value", zap.String("minioUseSSL", c.minioUseSSL), zap.Error(err))
}
useSSL = minioUseSSL
}
chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio",
storage.RootPath(c.params.LocalStorageCfg.Path),
storage.Address(getConfigValue(c.minioAddress, c.params.MinioCfg.Address, "minio_address")),
storage.AccessKeyID(getConfigValue(c.minioUsername, c.params.MinioCfg.AccessKeyID, "minio_username")),
storage.SecretAccessKeyID(getConfigValue(c.minioPassword, c.params.MinioCfg.SecretAccessKey, "minio_password")),
storage.UseSSL(useSSL),
storage.BucketName(getConfigValue(c.minioBucketName, c.params.MinioCfg.BucketName, "minio_bucket_name")),
storage.CreateBucket(true))
var err error
c.minioChunkManager, err = chunkManagerFactory.NewVectorStorageChunkManager(context.Background())
if err != nil {
log.Fatal("failed to connect to etcd", zap.Error(err))
}
}
func getConfigValue(a string, b string, name string) string {
if a != "" {
return a
}
if b != "" {
return b
}
log.Panic(fmt.Sprintf("the config '%s' is empty", name))
return ""
}
func (c *mck) cleanTrash() {
keys, _, err := c.etcdKV.LoadWithPrefix(MckTrash)
if err != nil {
log.Error("failed to load backup info", zap.Error(err))
return
}
if len(keys) == 0 {
fmt.Println("Empty backup task info")
return
}
fmt.Println(strings.Join(keys, "\n"))
fmt.Print("Delete all backup infos, [Y/n]:")
deleteAll := ""
fmt.Scanln(&deleteAll)
if deleteAll == "Y" {
err = c.etcdKV.RemoveWithPrefix(MckTrash)
if err != nil {
log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err))
return
}
}
}
func (c *mck) collectInvalidTask() []int64 {
var invalidTasks []int64
invalidTasks = append(invalidTasks, c.collectInvalidTaskForPartition()...)
invalidTasks = append(invalidTasks, c.collectInvalidTaskForSegment()...)
invalidTasks = append(invalidTasks, c.collectInvalidTaskForMinioPath()...)
invalidTasks = removeRepeatElement(invalidTasks)
return invalidTasks
}
// collect invalid tasks related with invalid partitions
func (c *mck) collectInvalidTaskForPartition() []int64 {
var invalidTasksOfPartition []int64
var buffer bytes.Buffer
for id, tasks := range c.partitionIDToTasks {
if _, ok := c.partitionIDMap[id]; !ok {
invalidTasksOfPartition = append(invalidTasksOfPartition, tasks...)
buffer.WriteString(fmt.Sprintf("Partition ID: %d\n", id))
buffer.WriteString(fmt.Sprintf("Tasks: %v\n", tasks))
}
}
invalidTasksOfPartition = removeRepeatElement(invalidTasksOfPartition)
if len(invalidTasksOfPartition) > 0 {
line()
fmt.Println("Invalid Partitions")
fmt.Println(buffer.String())
}
return invalidTasksOfPartition
}
// collect invalid tasks related with invalid segments
func (c *mck) collectInvalidTaskForSegment() []int64 {
var invalidTasksOfSegment []int64
var buffer bytes.Buffer
if len(c.segmentIDToTasks) > 0 {
for id, tasks := range c.segmentIDToTasks {
if _, ok := c.segmentIDMap[id]; !ok {
invalidTasksOfSegment = append(invalidTasksOfSegment, tasks...)
buffer.WriteString(fmt.Sprintf("Segment ID: %d\n", id))
buffer.WriteString(fmt.Sprintf("Tasks: %v\n", tasks))
}
}
invalidTasksOfSegment = removeRepeatElement(invalidTasksOfSegment)
}
if len(invalidTasksOfSegment) > 0 {
line()
fmt.Println("Invalid Segments")
fmt.Println(buffer.String())
}
return invalidTasksOfSegment
}
// collect invalid tasks related with incorrect file paths in minio
func (c *mck) collectInvalidTaskForMinioPath() []int64 {
var invalidTasksOfPath []int64
if len(c.taskIDToInvalidPath) > 0 {
line()
fmt.Println("Invalid Paths")
for id, paths := range c.taskIDToInvalidPath {
fmt.Printf("Task ID: %d\n", id)
for _, path := range paths {
fmt.Printf("\t%s\n", path)
}
invalidTasksOfPath = append(invalidTasksOfPath, id)
}
invalidTasksOfPath = removeRepeatElement(invalidTasksOfPath)
}
return invalidTasksOfPath
}
func redPrint(msg string) {
fmt.Printf("\033[0;31;40m%s\033[0m", msg)
}
func line() {
fmt.Println("================================================================================")
}
func line2() {
fmt.Println("--------------------------------------------------------------------------------")
}
func getTrashKey(taskType, key string) string {
return fmt.Sprintf("%s/%s/%s", MckTrash, taskType, key)
}
func (c *mck) extractTask(prefix string, keys []string, values []string) {
for i := range keys {
taskID, err := strconv.ParseInt(filepath.Base(keys[i]), 10, 64)
if err != nil {
log.Warn("failed to parse int", zap.String("key", filepath.Base(keys[i])), zap.String("tasks", filepath.Base(keys[i])))
continue
}
taskName, pids, sids, err := c.unmarshalTask(taskID, values[i])
if err != nil {
log.Warn("failed to unmarshal task", zap.Int64("task_id", taskID))
continue
}
for _, pid := range pids {
c.partitionIDToTasks[pid] = append(c.partitionIDToTasks[pid], taskID)
}
for _, sid := range sids {
c.segmentIDToTasks[sid] = append(c.segmentIDToTasks[sid], taskID)
}
c.taskKeyMap[taskID] = fmt.Sprintf("%s/%d", prefix, taskID)
c.allTaskInfo[c.taskKeyMap[taskID]] = values[i]
c.taskNameMap[taskID] = taskName
}
}
func (c *mck) removeTask(invalidTask int64) bool {
taskType := c.taskNameMap[invalidTask]
key := c.taskKeyMap[invalidTask]
err := c.etcdKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key])
if err != nil {
log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.etcdKV.Remove(key)
if err != nil {
log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask)
taskInfo, err := c.etcdKV.Load(key)
if err != nil {
log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
err = c.etcdKV.Save(getTrashKey(taskType, key), taskInfo)
if err != nil {
log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false
}
fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.etcdKV.Remove(key)
if err != nil {
log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err))
}
return true
}
func emptyInt64() []int64 {
return []int64{}
}
func errReturn(taskID int64, pbName string, err error) (string, []int64, []int64, error) {
return "", emptyInt64(), emptyInt64(), fmt.Errorf("task id: %d, failed to unmarshal %s, err %s ", taskID, pbName, err.Error())
}
func int64Map(ids []int64) map[int64]interface{} {
idMap := make(map[int64]interface{})
for _, id := range ids {
idMap[id] = nil
}
return idMap
}
func removeRepeatElement(ids []int64) []int64 {
idMap := int64Map(ids)
elements := make([]int64, 0, len(idMap))
for k := range idMap {
elements = append(elements, k)
}
return elements
}
func (c *mck) extractDataSegmentInfos(taskID int64, infos []*datapb.SegmentInfo) ([]int64, []int64) {
var partitionIDs []int64
var segmentIDs []int64
for _, info := range infos {
partitionIDs = append(partitionIDs, info.PartitionID)
segmentIDs = append(segmentIDs, info.ID)
c.extractFieldBinlog(taskID, info.Binlogs)
c.extractFieldBinlog(taskID, info.Statslogs)
c.extractFieldBinlog(taskID, info.Deltalogs)
}
return partitionIDs, segmentIDs
}
func (c *mck) extractQuerySegmentInfos(taskID int64, infos []*querypb.SegmentInfo) ([]int64, []int64) {
var partitionIDs []int64
var segmentIDs []int64
for _, info := range infos {
partitionIDs = append(partitionIDs, info.PartitionID)
segmentIDs = append(segmentIDs, info.SegmentID)
c.extractVecFieldIndexInfo(taskID, info.IndexInfos)
}
return partitionIDs, segmentIDs
}
func (c *mck) extractVchannelInfo(taskID int64, infos []*datapb.VchannelInfo) ([]int64, []int64) {
var partitionIDs []int64
var segmentIDs []int64
for _, info := range infos {
pids, sids := c.extractDataSegmentInfos(taskID, info.DroppedSegments)
partitionIDs = append(partitionIDs, pids...)
segmentIDs = append(segmentIDs, sids...)
pids, sids = c.extractDataSegmentInfos(taskID, info.FlushedSegments)
partitionIDs = append(partitionIDs, pids...)
segmentIDs = append(segmentIDs, sids...)
pids, sids = c.extractDataSegmentInfos(taskID, info.UnflushedSegments)
partitionIDs = append(partitionIDs, pids...)
segmentIDs = append(segmentIDs, sids...)
}
return partitionIDs, segmentIDs
}
func (c *mck) extractFieldBinlog(taskID int64, fieldBinlogList []*datapb.FieldBinlog) {
for _, fieldBinlog := range fieldBinlogList {
for _, binlog := range fieldBinlog.Binlogs {
ok, _ := c.minioChunkManager.Exist(binlog.LogPath)
if !ok {
c.taskIDToInvalidPath[taskID] = append(c.taskIDToInvalidPath[taskID], binlog.LogPath)
}
}
}
}
func (c *mck) extractVecFieldIndexInfo(taskID int64, infos []*querypb.FieldIndexInfo) {
for _, info := range infos {
for _, indexPath := range info.IndexFilePaths {
ok, _ := c.minioChunkManager.Exist(indexPath)
if !ok {
c.taskIDToInvalidPath[taskID] = append(c.taskIDToInvalidPath[taskID], indexPath)
}
}
}
}
// return partitionIDs,segmentIDs,error
func (c *mck) unmarshalTask(taskID int64, t string) (string, []int64, []int64, error) {
header := commonpb.MsgHeader{}
err := proto.Unmarshal([]byte(t), &header)
if err != nil {
return errReturn(taskID, "MsgHeader", err)
}
switch header.Base.MsgType {
case commonpb.MsgType_LoadCollection:
loadReq := querypb.LoadCollectionRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "LoadCollectionRequest", err)
}
log.Info("LoadCollection", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "LoadCollection", emptyInt64(), emptyInt64(), nil
case commonpb.MsgType_LoadPartitions:
loadReq := querypb.LoadPartitionsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "LoadPartitionsRequest", err)
}
log.Info("LoadPartitions", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "LoadPartitions", loadReq.PartitionIDs, emptyInt64(), nil
case commonpb.MsgType_ReleaseCollection:
loadReq := querypb.ReleaseCollectionRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "ReleaseCollectionRequest", err)
}
log.Info("ReleaseCollection", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "ReleaseCollection", emptyInt64(), emptyInt64(), nil
case commonpb.MsgType_ReleasePartitions:
loadReq := querypb.ReleasePartitionsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "ReleasePartitionsRequest", err)
}
log.Info("ReleasePartitions", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "ReleasePartitions", loadReq.PartitionIDs, emptyInt64(), nil
case commonpb.MsgType_LoadSegments:
loadReq := querypb.LoadSegmentsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "LoadSegmentsRequest", err)
}
fmt.Printf("LoadSegments, task-id: %d, %+v\n", taskID, loadReq)
var partitionIDs []int64
var segmentIDs []int64
if loadReq.LoadMeta != nil {
partitionIDs = append(partitionIDs, loadReq.LoadMeta.PartitionIDs...)
}
for _, info := range loadReq.Infos {
partitionIDs = append(partitionIDs, info.PartitionID)
segmentIDs = append(segmentIDs, info.SegmentID)
c.extractFieldBinlog(taskID, info.BinlogPaths)
c.extractFieldBinlog(taskID, info.Statslogs)
c.extractFieldBinlog(taskID, info.Deltalogs)
c.extractVecFieldIndexInfo(taskID, info.IndexInfos)
}
log.Info("LoadSegments", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "LoadSegments", removeRepeatElement(partitionIDs), removeRepeatElement(segmentIDs), nil
case commonpb.MsgType_ReleaseSegments:
loadReq := querypb.ReleaseSegmentsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "ReleaseSegmentsRequest", err)
}
log.Info("ReleaseSegments", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "ReleaseSegments", loadReq.PartitionIDs, loadReq.SegmentIDs, nil
case commonpb.MsgType_WatchDmChannels:
loadReq := querypb.WatchDmChannelsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "WatchDmChannelsRequest", err)
}
var partitionIDs []int64
var segmentIDs []int64
if loadReq.LoadMeta != nil {
partitionIDs = append(partitionIDs, loadReq.LoadMeta.PartitionIDs...)
}
pids, sids := c.extractVchannelInfo(taskID, loadReq.Infos)
partitionIDs = append(partitionIDs, pids...)
segmentIDs = append(segmentIDs, sids...)
pids, sids = c.extractDataSegmentInfos(taskID, loadReq.ExcludeInfos)
partitionIDs = append(partitionIDs, pids...)
segmentIDs = append(segmentIDs, sids...)
log.Info("WatchDmChannels", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "WatchDmChannels", removeRepeatElement(partitionIDs), removeRepeatElement(segmentIDs), nil
case commonpb.MsgType_WatchDeltaChannels:
loadReq := querypb.WatchDeltaChannelsRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "WatchDeltaChannelsRequest", err)
}
var partitionIDs []int64
var segmentIDs []int64
if loadReq.LoadMeta != nil {
partitionIDs = append(partitionIDs, loadReq.LoadMeta.PartitionIDs...)
}
pids, sids := c.extractVchannelInfo(taskID, loadReq.Infos)
partitionIDs = append(partitionIDs, pids...)
segmentIDs = append(segmentIDs, sids...)
log.Info("WatchDeltaChannels", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "WatchDeltaChannels", removeRepeatElement(partitionIDs), removeRepeatElement(segmentIDs), nil
case commonpb.MsgType_WatchQueryChannels:
log.Warn("legacy WatchQueryChannels type found, ignore")
return "WatchQueryChannels", emptyInt64(), emptyInt64(), nil
case commonpb.MsgType_LoadBalanceSegments:
loadReq := querypb.LoadBalanceRequest{}
err = proto.Unmarshal([]byte(t), &loadReq)
if err != nil {
return errReturn(taskID, "LoadBalanceRequest", err)
}
log.Info("LoadBalanceSegments", zap.String("detail", fmt.Sprintf("+%v", loadReq)))
return "LoadBalanceSegments", emptyInt64(), loadReq.SealedSegmentIDs, nil
case commonpb.MsgType_HandoffSegments:
handoffReq := querypb.HandoffSegmentsRequest{}
err = proto.Unmarshal([]byte(t), &handoffReq)
if err != nil {
return errReturn(taskID, "HandoffSegmentsRequest", err)
}
pids, sids := c.extractQuerySegmentInfos(taskID, handoffReq.SegmentInfos)
log.Info("HandoffSegments", zap.String("detail", fmt.Sprintf("+%v", handoffReq)))
return "HandoffSegments", pids, sids, nil
default:
err = errors.New("inValid msg type when unMarshal task")
log.Error("invalid message task", zap.Int("type", int(header.Base.MsgType)), zap.Error(err))
return "", emptyInt64(), emptyInt64(), err
}
}
func (c *mck) handleUserEnter(invalidTasks []int64) {
if len(invalidTasks) > 0 {
fmt.Print("Delete all invalid tasks, [Y/n]:")
deleteAll := ""
fmt.Scanln(&deleteAll)
if deleteAll == "Y" {
for _, invalidTask := range invalidTasks {
if !c.removeTask(invalidTask) {
continue
}
fmt.Println("Delete task id: ", invalidTask)
}
} else {
deleteTask := ""
idMap := int64Map(invalidTasks)
fmt.Println("Enter 'Exit' to end")
for len(idMap) > 0 {
fmt.Print("Enter a invalid task id to delete, [Exit]:")
fmt.Scanln(&deleteTask)
if deleteTask == "Exit" {
return
}
invalidTask, err := strconv.ParseInt(deleteTask, 10, 64)
if err != nil {
fmt.Println("Invalid task id.")
continue
}
if _, ok := idMap[invalidTask]; !ok {
fmt.Println("This is not a invalid task id.")
continue
}
if !c.removeTask(invalidTask) {
continue
}
delete(idMap, invalidTask)
fmt.Println("Delete task id: ", invalidTask)
}
}
}
}

View File

@ -3,33 +3,13 @@ package milvus
import (
"flag"
"fmt"
"io"
"io/ioutil"
syslog "log"
"os"
"path"
"runtime"
"strings"
"syscall"
"github.com/gofrs/flock"
"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/automaxprocs/maxprocs"
// use auto max procs to set container CPU quota
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
)
const (
roleMixture = "mixture"
DryRunCmd = "dry-run"
)
// inject variable at build-time
var (
BuildTags = "unknown"
BuildTime = "unknown"
@ -37,307 +17,45 @@ var (
GoVersion = "unknown"
)
func printBanner(w io.Writer) {
fmt.Fprintln(w)
fmt.Fprintln(w, " __ _________ _ ____ ______ ")
fmt.Fprintln(w, " / |/ / _/ /| | / / / / / __/ ")
fmt.Fprintln(w, " / /|_/ // // /_| |/ / /_/ /\\ \\ ")
fmt.Fprintln(w, " /_/ /_/___/____/___/\\____/___/ ")
fmt.Fprintln(w)
fmt.Fprintln(w, "Welcome to use Milvus!")
fmt.Fprintln(w, "Version: "+BuildTags)
fmt.Fprintln(w, "Built: "+BuildTime)
fmt.Fprintln(w, "GitCommit: "+GitCommit)
fmt.Fprintln(w, "GoVersion: "+GoVersion)
fmt.Fprintln(w)
type command interface {
execute(args []string, flags *flag.FlagSet)
}
func injectVariablesToEnv() {
// inject in need
type dryRun struct{}
var err error
func (c *dryRun) execute(args []string, flags *flag.FlagSet) {}
err = os.Setenv(metricsinfo.GitCommitEnvKey, GitCommit)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitCommitEnvKey),
zap.Error(err))
}
type defaultCommand struct{}
err = os.Setenv(metricsinfo.GitBuildTagsEnvKey, BuildTags)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitBuildTagsEnvKey),
zap.Error(err))
}
err = os.Setenv(metricsinfo.MilvusBuildTimeEnvKey, BuildTime)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusBuildTimeEnvKey),
zap.Error(err))
}
err = os.Setenv(metricsinfo.MilvusUsedGoVersion, GoVersion)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusUsedGoVersion),
zap.Error(err))
}
}
func getPidFileName(serverType string, alias string) string {
var filename string
if len(alias) != 0 {
filename = fmt.Sprintf("%s-%s.pid", serverType, alias)
} else {
filename = serverType + ".pid"
}
return filename
}
func createPidFile(w io.Writer, filename string, runtimeDir string) (*flock.Flock, error) {
fileFullName := path.Join(runtimeDir, filename)
fd, err := os.OpenFile(fileFullName, os.O_CREATE|os.O_RDWR, 0664)
if err != nil {
return nil, fmt.Errorf("file %s is locked, error = %w", filename, err)
}
fmt.Fprintln(w, "open pid file:", fileFullName)
defer fd.Close()
fd.Truncate(0)
_, err = fd.WriteString(fmt.Sprintf("%d", os.Getpid()))
if err != nil {
return nil, fmt.Errorf("file %s write fail, error = %w", filename, err)
}
lock := flock.New(fileFullName)
_, err = lock.TryLock()
if err != nil {
return nil, fmt.Errorf("file %s is locked, error = %w", filename, err)
}
fmt.Fprintln(w, "lock pid file:", fileFullName)
return lock, nil
}
func closePidFile(fd *os.File) {
fd.Close()
}
func removePidFile(lock *flock.Flock) {
filename := lock.Path()
lock.Close()
os.Remove(filename)
}
func stopPid(filename string, runtimeDir string) error {
var pid int
fd, err := os.OpenFile(path.Join(runtimeDir, filename), os.O_RDONLY, 0664)
if err != nil {
return err
}
defer closePidFile(fd)
if _, err = fmt.Fscanf(fd, "%d", &pid); err != nil {
return err
}
if process, err := os.FindProcess(pid); err == nil {
return process.Signal(syscall.SIGTERM)
}
return nil
}
func makeRuntimeDir(dir string) error {
perm := os.FileMode(0755)
// os.MkdirAll equal to `mkdir -p`
err := os.MkdirAll(dir, perm)
if err != nil {
// err will be raised only when dir exists and dir is a file instead of a directory.
return fmt.Errorf("create runtime dir %s failed, err: %s", dir, err.Error())
}
tmpFile, err := ioutil.TempFile(dir, "tmp")
if err != nil {
return err
}
fileName := tmpFile.Name()
tmpFile.Close()
os.Remove(fileName)
return nil
}
// simplified print from flag package
func printUsage(w io.Writer, f *flag.Flag) {
s := fmt.Sprintf(" -%s", f.Name) // Two spaces before -; see next two comments.
name, usage := flag.UnquoteUsage(f)
if len(name) > 0 {
s += " " + name
}
// Boolean flags of one ASCII letter are so common we
// treat them specially, putting their usage on the same line.
if len(s) <= 4 { // space, space, '-', 'x'.
s += "\t"
} else {
// Four spaces before the tab triggers good alignment
// for both 4- and 8-space tab stops.
s += "\n \t"
}
s += strings.ReplaceAll(usage, "\n", "\n \t")
fmt.Fprint(w, s, "\n")
}
// create runtime folder
func createRuntimeDir(sType string) string {
var writer io.Writer
if sType == typeutil.EmbeddedRole {
writer = io.Discard
} else {
writer = os.Stderr
}
runtimeDir := "/run/milvus"
if runtime.GOOS == "windows" {
runtimeDir = "run"
if err := makeRuntimeDir(runtimeDir); err != nil {
fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir)
os.Exit(-1)
}
} else {
if err := makeRuntimeDir(runtimeDir); err != nil {
fmt.Fprintf(writer, "Set runtime dir at %s failed, set it to /tmp/milvus directory\n", runtimeDir)
runtimeDir = "/tmp/milvus"
if err = makeRuntimeDir(runtimeDir); err != nil {
fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir)
os.Exit(-1)
}
}
}
return runtimeDir
func (c *defaultCommand) execute(args []string, flags *flag.FlagSet) {
fmt.Fprintf(os.Stderr, "unknown command : %s\n", args[1])
fmt.Fprintln(os.Stdout, usageLine)
}
func RunMilvus(args []string) {
if len(args) < 3 {
_, _ = fmt.Fprint(os.Stderr, "usage: milvus [command] [server type] [flags]\n")
if len(args) < 2 {
fmt.Fprintln(os.Stderr, usageLine)
return
}
command := args[1]
serverType := args[2]
cmd := args[1]
flags := flag.NewFlagSet(args[0], flag.ExitOnError)
var svrAlias string
flags.StringVar(&svrAlias, "alias", "", "set alias")
var enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord bool
flags.BoolVar(&enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator")
flags.BoolVar(&enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator")
flags.BoolVar(&enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
flags.BoolVar(&enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")
// Discard Milvus welcome logs, init logs and maxprocs logs in embedded Milvus.
if serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)
// Initialize maxprocs while discarding log.
maxprocs.Set(maxprocs.Logger(nil))
} else {
// Initialize maxprocs.
maxprocs.Set(maxprocs.Logger(syslog.Printf))
}
flags.Usage = func() {
fmt.Fprintf(flags.Output(), "Usage of %s:\n", args[0])
switch {
case serverType == roleMixture:
flags.VisitAll(func(f *flag.Flag) {
printUsage(flags.Output(), f)
})
default:
flags.VisitAll(func(f *flag.Flag) {
if f.Name != "alias" {
return
}
printUsage(flags.Output(), f)
})
}
fmt.Fprintln(os.Stderr, usageLine)
}
if err := flags.Parse(args[3:]); err != nil {
os.Exit(-1)
}
var local = false
role := roles.MilvusRoles{}
switch serverType {
case typeutil.RootCoordRole:
role.EnableRootCoord = true
case typeutil.ProxyRole:
role.EnableProxy = true
case typeutil.QueryCoordRole:
role.EnableQueryCoord = true
case typeutil.QueryNodeRole:
role.EnableQueryNode = true
case typeutil.DataCoordRole:
role.EnableDataCoord = true
case typeutil.DataNodeRole:
role.EnableDataNode = true
case typeutil.IndexCoordRole:
role.EnableIndexCoord = true
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.HasMultipleRoles = true
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
role.EnableQueryNode = true
role.EnableDataCoord = true
role.EnableDataNode = true
role.EnableIndexCoord = true
role.EnableIndexNode = true
local = true
case roleMixture:
role.HasMultipleRoles = true
role.EnableRootCoord = enableRootCoord
role.EnableQueryCoord = enableQueryCoord
role.EnableDataCoord = enableDataCoord
role.EnableIndexCoord = enableIndexCoord
var c command
switch cmd {
case RunCmd:
c = &run{}
case StopCmd:
c = &stop{}
case DryRunCmd:
c = &dryRun{}
case MckCmd:
c = &mck{}
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n", serverType)
os.Exit(-1)
c = &defaultCommand{}
}
// Setup logger in advance for standalone and embedded Milvus.
// Any log from this point on is under control.
if serverType == typeutil.StandaloneRole || serverType == typeutil.EmbeddedRole {
var params paramtable.BaseTable
if serverType == typeutil.EmbeddedRole {
params.GlobalInitWithYaml("embedded-milvus.yaml")
} else {
params.Init()
}
params.SetLogConfig()
params.RoleName = serverType
params.SetLogger(0)
}
runtimeDir := createRuntimeDir(serverType)
filename := getPidFileName(serverType, svrAlias)
switch command {
case "run":
printBanner(flags.Output())
injectVariablesToEnv()
lock, err := createPidFile(flags.Output(), filename, runtimeDir)
if err != nil {
panic(err)
}
defer removePidFile(lock)
role.Run(local, svrAlias)
case "stop":
if err := stopPid(filename, runtimeDir); err != nil {
fmt.Fprintf(os.Stderr, "%s\n\n", err.Error())
}
case "dry-run":
// A dry run does not actually bring up the Milvus instance.
default:
fmt.Fprintf(os.Stderr, "unknown command : %s\n", command)
}
c.execute(args, flags)
}

171
cmd/milvus/run.go Normal file
View File

@ -0,0 +1,171 @@
package milvus
import (
"flag"
"fmt"
"io"
"os"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
RunCmd = "run"
roleMixture = "mixture"
)
type run struct {
serverType string
// flags
svrAlias string
enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord bool
}
func (c *run) getHelp() string {
return runLine + "\n" + serverTypeLine
}
func (c *run) execute(args []string, flags *flag.FlagSet) {
if len(args) < 3 {
fmt.Fprintln(os.Stderr, c.getHelp())
return
}
flags.Usage = func() {
fmt.Fprintln(os.Stderr, c.getHelp())
}
c.serverType = args[2]
c.formatFlags(args, flags)
var local = false
role := roles.MilvusRoles{}
switch c.serverType {
case typeutil.RootCoordRole:
role.EnableRootCoord = true
case typeutil.ProxyRole:
role.EnableProxy = true
case typeutil.QueryCoordRole:
role.EnableQueryCoord = true
case typeutil.QueryNodeRole:
role.EnableQueryNode = true
case typeutil.DataCoordRole:
role.EnableDataCoord = true
case typeutil.DataNodeRole:
role.EnableDataNode = true
case typeutil.IndexCoordRole:
role.EnableIndexCoord = true
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.HasMultipleRoles = true
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
role.EnableQueryNode = true
role.EnableDataCoord = true
role.EnableDataNode = true
role.EnableIndexCoord = true
role.EnableIndexNode = true
local = true
case roleMixture:
role.HasMultipleRoles = true
role.EnableRootCoord = c.enableRootCoord
role.EnableQueryCoord = c.enableQueryCoord
role.EnableDataCoord = c.enableDataCoord
role.EnableIndexCoord = c.enableIndexCoord
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", c.serverType, c.getHelp())
os.Exit(-1)
}
// Setup logger in advance for standalone and embedded Milvus.
// Any log from this point on is under control.
if c.serverType == typeutil.StandaloneRole || c.serverType == typeutil.EmbeddedRole {
var params paramtable.BaseTable
if c.serverType == typeutil.EmbeddedRole {
params.GlobalInitWithYaml("embedded-milvus.yaml")
} else {
params.Init()
}
params.SetLogConfig()
params.RoleName = c.serverType
params.SetLogger(0)
}
runtimeDir := createRuntimeDir(c.serverType)
filename := getPidFileName(c.serverType, c.svrAlias)
c.printBanner(flags.Output())
c.injectVariablesToEnv()
lock, err := createPidFile(flags.Output(), filename, runtimeDir)
if err != nil {
panic(err)
}
defer removePidFile(lock)
role.Run(local, c.svrAlias)
}
func (c *run) formatFlags(args []string, flags *flag.FlagSet) {
flags.StringVar(&c.svrAlias, "alias", "", "set alias")
flags.BoolVar(&c.enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator")
flags.BoolVar(&c.enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator")
flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
flags.BoolVar(&c.enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")
initMaxprocs(c.serverType, flags)
if err := flags.Parse(args[3:]); err != nil {
os.Exit(-1)
}
}
func (c *run) printBanner(w io.Writer) {
fmt.Fprintln(w)
fmt.Fprintln(w, " __ _________ _ ____ ______ ")
fmt.Fprintln(w, " / |/ / _/ /| | / / / / / __/ ")
fmt.Fprintln(w, " / /|_/ // // /_| |/ / /_/ /\\ \\ ")
fmt.Fprintln(w, " /_/ /_/___/____/___/\\____/___/ ")
fmt.Fprintln(w)
fmt.Fprintln(w, "Welcome to use Milvus!")
fmt.Fprintln(w, "Version: "+BuildTags)
fmt.Fprintln(w, "Built: "+BuildTime)
fmt.Fprintln(w, "GitCommit: "+GitCommit)
fmt.Fprintln(w, "GoVersion: "+GoVersion)
fmt.Fprintln(w)
}
func (c *run) injectVariablesToEnv() {
// inject in need
var err error
err = os.Setenv(metricsinfo.GitCommitEnvKey, GitCommit)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitCommitEnvKey),
zap.Error(err))
}
err = os.Setenv(metricsinfo.GitBuildTagsEnvKey, BuildTags)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.GitBuildTagsEnvKey),
zap.Error(err))
}
err = os.Setenv(metricsinfo.MilvusBuildTimeEnvKey, BuildTime)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusBuildTimeEnvKey),
zap.Error(err))
}
err = os.Setenv(metricsinfo.MilvusUsedGoVersion, GoVersion)
if err != nil {
log.Warn(fmt.Sprintf("failed to inject %s to environment variable", metricsinfo.MilvusUsedGoVersion),
zap.Error(err))
}
}

73
cmd/milvus/stop.go Normal file
View File

@ -0,0 +1,73 @@
package milvus
import (
"flag"
"fmt"
"os"
"path"
"syscall"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
StopCmd = "stop"
)
type stop struct {
serverType string
svrAlias string
}
func (c *stop) getHelp() string {
return stopLine + "\n" + serverTypeLine
}
func (c *stop) execute(args []string, flags *flag.FlagSet) {
if len(args) < 3 {
fmt.Fprintln(os.Stderr, c.getHelp())
return
}
flags.Usage = func() {
fmt.Fprintln(os.Stderr, c.getHelp())
}
c.serverType = args[2]
if _, ok := typeutil.ServerTypeMap()[c.serverType]; !ok {
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n", c.serverType)
os.Exit(-1)
}
c.formatFlags(args, flags)
runtimeDir := createRuntimeDir(c.serverType)
filename := getPidFileName(c.serverType, c.svrAlias)
if err := c.stopPid(filename, runtimeDir); err != nil {
fmt.Fprintf(os.Stderr, "%s\n\n", err.Error())
}
}
func (c *stop) formatFlags(args []string, flags *flag.FlagSet) {
flags.StringVar(&(c.svrAlias), "alias", "", "set alias")
initMaxprocs(c.serverType, flags)
if err := flags.Parse(args[3:]); err != nil {
os.Exit(-1)
}
}
func (c *stop) stopPid(filename string, runtimeDir string) error {
var pid int
fd, err := os.OpenFile(path.Join(runtimeDir, filename), os.O_RDONLY, 0664)
if err != nil {
return err
}
defer closePidFile(fd)
if _, err = fmt.Fscanf(fd, "%d", &pid); err != nil {
return err
}
if process, err := os.FindProcess(pid); err == nil {
return process.Signal(syscall.SIGTERM)
}
return nil
}

124
cmd/milvus/util.go Normal file
View File

@ -0,0 +1,124 @@
package milvus
import (
"flag"
"fmt"
"io"
"io/ioutil"
syslog "log"
"os"
"path"
"runtime"
"github.com/gofrs/flock"
"go.uber.org/automaxprocs/maxprocs"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func makeRuntimeDir(dir string) error {
perm := os.FileMode(0755)
// os.MkdirAll equal to `mkdir -p`
err := os.MkdirAll(dir, perm)
if err != nil {
// err will be raised only when dir exists and dir is a file instead of a directory.
return fmt.Errorf("create runtime dir %s failed, err: %s", dir, err.Error())
}
tmpFile, err := ioutil.TempFile(dir, "tmp")
if err != nil {
return err
}
fileName := tmpFile.Name()
tmpFile.Close()
os.Remove(fileName)
return nil
}
// create runtime folder
func createRuntimeDir(sType string) string {
var writer io.Writer
if sType == typeutil.EmbeddedRole {
writer = io.Discard
} else {
writer = os.Stderr
}
runtimeDir := "/run/milvus"
if runtime.GOOS == "windows" {
runtimeDir = "run"
if err := makeRuntimeDir(runtimeDir); err != nil {
fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir)
os.Exit(-1)
}
} else {
if err := makeRuntimeDir(runtimeDir); err != nil {
fmt.Fprintf(writer, "Set runtime dir at %s failed, set it to /tmp/milvus directory\n", runtimeDir)
runtimeDir = "/tmp/milvus"
if err = makeRuntimeDir(runtimeDir); err != nil {
fmt.Fprintf(writer, "Create runtime directory at %s failed\n", runtimeDir)
os.Exit(-1)
}
}
}
return runtimeDir
}
// Initialize maxprocs
func initMaxprocs(serverType string, flags *flag.FlagSet) {
if serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)
// Initialize maxprocs while discarding log.
maxprocs.Set(maxprocs.Logger(nil))
} else {
// Initialize maxprocs.
maxprocs.Set(maxprocs.Logger(syslog.Printf))
}
}
func createPidFile(w io.Writer, filename string, runtimeDir string) (*flock.Flock, error) {
fileFullName := path.Join(runtimeDir, filename)
fd, err := os.OpenFile(fileFullName, os.O_CREATE|os.O_RDWR, 0664)
if err != nil {
return nil, fmt.Errorf("file %s is locked, error = %w", filename, err)
}
fmt.Fprintln(w, "open pid file:", fileFullName)
defer fd.Close()
fd.Truncate(0)
_, err = fd.WriteString(fmt.Sprintf("%d", os.Getpid()))
if err != nil {
return nil, fmt.Errorf("file %s write fail, error = %w", filename, err)
}
lock := flock.New(fileFullName)
_, err = lock.TryLock()
if err != nil {
return nil, fmt.Errorf("file %s is locked, error = %w", filename, err)
}
fmt.Fprintln(w, "lock pid file:", fileFullName)
return lock, nil
}
func getPidFileName(serverType string, alias string) string {
var filename string
if len(alias) != 0 {
filename = fmt.Sprintf("%s-%s.pid", serverType, alias)
} else {
filename = serverType + ".pid"
}
return filename
}
func closePidFile(fd *os.File) {
fd.Close()
}
func removePidFile(lock *flock.Flock) {
filename := lock.Path()
lock.Close()
os.Remove(filename)
}

View File

@ -47,3 +47,27 @@ const (
// DataNodeRole is a constant represent DataNode
DataNodeRole = "datanode"
)
func ServerTypeMap() map[string]interface{} {
return map[string]interface{}{
EmbeddedRole: nil,
StandaloneRole: nil,
RootCoordRole: nil,
ProxyRole: nil,
QueryCoordRole: nil,
QueryNodeRole: nil,
IndexCoordRole: nil,
IndexNodeRole: nil,
DataCoordRole: nil,
DataNodeRole: nil,
}
}
func ServerTypeList() []string {
serverTypeMap := ServerTypeMap()
types := make([]string, 0, len(serverTypeMap))
for key := range serverTypeMap {
types = append(types, key)
}
return types
}