未验证 提交 7ed6edc5 编写于 作者: C Cai Yudong 提交者: GitHub

Caiyd 1883 fix rw (#1926)

* #1883 use DiskIO
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix logic error
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update changelog
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* retry CI
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* Update CHANGELOG
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* update changelog
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
Co-authored-by: NJinHai-CN <hai.jin@zilliz.com>
上级 c60babeb
......@@ -2,7 +2,7 @@
Please mark all change in change log and use the issue from GitHub
# Milvus 0.8.0 (TBD)
# Milvus 0.8.0 (2020-04-14)
## Bug
- \#1276 SQLite throw exception after create 50000+ partitions in a table
......@@ -11,6 +11,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1832 Fix crash in tracing module
- \#1873 Fix index file serialize to incorrect path
- \#1881 Fix bad alloc when index files lost
- \#1883 Fix inserted vectors becomes all zero when index_file_size >= 2GB
- \#1901 Search failed with flat index
- \#1903 Fix invalid annoy result
- \#1910 C++ SDK GetIDsInSegment could not work for large dataset
......@@ -36,7 +37,6 @@ Please mark all change in change log and use the issue from GitHub
## Task
# Milvus 0.7.1 (2020-03-29)
## Bug
......@@ -707,7 +707,7 @@ Please mark all change in change log and use the issue from GitHub
- MS-37 Add query, cache usage, disk write speed and file data size metrics
- MS-30 Use faiss v1.5.2
- MS-54 cmake: Change Thrift third party URL to github.com
- MS-69 prometheus: add all proposed metrics
- MS-69 Prometheus: add all proposed metrics
## Task
......
......@@ -31,74 +31,44 @@ namespace milvus {
namespace codec {
void
DefaultVectorsFormat::read_vectors_internal(const std::string& file_path, off_t offset, size_t num,
std::vector<uint8_t>& raw_vectors) {
int rv_fd = open(file_path.c_str(), O_RDONLY, 00664);
if (rv_fd == -1) {
DefaultVectorsFormat::read_vectors_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
off_t offset, size_t num, std::vector<uint8_t>& raw_vectors) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
}
size_t num_bytes;
if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
num = std::min(num, num_bytes - offset);
offset += sizeof(size_t); // Beginning of file is num_bytes
int off = lseek(rv_fd, offset, SEEK_SET);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->seekg(offset);
raw_vectors.resize(num / sizeof(uint8_t));
if (::read(rv_fd, raw_vectors.data(), num) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(raw_vectors.data(), num);
if (::close(rv_fd) == -1) {
std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->close();
}
void
DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vector<segment::doc_id_t>& uids) {
int uid_fd = open(file_path.c_str(), O_RDONLY, 00664);
if (uid_fd == -1) {
DefaultVectorsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
std::vector<segment::doc_id_t>& uids) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
}
size_t num_bytes;
if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
uids.resize(num_bytes / sizeof(segment::doc_id_t));
if (::read(uid_fd, uids.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(uids.data(), num_bytes);
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->close();
}
void
......@@ -121,13 +91,13 @@ DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::Vectors
const auto& path = it->path();
if (path.extension().string() == raw_vector_extension_) {
std::vector<uint8_t> vector_list;
read_vectors_internal(path.string(), 0, INT64_MAX, vector_list);
read_vectors_internal(fs_ptr, path.string(), 0, INT64_MAX, vector_list);
vectors_read->AddData(vector_list);
vectors_read->SetName(path.stem().string());
}
if (path.extension().string() == user_id_extension_) {
std::vector<segment::doc_id_t> uids;
read_uids_internal(path.string(), uids);
read_uids_internal(fs_ptr, path.string(), uids);
vectors_read->AddUids(uids);
}
}
......@@ -144,54 +114,28 @@ DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::
TimeRecorder rc("write vectors");
int rv_fd = open(rv_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (rv_fd == -1) {
if (!fs_ptr->writer_ptr_->open(rv_file_path.c_str())) {
std::string err_msg = "Failed to open file: " + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t rv_num_bytes = vectors->GetData().size() * sizeof(uint8_t);
if (::write(rv_fd, &rv_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(rv_fd, vectors->GetData().data(), rv_num_bytes) == -1) {
std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(rv_fd) == -1) {
std::string err_msg = "Failed to close file: " + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->write(&rv_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->write((void*)vectors->GetData().data(), rv_num_bytes);
fs_ptr->writer_ptr_->close();
rc.RecordSection("write rv done");
int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (uid_fd == -1) {
if (!fs_ptr->writer_ptr_->open(uid_file_path.c_str())) {
std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t uid_num_bytes = vectors->GetUids().size() * sizeof(segment::doc_id_t);
if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(uid_fd, vectors->GetUids().data(), uid_num_bytes) == -1) {
std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->write(&uid_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->write((void*)vectors->GetUids().data(), uid_num_bytes);
fs_ptr->writer_ptr_->close();
rc.RecordSection("write uids done");
}
......@@ -215,7 +159,7 @@ DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == user_id_extension_) {
read_uids_internal(path.string(), uids);
read_uids_internal(fs_ptr, path.string(), uids);
}
}
}
......@@ -240,7 +184,7 @@ DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t of
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == raw_vector_extension_) {
read_vectors_internal(path.string(), offset, num_bytes, raw_vectors);
read_vectors_internal(fs_ptr, path.string(), offset, num_bytes, raw_vectors);
}
}
}
......
......@@ -55,10 +55,12 @@ class DefaultVectorsFormat : public VectorsFormat {
private:
void
read_vectors_internal(const std::string&, off_t, size_t, std::vector<uint8_t>&);
read_vectors_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset, size_t num,
std::vector<uint8_t>& raw_vectors);
void
read_uids_internal(const std::string&, std::vector<segment::doc_id_t>&);
read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
std::vector<segment::doc_id_t>& uids);
private:
std::mutex mutex_;
......
......@@ -63,6 +63,7 @@ constexpr ErrorCode SERVER_CANNOT_CREATE_FILE = ToServerErrorCode(9);
constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10);
constexpr ErrorCode SERVER_CANNOT_DELETE_FILE = ToServerErrorCode(11);
constexpr ErrorCode SERVER_BUILD_INDEX_ERROR = ToServerErrorCode(12);
constexpr ErrorCode SERVER_CANNOT_OPEN_FILE = ToServerErrorCode(13);
constexpr ErrorCode SERVER_COLLECTION_NOT_EXIST = ToServerErrorCode(100);
constexpr ErrorCode SERVER_INVALID_COLLECTION_NAME = ToServerErrorCode(101);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册