diff --git a/core/src/cache/Cache.h b/core/src/cache/Cache.h index 5f72fa5060a123260d9fd3f5fd80f5b2fc5c4225..34594c45739974cdb253742c8d54182e6f1ccda4 100644 --- a/core/src/cache/Cache.h +++ b/core/src/cache/Cache.h @@ -69,6 +69,9 @@ class Cache { void erase(const std::string& key); + bool + reserve(const int64_t size); + void print(); @@ -77,7 +80,13 @@ class Cache { private: void - free_memory(); + insert_internal(const std::string& key, const ItemObj& item); + + void + erase_internal(const std::string& key); + + void + free_memory_internal(const int64_t target_size); private: std::string header_; diff --git a/core/src/cache/Cache.inl b/core/src/cache/Cache.inl index 147c344532a29a2482fab4232c61da02af314012..607976e4117e16ee4df65a65ec163df78d0ed17f 100644 --- a/core/src/cache/Cache.inl +++ b/core/src/cache/Cache.inl @@ -13,8 +13,6 @@ namespace milvus { namespace cache { constexpr double DEFAULT_THRESHOLD_PERCENT = 0.7; -constexpr double WARNING_THRESHOLD_PERCENT = 0.9; -constexpr double BIG_ITEM_THRESHOLD_PERCENT = 0.1; template Cache::Cache(int64_t capacity, int64_t cache_max_count, const std::string& header) @@ -28,9 +26,10 @@ Cache::Cache(int64_t capacity, int64_t cache_max_count, const std::stri template void Cache::set_capacity(int64_t capacity) { + std::lock_guard lock(mutex_); if (capacity > 0) { capacity_ = capacity; - free_memory(); + free_memory_internal(capacity); } } @@ -55,56 +54,95 @@ Cache::get(const std::string& key) { if (!lru_.exists(key)) { return nullptr; } - return lru_.get(key); } template void Cache::insert(const std::string& key, const ItemObj& item) { + std::lock_guard lock(mutex_); + insert_internal(key, item); +} + +template +void +Cache::erase(const std::string& key) { + std::lock_guard lock(mutex_); + erase_internal(key); +} + +template +bool +Cache::reserve(const int64_t item_size) { + std::lock_guard lock(mutex_); + if (item_size > capacity_) { + SERVER_LOG_ERROR << header_ << " item size " << (item_size >> 20) << "MB too big to insert into cache capacity" + << (capacity_ >> 20) << "MB"; + return false; + } + if (item_size > capacity_ - usage_) { + free_memory_internal(capacity_ - item_size); + } + return true; +} + +template +void +Cache::clear() { + std::lock_guard lock(mutex_); + lru_.clear(); + usage_ = 0; + SERVER_LOG_DEBUG << header_ << " Clear cache !"; +} + + +template +void +Cache::print() { + std::lock_guard lock(mutex_); + size_t cache_count = lru_.size(); + // for (auto it = lru_.begin(); it != lru_.end(); ++it) { + // SERVER_LOG_DEBUG << it->first; + // } + SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20) + << "MB, [capacity] " << (capacity_ >> 20) << "MB"; +} + +template +void +Cache::insert_internal(const std::string& key, const ItemObj& item) { if (item == nullptr) { return; } size_t item_size = item->Size(); - // calculate usage - { - std::lock_guard lock(mutex_); - - // if key already exist, subtract old item size - if (lru_.exists(key)) { - const ItemObj& old_item = lru_.get(key); - usage_ -= old_item->Size(); - } - - // plus new item size - usage_ += item_size; + + // if key already exist, subtract old item size + if (lru_.exists(key)) { + const ItemObj& old_item = lru_.get(key); + usage_ -= old_item->Size(); } + // plus new item size + usage_ += item_size; + // if usage exceed capacity, free some items - if (usage_ > capacity_ || - (item_size > (int64_t)(capacity_ * BIG_ITEM_THRESHOLD_PERCENT) && - usage_ > (int64_t)(capacity_ * WARNING_THRESHOLD_PERCENT))) { + if (usage_ > capacity_) { SERVER_LOG_DEBUG << header_ << " Current usage " << (usage_ >> 20) << "MB is too high for capacity " << (capacity_ >> 20) << "MB, start free memory"; - free_memory(); + free_memory_internal(capacity_); } // insert new item - { - std::lock_guard lock(mutex_); - - lru_.put(key, item); - SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache"; - SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: " - << (capacity_ >> 20) << "MB"; - } + lru_.put(key, item); + SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache"; + SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: " + << (capacity_ >> 20) << "MB"; } template void -Cache::erase(const std::string& key) { - std::lock_guard lock(mutex_); +Cache::erase_internal(const std::string& key) { if (!lru_.exists(key)) { return; } @@ -122,22 +160,9 @@ Cache::erase(const std::string& key) { template void -Cache::clear() { - std::lock_guard lock(mutex_); - lru_.clear(); - usage_ = 0; - SERVER_LOG_DEBUG << header_ << " Clear cache !"; -} - -/* free memory space when CACHE occupation exceed its capacity */ -template -void -Cache::free_memory() { - // if (usage_ <= capacity_) - // return; - - int64_t threshhold = capacity_ * freemem_percent_; - int64_t delta_size = usage_ - threshhold; +Cache::free_memory_internal(const int64_t target_size) { + int64_t threshold = std::min((int64_t)(capacity_ * freemem_percent_), target_size); + int64_t delta_size = usage_ - threshold; if (delta_size <= 0) { delta_size = 1; // ensure at least one item erased } @@ -145,44 +170,22 @@ Cache::free_memory() { std::set key_array; int64_t released_size = 0; - { - std::lock_guard lock(mutex_); + auto it = lru_.rbegin(); + while (it != lru_.rend() && released_size < delta_size) { + auto& key = it->first; + auto& obj_ptr = it->second; - auto it = lru_.rbegin(); - while (it != lru_.rend() && released_size < delta_size) { - auto& key = it->first; - auto& obj_ptr = it->second; - - key_array.emplace(key); - released_size += obj_ptr->Size(); - ++it; - } + key_array.emplace(key); + released_size += obj_ptr->Size(); + ++it; } SERVER_LOG_DEBUG << header_ << " To be released memory size: " << (released_size >> 20) << "MB"; for (auto& key : key_array) { - erase(key); + erase_internal(key); } } -template -void -Cache::print() { - size_t cache_count = 0; - { - std::lock_guard lock(mutex_); - cache_count = lru_.size(); -#if 0 - for (auto it = lru_.begin(); it != lru_.end(); ++it) { - SERVER_LOG_DEBUG << it->first; - } -#endif - } - - SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20) - << "MB, [capacity] " << (capacity_ >> 20) << "MB"; -} - } // namespace cache } // namespace milvus diff --git a/core/src/cache/CacheMgr.h b/core/src/cache/CacheMgr.h index 168c00b68eaeeaa059edb618afb5c267d677bf0f..c7ab9e26ab1d01faca63772d72de5155194f2902 100644 --- a/core/src/cache/CacheMgr.h +++ b/core/src/cache/CacheMgr.h @@ -39,6 +39,9 @@ class CacheMgr { virtual void EraseItem(const std::string& key); + virtual bool + Reserve(const int64_t size); + virtual void PrintInfo(); @@ -54,14 +57,20 @@ class CacheMgr { void SetCapacity(int64_t capacity); + void + Lock(); + + void + Unlock(); + protected: CacheMgr(); virtual ~CacheMgr(); protected: - using CachePtr = std::shared_ptr>; - CachePtr cache_; + std::shared_ptr> cache_; + std::mutex mutex_; }; } // namespace cache diff --git a/core/src/cache/CacheMgr.inl b/core/src/cache/CacheMgr.inl index f3eb6dc42eeefbc9546142c0916fd76d1864bc04..606510c66cace9ac3de6e699a4db387182a1035f 100644 --- a/core/src/cache/CacheMgr.inl +++ b/core/src/cache/CacheMgr.inl @@ -27,7 +27,6 @@ CacheMgr::ItemCount() const { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; } - return (uint64_t)(cache_->size()); } @@ -38,7 +37,6 @@ CacheMgr::ItemExists(const std::string& key) { SERVER_LOG_ERROR << "Cache doesn't exist"; return false; } - return cache_->exists(key); } @@ -60,7 +58,6 @@ CacheMgr::InsertItem(const std::string& key, const ItemObj& data) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->insert(key, data); server::Metrics::GetInstance().CacheAccessTotalIncrement(); } @@ -72,11 +69,20 @@ CacheMgr::EraseItem(const std::string& key) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->erase(key); server::Metrics::GetInstance().CacheAccessTotalIncrement(); } +template +bool +CacheMgr::Reserve(const int64_t size) { + if (cache_ == nullptr) { + SERVER_LOG_ERROR << "Cache doesn't exist"; + return false; + } + return cache_->reserve(size); +} + template void CacheMgr::PrintInfo() { @@ -84,7 +90,6 @@ CacheMgr::PrintInfo() { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->print(); } @@ -95,7 +100,6 @@ CacheMgr::ClearCache() { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->clear(); } @@ -106,7 +110,6 @@ CacheMgr::CacheUsage() const { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; } - return cache_->usage(); } @@ -117,7 +120,6 @@ CacheMgr::CacheCapacity() const { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; } - return cache_->capacity(); } @@ -131,5 +133,17 @@ CacheMgr::SetCapacity(int64_t capacity) { cache_->set_capacity(capacity); } +template +void +CacheMgr::Lock() { + mutex_.lock(); +} + +template +void +CacheMgr::Unlock() { + mutex_.unlock(); +} + } // namespace cache } // namespace milvus diff --git a/core/src/cache/GpuCacheMgr.cpp b/core/src/cache/GpuCacheMgr.cpp index 374d141fc0da93dacd7851b0e220510496835045..30e4ec4c5a6241be854f8f191ae185dc54cfd971 100644 --- a/core/src/cache/GpuCacheMgr.cpp +++ b/core/src/cache/GpuCacheMgr.cpp @@ -22,7 +22,7 @@ namespace cache { #ifdef MILVUS_GPU_VERSION std::mutex GpuCacheMgr::global_mutex_; -std::unordered_map> GpuCacheMgr::instance_; +std::unordered_map GpuCacheMgr::instance_; namespace { constexpr int64_t G_BYTE = 1024 * 1024 * 1024; @@ -65,33 +65,28 @@ GpuCacheMgr::InsertItem(const std::string& key, const milvus::cache::DataObjPtr& } } -GpuCacheMgrPtr -GpuCacheMgr::GetInstance(int64_t gpu_id) { - if (instance_.find(gpu_id) == instance_.end()) { - std::lock_guard lock(global_mutex_); - if (instance_.find(gpu_id) == instance_.end()) { - instance_[gpu_id] = std::make_pair(std::make_shared(gpu_id), std::make_shared()); - } - } - return instance_[gpu_id].first; +bool +GpuCacheMgr::Reserve(const int64_t size) { + return CacheMgr::Reserve(size); } -MutexPtr -GpuCacheMgr::GetInstanceMutex(int64_t gpu_id) { +GpuCacheMgrPtr +GpuCacheMgr::GetInstance(int64_t gpu_id) { if (instance_.find(gpu_id) == instance_.end()) { std::lock_guard lock(global_mutex_); if (instance_.find(gpu_id) == instance_.end()) { - instance_[gpu_id] = std::make_pair(std::make_shared(gpu_id), std::make_shared()); + instance_[gpu_id] = std::make_shared(gpu_id); } } - return instance_[gpu_id].second; + return instance_[gpu_id]; } void GpuCacheMgr::OnGpuCacheCapacityChanged(int64_t capacity) { for (auto& iter : instance_) { - std::lock_guard lock(*(iter.second.second)); - iter.second.first->SetCapacity(capacity * G_BYTE); + iter.second->Lock(); + iter.second->SetCapacity(capacity * G_BYTE); + iter.second->Unlock(); } } diff --git a/core/src/cache/GpuCacheMgr.h b/core/src/cache/GpuCacheMgr.h index d96d6f3c89c142b4e5e59949ea9776448af4ce58..17a92c1d8e438dc2ce17f5f2cd7a33ed5b87933f 100644 --- a/core/src/cache/GpuCacheMgr.h +++ b/core/src/cache/GpuCacheMgr.h @@ -39,12 +39,12 @@ class GpuCacheMgr : public CacheMgr, public server::GpuResourceConfi void InsertItem(const std::string& key, const DataObjPtr& data); + bool + Reserve(const int64_t size); + static GpuCacheMgrPtr GetInstance(int64_t gpu_id); - static MutexPtr - GetInstanceMutex(int64_t gpu_id); - protected: void OnGpuCacheCapacityChanged(int64_t capacity) override; @@ -53,7 +53,7 @@ class GpuCacheMgr : public CacheMgr, public server::GpuResourceConfi bool gpu_enable_ = true; int64_t gpu_id_; static std::mutex global_mutex_; - static std::unordered_map> instance_; + static std::unordered_map instance_; std::string identity_; }; #endif diff --git a/core/src/db/engine/ExecutionEngine.h b/core/src/db/engine/ExecutionEngine.h index e7739d4d53bb49acbe113f454a90e4420cc4336e..d8911bd08fa4c08ddea18cf6e88b14077abbdcde 100644 --- a/core/src/db/engine/ExecutionEngine.h +++ b/core/src/db/engine/ExecutionEngine.h @@ -111,9 +111,6 @@ class ExecutionEngine { virtual Status Cache() = 0; - virtual Status - GpuCache(uint64_t gpu_id) = 0; - virtual Status Init() = 0; diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index dbbee791b6ebaaa173cddd3fced31f27737d959f..ace870226c69271b8f741bedf4de2bb04c284bed 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -41,6 +41,7 @@ #include "metrics/Metrics.h" #include "scheduler/Utils.h" #include "utils/CommonUtil.h" +#include "utils/Error.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/Status.h" @@ -545,13 +546,16 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) { try { /* Index data is copied to GPU first, then added into GPU cache. - * We MUST add a lock here to avoid more than one INDEX are copied to one GPU card at same time, - * which will potentially cause GPU out of memory. + * Add lock here to avoid multiple INDEX are copied to one GPU card at same time. + * And reserve space to avoid GPU out of memory issue. */ - std::lock_guard lock(*(cache::GpuCacheMgr::GetInstanceMutex(device_id))); ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " start"; + auto gpu_cache_mgr = cache::GpuCacheMgr::GetInstance(device_id); + gpu_cache_mgr->Lock(); + gpu_cache_mgr->Reserve(index_->Size()); index_ = knowhere::cloner::CopyCpuToGpu(index_, device_id, knowhere::Config()); - GpuCache(device_id); + gpu_cache_mgr->InsertItem(location_, std::static_pointer_cast(index_)); + gpu_cache_mgr->Unlock(); ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " finished"; } catch (std::exception& e) { ENGINE_LOG_ERROR << e.what(); @@ -568,10 +572,11 @@ ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) { #ifdef MILVUS_GPU_VERSION // the ToIndexData is only a placeholder, cpu-copy-to-gpu action is performed in if (index_) { + auto gpu_cache_mgr = milvus::cache::GpuCacheMgr::GetInstance(device_id); + gpu_cache_mgr->Lock(); gpu_num_ = device_id; - auto to_index_data = std::make_shared(index_->Size()); - cache::DataObjPtr obj = std::static_pointer_cast(to_index_data); - milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj); + gpu_cache_mgr->Reserve(index_->Size()); + gpu_cache_mgr->Unlock(); } #endif return Status::OK(); @@ -954,18 +959,11 @@ ExecutionEngineImpl::GetVectorByID(const int64_t& id, uint8_t* vector, bool hybr Status ExecutionEngineImpl::Cache() { + auto cpu_cache_mgr = milvus::cache::CpuCacheMgr::GetInstance(); + cpu_cache_mgr->Lock(); cache::DataObjPtr obj = std::static_pointer_cast(index_); - milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj); - - return Status::OK(); -} - -Status -ExecutionEngineImpl::GpuCache(uint64_t gpu_id) { -#ifdef MILVUS_GPU_VERSION - cache::DataObjPtr obj = std::static_pointer_cast(index_); - milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj); -#endif + cpu_cache_mgr->InsertItem(location_, obj); + cpu_cache_mgr->Unlock(); return Status::OK(); } diff --git a/core/src/db/engine/ExecutionEngineImpl.h b/core/src/db/engine/ExecutionEngineImpl.h index d8ef30a6aa8482ebe320838155b0e2d551d574a2..35e0b9c217e6a16106b11fd862af9e48bb2e0b0f 100644 --- a/core/src/db/engine/ExecutionEngineImpl.h +++ b/core/src/db/engine/ExecutionEngineImpl.h @@ -86,9 +86,6 @@ class ExecutionEngineImpl : public ExecutionEngine { Status Cache() override; - Status - GpuCache(uint64_t gpu_id) override; - Status Init() override; diff --git a/core/unittest/db/test_engine.cpp b/core/unittest/db/test_engine.cpp index ac05c4d1e137ef1f80971182cededa2f0ccbf15f..aac6fcd8b38bf304eaf1ea314121710b2973b40b 100644 --- a/core/unittest/db/test_engine.cpp +++ b/core/unittest/db/test_engine.cpp @@ -203,8 +203,6 @@ TEST_F(EngineTest, ENGINE_IMPL_TEST) { auto status = engine_ptr->CopyToGpu(0, false); ASSERT_TRUE(status.ok()); - status = engine_ptr->GpuCache(0); - ASSERT_TRUE(status.ok()); status = engine_ptr->CopyToGpu(0, false); ASSERT_TRUE(status.ok());