add models

This commit is contained in:
peng.xu 2019-09-18 15:43:42 +08:00
parent deb4a5fb62
commit 099317edee
10 changed files with 144 additions and 26 deletions

14
manager.py Normal file
View File

@ -0,0 +1,14 @@
import fire
from mishards import db
class DBHandler:
@classmethod
def create_all(cls):
db.create_all()
@classmethod
def drop_all(cls):
db.drop_all()
if __name__ == '__main__':
fire.Fire(DBHandler)

View File

@ -1,8 +1,13 @@
import settings
from connections import ConnectionMgr
from mishards import settings
from mishards.db_base import DB
db = DB()
db.init_db(uri=settings.SQLALCHEMY_DATABASE_URI)
from mishards.connections import ConnectionMgr
connect_mgr = ConnectionMgr()
from service_founder import ServiceFounder
from mishards.service_founder import ServiceFounder
discover = ServiceFounder(namespace=settings.SD_NAMESPACE,
conn_mgr=connect_mgr,
pod_patt=settings.SD_ROSERVER_POD_PATT,
@ -10,5 +15,5 @@ discover = ServiceFounder(namespace=settings.SD_NAMESPACE,
in_cluster=settings.SD_IN_CLUSTER,
poll_interval=settings.SD_POLL_INTERVAL)
from server import Server
from mishards.server import Server
grpc_server = Server(conn_mgr=connect_mgr)

View File

@ -4,9 +4,8 @@ from functools import wraps
from contextlib import contextmanager
from milvus import Milvus
import settings
import exceptions
from utils import singleton
from mishards import (settings, exceptions)
from mishards.utils import singleton
logger = logging.getLogger(__name__)

27
mishards/db_base.py Normal file
View File

@ -0,0 +1,27 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
class DB:
Model = declarative_base()
def __init__(self, uri=None):
uri and self.init_db(uri)
def init_db(self, uri):
self.engine = create_engine(uri, pool_size=100, pool_recycle=5, pool_timeout=30,
pool_pre_ping=True,
max_overflow=0)
self.uri = uri
session = sessionmaker()
session.configure(bind=self.engine)
self.db_session = session()
@property
def Session(self):
return self.db_session
def drop_all(self):
self.Model.metadata.drop_all(self.engine)
def create_all(self):
self.Model.metadata.create_all(self.engine)

View File

@ -1,4 +1,4 @@
import exception_codes as codes
import mishards.exception_codes as codes
class BaseException(Exception):
code = codes.INVALID_CODE

View File

@ -1,21 +1,17 @@
import sys
import os
import os, sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import settings
from mishards import (connect_mgr,
from mishards import (
settings,
db, connect_mgr,
discover,
grpc_server as server)
def main():
try:
discover.start()
connect_mgr.register('WOSERVER', settings.WOSERVER if not settings.TESTING else settings.TESTING_WOSERVER)
server.run(port=settings.SERVER_PORT)
return 0
except Exception as e:
logger.error(e)
return 1
discover.start()
connect_mgr.register('WOSERVER', settings.WOSERVER if not settings.TESTING else settings.TESTING_WOSERVER)
server.run(port=settings.SERVER_PORT)
return 0
if __name__ == '__main__':
sys.exit(main())

75
mishards/models.py Normal file
View File

@ -0,0 +1,75 @@
import logging
from sqlalchemy import (Integer, Boolean, Text,
String, BigInteger, func, and_, or_,
Column)
from sqlalchemy.orm import relationship, backref
from mishards import db
logger = logging.getLogger(__name__)
class TableFiles(db.Model):
FILE_TYPE_NEW = 0
FILE_TYPE_RAW = 1
FILE_TYPE_TO_INDEX = 2
FILE_TYPE_INDEX = 3
FILE_TYPE_TO_DELETE = 4
FILE_TYPE_NEW_MERGE = 5
FILE_TYPE_NEW_INDEX = 6
FILE_TYPE_BACKUP = 7
__tablename__ = 'TableFiles'
id = Column(BigInteger, primary_key=True, autoincrement=True)
table_id = Column(String(50))
engine_type = Column(Integer)
file_id = Column(String(50))
file_type = Column(Integer)
file_size = Column(Integer, default=0)
row_count = Column(Integer, default=0)
updated_time = Column(BigInteger)
created_on = Column(BigInteger)
date = Column(Integer)
table = relationship(
'Table',
primaryjoin='and_(foreign(TableFile.table_id) == Table.table_id)',
backref=backref('files', uselist=True, lazy='dynamic')
)
class Tables(db.Model):
TO_DELETE = 1
NORMAL = 0
__tablename__ = 'Tables'
id = Column(BigInteger, primary_key=True, autoincrement=True)
table_id = Column(String(50), unique=True)
state = Column(Integer)
dimension = Column(Integer)
created_on = Column(Integer)
flag = Column(Integer, default=0)
index_file_size = Column(Integer)
engine_type = Column(Integer)
nlist = Column(Integer)
metric_type = Column(Integer)
def files_to_search(self, date_range=None):
cond = or_(
TableFile.file_type==TableFile.FILE_TYPE_RAW,
TableFile.file_type==TableFile.FILE_TYPE_TO_INDEX,
TableFile.file_type==TableFile.FILE_TYPE_INDEX,
)
if date_range:
cond = and_(
cond,
or_(
and_(TableFile.date>=d[0], TableFile.date<d[1]) for d in date_range
)
)
files = self.files.filter(cond)
logger.debug('DATE_RANGE: {}'.format(date_range))
return files

View File

@ -4,8 +4,8 @@ import time
from concurrent import futures
from grpc._cython import cygrpc
from milvus.grpc_gen.milvus_pb2_grpc import add_MilvusServiceServicer_to_server
from service_handler import ServiceHandler
import settings
from mishards.service_handler import ServiceHandler
import mishards.settings
logger = logging.getLogger(__name__)

View File

@ -9,9 +9,9 @@ from milvus.grpc_gen import milvus_pb2, milvus_pb2_grpc, status_pb2
from milvus.grpc_gen.milvus_pb2 import TopKQueryResult
from milvus.client import types
import settings
from grpc_utils.grpc_args_parser import GrpcArgsParser as Parser
import exceptions
from mishards import (settings, exceptions)
from mishards.grpc_utils.grpc_args_parser import GrpcArgsParser as Parser
from mishards.models import Tables, TableFiles
logger = logging.getLogger(__name__)

View File

@ -15,9 +15,11 @@ LOG_PATH = env.str('LOG_PATH', '/tmp/mishards')
LOG_NAME = env.str('LOG_NAME', 'logfile')
TIMEZONE = env.str('TIMEZONE', 'UTC')
from utils.logger_helper import config
from mishards.utils.logger_helper import config
config(LOG_LEVEL, LOG_PATH, LOG_NAME, TIMEZONE)
SQLALCHEMY_DATABASE_URI = env.str('SQLALCHEMY_DATABASE_URI')
TIMEOUT = env.int('TIMEOUT', 60)
MAX_RETRY = env.int('MAX_RETRY', 3)
SEARCH_WORKER_SIZE = env.int('SEARCH_WORKER_SIZE', 10)