提交 28a0f1de 编写于 作者: Y yudong.cai

#1548 move store/Directory.cpp to storage

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 4b9dc026
...@@ -125,8 +125,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files) ...@@ -125,8 +125,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files) aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files)
set(engine_files set(engine_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
${cache_files} ${cache_files}
...@@ -143,7 +141,6 @@ set(engine_files ...@@ -143,7 +141,6 @@ set(engine_files
${codecs_files} ${codecs_files}
${codecs_default_files} ${codecs_default_files}
${segment_files} ${segment_files}
${store_files}
) )
if (MILVUS_WITH_PROMETHEUS) if (MILVUS_WITH_PROMETHEUS)
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <memory> #include <memory>
#include "segment/DeletedDocs.h" #include "segment/DeletedDocs.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
namespace milvus { namespace milvus {
namespace codec { namespace codec {
...@@ -28,10 +28,10 @@ namespace codec { ...@@ -28,10 +28,10 @@ namespace codec {
class DeletedDocsFormat { class DeletedDocsFormat {
public: public:
virtual void virtual void
read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0;
virtual void virtual void
write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0;
}; };
using DeletedDocsFormatPtr = std::shared_ptr<DeletedDocsFormat>; using DeletedDocsFormatPtr = std::shared_ptr<DeletedDocsFormat>;
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <memory> #include <memory>
#include "segment/IdBloomFilter.h" #include "segment/IdBloomFilter.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
namespace milvus { namespace milvus {
namespace codec { namespace codec {
...@@ -28,13 +28,13 @@ namespace codec { ...@@ -28,13 +28,13 @@ namespace codec {
class IdBloomFilterFormat { class IdBloomFilterFormat {
public: public:
virtual void virtual void
read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
virtual void virtual void
write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
virtual void virtual void
create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
}; };
using IdBloomFilterFormatPtr = std::shared_ptr<IdBloomFilterFormat>; using IdBloomFilterFormatPtr = std::shared_ptr<IdBloomFilterFormat>;
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <vector> #include <vector>
#include "segment/Vectors.h" #include "segment/Vectors.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
namespace milvus { namespace milvus {
namespace codec { namespace codec {
...@@ -29,16 +29,16 @@ namespace codec { ...@@ -29,16 +29,16 @@ namespace codec {
class VectorsFormat { class VectorsFormat {
public: public:
virtual void virtual void
read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0; read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0;
virtual void virtual void
write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0; write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0;
virtual void virtual void
read_uids(const store::DirectoryPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) = 0; read_uids(const storage::OperationPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) = 0;
virtual void virtual void
read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes, read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_vectors) = 0; std::vector<uint8_t>& raw_vectors) = 0;
}; };
......
...@@ -35,10 +35,10 @@ namespace milvus { ...@@ -35,10 +35,10 @@ namespace milvus {
namespace codec { namespace codec {
void void
DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) { DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664); int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664);
...@@ -75,10 +75,11 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment ...@@ -75,10 +75,11 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment
} }
void void
DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) { DefaultDeletedDocsFormat::write(const storage::OperationPtr& directory_ptr,
const segment::DeletedDocsPtr& deleted_docs) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
// Create a temporary file from the existing file // Create a temporary file from the existing file
......
...@@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat { ...@@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat {
DefaultDeletedDocsFormat() = default; DefaultDeletedDocsFormat() = default;
void void
read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override; read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override;
void void
write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override; write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override;
// No copy and move // No copy and move
DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete; DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete;
......
...@@ -30,11 +30,11 @@ constexpr unsigned int bloom_filter_capacity = 500000; ...@@ -30,11 +30,11 @@ constexpr unsigned int bloom_filter_capacity = 500000;
constexpr double bloom_filter_error_rate = 0.01; constexpr double bloom_filter_error_rate = 0.01;
void void
DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr, DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr,
segment::IdBloomFilterPtr& id_bloom_filter_ptr) { segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
scaling_bloom_t* bloom_filter = scaling_bloom_t* bloom_filter =
new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
...@@ -48,11 +48,11 @@ DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr, ...@@ -48,11 +48,11 @@ DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr,
} }
void void
DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr, DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr,
const segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) { if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) {
std::string err_msg = std::string err_msg =
...@@ -63,9 +63,9 @@ DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr, ...@@ -63,9 +63,9 @@ DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr,
} }
void void
DefaultIdBloomFilterFormat::create(const store::DirectoryPtr& directory_ptr, DefaultIdBloomFilterFormat::create(const storage::OperationPtr& directory_ptr,
segment::IdBloomFilterPtr& id_bloom_filter_ptr) { segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
scaling_bloom_t* bloom_filter = scaling_bloom_t* bloom_filter =
new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "codecs/IdBloomFilterFormat.h" #include "codecs/IdBloomFilterFormat.h"
#include "segment/IdBloomFilter.h" #include "segment/IdBloomFilter.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
namespace milvus { namespace milvus {
namespace codec { namespace codec {
...@@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat { ...@@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat {
DefaultIdBloomFilterFormat() = default; DefaultIdBloomFilterFormat() = default;
void void
read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
void void
write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
void void
create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
// No copy and move // No copy and move
DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete; DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete;
......
...@@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect ...@@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect
} }
void void
DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) { DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) { if (!boost::filesystem::is_directory(dir_path)) {
std::string err_msg = "Directory: " + dir_path + "does not exist"; std::string err_msg = "Directory: " + dir_path + "does not exist";
ENGINE_LOG_ERROR << err_msg; ENGINE_LOG_ERROR << err_msg;
...@@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve ...@@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve
} }
void void
DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) { DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_; const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_;
const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_; const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_;
...@@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm ...@@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
} }
void void
DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) { DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) { if (!boost::filesystem::is_directory(dir_path)) {
std::string err_msg = "Directory: " + dir_path + "does not exist"; std::string err_msg = "Directory: " + dir_path + "does not exist";
ENGINE_LOG_ERROR << err_msg; ENGINE_LOG_ERROR << err_msg;
...@@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v ...@@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v
} }
void void
DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes, DefaultVectorsFormat::read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_vectors) { std::vector<uint8_t>& raw_vectors) {
const std::lock_guard<std::mutex> lock(mutex_); const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath(); std::string dir_path = directory_ptr->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) { if (!boost::filesystem::is_directory(dir_path)) {
std::string err_msg = "Directory: " + dir_path + "does not exist"; std::string err_msg = "Directory: " + dir_path + "does not exist";
ENGINE_LOG_ERROR << err_msg; ENGINE_LOG_ERROR << err_msg;
......
...@@ -32,16 +32,17 @@ class DefaultVectorsFormat : public VectorsFormat { ...@@ -32,16 +32,17 @@ class DefaultVectorsFormat : public VectorsFormat {
DefaultVectorsFormat() = default; DefaultVectorsFormat() = default;
void void
read(const store::DirectoryPtr&, segment::VectorsPtr&) override; read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) override;
void void
write(const store::DirectoryPtr&, const segment::VectorsPtr&) override; write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) override;
void void
read_vectors(const store::DirectoryPtr&, off_t, size_t, std::vector<uint8_t>&) override; read_uids(const storage::OperationPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) override;
void void
read_uids(const store::DirectoryPtr&, std::vector<segment::doc_id_t>&) override; read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_vectors) override;
// No copy and move // No copy and move
DefaultVectorsFormat(const DefaultVectorsFormat&) = delete; DefaultVectorsFormat(const DefaultVectorsFormat&) = delete;
......
...@@ -21,14 +21,14 @@ ...@@ -21,14 +21,14 @@
#include "Vectors.h" #include "Vectors.h"
#include "codecs/default/DefaultCodec.h" #include "codecs/default/DefaultCodec.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
#include "utils/Log.h" #include "utils/Log.h"
namespace milvus { namespace milvus {
namespace segment { namespace segment {
SegmentReader::SegmentReader(const std::string& directory) { SegmentReader::SegmentReader(const std::string& directory) {
directory_ptr_ = std::make_shared<store::Directory>(directory); directory_ptr_ = std::make_shared<storage::DiskOperation>(directory);
segment_ptr_ = std::make_shared<Segment>(); segment_ptr_ = std::make_shared<Segment>();
} }
...@@ -43,7 +43,7 @@ SegmentReader::Load() { ...@@ -43,7 +43,7 @@ SegmentReader::Load() {
// TODO(zhiru) // TODO(zhiru)
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_); default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_);
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_);
} catch (std::exception& e) { } catch (std::exception& e) {
...@@ -56,7 +56,7 @@ Status ...@@ -56,7 +56,7 @@ Status
SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector<uint8_t>& raw_vectors) { SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector<uint8_t>& raw_vectors) {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors); default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors);
} catch (std::exception& e) { } catch (std::exception& e) {
std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); std::string err_msg = "Failed to load raw vectors: " + std::string(e.what());
...@@ -70,7 +70,7 @@ Status ...@@ -70,7 +70,7 @@ Status
SegmentReader::LoadUids(std::vector<doc_id_t>& uids) { SegmentReader::LoadUids(std::vector<doc_id_t>& uids) {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids); default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids);
} catch (std::exception& e) { } catch (std::exception& e) {
std::string err_msg = "Failed to load uids: " + std::string(e.what()); std::string err_msg = "Failed to load uids: " + std::string(e.what());
...@@ -90,7 +90,7 @@ Status ...@@ -90,7 +90,7 @@ Status
SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr); default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr);
} catch (std::exception& e) { } catch (std::exception& e) {
std::string err_msg = "Failed to load bloom filter: " + std::string(e.what()); std::string err_msg = "Failed to load bloom filter: " + std::string(e.what());
...@@ -104,7 +104,7 @@ Status ...@@ -104,7 +104,7 @@ Status
SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr);
} catch (std::exception& e) { } catch (std::exception& e) {
std::string err_msg = "Failed to load deleted docs: " + std::string(e.what()); std::string err_msg = "Failed to load deleted docs: " + std::string(e.what());
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include <vector> #include <vector>
#include "segment/Types.h" #include "segment/Types.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
#include "utils/Status.h" #include "utils/Status.h"
namespace milvus { namespace milvus {
...@@ -55,7 +55,7 @@ class SegmentReader { ...@@ -55,7 +55,7 @@ class SegmentReader {
GetSegment(SegmentPtr& segment_ptr); GetSegment(SegmentPtr& segment_ptr);
private: private:
store::DirectoryPtr directory_ptr_; storage::OperationPtr directory_ptr_;
SegmentPtr segment_ptr_; SegmentPtr segment_ptr_;
}; };
......
...@@ -23,14 +23,14 @@ ...@@ -23,14 +23,14 @@
#include "SegmentReader.h" #include "SegmentReader.h"
#include "Vectors.h" #include "Vectors.h"
#include "codecs/default/DefaultCodec.h" #include "codecs/default/DefaultCodec.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
#include "utils/Log.h" #include "utils/Log.h"
namespace milvus { namespace milvus {
namespace segment { namespace segment {
SegmentWriter::SegmentWriter(const std::string& directory) { SegmentWriter::SegmentWriter(const std::string& directory) {
directory_ptr_ = std::make_shared<store::Directory>(directory); directory_ptr_ = std::make_shared<storage::DiskOperation>(directory);
segment_ptr_ = std::make_shared<Segment>(); segment_ptr_ = std::make_shared<Segment>();
} }
...@@ -84,7 +84,7 @@ Status ...@@ -84,7 +84,7 @@ Status
SegmentWriter::WriteVectors() { SegmentWriter::WriteVectors() {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_); default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_);
} catch (std::exception& e) { } catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what()); std::string err_msg = "Failed to write vectors: " + std::string(e.what());
...@@ -98,7 +98,7 @@ Status ...@@ -98,7 +98,7 @@ Status
SegmentWriter::WriteBloomFilter() { SegmentWriter::WriteBloomFilter() {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
auto start = std::chrono::high_resolution_clock::now(); auto start = std::chrono::high_resolution_clock::now();
...@@ -138,7 +138,7 @@ Status ...@@ -138,7 +138,7 @@ Status
SegmentWriter::WriteDeletedDocs() { SegmentWriter::WriteDeletedDocs() {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>(); DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr); default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr);
} catch (std::exception& e) { } catch (std::exception& e) {
...@@ -153,7 +153,7 @@ Status ...@@ -153,7 +153,7 @@ Status
SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) { SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs); default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs);
} catch (std::exception& e) { } catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
...@@ -167,7 +167,7 @@ Status ...@@ -167,7 +167,7 @@ Status
SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) { SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) {
codec::DefaultCodec default_codec; codec::DefaultCodec default_codec;
try { try {
directory_ptr_->Create(); directory_ptr_->CreateDirectory();
default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr); default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr);
} catch (std::exception& e) { } catch (std::exception& e) {
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
...@@ -191,11 +191,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) { ...@@ -191,11 +191,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) {
Status Status
SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
if (dir_to_merge == directory_ptr_->GetDirPath()) { if (dir_to_merge == directory_ptr_->GetDirectory()) {
return Status(DB_ERROR, "Cannot Merge Self"); return Status(DB_ERROR, "Cannot Merge Self");
} }
ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirPath(); ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirectory();
auto start = std::chrono::high_resolution_clock::now(); auto start = std::chrono::high_resolution_clock::now();
...@@ -234,7 +234,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { ...@@ -234,7 +234,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took " ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took "
<< diff.count() << " s"; << diff.count() << " s";
ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirPath(); ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirectory();
return Status::OK(); return Status::OK();
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include <vector> #include <vector>
#include "segment/Types.h" #include "segment/Types.h"
#include "store/Directory.h" #include "src/storage/disk/DiskOperation.h"
#include "utils/Status.h" #include "utils/Status.h"
namespace milvus { namespace milvus {
...@@ -70,7 +70,7 @@ class SegmentWriter { ...@@ -70,7 +70,7 @@ class SegmentWriter {
WriteDeletedDocs(); WriteDeletedDocs();
private: private:
store::DirectoryPtr directory_ptr_; storage::OperationPtr directory_ptr_;
SegmentPtr segment_ptr_; SegmentPtr segment_ptr_;
}; };
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace storage {
class Operation {
public:
virtual void
CreateDirectory() = 0;
virtual const std::string&
GetDirectory() const = 0;
virtual void
ListDirectory(std::vector<std::string>& file_paths) = 0;
virtual bool
DeleteFile(const std::string& file_path) = 0;
// TODO(zhiru):
// open(), sync(), close()
// function that opens a stream for reading file
// function that creates a new, empty file and returns an stream for appending data to this file
// function that creates a new, empty, temporary file and returns an stream for appending data to this file
};
using OperationPtr = std::shared_ptr<Operation>;
} // namespace storage
} // namespace milvus
...@@ -15,21 +15,20 @@ ...@@ -15,21 +15,20 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "store/Directory.h"
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include "storage/disk/DiskOperation.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "utils/Log.h" #include "utils/Log.h"
namespace milvus { namespace milvus {
namespace store { namespace storage {
Directory::Directory(const std::string& dir_path) : dir_path_(dir_path) { DiskOperation::DiskOperation(const std::string& dir_path) : dir_path_(dir_path) {
} }
void void
Directory::Create() { DiskOperation::CreateDirectory() {
if (!boost::filesystem::is_directory(dir_path_)) { if (!boost::filesystem::is_directory(dir_path_)) {
auto ret = boost::filesystem::create_directory(dir_path_); auto ret = boost::filesystem::create_directory(dir_path_);
if (!ret) { if (!ret) {
...@@ -40,8 +39,13 @@ Directory::Create() { ...@@ -40,8 +39,13 @@ Directory::Create() {
} }
} }
const std::string&
DiskOperation::GetDirectory() const {
return dir_path_;
}
void void
Directory::ListAll(std::vector<std::string>& file_paths) { DiskOperation::ListDirectory(std::vector<std::string>& file_paths) {
boost::filesystem::path target_path(dir_path_); boost::filesystem::path target_path(dir_path_);
typedef boost::filesystem::directory_iterator d_it; typedef boost::filesystem::directory_iterator d_it;
d_it it_end; d_it it_end;
...@@ -54,14 +58,9 @@ Directory::ListAll(std::vector<std::string>& file_paths) { ...@@ -54,14 +58,9 @@ Directory::ListAll(std::vector<std::string>& file_paths) {
} }
bool bool
Directory::DeleteFile(const std::string& file_path) { DiskOperation::DeleteFile(const std::string& file_path) {
return boost::filesystem::remove(file_path); return boost::filesystem::remove(file_path);
} }
const std::string& } // namespace storage
Directory::GetDirPath() const {
return dir_path_;
}
} // namespace store
} // namespace milvus } // namespace milvus
...@@ -21,25 +21,27 @@ ...@@ -21,25 +21,27 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "storage/Operation.h"
namespace milvus { namespace milvus {
namespace store { namespace storage {
class Directory { class DiskOperation : public Operation {
public: public:
explicit Directory(const std::string& dir_path); explicit DiskOperation(const std::string& dir_path);
void void
Create(); CreateDirectory();
const std::string&
GetDirectory() const;
void void
ListAll(std::vector<std::string>& file_paths); ListDirectory(std::vector<std::string>& file_paths);
bool bool
DeleteFile(const std::string& file_path); DeleteFile(const std::string& file_path);
const std::string&
GetDirPath() const;
// TODO(zhiru): // TODO(zhiru):
// open(), sync(), close() // open(), sync(), close()
// function that opens a stream for reading file // function that opens a stream for reading file
...@@ -50,7 +52,7 @@ class Directory { ...@@ -50,7 +52,7 @@ class Directory {
const std::string dir_path_; const std::string dir_path_;
}; };
using DirectoryPtr = std::shared_ptr<Directory>; using DiskOperationPtr = std::shared_ptr<DiskOperation>;
} // namespace store } // namespace storage
} // namespace milvus } // namespace milvus
...@@ -111,8 +111,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files) ...@@ -111,8 +111,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files) aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files)
set(entry_file set(entry_file
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp) ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp)
...@@ -146,7 +144,6 @@ set(common_files ...@@ -146,7 +144,6 @@ set(common_files
${codecs_files} ${codecs_files}
${codecs_default_files} ${codecs_default_files}
${segment_files} ${segment_files}
${store_files}
) )
set(unittest_libs set(unittest_libs
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册