diff --git a/manager.py b/manager.py new file mode 100644 index 0000000000..0a2acad26f --- /dev/null +++ b/manager.py @@ -0,0 +1,14 @@ +import fire +from mishards import db + +class DBHandler: + @classmethod + def create_all(cls): + db.create_all() + + @classmethod + def drop_all(cls): + db.drop_all() + +if __name__ == '__main__': + fire.Fire(DBHandler) diff --git a/mishards/__init__.py b/mishards/__init__.py index b3a14cf7e3..c799e42fa4 100644 --- a/mishards/__init__.py +++ b/mishards/__init__.py @@ -1,8 +1,13 @@ -import settings -from connections import ConnectionMgr +from mishards import settings + +from mishards.db_base import DB +db = DB() +db.init_db(uri=settings.SQLALCHEMY_DATABASE_URI) + +from mishards.connections import ConnectionMgr connect_mgr = ConnectionMgr() -from service_founder import ServiceFounder +from mishards.service_founder import ServiceFounder discover = ServiceFounder(namespace=settings.SD_NAMESPACE, conn_mgr=connect_mgr, pod_patt=settings.SD_ROSERVER_POD_PATT, @@ -10,5 +15,5 @@ discover = ServiceFounder(namespace=settings.SD_NAMESPACE, in_cluster=settings.SD_IN_CLUSTER, poll_interval=settings.SD_POLL_INTERVAL) -from server import Server +from mishards.server import Server grpc_server = Server(conn_mgr=connect_mgr) diff --git a/mishards/connections.py b/mishards/connections.py index 82dd082eac..9201ea2b08 100644 --- a/mishards/connections.py +++ b/mishards/connections.py @@ -4,9 +4,8 @@ from functools import wraps from contextlib import contextmanager from milvus import Milvus -import settings -import exceptions -from utils import singleton +from mishards import (settings, exceptions) +from mishards.utils import singleton logger = logging.getLogger(__name__) diff --git a/mishards/db_base.py b/mishards/db_base.py new file mode 100644 index 0000000000..702c9e57e9 --- /dev/null +++ b/mishards/db_base.py @@ -0,0 +1,27 @@ +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker, scoped_session + +class DB: + Model = declarative_base() + def __init__(self, uri=None): + uri and self.init_db(uri) + + def init_db(self, uri): + self.engine = create_engine(uri, pool_size=100, pool_recycle=5, pool_timeout=30, + pool_pre_ping=True, + max_overflow=0) + self.uri = uri + session = sessionmaker() + session.configure(bind=self.engine) + self.db_session = session() + + @property + def Session(self): + return self.db_session + + def drop_all(self): + self.Model.metadata.drop_all(self.engine) + + def create_all(self): + self.Model.metadata.create_all(self.engine) diff --git a/mishards/exceptions.py b/mishards/exceptions.py index 1445d18769..0f89ecb52d 100644 --- a/mishards/exceptions.py +++ b/mishards/exceptions.py @@ -1,4 +1,4 @@ -import exception_codes as codes +import mishards.exception_codes as codes class BaseException(Exception): code = codes.INVALID_CODE diff --git a/mishards/main.py b/mishards/main.py index 0526f87ff8..5d96d8b499 100644 --- a/mishards/main.py +++ b/mishards/main.py @@ -1,21 +1,17 @@ -import sys -import os +import os, sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -import settings -from mishards import (connect_mgr, +from mishards import ( + settings, + db, connect_mgr, discover, grpc_server as server) def main(): - try: - discover.start() - connect_mgr.register('WOSERVER', settings.WOSERVER if not settings.TESTING else settings.TESTING_WOSERVER) - server.run(port=settings.SERVER_PORT) - return 0 - except Exception as e: - logger.error(e) - return 1 + discover.start() + connect_mgr.register('WOSERVER', settings.WOSERVER if not settings.TESTING else settings.TESTING_WOSERVER) + server.run(port=settings.SERVER_PORT) + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/mishards/models.py b/mishards/models.py new file mode 100644 index 0000000000..c699f490dd --- /dev/null +++ b/mishards/models.py @@ -0,0 +1,75 @@ +import logging +from sqlalchemy import (Integer, Boolean, Text, + String, BigInteger, func, and_, or_, + Column) +from sqlalchemy.orm import relationship, backref + +from mishards import db + +logger = logging.getLogger(__name__) + +class TableFiles(db.Model): + FILE_TYPE_NEW = 0 + FILE_TYPE_RAW = 1 + FILE_TYPE_TO_INDEX = 2 + FILE_TYPE_INDEX = 3 + FILE_TYPE_TO_DELETE = 4 + FILE_TYPE_NEW_MERGE = 5 + FILE_TYPE_NEW_INDEX = 6 + FILE_TYPE_BACKUP = 7 + + __tablename__ = 'TableFiles' + + id = Column(BigInteger, primary_key=True, autoincrement=True) + table_id = Column(String(50)) + engine_type = Column(Integer) + file_id = Column(String(50)) + file_type = Column(Integer) + file_size = Column(Integer, default=0) + row_count = Column(Integer, default=0) + updated_time = Column(BigInteger) + created_on = Column(BigInteger) + date = Column(Integer) + + table = relationship( + 'Table', + primaryjoin='and_(foreign(TableFile.table_id) == Table.table_id)', + backref=backref('files', uselist=True, lazy='dynamic') + ) + + +class Tables(db.Model): + TO_DELETE = 1 + NORMAL = 0 + + __tablename__ = 'Tables' + + id = Column(BigInteger, primary_key=True, autoincrement=True) + table_id = Column(String(50), unique=True) + state = Column(Integer) + dimension = Column(Integer) + created_on = Column(Integer) + flag = Column(Integer, default=0) + index_file_size = Column(Integer) + engine_type = Column(Integer) + nlist = Column(Integer) + metric_type = Column(Integer) + + def files_to_search(self, date_range=None): + cond = or_( + TableFile.file_type==TableFile.FILE_TYPE_RAW, + TableFile.file_type==TableFile.FILE_TYPE_TO_INDEX, + TableFile.file_type==TableFile.FILE_TYPE_INDEX, + ) + if date_range: + cond = and_( + cond, + or_( + and_(TableFile.date>=d[0], TableFile.date