未验证 提交 ac443e37 编写于 作者: C Cai Yudong 提交者: GitHub

Caiyd 1689 gpu out of memory (#1776)

* #1689 update cache stragegy
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* #1689 add lock when change cache capacity
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* retry CI
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* #1689 update cache
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* retry CI
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 d92bcd2e
......@@ -69,6 +69,9 @@ class Cache {
void
erase(const std::string& key);
bool
reserve(const int64_t size);
void
print();
......@@ -77,7 +80,13 @@ class Cache {
private:
void
free_memory();
insert_internal(const std::string& key, const ItemObj& item);
void
erase_internal(const std::string& key);
void
free_memory_internal(const int64_t target_size);
private:
std::string header_;
......
......@@ -13,8 +13,6 @@ namespace milvus {
namespace cache {
constexpr double DEFAULT_THRESHOLD_PERCENT = 0.7;
constexpr double WARNING_THRESHOLD_PERCENT = 0.9;
constexpr double BIG_ITEM_THRESHOLD_PERCENT = 0.1;
template <typename ItemObj>
Cache<ItemObj>::Cache(int64_t capacity, int64_t cache_max_count, const std::string& header)
......@@ -28,9 +26,10 @@ Cache<ItemObj>::Cache(int64_t capacity, int64_t cache_max_count, const std::stri
template <typename ItemObj>
void
Cache<ItemObj>::set_capacity(int64_t capacity) {
std::lock_guard<std::mutex> lock(mutex_);
if (capacity > 0) {
capacity_ = capacity;
free_memory();
free_memory_internal(capacity);
}
}
......@@ -55,56 +54,95 @@ Cache<ItemObj>::get(const std::string& key) {
if (!lru_.exists(key)) {
return nullptr;
}
return lru_.get(key);
}
template <typename ItemObj>
void
Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
std::lock_guard<std::mutex> lock(mutex_);
insert_internal(key, item);
}
template <typename ItemObj>
void
Cache<ItemObj>::erase(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
erase_internal(key);
}
template <typename ItemObj>
bool
Cache<ItemObj>::reserve(const int64_t item_size) {
std::lock_guard<std::mutex> lock(mutex_);
if (item_size > capacity_) {
SERVER_LOG_ERROR << header_ << " item size " << (item_size >> 20) << "MB too big to insert into cache capacity"
<< (capacity_ >> 20) << "MB";
return false;
}
if (item_size > capacity_ - usage_) {
free_memory_internal(capacity_ - item_size);
}
return true;
}
template <typename ItemObj>
void
Cache<ItemObj>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
SERVER_LOG_DEBUG << header_ << " Clear cache !";
}
template <typename ItemObj>
void
Cache<ItemObj>::print() {
std::lock_guard<std::mutex> lock(mutex_);
size_t cache_count = lru_.size();
// for (auto it = lru_.begin(); it != lru_.end(); ++it) {
// SERVER_LOG_DEBUG << it->first;
// }
SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20)
<< "MB, [capacity] " << (capacity_ >> 20) << "MB";
}
template <typename ItemObj>
void
Cache<ItemObj>::insert_internal(const std::string& key, const ItemObj& item) {
if (item == nullptr) {
return;
}
size_t item_size = item->Size();
// calculate usage
{
std::lock_guard<std::mutex> lock(mutex_);
// if key already exist, subtract old item size
if (lru_.exists(key)) {
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->Size();
}
// plus new item size
usage_ += item_size;
// if key already exist, subtract old item size
if (lru_.exists(key)) {
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->Size();
}
// plus new item size
usage_ += item_size;
// if usage exceed capacity, free some items
if (usage_ > capacity_ ||
(item_size > (int64_t)(capacity_ * BIG_ITEM_THRESHOLD_PERCENT) &&
usage_ > (int64_t)(capacity_ * WARNING_THRESHOLD_PERCENT))) {
if (usage_ > capacity_) {
SERVER_LOG_DEBUG << header_ << " Current usage " << (usage_ >> 20) << "MB is too high for capacity "
<< (capacity_ >> 20) << "MB, start free memory";
free_memory();
free_memory_internal(capacity_);
}
// insert new item
{
std::lock_guard<std::mutex> lock(mutex_);
lru_.put(key, item);
SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache";
SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: "
<< (capacity_ >> 20) << "MB";
}
lru_.put(key, item);
SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache";
SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: "
<< (capacity_ >> 20) << "MB";
}
template <typename ItemObj>
void
Cache<ItemObj>::erase(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
Cache<ItemObj>::erase_internal(const std::string& key) {
if (!lru_.exists(key)) {
return;
}
......@@ -122,22 +160,9 @@ Cache<ItemObj>::erase(const std::string& key) {
template <typename ItemObj>
void
Cache<ItemObj>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
SERVER_LOG_DEBUG << header_ << " Clear cache !";
}
/* free memory space when CACHE occupation exceed its capacity */
template <typename ItemObj>
void
Cache<ItemObj>::free_memory() {
// if (usage_ <= capacity_)
// return;
int64_t threshhold = capacity_ * freemem_percent_;
int64_t delta_size = usage_ - threshhold;
Cache<ItemObj>::free_memory_internal(const int64_t target_size) {
int64_t threshold = std::min((int64_t)(capacity_ * freemem_percent_), target_size);
int64_t delta_size = usage_ - threshold;
if (delta_size <= 0) {
delta_size = 1; // ensure at least one item erased
}
......@@ -145,44 +170,22 @@ Cache<ItemObj>::free_memory() {
std::set<std::string> key_array;
int64_t released_size = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = lru_.rbegin();
while (it != lru_.rend() && released_size < delta_size) {
auto& key = it->first;
auto& obj_ptr = it->second;
auto it = lru_.rbegin();
while (it != lru_.rend() && released_size < delta_size) {
auto& key = it->first;
auto& obj_ptr = it->second;
key_array.emplace(key);
released_size += obj_ptr->Size();
++it;
}
key_array.emplace(key);
released_size += obj_ptr->Size();
++it;
}
SERVER_LOG_DEBUG << header_ << " To be released memory size: " << (released_size >> 20) << "MB";
for (auto& key : key_array) {
erase(key);
erase_internal(key);
}
}
template <typename ItemObj>
void
Cache<ItemObj>::print() {
size_t cache_count = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
cache_count = lru_.size();
#if 0
for (auto it = lru_.begin(); it != lru_.end(); ++it) {
SERVER_LOG_DEBUG << it->first;
}
#endif
}
SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20)
<< "MB, [capacity] " << (capacity_ >> 20) << "MB";
}
} // namespace cache
} // namespace milvus
......@@ -39,6 +39,9 @@ class CacheMgr {
virtual void
EraseItem(const std::string& key);
virtual bool
Reserve(const int64_t size);
virtual void
PrintInfo();
......@@ -54,14 +57,20 @@ class CacheMgr {
void
SetCapacity(int64_t capacity);
void
Lock();
void
Unlock();
protected:
CacheMgr();
virtual ~CacheMgr();
protected:
using CachePtr = std::shared_ptr<Cache<ItemObj>>;
CachePtr cache_;
std::shared_ptr<Cache<ItemObj>> cache_;
std::mutex mutex_;
};
} // namespace cache
......
......@@ -27,7 +27,6 @@ CacheMgr<ItemObj>::ItemCount() const {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
return (uint64_t)(cache_->size());
}
......@@ -38,7 +37,6 @@ CacheMgr<ItemObj>::ItemExists(const std::string& key) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return false;
}
return cache_->exists(key);
}
......@@ -60,7 +58,6 @@ CacheMgr<ItemObj>::InsertItem(const std::string& key, const ItemObj& data) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->insert(key, data);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
......@@ -72,11 +69,20 @@ CacheMgr<ItemObj>::EraseItem(const std::string& key) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->erase(key);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
template <typename ItemObj>
bool
CacheMgr<ItemObj>::Reserve(const int64_t size) {
if (cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return false;
}
return cache_->reserve(size);
}
template <typename ItemObj>
void
CacheMgr<ItemObj>::PrintInfo() {
......@@ -84,7 +90,6 @@ CacheMgr<ItemObj>::PrintInfo() {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->print();
}
......@@ -95,7 +100,6 @@ CacheMgr<ItemObj>::ClearCache() {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->clear();
}
......@@ -106,7 +110,6 @@ CacheMgr<ItemObj>::CacheUsage() const {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
return cache_->usage();
}
......@@ -117,7 +120,6 @@ CacheMgr<ItemObj>::CacheCapacity() const {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
return cache_->capacity();
}
......@@ -131,5 +133,17 @@ CacheMgr<ItemObj>::SetCapacity(int64_t capacity) {
cache_->set_capacity(capacity);
}
template <typename ItemObj>
void
CacheMgr<ItemObj>::Lock() {
mutex_.lock();
}
template <typename ItemObj>
void
CacheMgr<ItemObj>::Unlock() {
mutex_.unlock();
}
} // namespace cache
} // namespace milvus
......@@ -22,7 +22,7 @@ namespace cache {
#ifdef MILVUS_GPU_VERSION
std::mutex GpuCacheMgr::global_mutex_;
std::unordered_map<int64_t, std::pair<GpuCacheMgrPtr, MutexPtr>> GpuCacheMgr::instance_;
std::unordered_map<int64_t, GpuCacheMgrPtr> GpuCacheMgr::instance_;
namespace {
constexpr int64_t G_BYTE = 1024 * 1024 * 1024;
......@@ -65,33 +65,28 @@ GpuCacheMgr::InsertItem(const std::string& key, const milvus::cache::DataObjPtr&
}
}
GpuCacheMgrPtr
GpuCacheMgr::GetInstance(int64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(global_mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
instance_[gpu_id] = std::make_pair(std::make_shared<GpuCacheMgr>(gpu_id), std::make_shared<std::mutex>());
}
}
return instance_[gpu_id].first;
bool
GpuCacheMgr::Reserve(const int64_t size) {
return CacheMgr<DataObjPtr>::Reserve(size);
}
MutexPtr
GpuCacheMgr::GetInstanceMutex(int64_t gpu_id) {
GpuCacheMgrPtr
GpuCacheMgr::GetInstance(int64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(global_mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
instance_[gpu_id] = std::make_pair(std::make_shared<GpuCacheMgr>(gpu_id), std::make_shared<std::mutex>());
instance_[gpu_id] = std::make_shared<GpuCacheMgr>(gpu_id);
}
}
return instance_[gpu_id].second;
return instance_[gpu_id];
}
void
GpuCacheMgr::OnGpuCacheCapacityChanged(int64_t capacity) {
for (auto& iter : instance_) {
std::lock_guard<std::mutex> lock(*(iter.second.second));
iter.second.first->SetCapacity(capacity * G_BYTE);
iter.second->Lock();
iter.second->SetCapacity(capacity * G_BYTE);
iter.second->Unlock();
}
}
......
......@@ -39,12 +39,12 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr>, public server::GpuResourceConfi
void
InsertItem(const std::string& key, const DataObjPtr& data);
bool
Reserve(const int64_t size);
static GpuCacheMgrPtr
GetInstance(int64_t gpu_id);
static MutexPtr
GetInstanceMutex(int64_t gpu_id);
protected:
void
OnGpuCacheCapacityChanged(int64_t capacity) override;
......@@ -53,7 +53,7 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr>, public server::GpuResourceConfi
bool gpu_enable_ = true;
int64_t gpu_id_;
static std::mutex global_mutex_;
static std::unordered_map<int64_t, std::pair<GpuCacheMgrPtr, MutexPtr>> instance_;
static std::unordered_map<int64_t, GpuCacheMgrPtr> instance_;
std::string identity_;
};
#endif
......
......@@ -111,9 +111,6 @@ class ExecutionEngine {
virtual Status
Cache() = 0;
virtual Status
GpuCache(uint64_t gpu_id) = 0;
virtual Status
Init() = 0;
......
......@@ -41,6 +41,7 @@
#include "metrics/Metrics.h"
#include "scheduler/Utils.h"
#include "utils/CommonUtil.h"
#include "utils/Error.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/Status.h"
......@@ -545,13 +546,16 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
try {
/* Index data is copied to GPU first, then added into GPU cache.
* We MUST add a lock here to avoid more than one INDEX are copied to one GPU card at same time,
* which will potentially cause GPU out of memory.
* Add lock here to avoid multiple INDEX are copied to one GPU card at same time.
* And reserve space to avoid GPU out of memory issue.
*/
std::lock_guard<std::mutex> lock(*(cache::GpuCacheMgr::GetInstanceMutex(device_id)));
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " start";
auto gpu_cache_mgr = cache::GpuCacheMgr::GetInstance(device_id);
gpu_cache_mgr->Lock();
gpu_cache_mgr->Reserve(index_->Size());
index_ = knowhere::cloner::CopyCpuToGpu(index_, device_id, knowhere::Config());
GpuCache(device_id);
gpu_cache_mgr->InsertItem(location_, std::static_pointer_cast<cache::DataObj>(index_));
gpu_cache_mgr->Unlock();
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " finished";
} catch (std::exception& e) {
ENGINE_LOG_ERROR << e.what();
......@@ -568,10 +572,11 @@ ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
#ifdef MILVUS_GPU_VERSION
// the ToIndexData is only a placeholder, cpu-copy-to-gpu action is performed in
if (index_) {
auto gpu_cache_mgr = milvus::cache::GpuCacheMgr::GetInstance(device_id);
gpu_cache_mgr->Lock();
gpu_num_ = device_id;
auto to_index_data = std::make_shared<knowhere::ToIndexData>(index_->Size());
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj);
gpu_cache_mgr->Reserve(index_->Size());
gpu_cache_mgr->Unlock();
}
#endif
return Status::OK();
......@@ -954,18 +959,11 @@ ExecutionEngineImpl::GetVectorByID(const int64_t& id, uint8_t* vector, bool hybr
Status
ExecutionEngineImpl::Cache() {
auto cpu_cache_mgr = milvus::cache::CpuCacheMgr::GetInstance();
cpu_cache_mgr->Lock();
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
return Status::OK();
}
Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
#ifdef MILVUS_GPU_VERSION
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
#endif
cpu_cache_mgr->InsertItem(location_, obj);
cpu_cache_mgr->Unlock();
return Status::OK();
}
......
......@@ -86,9 +86,6 @@ class ExecutionEngineImpl : public ExecutionEngine {
Status
Cache() override;
Status
GpuCache(uint64_t gpu_id) override;
Status
Init() override;
......
......@@ -203,8 +203,6 @@ TEST_F(EngineTest, ENGINE_IMPL_TEST) {
auto status = engine_ptr->CopyToGpu(0, false);
ASSERT_TRUE(status.ok());
status = engine_ptr->GpuCache(0);
ASSERT_TRUE(status.ok());
status = engine_ptr->CopyToGpu(0, false);
ASSERT_TRUE(status.ok());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册