mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
Merge branch 'jinhai' into 'develop'
Update for more function See merge request jinhai/vecwise_engine!6
This commit is contained in:
commit
09bf6f3fbc
@ -10,7 +10,7 @@ app.config.from_object('engine.settings')
|
||||
print ("Create database instance")
|
||||
db = SQLAlchemy(app)
|
||||
|
||||
from engine.model.GroupTable import GroupTable
|
||||
from engine.model.FileTable import FileTable
|
||||
from engine.model.group_table import GroupTable
|
||||
from engine.model.file_table import FileTable
|
||||
|
||||
from engine.controller import IndexManage
|
||||
from engine.controller import index_manager
|
||||
|
||||
@ -1,24 +0,0 @@
|
||||
import os, shutil
|
||||
|
||||
class GroupHandler(object):
|
||||
|
||||
@staticmethod
|
||||
def CreateGroupDirectory(group_id):
|
||||
path = GetGroupDirectory(group_id)
|
||||
path = path.strip()
|
||||
path=path.rstrip("\\")
|
||||
if not os.path.exists():
|
||||
os.makedirs(path)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def DeleteGroupDirectory(group_id):
|
||||
path = GetGroupDirectory(group_id)
|
||||
path = path.strip()
|
||||
path=path.rstrip("\\")
|
||||
if os.path.exists():
|
||||
shutil.rmtree(path)
|
||||
|
||||
@staticmethod
|
||||
def GetGroupDirectory(group_id):
|
||||
return DATABASE_DIRECTORY + '/' + group_id
|
||||
@ -1,114 +0,0 @@
|
||||
from engine.model.GroupTable import GroupTable
|
||||
from engine.model.FileTable import FileTable
|
||||
from engine.controller.RawFileHandler import RawFileHandler
|
||||
from engine.controller.GroupHandler import GroupHandler
|
||||
from flask import jsonify
|
||||
from engine import db
|
||||
import sys, os
|
||||
|
||||
class VectorEngine(object):
|
||||
|
||||
@staticmethod
|
||||
def AddGroup(group_id):
|
||||
group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
|
||||
if group:
|
||||
return jsonify({'code': 1, 'group_name': group_id, 'file_number': group.file_number})
|
||||
else:
|
||||
new_group = GroupTable(group_id)
|
||||
db.session.add(new_group)
|
||||
db.session.commit()
|
||||
GroupHandler.CreateGroupDirectory(group_id)
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': 0})
|
||||
|
||||
@staticmethod
|
||||
def GetGroup(group_id):
|
||||
group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
|
||||
if group:
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': group.file_number})
|
||||
else:
|
||||
return jsonify({'code': 1, 'group_name': group_id, 'file_number': 0}) # not found
|
||||
|
||||
|
||||
@staticmethod
|
||||
def DeleteGroup(group_id):
|
||||
group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
|
||||
if(group):
|
||||
# old_group = GroupTable(group_id)
|
||||
db.session.delete(group)
|
||||
db.session.commit()
|
||||
GroupHandler.DeleteGroupDirectory(group_id)
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': group.file_number})
|
||||
else:
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': 0})
|
||||
|
||||
@staticmethod
|
||||
def GetGroupList():
|
||||
group = GroupTable.query.all()
|
||||
group_list = []
|
||||
for group_tuple in group:
|
||||
group_item = {}
|
||||
group_item['group_name'] = group_tuple.group_name
|
||||
group_item['file_number'] = group_tuple.file_number
|
||||
group_list.append(group_item)
|
||||
|
||||
print(group_list)
|
||||
return jsonify(results = group_list)
|
||||
|
||||
@staticmethod
|
||||
def AddVector(group_id, vector):
|
||||
print(group_id, vector)
|
||||
file = FileTable.query.filter(and_(FileTable.group_name == group_id, FileTable.type == 'raw')).first()
|
||||
if file:
|
||||
if file.row_number >= ROW_LIMIT:
|
||||
# create index
|
||||
index_filename = file.filename + "_index"
|
||||
CreateIndex(group_id, index_filename)
|
||||
|
||||
# create another raw file
|
||||
raw_filename = file.seq_no
|
||||
InsertVectorIntoRawFile(group_id, raw_filename, vector)
|
||||
# insert a record into database
|
||||
db.session.add(FileTable(group_id, raw_filename, 'raw', 1))
|
||||
db.session.commit()
|
||||
else:
|
||||
# we still can insert into exist raw file
|
||||
InsertVectorIntoRawFile(file.filename, vector)
|
||||
# update database
|
||||
# FileTable.query.filter_by(FileTable.group_name == group_id).filter_by(FileTable.type == 'raw').update('row_number':file.row_number + 1)
|
||||
else:
|
||||
# first raw file
|
||||
raw_filename = group_id + '_0'
|
||||
# create and insert vector into raw file
|
||||
InsertVectorIntoRawFile(raw_filename, vector)
|
||||
# insert a record into database
|
||||
db.session.add(FileTable(group_id, raw_filename, 'raw', 1))
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({'code': 0})
|
||||
|
||||
@staticmethod
|
||||
def SearchVector(group_id, vector, limit):
|
||||
# find all files
|
||||
# according to difference files get topk of each
|
||||
# reduce the topk from them
|
||||
# construct response and send back
|
||||
return jsonify({'code': 0})
|
||||
|
||||
@staticmethod
|
||||
def CreateIndex(group_id, filename):
|
||||
path = GroupHandler.GetGroupDirectory(group_id) + '/' + filename
|
||||
print(group_id, path)
|
||||
return jsonify({'code': 0})
|
||||
|
||||
@staticmethod
|
||||
def InsertVectorIntoRawFile(group_id, filename, vector):
|
||||
print(sys._getframe().f_code.co_name)
|
||||
path = GroupHandler.GetGroupDirectory(group_id) + '/' + filename
|
||||
|
||||
# if filename exist
|
||||
# append
|
||||
# if filename not exist
|
||||
# create file
|
||||
# append
|
||||
return filename
|
||||
|
||||
29
pyengine/engine/controller/group_handler.py
Normal file
29
pyengine/engine/controller/group_handler.py
Normal file
@ -0,0 +1,29 @@
|
||||
import os, shutil
|
||||
from engine.settings import DATABASE_DIRECTORY
|
||||
|
||||
class GroupHandler(object):
|
||||
|
||||
@staticmethod
|
||||
def CreateGroupDirectory(group_id):
|
||||
path = GroupHandler.GetGroupDirectory(group_id)
|
||||
path = path.strip()
|
||||
path=path.rstrip("\\")
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
print("CreateGroupDirectory, Path: ", path)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def DeleteGroupDirectory(group_id):
|
||||
path = GroupHandler.GetGroupDirectory(group_id)
|
||||
path = path.strip()
|
||||
path=path.rstrip("\\")
|
||||
if os.path.exists(path):
|
||||
shutil.rmtree(path)
|
||||
print("DeleteGroupDirectory, Path: ", path)
|
||||
|
||||
@staticmethod
|
||||
def GetGroupDirectory(group_id):
|
||||
print("GetGroupDirectory, Path: ", DATABASE_DIRECTORY + '/' + group_id)
|
||||
return DATABASE_DIRECTORY + '/' + group_id
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
|
||||
class RawFileHandler(object):
|
||||
class IndexFileHandler(object):
|
||||
|
||||
@staticmethod
|
||||
def Create(filename, type):
|
||||
# type means: csv, parquet
|
||||
@ -1,8 +1,8 @@
|
||||
from flask import Flask, jsonify, request
|
||||
from flask_restful import Resource, Api
|
||||
from engine import app, db
|
||||
from engine.model.GroupTable import GroupTable
|
||||
from engine.controller.VectorEngine import VectorEngine
|
||||
from engine.model.group_table import GroupTable
|
||||
from engine.controller.vector_engine import VectorEngine
|
||||
|
||||
# app = Flask(__name__)
|
||||
api = Api(app)
|
||||
@ -25,12 +25,13 @@ class VectorSearch(Resource):
|
||||
def __init__(self):
|
||||
self.__parser = reqparse.RequestParser()
|
||||
self.__parser.add_argument('vector', type=float, action='append', location=['json'])
|
||||
self.__parser.add_argument('limit', type=int, action='append', location=['json'])
|
||||
|
||||
def post(self, group_id):
|
||||
args = self.__parser.parse_args()
|
||||
print('vector: ', args['vector'])
|
||||
# go to search every thing
|
||||
return "vectorSearch post"
|
||||
return VectorEngine.SearchVector(group_id, args['vector'], args['limit'])
|
||||
|
||||
|
||||
class Index(Resource):
|
||||
@ -46,9 +47,12 @@ class Group(Resource):
|
||||
def __init__(self):
|
||||
self.__parser = reqparse.RequestParser()
|
||||
self.__parser.add_argument('group_id', type=str)
|
||||
self.__parser.add_argument('dimension', type=int, action='append', location=['json'])
|
||||
|
||||
def post(self, group_id):
|
||||
return VectorEngine.AddGroup(group_id)
|
||||
args = self.__parser.parse_args()
|
||||
dimension = args['dimension']
|
||||
return VectorEngine.AddGroup(group_id, dimension)
|
||||
|
||||
def get(self, group_id):
|
||||
return VectorEngine.GetGroup(group_id)
|
||||
18
pyengine/engine/controller/raw_file_handler.py
Normal file
18
pyengine/engine/controller/raw_file_handler.py
Normal file
@ -0,0 +1,18 @@
|
||||
|
||||
class RawFileHandler(object):
|
||||
@staticmethod
|
||||
def Create(filename, type):
|
||||
# type means: csv, parquet
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def Read(filename, type):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def Append(filename, type, record):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def GetRawFilename(group_id):
|
||||
return group_id + '.raw'
|
||||
176
pyengine/engine/controller/vector_engine.py
Normal file
176
pyengine/engine/controller/vector_engine.py
Normal file
@ -0,0 +1,176 @@
|
||||
from engine.model.group_table import GroupTable
|
||||
from engine.model.file_table import FileTable
|
||||
from engine.controller.raw_file_handler import RawFileHandler
|
||||
from engine.controller.group_handler import GroupHandler
|
||||
from engine.controller.index_file_handler import IndexFileHandler
|
||||
from engine.settings import ROW_LIMIT
|
||||
from flask import jsonify
|
||||
from engine import db
|
||||
import sys, os
|
||||
|
||||
class VectorEngine(object):
|
||||
group_dict = None
|
||||
|
||||
@staticmethod
|
||||
def AddGroup(group_id, dimension):
|
||||
group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
|
||||
if group:
|
||||
print('Already create the group: ', group_id)
|
||||
return jsonify({'code': 1, 'group_name': group_id, 'file_number': group.file_number})
|
||||
else:
|
||||
print('To create the group: ', group_id)
|
||||
new_group = GroupTable(group_id, dimension)
|
||||
GroupHandler.CreateGroupDirectory(group_id)
|
||||
|
||||
# add into database
|
||||
db.session.add(new_group)
|
||||
db.session.commit()
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': 0})
|
||||
|
||||
|
||||
@staticmethod
|
||||
def GetGroup(group_id):
|
||||
group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
|
||||
if group:
|
||||
print('Found the group: ', group_id)
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': group.file_number})
|
||||
else:
|
||||
print('Not found the group: ', group_id)
|
||||
return jsonify({'code': 1, 'group_name': group_id, 'file_number': 0}) # not found
|
||||
|
||||
|
||||
@staticmethod
|
||||
def DeleteGroup(group_id):
|
||||
group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
|
||||
if(group):
|
||||
# old_group = GroupTable(group_id)
|
||||
db.session.delete(group)
|
||||
db.session.commit()
|
||||
GroupHandler.DeleteGroupDirectory(group_id)
|
||||
|
||||
records = FileTable.query.filter(FileTable.group_name == group_id).all()
|
||||
for record in records:
|
||||
print("record.group_name: ", record.group_name)
|
||||
db.session.delete(record)
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': group.file_number})
|
||||
else:
|
||||
return jsonify({'code': 0, 'group_name': group_id, 'file_number': 0})
|
||||
|
||||
|
||||
@staticmethod
|
||||
def GetGroupList():
|
||||
group = GroupTable.query.all()
|
||||
group_list = []
|
||||
for group_tuple in group:
|
||||
group_item = {}
|
||||
group_item['group_name'] = group_tuple.group_name
|
||||
group_item['file_number'] = group_tuple.file_number
|
||||
group_list.append(group_item)
|
||||
|
||||
print(group_list)
|
||||
return jsonify(results = group_list)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def AddVector(group_id, vector):
|
||||
print(group_id, vector)
|
||||
file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first()
|
||||
if file:
|
||||
print('insert into exist file')
|
||||
# insert into raw file
|
||||
VectorEngine.InsertVectorIntoRawFile(group_id, file.filename, vector)
|
||||
|
||||
# check if the file can be indexed
|
||||
if file.row_number + 1 >= ROW_LIMIT:
|
||||
# read data from raw file
|
||||
data = GetVectorsFromRawFile()
|
||||
|
||||
# create index
|
||||
index_filename = file.filename + '_index'
|
||||
CreateIndex(group_id, index_filename, data)
|
||||
|
||||
# update record into database
|
||||
FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1, 'type': 'index'})
|
||||
pass
|
||||
|
||||
else:
|
||||
# we still can insert into exist raw file, update database
|
||||
FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1})
|
||||
db.session.commit()
|
||||
print('Update db for raw file insertion')
|
||||
pass
|
||||
|
||||
else:
|
||||
print('add a new raw file')
|
||||
# first raw file
|
||||
raw_filename = group_id + '.raw'
|
||||
# create and insert vector into raw file
|
||||
VectorEngine.InsertVectorIntoRawFile(group_id, raw_filename, vector)
|
||||
# insert a record into database
|
||||
db.session.add(FileTable(group_id, raw_filename, 'raw', 1))
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({'code': 0})
|
||||
|
||||
|
||||
@staticmethod
|
||||
def SearchVector(group_id, vector, limit):
|
||||
# find all files
|
||||
files = FileTable.query.filter(FileTable.group_name == group_id).all()
|
||||
|
||||
for file in files:
|
||||
if(file.type == 'raw'):
|
||||
# create index
|
||||
# add vector list
|
||||
# train
|
||||
# get topk
|
||||
print('search in raw file: ', file.filename)
|
||||
pass
|
||||
else:
|
||||
# get topk
|
||||
print('search in index file: ', file.filename)
|
||||
data = IndexFileHandler.Read(file.filename, file.type)
|
||||
pass
|
||||
|
||||
# according to difference files get topk of each
|
||||
# reduce the topk from them
|
||||
# construct response and send back
|
||||
return jsonify({'code': 0})
|
||||
|
||||
|
||||
@staticmethod
|
||||
def CreateIndex(group_id):
|
||||
# create index
|
||||
file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first()
|
||||
path = GroupHandler.GetGroupDirectory(group_id) + '/' + file.filename
|
||||
print('Going to create index for: ', path)
|
||||
return jsonify({'code': 0})
|
||||
|
||||
|
||||
@staticmethod
|
||||
def InsertVectorIntoRawFile(group_id, filename, vector):
|
||||
# print(sys._getframe().f_code.co_name, group_id, vector)
|
||||
# path = GroupHandler.GetGroupDirectory(group_id) + '/' + filename
|
||||
if VectorEngine.group_dict is None:
|
||||
# print("VectorEngine.group_dict is None")
|
||||
VectorEngine.group_dict = dict()
|
||||
VectorEngine.group_dict[group_id] = []
|
||||
|
||||
VectorEngine.group_dict[group_id].append(vector)
|
||||
|
||||
print('InsertVectorIntoRawFile: ', VectorEngine.group_dict[group_id])
|
||||
|
||||
# if filename exist
|
||||
# append
|
||||
# if filename not exist
|
||||
# create file
|
||||
# append
|
||||
return filename
|
||||
|
||||
|
||||
@staticmethod
|
||||
def GetVectorListFromRawFile(group_id, filename):
|
||||
return VectorEngine.group_dict[group_id]
|
||||
|
||||
@ -9,6 +9,7 @@ class FileTable(db.Model):
|
||||
row_number = db.Column(db.Integer)
|
||||
seq_no = db.Column(db.Integer)
|
||||
|
||||
|
||||
def __init__(self, group_name, filename, type, row_number):
|
||||
self.group_name = group_name
|
||||
self.filename = filename
|
||||
@ -17,5 +18,7 @@ class FileTable(db.Model):
|
||||
self.type = type
|
||||
self.seq_no = 0
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return '<FileTable $r>' % self.tablename
|
||||
return '<FileTable $r>' % self.tablename
|
||||
|
||||
@ -5,10 +5,15 @@ class GroupTable(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
group_name = db.Column(db.String(100))
|
||||
file_number = db.Column(db.Integer)
|
||||
dimension = db.Column(db.Integer)
|
||||
|
||||
def __init__(self, group_name):
|
||||
|
||||
def __init__(self, group_name, dimension):
|
||||
self.group_name = group_name
|
||||
self.dimension = dimension
|
||||
self.file_number = 0
|
||||
self.dimension = 0
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return '<GroupTable $s>' % self.group_name
|
||||
@ -6,4 +6,4 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://vecwise@127.0.0.1:3306/vecdata"
|
||||
|
||||
ROW_LIMIT = 10000000
|
||||
DATABASE_DIRECTORY = '/home/jinhai/Document/development/vecwise_engine/db'
|
||||
DATABASE_DIRECTORY = '/home/jinhai/disk0/vecwise/db'
|
||||
Loading…
x
Reference in New Issue
Block a user