提交 273e3f44 编写于 作者: Y Yu Yang

Remove not necessary functionalities in Parameter

上级 a0be0ed6
......@@ -758,7 +758,7 @@ public:
T p3); // decayRate
/// apply L1/L2 to *this*
void applyL1(T learningRate, T decayRate);
virtual void applyL1(T learningRate, T decayRate);
void applyL1(BaseMatrixT& lr, T learningRate, T decayRate);
void applyL2(T learningRate, T decayRate);
void applyL2(BaseMatrixT& lr, T learningRate, T decayRate);
......
......@@ -54,7 +54,7 @@ void SparseRowCpuMatrix::zeroMem() {
clearRows();
}
void SparseRowCpuMatrix::applyL1Decay(real learningRate, real decayRate) {
void SparseRowCpuMatrix::applyL1(real learningRate, real decayRate) {
apply([=](real* buf, size_t len) {
CpuVector value(0, nullptr);
value.subVecFrom(buf, 0, len);
......
......@@ -94,7 +94,7 @@ public:
/**
* apply L1 to all sparse rows, should be apply after indices ready.
*/
void applyL1Decay(real learningRate, real decayRate);
virtual void applyL1(real learningRate, real decayRate);
void clearIndices() { clearRows(); }
void zeroMemThread(size_t tid, size_t numThreads);
......
......@@ -20,6 +20,7 @@ limitations under the License. */
#include "OptimizerFunctions.h"
#include "OptimizerWithRegularizer.h"
#include "ParameterUpdateFunctions.h"
#include "ThreadLocalBuffer.h"
#include "hl_gpu.h"
#include "paddle/math/CpuSparseMatrix.h"
#include "paddle/math/MathUtils.h"
......@@ -262,15 +263,6 @@ void Parameter::setMat(ParameterType pType, int matType) {
}
}
SparsePrefetchRowCpuMatrix* Parameter::getPrefetchMatrix() {
MatrixPtr mat = mats_[PARAMETER_VALUE];
if (mat) {
return dynamic_cast<SparsePrefetchRowCpuMatrix*>(mat.get());
}
return nullptr;
}
void Parameter::incUpdate(const UpdateCallback& callback) {
// Static parameter is fixed, and does not need to be updated
if (isStatic()) {
......@@ -422,37 +414,4 @@ bool Parameter::load(std::istream& s) {
return true;
}
ThreadLocal<std::vector<VectorPtr>> Parameter::tlsTempBufs_;
VectorPtr* Parameter::getTlsTempBufs() {
std::vector<VectorPtr>& bufs = *tlsTempBufs_;
if (bufs.empty()) {
bufs.resize(NUM_PARAMETER_TYPES);
for (auto& vec : bufs) {
vec.reset(new CpuVector(0, nullptr));
}
}
return bufs.data();
}
void Parameter::exec(ExecFunc func) {
auto execFunc = [this, func](int tid, size_t numThreads) {
if (numThreads == 1) { // single thread
func(this->getBufs());
} else { // multi thread
VectorPtr* vecs = Parameter::getTlsTempBufs();
auto interval = calcSplitArrayInterval(
this->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/);
for (size_t i = 0; i < (size_t)NUM_PARAMETER_TYPES; ++i) {
if (bufs_[i]) {
vecs[i]->subVecFrom(*bufs_[i], interval);
}
}
func(vecs);
}
};
getBuf(PARAMETER_VALUE)->exec(execFunc);
}
} // namespace paddle
......@@ -209,14 +209,6 @@ public:
intBufs_[pType] = iVec;
}
SparsePrefetchRowCpuMatrix* getPrefetchMatrix();
float getLearnRate() const { return config_.learning_rate(); }
float getInitMean() const { return config_.initial_mean(); }
float getInitStandardDeviation() const { return config_.initial_std(); }
void setValueUpdated() { updated_ = true; }
void clearValueUpdated() { updated_ = false; }
......@@ -356,8 +348,6 @@ protected:
bool updated_;
SparseFormat format_;
static ThreadLocal<std::vector<VectorPtr>> tlsTempBufs_;
std::vector<std::shared_ptr<IParameterUpdaterHook>> updaterHooks_;
public:
......@@ -371,15 +361,6 @@ public:
static const std::string kMissParameterFail;
static const std::string kMissParameterRand;
static const std::string kMissParameterZero;
static VectorPtr* getTlsTempBufs();
/**
* exec a func in single/multi thread.
* vecs is bufs_ of Parameter, as input of ExecFunc.
*/
typedef std::function<void(const VectorPtr vecs[])> ExecFunc;
void exec(ExecFunc func);
};
typedef std::map<std::string, ParameterPtr> ParameterMap;
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "ThreadLocalBuffer.h"
#include "Parameter.h"
namespace paddle {
namespace parameter {
static ThreadLocal<std::vector<VectorPtr>> tlsTempBufs_;
VectorPtr* getThreadLocalBuffer() {
std::vector<VectorPtr>& bufs = *tlsTempBufs_;
if (bufs.empty()) {
bufs.resize(NUM_PARAMETER_TYPES);
for (auto& vec : bufs) {
vec.reset(new CpuVector(0, nullptr));
}
}
return bufs.data();
}
} // namespace parameter
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "paddle/math/Vector.h"
namespace paddle {
namespace parameter {
extern VectorPtr* getThreadLocalBuffer();
} // namespace parameter
} // namespace paddle
......@@ -243,7 +243,8 @@ void ParameterClient2::prepareSendData(
CHECK_GE(blockSize, 1LU) << "blockSize should > 0 " << blockSize;
const auto paraSize = parameter->getSize();
if (sparseUpdate) {
const auto prefetchMat = parameter->getPrefetchMatrix();
auto prefetchMat = std::dynamic_pointer_cast<SparsePrefetchRowCpuMatrix>(
parameter->getMat(PARAMETER_VALUE));
CHECK(prefetchMat != nullptr) << "prefetchMat is nullptr";
auto sendMat = dynamic_cast<SparseRowCpuMatrix*>(
parameter->getMat(parameterType).get());
......
......@@ -18,7 +18,6 @@ limitations under the License. */
#include <fstream>
#include "paddle/math/SIMDFunctions.h"
#include "paddle/parameter/AverageOptimizer.h"
#include "paddle/parameter/FirstOrderOptimizer.h"
#include "paddle/parameter/OptimizerFunctions.h"
......@@ -26,6 +25,7 @@ limitations under the License. */
#include "paddle/parameter/ParameterOptimizer.h"
#include "paddle/parameter/ParameterUpdateFunctions.h"
#include "paddle/parameter/Regularizer.h"
#include "paddle/parameter/ThreadLocalBuffer.h"
#include "paddle/utils/Flags.h"
#include "paddle/utils/GlobalConstants.h"
#include "paddle/utils/Stat.h"
......@@ -618,7 +618,7 @@ void ParameterServer2::asyncSGD(const SendParameterRequest& request,
bool commitGradient = asyncGrdientCommitCheckAndStat(request);
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
size_t bufferIndex = 0;
for (const auto& block : request.blocks()) {
int64_t offset = getBlockOffset(block);
......@@ -1051,15 +1051,15 @@ void ParameterServer2::clearUnusedSegments(CpuVector* vec) {
}
void ParameterServer2::parallelExecForEachBlock(ExecFunc func) {
SyncThreadPool::execHelper(syncThreadPool_.get(),
[&](int tid, size_t numThreads) {
int64_t numBlocks = blockIdMap_.size();
VectorPtr* vecs = Parameter::getTlsTempBufs();
for (int64_t blockId = tid; blockId < numBlocks;
blockId += numThreads) {
func(blockId, vecs);
}
});
SyncThreadPool::execHelper(
syncThreadPool_.get(), [&](int tid, size_t numThreads) {
int64_t numBlocks = blockIdMap_.size();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
for (int64_t blockId = tid; blockId < numBlocks;
blockId += numThreads) {
func(blockId, vecs);
}
});
}
void ParameterServer2::blockTraverse(
......
......@@ -747,28 +747,32 @@ void SparseRemoteParameterUpdater::getParametersRemote(bool fullSize,
bool apply) {
ParameterType sendBackParameterType =
(useApplyInPserver_ && apply) ? PARAMETER_APPLY : PARAMETER_VALUE;
std::function<void()> getParams;
std::function<void(Parameter&, real)> applyL1;
if (fullSize) {
parameterClient_->getParameter(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
if (config_.shrink_parameter_value() > 0) {
for (auto& para : parameters_) {
if (para->getConfig().decay_rate_l1() > 0) {
para->getBuf(PARAMETER_VALUE)
->applyL1(1.0f, // learningRate
config_.shrink_parameter_value()); // decayRate
}
}
}
getParams = [&] {
parameterClient_->getParameter(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
};
applyL1 = [](Parameter& para, real decayRate) {
para.getBuf(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate);
};
} else {
REGISTER_TIMER("getParamSparse");
parameterClient_->getParameterSparse(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
getParams = [&] {
parameterClient_->getParameterSparse(
/* recvParameterType= */ PARAMETER_VALUE, sendBackParameterType);
};
applyL1 = [](Parameter& para, real decayRate) {
para.getMat(PARAMETER_VALUE)->applyL1(/*lr=*/1.0f, decayRate);
};
}
{
REGISTER_TIMER("getParamDenseAndSparse");
getParams();
if (config_.shrink_parameter_value() > 0) {
for (auto& para : parameters_) {
if (para->getConfig().decay_rate_l1() > 0) {
para->getPrefetchMatrix()->applyL1Decay(
1.0f, // learningRate
config_.shrink_parameter_value()); // decayRate
applyL1(*para, config_.shrink_parameter_value());
}
}
}
......
......@@ -17,6 +17,7 @@ limitations under the License. */
#include "paddle/utils/Logging.h"
#include "paddle/math/SparseRowMatrix.h"
#include "paddle/parameter/ThreadLocalBuffer.h"
#include "paddle/utils/Thread.h"
DECLARE_int32(trainer_count);
......@@ -98,7 +99,7 @@ void SgdThreadUpdater::threadTraverse(
int tid,
size_t numThreads,
Parameter* para) {
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
if (para->isGradSparseUpdate()) {
size_t height = para->getConfig().dims(0);
size_t width = para->getConfig().dims(1);
......@@ -214,7 +215,7 @@ void SgdThreadUpdater::threadUpdateSparse(int tid,
Parameter* para) {
int pid = para->getID();
ParameterOptimizer* optimizer = optimizers_[pid].get();
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
size_t height = para->getConfig().dims(0);
size_t width = para->getConfig().dims(1);
......@@ -286,7 +287,7 @@ void SgdThreadUpdater::threadUpdateDense(int tid,
Parameter* para) {
int pid = para->getID();
ParameterOptimizer* optimizer = optimizers_[pid].get();
VectorPtr* vecs = Parameter::getTlsTempBufs();
VectorPtr* vecs = parameter::getThreadLocalBuffer();
auto interval = calcSplitArrayInterval(
para->getSize(), (size_t)tid, numThreads, 8LU /*for avx*/);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册