未验证 提交 5a83ba4e 编写于 作者: B BossZou 提交者: GitHub

Optimize behavior to get file ids from metadata in mishards (#1754) (#1755)

* update mishards get file ids
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* update behaviour for getting file ids
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* check default partition
Signed-off-by: Nyhz <413554850@qq.com>

* remove exception when file id not found
Signed-off-by: Nyhz <413554850@qq.com>

* Optimize behavior to get file ids from metadata in mishards (fix #1754)
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* [skip ci] Update changlog
Signed-off-by: NYhz <yinghao.zou@zilliz.com>
上级 d34bd505
......@@ -38,6 +38,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1678 Remove CUSTOMIZATION macro
- \#1698 Upgrade mishards to v0.7.0
- \#1719 Improve Milvus log
- \#1754 Optimize behavior to get file ids from metadata in mishards
## Task
......
import logging
from sqlalchemy import exc as sqlalchemy_exc
from sqlalchemy import and_, or_
from mishards.models import Tables
from mishards.models import Tables, TableFiles
from mishards.router import RouterMixin
from mishards import exceptions, db
from mishards.hash_ring import HashRing
......@@ -14,7 +14,7 @@ class Factory(RouterMixin):
def __init__(self, writable_topo, readonly_topo, **kwargs):
super(Factory, self).__init__(writable_topo=writable_topo,
readonly_topo=readonly_topo)
readonly_topo=readonly_topo)
def routing(self, table_name, partition_tags=None, metadata=None, **kwargs):
range_array = kwargs.pop('range_array', None)
......@@ -26,24 +26,43 @@ class Factory(RouterMixin):
if not partition_tags:
cond = and_(
or_(Tables.table_id == table_name, Tables.owner_table == table_name),
Tables.state != Tables.TO_DELETE)
or_(Tables.table_id == table_name, Tables.owner_table == table_name),
Tables.state != Tables.TO_DELETE)
else:
# TODO: collection default partition is '_default'
cond = and_(Tables.state != Tables.TO_DELETE,
Tables.owner_table == table_name,
Tables.partition_tag.in_(partition_tags))
if '_default' in partition_tags:
default_par_cond = and_(Tables.table_id == table_name, Tables.state != Tables.TO_DELETE)
cond = or_(cond, default_par_cond)
try:
tables = db.Session.query(Tables).filter(cond).all()
except sqlalchemy_exc.SQLAlchemyError as e:
raise exceptions.DBError(message=str(e), metadata=metadata)
if not tables:
logger.error("Cannot find table {} / {} in metadata".format(table_name, partition_tags))
raise exceptions.TableNotFoundError('{}:{}'.format(table_name, partition_tags), metadata=metadata)
total_files = []
for table in tables:
files = table.files_to_search(range_array)
total_files.append(files)
table_list = [str(table.table_id) for table in tables]
file_type_cond = or_(
TableFiles.file_type == TableFiles.FILE_TYPE_RAW,
TableFiles.file_type == TableFiles.FILE_TYPE_TO_INDEX,
TableFiles.file_type == TableFiles.FILE_TYPE_INDEX,
)
file_cond = and_(file_type_cond, TableFiles.table_id.in_(table_list))
try:
files = db.Session.query(TableFiles).filter(file_cond).all()
except sqlalchemy_exc.SQLAlchemyError as e:
raise exceptions.DBError(message=str(e), metadata=metadata)
if not files:
logger.warning("Table file is empty. {}".format(table_list))
# logger.error("Cannot find table file id {} / {} in metadata".format(table_name, partition_tags))
# raise exceptions.TableNotFoundError('Table file id not found. {}:{}'.format(table_name, partition_tags),
# metadata=metadata)
db.remove_session()
......@@ -54,18 +73,13 @@ class Factory(RouterMixin):
routing = {}
for files in total_files:
for f in files:
target_host = ring.get_node(str(f.id))
sub = routing.get(target_host, None)
if not sub:
sub = {}
routing[target_host] = sub
kv = sub.get(f.table_id, None)
if not kv:
kv = []
sub[f.table_id] = kv
sub[f.table_id].append(str(f.id))
for f in files:
target_host = ring.get_node(str(f.id))
sub = routing.get(target_host, None)
if not sub:
sub = []
routing[target_host] = sub
routing[target_host].append(str(f.id))
return routing
......
......@@ -62,6 +62,9 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
status, _ = files_collection
return status, [], []
if files_collection.status.error_code != 0:
return files_collection.status, [], []
row_num = files_collection.row_num
# row_num is equal to 0, result is empty
if not row_num:
......@@ -111,16 +114,12 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
partition_tags=None,
**kwargs):
metadata = kwargs.get('metadata', None)
# range_array = [
# utilities.range_to_date(r, metadata=metadata) for r in range_array
# ] if range_array else None
routing = {}
p_span = None if self.tracer.empty else context.get_active_span(
).context
with self.tracer.start_span('get_routing', child_of=p_span):
routing = self.router.routing(table_id,
# range_array=range_array,
partition_tags=partition_tags,
metadata=metadata)
logger.info('Routing: {}'.format(routing))
......@@ -149,7 +148,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
top_k=topk,
params=params)
if ret.status.error_code != 0:
logger.error(ret.status)
logger.error("Search fail {}".format(ret.status))
end = time.time()
......@@ -157,17 +156,16 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
with self.tracer.start_span('do_search', child_of=p_span) as span:
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
for addr, params in routing.items():
for sub_table_id, file_ids in params.items():
res = pool.submit(search,
addr,
table_id,
file_ids,
vectors,
topk,
search_params,
span=span)
rs.append(res)
for addr, file_ids in routing.items():
res = pool.submit(search,
addr,
table_id,
file_ids,
vectors,
topk,
search_params,
span=span)
rs.append(res)
for res in rs:
res.result()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册