提交 c3f140ed 编写于 作者: C chen qingxiang 提交者: Wang Xiangyu

solve conflicts

Signed-off-by: NWang Xiangyu <xy.wang@zilliz.com>
上级 7821ead8
......@@ -21,10 +21,10 @@
#include <algorithm>
#include <boost/filesystem.hpp>
#include <memory>
#include <unordered_map>
#include <utility>
#include "codecs/ExtraFileInfo.h"
#include "db/Utils.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h"
#include "utils/Log.h"
......@@ -33,18 +33,18 @@ namespace codec {
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t num_bytes = stol(map.at("size"));
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(num_bytes);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
......@@ -54,8 +54,6 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (offset < 0 || num_bytes <= 0) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path);
}
......@@ -63,20 +61,20 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
if (!fs_ptr->reader_ptr_->Open(file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t total_num_bytes;
fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t));
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t total_num_bytes = stol(map.at("size"));
offset += MAGIC_SIZE + HEADER_SIZE + sizeof(size_t); // Beginning of file is num_bytes
if (offset + num_bytes > total_num_bytes) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
}
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(num_bytes);
fs_ptr->reader_ptr_->Seekg(offset);
fs_ptr->reader_ptr_->Seekg(offset + MAGIC_SIZE + HEADER_SIZE);
fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
......@@ -86,8 +84,6 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
Status
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges,
engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (read_ranges.empty()) {
return Status::OK();
}
......@@ -95,11 +91,13 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
if (!fs_ptr->reader_ptr_->Open(file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t total_num_bytes;
fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t));
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t total_num_bytes = stol(map.at("size"));
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
int64_t total_bytes = 0;
for (auto& range : read_ranges) {
if (range.offset_ > total_num_bytes) {
......@@ -112,7 +110,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
raw->data_.resize(total_bytes);
int64_t poz = 0;
for (auto& range : read_ranges) {
int64_t offset = MAGIC_SIZE + HEADER_SIZE + sizeof(size_t) + range.offset_;
int64_t offset = MAGIC_SIZE + HEADER_SIZE + range.offset_;
fs_ptr->reader_ptr_->Seekg(offset);
fs_ptr->reader_ptr_->Read(raw->data_.data() + poz, range.num_bytes_);
poz += range.num_bytes_;
......@@ -128,24 +126,27 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_
if (raw == nullptr) {
return Status::OK();
}
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, file_path);
WRITE_HEADER(fs_ptr, file_path, maps);
if (!fs_ptr->writer_ptr_->InOpen(file_path)) {
if (!fs_ptr->writer_ptr_->Open(file_path)) {
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path);
}
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
// TODO: add extra info
WRITE_MAGIC(fs_ptr);
size_t num_bytes = raw->data_.size();
fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t));
HeaderMap maps;
maps.insert(std::make_pair("size", std::to_string(num_bytes)));
std::string header = HeaderWrapper(maps);
WRITE_HEADER(fs_ptr, header);
fs_ptr->writer_ptr_->Write(raw->data_.data(), num_bytes);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, file_path);
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(raw->data_.data()), num_bytes);
fs_ptr->writer_ptr_->Close();
} catch (std::exception& ex) {
std::string err_msg = "Failed to write block data: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
......
......@@ -10,7 +10,40 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
aux_source_directory( ${MILVUS_ENGINE_SRC}/codecs CODECS_FILES )
set(CODECS_FILES
BlockFormat.cpp
Codec.cpp
DeletedDocsFormat.cpp
ExtraFileInfo.cpp
IdBloomFilterFormat.cpp
StructuredIndexFormat.cpp
VectorCompressFormat.cpp
VectorIndexFormat.cpp
)
add_library( codecs STATIC )
target_sources( codecs PRIVATE ${CODECS_FILES} )
target_link_libraries( codecs PRIVATE fiu )
set(LINK_LIBRARY
log
storage
crc32c
libstdc++fs.a
fiu )
target_link_libraries( codecs PUBLIC ${LINK_LIBRARY} )
if ( BUILD_UNIT_TEST )
add_executable( ExtraFileInfoTest )
target_sources( ExtraFileInfoTest PRIVATE ExtraFileInfoTest.cpp)
target_link_libraries( ExtraFileInfoTest
PRIVATE
codecs
gtest
gtest_main
gmock
gmock_main
)
add_test ( NAME ExtraFileInfoTest
COMMAND $<TARGET_FILE:ExtraFileInfoTest>
)
endif()
......@@ -18,16 +18,15 @@
#include "codecs/DeletedDocsFormat.h"
#include <unistd.h>
#include <experimental/filesystem>
#include <utility>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "codecs/ExtraFileInfo.h"
#include "db/Utils.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h"
#include "utils/Log.h"
......@@ -47,20 +46,20 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string&
segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t num_bytes = stol(map.at("size"));
auto deleted_docs_size = num_bytes / sizeof(engine::offset_t);
std::vector<engine::offset_t> deleted_docs_list;
deleted_docs_list.resize(deleted_docs_size);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->reader_ptr_->Read(deleted_docs_list.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
......@@ -74,49 +73,26 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
const segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
// Create a temporary file from the existing file
const std::string temp_path = file_path + ".temp_del";
bool exists = std::experimental::filesystem::exists(full_file_path);
if (exists) {
std::experimental::filesystem::copy_file(full_file_path, temp_path,
std::experimental::filesystem::copy_options::none);
}
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t num_bytes = sizeof(engine::offset_t) * deleted_docs->GetCount();
// Write to the temp file, in order to avoid possible race condition with search (concurrent read and write)
size_t old_num_bytes;
std::vector<engine::offset_t> delete_ids;
if (exists) {
if (!fs_ptr->reader_ptr_->Open(temp_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open tmp deleted docs file: " + temp_path);
}
fs_ptr->reader_ptr_->Read(&old_num_bytes, sizeof(size_t));
delete_ids.resize(old_num_bytes / sizeof(engine::offset_t));
fs_ptr->reader_ptr_->Read(delete_ids.data(), old_num_bytes);
fs_ptr->reader_ptr_->Close();
} else {
old_num_bytes = 0;
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + full_file_path);
}
try {
// TODO: add extra info
WRITE_MAGIC(fs_ptr);
HeaderMap maps;
maps.insert(std::make_pair("size", std::to_string(num_bytes)));
std::string header = HeaderWrapper(maps);
WRITE_HEADER(fs_ptr, header);
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t new_num_bytes = old_num_bytes + sizeof(engine::offset_t) * deleted_docs->GetCount();
if (!deleted_docs_list.empty()) {
delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end());
}
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, temp_path)
WRITE_HEADER(fs_ptr, temp_path, maps);
fs_ptr->writer_ptr_->Write(deleted_docs_list.data(), num_bytes);
if (!fs_ptr->writer_ptr_->InOpen(temp_path)) {
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path);
}
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(deleted_docs_list.data()), num_bytes);
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, temp_path);
// WRITE_SUM(fs_ptr, full_file_path);
} catch (std::exception& ex) {
std::string err_msg = "Failed to write delete doc: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -125,30 +101,20 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
return Status(SERVER_WRITE_ERROR, err_msg);
}
// Move temp file to delete file
try {
std::experimental::filesystem::rename(temp_path, full_file_path);
} catch (std::exception& ex) {
std::string msg = "Failed to rename file [" + temp_path + "] to [" + full_file_path + "]";
LOG_SERVER_ERROR_ << msg;
return Status(SERVER_UNEXPECTED_ERROR, msg);
}
return Status::OK();
}
Status
DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t num_bytes = stol(map.at("size"));
size = num_bytes / sizeof(engine::offset_t);
fs_ptr->reader_ptr_->Close();
......
......@@ -9,18 +9,17 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <iostream>
#include <regex>
#include <utility>
#include <vector>
#include "codecs/ExtraFileInfo.h"
#include "crc32c/crc32c.h"
#include "storage/ExtraFileInfo.h"
const char* MAGIC = "Milvus";
const int64_t MAGIC_SIZE = 6;
const int64_t HEADER_SIZE = 4090;
const int64_t SUM_SIZE = 16;
const int64_t SUM_SIZE = sizeof(uint32_t);
bool
validate(std::string s) {
......@@ -29,145 +28,108 @@ validate(std::string s) {
}
namespace milvus {
namespace storage {
namespace codec {
bool
CheckMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
char* ch = static_cast<char*>(malloc(MAGIC_SIZE));
fs_ptr->reader_ptr_->Read(ch, MAGIC_SIZE);
bool result = !strcmp(ch, MAGIC);
fs_ptr->reader_ptr_->Close();
free(ch);
return result;
void
WriteMagic(const storage::FSHandlerPtr& fs_ptr) {
fs_ptr->writer_ptr_->Write(MAGIC, MAGIC_SIZE);
}
void
WriteMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
if (!fs_ptr->writer_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->Write(const_cast<char*>(MAGIC), MAGIC_SIZE);
fs_ptr->writer_ptr_->Close();
bool
CheckMagic(const storage::FSHandlerPtr& fs_ptr) {
std::vector<char> magic;
magic.resize(MAGIC_SIZE);
fs_ptr->reader_ptr_->Read(magic.data(), MAGIC_SIZE);
return !strcmp(magic.data(), MAGIC);
}
std::unordered_map<std::string, std::string>
ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr) {
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE);
char* ch = static_cast<char*>(malloc(HEADER_SIZE));
fs_ptr->reader_ptr_->Read(ch, HEADER_SIZE);
std::string data(ch);
std::vector<char> data;
data.resize(HEADER_SIZE);
fs_ptr->reader_ptr_->Read(data.data(), HEADER_SIZE);
std::string header(data.begin(), data.end());
auto result = std::unordered_map<std::string, std::string>();
std::regex semicolon(";");
std::vector<std::string> maps(std::sregex_token_iterator(data.begin(), data.end(), semicolon, -1),
std::vector<std::string> maps(std::sregex_token_iterator(header.begin(), header.end(), semicolon, -1),
std::sregex_token_iterator());
std::regex equal("=");
for (auto& item : maps) {
std::vector<std::string> pair(std::sregex_token_iterator(item.begin(), item.end(), equal, -1),
std::sregex_token_iterator());
result.insert(std::make_pair(pair[0], pair[1]));
if (pair.size() == 2) {
result.insert(std::make_pair(pair[0], pair[1]));
}
}
fs_ptr->reader_ptr_->Close();
free(ch);
return result;
}
std::string
ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key) {
auto kv = ReadHeaderValues(fs_ptr, file_path);
ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& key) {
auto kv = ReadHeaderValues(fs_ptr);
return kv.at(key);
}
std::uint32_t
CalculateSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, bool written) {
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
int size = fs_ptr->reader_ptr_->Length();
CalculateSum(const storage::FSHandlerPtr& fs_ptr, bool written) {
auto size = fs_ptr->reader_ptr_->Length();
if (written) {
size -= SUM_SIZE;
}
char* ch = static_cast<char*>(malloc(size));
fs_ptr->reader_ptr_->Read(ch, size);
std::uint32_t result = crc32c::Crc32c(ch, size);
fs_ptr->reader_ptr_->Close();
free(ch);
fs_ptr->reader_ptr_->Seekg(0);
std::vector<char> data;
data.resize(size);
fs_ptr->reader_ptr_->Read(data.data(), size);
std::uint32_t result = crc32c::Crc32c(data.data(), size);
return result;
}
void
WriteSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, uint32_t result, bool written) {
if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (written) {
fs_ptr->writer_ptr_->Seekp(-SUM_SIZE, std::ios_base::end);
} else {
fs_ptr->writer_ptr_->Seekp(0, std::ios_base::end);
}
std::uint32_t
CalculateSum(char* data, size_t size) {
std::uint32_t result = crc32c::Crc32c(data, size);
return result;
}
fs_ptr->writer_ptr_->Write(&result, SUM_SIZE);
fs_ptr->writer_ptr_->Close();
void
WriteSum(const storage::FSHandlerPtr& fs_ptr, std::string header, char* data, size_t data_size) {
std::vector<char> total;
total.resize(MAGIC_SIZE + HEADER_SIZE + data_size);
memcpy(total.data(), MAGIC, MAGIC_SIZE);
memcpy(total.data() + MAGIC_SIZE, header.data(), HEADER_SIZE);
memcpy(total.data() + MAGIC_SIZE + HEADER_SIZE, data, data_size);
auto result_sum = CalculateSum(total.data(), MAGIC_SIZE + HEADER_SIZE + data_size);
fs_ptr->writer_ptr_->Write(&result_sum, SUM_SIZE);
}
bool
CheckSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
uint32_t result = CalculateSum(fs_ptr, file_path, true);
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->Seekg(-SUM_SIZE, std::ios_base::end);
CheckSum(const storage::FSHandlerPtr& fs_ptr) {
auto length = fs_ptr->reader_ptr_->Length();
fs_ptr->reader_ptr_->Seekg(length - SUM_SIZE);
uint32_t record;
fs_ptr->reader_ptr_->Read(&record, SUM_SIZE);
fs_ptr->reader_ptr_->Close();
uint32_t result = CalculateSum(fs_ptr, true);
return record == result;
}
bool
WriteHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key,
const std::string& value) {
auto record = ReadHeaderValues(fs_ptr, file_path);
record.insert(std::make_pair(key, value));
WriteHeaderValues(fs_ptr, file_path, record);
WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& kv) {
fs_ptr->writer_ptr_->Write(kv.data(), HEADER_SIZE);
return true;
}
bool
WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const std::unordered_map<std::string, std::string>& maps) {
if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE);
std::string
HeaderWrapper(const std::unordered_map<std::string, std::string>& maps) {
std::string kv;
for (auto& map : maps) {
if (validate(map.first) && validate(map.second)) {
......@@ -179,12 +141,11 @@ WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
if (kv.size() > HEADER_SIZE) {
throw "Exceeded the limit of header data size";
}
kv.resize(HEADER_SIZE, ' ');
fs_ptr->writer_ptr_->Write(kv.data(), HEADER_SIZE);
fs_ptr->writer_ptr_->Close();
return true;
return kv;
}
} // namespace storage
} // namespace codec
} // namespace milvus
......@@ -22,11 +22,9 @@
#include <string>
#include <unordered_map>
#include <src/log/Log.h>
#include <src/utils/Error.h>
#include <src/utils/Exception.h>
#include "storage/FSHandler.h"
#include "utils/Error.h"
#include "utils/Exception.h"
extern const char* MAGIC;
extern const int64_t MAGIC_SIZE;
......@@ -34,67 +32,73 @@ extern const int64_t HEADER_SIZE;
extern const int64_t SUM_SIZE;
namespace milvus {
namespace storage {
namespace codec {
#define CHECK_MAGIC_VALID(PTR, FILE_PATH) \
if (!CheckMagic(PTR, FILE_PATH)) { \
throw Exception(SERVER_FILE_MAGIC_BYTES_ERROR, "wrong magic bytes"); \
#define CHECK_MAGIC_VALID(PTR) \
if (!CheckMagic(PTR)) { \
LOG_ENGINE_DEBUG_ << "Wrong Magic bytes"; \
throw Exception(SERVER_FILE_MAGIC_BYTES_ERROR, "Wrong magic bytes"); \
}
#define CHECK_SUM_VALID(PTR, FILE_PATH) \
if (!CheckSum(PTR, FILE_PATH)) { \
throw Exception(SERVER_FILE_SUM_BYTES_ERROR, "wrong sum bytes,file may be changed"); \
#define CHECK_SUM_VALID(PTR) \
if (!CheckSum(PTR)) { \
LOG_ENGINE_DEBUG_ << "Wrong sum bytes, file has been changed"; \
throw Exception(SERVER_FILE_SUM_BYTES_ERROR, "Wrong sum bytes, file has been changed"); \
}
#define WRITE_MAGIC(PTR, FILE_PATH) \
try { \
WriteMagic(PTR, FILE_PATH); \
} catch (...) { \
throw "write magic failed"; \
#define WRITE_MAGIC(PTR) \
try { \
WriteMagic(PTR); \
} catch (...) { \
LOG_ENGINE_DEBUG_ << "Write Magic failed"; \
throw "Write Magic failed"; \
}
#define WRITE_HEADER(PTR, FILE_PATH, KV) \
try { \
WriteHeaderValues(PTR, FILE_PATH, KV); \
} catch (...) { \
throw "write sum failed"; \
#define WRITE_HEADER(PTR, KV) \
try { \
WriteHeaderValues(PTR, KV); \
} catch (...) { \
LOG_ENGINE_DEBUG_ << "Write header failed"; \
throw "Write header failed"; \
}
#define WRITE_SUM(PTR, FILE_PATH) \
try { \
int result = CalculateSum(PTR, FILE_PATH); \
WriteSum(PTR, FILE_PATH, result); \
} catch (...) { \
throw "write sum failed"; \
#define WRITE_SUM(PTR, HEADER, NUM_BYTES, DATA) \
try { \
WriteSum(PTR, HEADER, NUM_BYTES, DATA); \
} catch (...) { \
LOG_ENGINE_DEBUG_ << "Write sum failed"; \
throw "Write sum failed"; \
}
void
WriteMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
WriteMagic(const storage::FSHandlerPtr& fs_ptr);
bool
CheckMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
CheckMagic(const storage::FSHandlerPtr& fs_ptr);
bool
CheckSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
CheckSum(const storage::FSHandlerPtr& fs_ptr);
void
WriteSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, uint32_t result, bool written = false);
WriteSum(const storage::FSHandlerPtr& fs_ptr, std::string header, char* data, size_t data_size);
std::uint32_t
CalculateSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, bool written = false);
CalculateSum(const storage::FSHandlerPtr& fs_ptr, bool written = false);
std::uint32_t
CalculateSum(char* data, size_t size);
std::string
ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key);
ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& key);
std::unordered_map<std::string, std::string>
ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr);
bool
WriteHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key,
const std::string& value);
WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& kv);
bool
WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const std::unordered_map<std::string, std::string>& maps);
std::string
HeaderWrapper(const std::unordered_map<std::string, std::string>& maps);
} // namespace storage
using HeaderMap = std::unordered_map<std::string, std::string>;
} // namespace codec
} // namespace milvus
......@@ -12,11 +12,11 @@
#include <cstring>
#include <unordered_map>
#include "easyloggingpp/easylogging++.h"
#include "codecs/ExtraFileInfo.h"
#include "crc32c/crc32c.h"
#include "gtest/gtest.h"
#include "utils/Log.h"
#include "ExtraFileInfo.h"
#include "crc32c/crc32c.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
......@@ -24,7 +24,7 @@
INITIALIZE_EASYLOGGINGPP
namespace milvus {
namespace storage {
namespace codec {
/* ExtraFileInfoTest */
class ExtraFileInfoTest : public testing::Test {
......@@ -43,32 +43,37 @@ TEST_F(ExtraFileInfoTest, WriteFileTest) {
auto record = std::unordered_map<std::string, std::string>();
record.insert(std::make_pair("test", "test"));
WriteMagic(fs_ptr, file_path);
WriteHeaderValues(fs_ptr, file_path, record);
if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) {
record.insert(std::make_pair("github", "github"));
if (!fs_ptr->writer_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
}
fs_ptr->writer_ptr_->Seekp(0, std::ios_base::end);
WRITE_MAGIC(fs_ptr);
size_t num_bytes = raw.size();
fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t));
record.insert(std::make_pair("size", std::to_string(num_bytes)));
std::string header = HeaderWrapper(record);
WriteHeaderValues(fs_ptr, header);
fs_ptr->writer_ptr_->Write(raw.data(), num_bytes);
WRITE_SUM(fs_ptr, header, raw.data(), num_bytes);
fs_ptr->writer_ptr_->Close();
int result_sum = CalculateSum(fs_ptr, file_path);
WriteSum(fs_ptr, file_path, result_sum);
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
}
ASSERT_TRUE(CheckMagic(fs_ptr));
std::unordered_map<std::string, std::string> headers = ReadHeaderValues(fs_ptr);
ASSERT_EQ(headers.at("test"), "test");
ASSERT_EQ(headers.at("github"), "github");
ASSERT_EQ(stol(headers.at("size")), num_bytes);
ASSERT_TRUE(CheckSum(fs_ptr, file_path));
ASSERT_EQ(ReadHeaderValue(fs_ptr, file_path, "test"), "test");
fs_ptr->reader_ptr_->Read(raw.data(), num_bytes);
ASSERT_TRUE(WriteHeaderValue(fs_ptr, file_path, "github", "github"));
ASSERT_EQ(ReadHeaderValue(fs_ptr, file_path, "github"), "github");
result_sum = CalculateSum(fs_ptr, file_path, true);
WriteSum(fs_ptr, file_path, result_sum, true);
ASSERT_TRUE(CheckMagic(fs_ptr, file_path));
ASSERT_TRUE(CheckSum(fs_ptr, file_path));
ASSERT_TRUE(CheckSum(fs_ptr));
}
} // namespace storage
} // namespace codec
} // namespace milvus
......@@ -21,13 +21,13 @@
#include <algorithm>
#include <boost/filesystem.hpp>
#include <memory>
#include <unordered_map>
#include <utility>
#include "db/Types.h"
#include "db/Utils.h"
#include "knowhere/index/structured_index/StructuredIndexSort.h"
#include "storage/ExtraFileInfo.h"
#include "codecs/ExtraFileInfo.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......@@ -86,22 +86,22 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
knowhere::BinarySet load_data_list;
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE;
if (length <= 0) {
return Status(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path);
}
size_t rp = MAGIC_SIZE + HEADER_SIZE;
fs_ptr->reader_ptr_->Seekg(rp);
HeaderMap map = ReadHeaderValues(fs_ptr);
int32_t data_type = stol(map.at("type"));
int32_t data_type = 0;
fs_ptr->reader_ptr_->Read(&data_type, sizeof(data_type));
rp += sizeof(data_type) + MAGIC_SIZE + HEADER_SIZE;
size_t rp = MAGIC_SIZE + HEADER_SIZE;
fs_ptr->reader_ptr_->Seekg(rp);
LOG_ENGINE_DEBUG_ << "Start to read_index(" << full_file_path << ") length: " << length << " bytes";
......@@ -149,35 +149,41 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const
milvus::TimeRecorder recorder("StructuredIndexFormat::Write");
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, full_file_path);
WRITE_HEADER(fs_ptr, full_file_path, maps);
auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
}
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type));
WRITE_MAGIC(fs_ptr);
// TODO: add extra info
HeaderMap maps;
maps.insert(std::make_pair("type", std::to_string(static_cast<int32_t>(data_type))));
std::string header = HeaderWrapper(maps);
WRITE_HEADER(fs_ptr, header);
std::vector<char> data;
int64_t offset = 0;
for (auto& iter : binaryset.binary_map_) {
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write(meta, meta_length);
data.resize(data.size() + sizeof(meta_length) + meta_length);
memcpy(data.data() + offset, &meta_length, sizeof(meta_length));
memcpy(data.data() + offset + sizeof(meta_length), meta, meta_length);
offset += sizeof(meta_length) + meta_length;
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length);
data.resize(data.size() + sizeof(binary_length) + binary_length);
memcpy(data.data() + offset, &binary_length, sizeof(binary_length));
memcpy(data.data() + offset + sizeof(binary_length), binary->data.get(), binary_length);
offset += sizeof(binary_length) + binary_length;
}
fs_ptr->writer_ptr_->Write(data.data(), data.size());
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(data.data()), data.size());
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
......
......@@ -19,10 +19,10 @@
#include <memory>
#include <unordered_map>
#include "codecs/ExtraFileInfo.h"
#include "codecs/VectorCompressFormat.h"
#include "db/Utils.h"
#include "knowhere/common/BinarySet.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......@@ -44,11 +44,11 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
milvus::TimeRecorder recorder("VectorCompressFormat::Read");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
int64_t length = fs_ptr->reader_ptr_->Length() - MAGIC_SIZE - HEADER_SIZE - SUM_SIZE;
if (length <= 0) {
......@@ -76,19 +76,23 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri
milvus::TimeRecorder recorder("VectorCompressFormat::Write");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, full_file_path);
WRITE_HEADER(fs_ptr, full_file_path, maps);
if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path);
}
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
// TODO: add extra info
WRITE_MAGIC(fs_ptr);
HeaderMap maps;
std::string header = HeaderWrapper(maps);
WRITE_HEADER(fs_ptr, header);
fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size);
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(compress->data.get()), compress->size);
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End");
double rate = compress->size * 1000000.0 / span / 1024 / 1024;
......
......@@ -17,15 +17,14 @@
#include <boost/filesystem.hpp>
#include <memory>
#include <unordered_map>
#include "codecs/Codec.h"
#include "codecs/ExtraFileInfo.h"
#include "codecs/VectorIndexFormat.h"
#include "db/Utils.h"
#include "knowhere/common/BinarySet.h"
#include "knowhere/index/vector_index/VecIndex.h"
#include "knowhere/index/vector_index/VecIndexFactory.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......@@ -45,23 +44,21 @@ Status
VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& data) {
milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw");
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
HeaderMap map = ReadHeaderValues(fs_ptr);
size_t num_bytes = stol(map.at("size"));
data = std::make_shared<knowhere::Binary>();
data->size = num_bytes;
data->data = std::shared_ptr<uint8_t[]>(new uint8_t[num_bytes]);
// Beginning of file is num_bytes
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE + sizeof(size_t));
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->reader_ptr_->Read(data->data.get(), num_bytes);
fs_ptr->reader_ptr_->Close();
......@@ -78,11 +75,11 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str
milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex");
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
}
CHECK_MAGIC_VALID(fs_ptr);
CHECK_SUM_VALID(fs_ptr);
int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE;
if (length <= 0) {
......@@ -188,18 +185,23 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex");
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, full_file_path);
WRITE_HEADER(fs_ptr, full_file_path, maps);
auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
return Status(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
}
try {
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
WRITE_MAGIC(fs_ptr);
// TODO: add extra info
HeaderMap maps;
std::string header = HeaderWrapper(maps);
WRITE_HEADER(fs_ptr, header);
std::vector<char> data;
int64_t offset = 0;
for (auto& iter : binaryset.binary_map_) {
if (iter.first == RAW_DATA || iter.first == QUANTIZATION_DATA) {
continue; // the two kinds of data will be written into another file
......@@ -207,17 +209,23 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write(meta, meta_length);
data.resize(data.size() + sizeof(meta_length) + meta_length);
memcpy(data.data() + offset, &meta_length, sizeof(meta_length));
memcpy(data.data() + offset + sizeof(meta_length), meta, meta_length);
offset += sizeof(meta_length) + meta_length;
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write(binary->data.get(), binary_length);
data.resize(data.size() + sizeof(binary_length) + binary_length);
memcpy(data.data() + offset, &binary_length, sizeof(binary_length));
memcpy(data.data() + offset + sizeof(binary_length), binary->data.get(), binary_length);
offset += sizeof(binary_length) + binary_length;
}
fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
fs_ptr->writer_ptr_->Write(data.data(), data.size());
WRITE_SUM(fs_ptr, header, reinterpret_cast<char*>(data.data()), data.size());
fs_ptr->writer_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
......
......@@ -178,6 +178,7 @@ TaskTable::PickToLoad(uint64_t limit) {
return std::vector<uint64_t>();
}
} else if (table_[index]->state == TaskTableItemState::START) {
cross = true;
auto task = table_[index]->task;
// if task is a build index task, limit it
......@@ -187,7 +188,6 @@ TaskTable::PickToLoad(uint64_t limit) {
continue;
}
}
cross = true;
indexes.push_back(index);
++pick_count;
} else {
......
......@@ -44,8 +44,6 @@ CmdReq::OnExecute() {
result_ = MILVUS_VERSION;
} else if (cmd_ == "status") {
result_ = "OK";
} else if (cmd_ == "tasktable") {
result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables();
} else if (cmd_ == "mode") {
#ifdef MILVUS_GPU_VERSION
result_ = "GPU";
......
......@@ -10,58 +10,13 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
set( STORAGE_SRCS
ExtraFileInfo.cpp
ExtraFileInfo.h
FSHandler.h
IOReader.h
IOWriter.h
Operation.h
disk/DiskIOReader.h
disk/DiskIOWriter.h
disk/DiskOperation.h
disk/DiskIOReader.cpp
disk/DiskIOWriter.cpp
disk/DiskOperation.cpp
# s3/S3ClientMock.h
# s3/S3ClientWrapper.h
# s3/S3ClientWrapper.cpp
# s3/S3IOReader.h
# s3/S3IOReader.cpp
# s3/S3IOWriter.h
# s3/S3IOWriter.cpp
)
#aux_source_directory( ${MILVUS_ENGINE_SRC}/storage STORAGE_MAIN_FILES )
#aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/disk STORAGE_DISK_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/storage STORAGE_MAIN_FILES )
aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/disk STORAGE_DISK_FILES )
# aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/s3 STORAGE_S3_FILES )
#set( STORAGE_FILES ${STORAGE_MAIN_FILES}
# ${STORAGE_DISK_FILES}
set( STORAGE_FILES ${STORAGE_MAIN_FILES}
${STORAGE_DISK_FILES}
# ${STORAGE_S3_FILES}
# )
)
add_library( storage STATIC )
target_sources( storage PRIVATE ${STORAGE_SRCS} )
add_dependencies( storage fiu )
target_link_libraries( storage
log
crc32c
fiu
libboost_filesystem.a
libstdc++fs.a
)
if ( BUILD_UNIT_TEST )
add_executable( ExtraFileInfoTest )
target_sources( ExtraFileInfoTest PRIVATE ExtraFileInfoTest.cpp)
target_link_libraries( ExtraFileInfoTest
storage
gtest
gtest_main
gmock
gmock_main )
add_test ( NAME ExtraFileInfoTest
COMMAND $<TARGET_FILE:ExtraFileInfoTest>
)
endif()
target_sources( storage PRIVATE ${STORAGE_FILES} )
target_link_libraries( storage PRIVATE fiu log )
......@@ -28,9 +28,6 @@ class IOReader {
virtual void
Seekg(int64_t pos) = 0;
virtual void
Seekg(int64_t pos, std::ios_base::seekdir seekdir) = 0;
virtual int64_t
Length() = 0;
......
......@@ -22,18 +22,9 @@ class IOWriter {
virtual bool
Open(const std::string& name) = 0;
virtual bool
InOpen(const std::string& name) = 0;
virtual void
Write(const void* ptr, int64_t size) = 0;
virtual void
Seekp(int64_t pos) = 0;
virtual void
Seekp(int64_t pos, std::ios_base::seekdir seekdir) = 0;
virtual int64_t
Length() = 0;
......
......@@ -30,10 +30,6 @@ void
DiskIOReader::Seekg(int64_t pos) {
fs_.seekg(pos);
}
void
DiskIOReader::Seekg(int64_t pos, std::ios_base::seekdir seekdir) {
fs_.seekg(pos, seekdir);
}
int64_t
DiskIOReader::Length() {
......
......@@ -42,9 +42,6 @@ class DiskIOReader : public IOReader {
void
Seekg(int64_t pos) override;
void
Seekg(int64_t pos, std::ios_base::seekdir seekdir) override;
int64_t
Length() override;
......
......@@ -21,13 +21,6 @@ DiskIOWriter::Open(const std::string& name) {
fs_ = std::fstream(name_, std::ios::out | std::ios::binary);
return fs_.good();
}
bool
DiskIOWriter::InOpen(const std::string& name) {
name_ = name;
len_ = 0;
fs_ = std::fstream(name_, std::ios::out | std::ios::binary | std::ios::in);
return fs_.good();
}
void
DiskIOWriter::Write(const void* ptr, int64_t size) {
......@@ -44,14 +37,6 @@ void
DiskIOWriter::Close() {
fs_.close();
}
void
DiskIOWriter::Seekp(int64_t pos) {
fs_.seekp(pos);
}
void
DiskIOWriter::Seekp(int64_t pos, std::ios_base::seekdir seekdir) {
fs_.seekp(pos, seekdir);
}
} // namespace storage
} // namespace milvus
......@@ -36,18 +36,9 @@ class DiskIOWriter : public IOWriter {
bool
Open(const std::string& name) override;
bool
InOpen(const std::string& name) override;
void
Write(const void* ptr, int64_t size) override;
void
Seekp(int64_t pos) override;
void
Seekp(int64_t pos, std::ios_base::seekdir seekdir) override;
int64_t
Length() override;
......
......@@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "utils/Status.h"
#include "memory"
#include <cstring>
......@@ -22,16 +23,14 @@ Status::Status(StatusCode code, const std::string& msg) {
// 4 bytes store message length
// the left bytes store message string
auto length = static_cast<uint32_t>(msg.size());
auto result = new char[length + sizeof(length) + CODE_WIDTH];
std::memcpy(result, &code, CODE_WIDTH);
std::memcpy(result + CODE_WIDTH, &length, sizeof(length));
memcpy(result + sizeof(length) + CODE_WIDTH, msg.data(), length);
state_ = result;
// auto result = new char[length + sizeof(length) + CODE_WIDTH];
state_.resize(length + sizeof(length) + CODE_WIDTH);
std::memcpy(state_.data(), &code, CODE_WIDTH);
std::memcpy(state_.data() + CODE_WIDTH, &length, sizeof(length));
memcpy(state_.data() + sizeof(length) + CODE_WIDTH, msg.data(), length);
}
Status::~Status() {
delete state_;
}
Status::Status(const Status& s) {
......@@ -56,37 +55,35 @@ Status::operator=(Status&& s) noexcept {
void
Status::CopyFrom(const Status& s) {
delete state_;
state_ = nullptr;
if (s.state_ == nullptr) {
state_.clear();
if (s.state_.empty()) {
return;
}
uint32_t length = 0;
memcpy(&length, s.state_ + CODE_WIDTH, sizeof(length));
memcpy(&length, s.state_.data() + CODE_WIDTH, sizeof(length));
int buff_len = length + sizeof(length) + CODE_WIDTH;
state_ = new char[buff_len];
memcpy(state_, s.state_, buff_len);
state_.resize(buff_len);
memcpy(state_.data(), s.state_.data(), buff_len);
}
void
Status::MoveFrom(Status& s) {
delete state_;
state_ = s.state_;
s.state_ = nullptr;
s.state_.clear();
}
std::string
Status::message() const {
if (state_ == nullptr) {
if (state_.empty()) {
return "OK";
}
std::string msg;
uint32_t length = 0;
memcpy(&length, state_ + CODE_WIDTH, sizeof(length));
memcpy(&length, state_.data() + CODE_WIDTH, sizeof(length));
if (length > 0) {
msg.append(state_ + sizeof(length) + CODE_WIDTH, length);
msg.append(state_.data() + sizeof(length) + CODE_WIDTH, length);
}
return msg;
......@@ -94,7 +91,7 @@ Status::message() const {
std::string
Status::ToString() const {
if (state_ == nullptr) {
if (state_.empty()) {
return "OK";
}
......
......@@ -51,12 +51,12 @@ class Status {
bool
ok() const {
return state_ == nullptr || code() == 0;
return state_.empty() || code() == 0;
}
StatusCode
code() const {
return (state_ == nullptr) ? 0 : *(StatusCode*)(state_);
return (state_.empty()) ? 0 : *(StatusCode*)(state_.data());
}
std::string
......@@ -73,7 +73,7 @@ class Status {
MoveFrom(Status& s);
private:
char* state_ = nullptr;
std::string state_;
}; // Status
} // namespace milvus
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册