未验证 提交 31fa2558 编写于 作者: 羽飞's avatar 羽飞 提交者: GitHub

thread-safe buffer pool and btree supported (#145)

Fix problem:
1. the buffer pool and b tree is not thread safe;
2. github/workflow/build does not work

### What is changed and how it works?
1. thread-safe buffer pool
- I use a mutex in buffer pool and take a lock in buffer pool operations
such as allocate frame, dispose frame;
- The frame is locked while updating/reading the content of frame;
- Frame manager take a lock when allocate/free pages.

2. thread-safe b+tree
- Crabing protocol is used to support concurrent

3. github/workflow/build
- update the submodules;
- create a build script and run build.sh in build.yaml
上级 5b06a712
......@@ -48,14 +48,14 @@ TabWidth: 4
UseTab: Never
BreakBeforeBraces: Custom
BraceWrapping:
AfterClass: false
AfterClass: true
AfterControlStatement: false
AfterEnum: false
AfterEnum: true
AfterFunction: true
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
AfterStruct: true
AfterUnion: true
AfterExternBlock: false
BeforeCatch: false
BeforeElse: false
......
......@@ -20,30 +20,8 @@ jobs:
steps:
- name: Checkout repository and submodules
uses: actions/checkout@v2
with:
submodules: recursive
- name: build deps/googletest
run: cmake -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local -B ${{github.workspace}}/deps/googletest/build -S ${{github.workspace}}/deps/googletest && cmake --build ${{github.workspace}}/deps/googletest/build --config ${{env.BUILD_TYPE}} --target install
- name: build deps/jsoncpp
run: cmake -DJSONCPP_WITH_TESTS=OFF -DJSONCPP_WITH_POST_BUILD_UNITTEST=OFF -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local -B ${{github.workspace}}/deps/jsoncpp/build -S ${{github.workspace}}/deps/jsoncpp && cmake --build ${{github.workspace}}/deps/jsoncpp/build --config ${{env.BUILD_TYPE}} --target install
- name: build deps/libevent
run: cmake -DEVENT__DISABLE_OPENSSL=ON -DEVENT__DISABLE_MBEDTLS=ON -DCMAKE_INSTALL_PREFIX=${{github.workspace}}/local -B ${{github.workspace}}/deps/libevent/build -S ${{github.workspace}}/deps/libevent && cmake --build ${{github.workspace}}/deps/libevent/build --config ${{env.BUILD_TYPE}} --target install
- name: Configure CMake
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
# See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type
run: cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local -B ${{github.workspace}}/build -DENABLE_ASAN=ON
- name: Build
# Build your program with the given configuration
run: cmake -DCMAKE_PREFIX_PATH=${{github.workspace}}/local --build ${{github.workspace}}/build --config ${{env.BUILD_TYPE}}
- name: Test
working-directory: ${{github.workspace}}/build
# Execute tests defined by the CMake configuration.
# See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail
run: ctest -C ${{env.BUILD_TYPE}}
shell: bash
run: sudo bash build.sh init && bash build.sh release --make -j4
./deps/3rd
./deps/libevent
./deps/googletest
./deps/jsoncpp
./deps/benchmark
build/*
build_*
cmake-build-*/*
.vscode/*
.DS_Store
......
[submodule "deps/libevent"]
path = deps/libevent
[submodule "deps/3rd/libevent"]
path = deps/3rd/libevent
url = https://github.com/libevent/libevent
[submodule "deps/googletest"]
path = deps/googletest
[submodule "deps/3rd/jsoncpp"]
path = deps/3rd/jsoncpp
url = https://github.com/open-source-parsers/jsoncpp
[submodule "deps/3rd/googletest"]
path = deps/3rd/googletest
url = https://github.com/google/googletest
[submodule "deps/jsoncpp"]
path = deps/jsoncpp
url = https://github.com/open-source-parsers/jsoncpp.git
[submodule "deps/3rd/benchmark"]
path = deps/3rd/benchmark
url = https://github.com/google/benchmark
......@@ -2,7 +2,7 @@
#INCLUDE(file1 [OPTIONAL])
cmake_minimum_required(VERSION 3.10)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD 20)
#SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
project(minidb)
......@@ -16,14 +16,18 @@ MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
#SET(LIBRARY_OUTPUT_PATH <路径>)
OPTION(ENABLE_ASAN "Enable build with address sanitizer" OFF)
OPTION(WITH_UNIT_TESTS "Compile miniob with unit tests" ON)
OPTION(CONCURRENCY "Support concurrency operations" OFF)
MESSAGE(STATUS "HOME dir: $ENV{HOME}")
#SET(ENV{变量名} 值)
IF(WIN32)
MESSAGE(STATUS "This is windows.")
ADD_DEFINITIONS(-DWIN32)
ELSEIF(WIN64)
MESSAGE(STATUS "This is windows.")
ADD_DEFINITIONS(-DWIN64)
MESSAGE(STATUS "This is windows.")
ADD_DEFINITIONS(-DWIN64)
ELSEIF(APPLE)
MESSAGE(STATUS "This is apple")
# normally __MACH__ has already been defined
......@@ -40,7 +44,6 @@ ENDIF(WIN32)
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -Wall -DCMAKE_EXPORT_COMPILE_COMMANDS=1")
IF(DEBUG)
MESSAGE("DEBUG has been set as TRUE ${DEBUG}")
#"${CMAKE_COMMON_FLAGS} -O0 -g " ${CMAKE_COMMON_FLAGS}最好在""以内,防止被cmake 增加了;
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -O0 -g -DDEBUG ")
ADD_DEFINITIONS(-DENABLE_DEBUG)
ELSEIF(NOT DEFINED ENV{DEBUG})
......@@ -50,13 +53,17 @@ ELSE()
MESSAGE("Enable debug")
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -O0 -g -DDEBUG")
ADD_DEFINITIONS(-DENABLE_DEBUG)
ENDIF()
ENDIF(DEBUG)
IF (CONCURRENCY)
MESSAGE("CONCURRENCY is ON")
SET(CMAKE_COMMON_FLAGS "${CMAKE_COMMON_FLAGS} -DCONCURRENCY")
ADD_DEFINITIONS(-DCONCURRENCY)
ENDIF (CONCURRENCY)
SET(CMAKE_CXX_FLAGS ${CMAKE_COMMON_FLAGS})
SET(CMAKE_C_FLAGS ${CMAKE_COMMON_FLAGS})
MESSAGE("CMAKE_CXX_FLAGS is " ${CMAKE_CXX_FLAGS})
OPTION(ENABLE_ASAN "Enable build with address sanitizer" OFF)
IF (ENABLE_ASAN)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-omit-frame-pointer -fsanitize=address")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer -fsanitize=address")
......@@ -77,11 +84,10 @@ MESSAGE("Install target dir is " ${CMAKE_INSTALL_PREFIX})
ADD_SUBDIRECTORY(deps)
ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(test/perf)
OPTION(WITH_UNIT_TESTS "Compile miniob with unit tests" ON)
ADD_SUBDIRECTORY(benchmark)
IF(WITH_UNIT_TESTS)
ADD_SUBDIRECTORY(unitest)
ADD_SUBDIRECTORY(unittest)
ENDIF()
# install 准备安装的目录是cmakefile 的当前目录, 不是build 后生成的目录
......
PROJECT(benchmark)
MESSAGE("Begin to build " ${PROJECT_NAME})
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR})
# 可以获取父cmake的变量
MESSAGE("${CMAKE_COMMON_FLAGS}")
#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...)
INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../deps ${PROJECT_SOURCE_DIR}/../src/observer /usr/local/include SYSTEM)
# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面
#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include)
LINK_DIRECTORIES(/usr/local/lib /usr/local/lib64 ${PROJECT_BINARY_DIR}/../lib)
IF (DEFINED ENV{LD_LIBRARY_PATH})
SET(LD_LIBRARY_PATH_STR $ENV{LD_LIBRARY_PATH})
#separate_arguments(LD_LIBRARY_PATH_STR) #只能处理空行
string(REPLACE ":" ";" LD_LIBRARY_PATH_LIST ${LD_LIBRARY_PATH_STR})
MESSAGE(" Add LD_LIBRARY_PATH to -L flags " ${LD_LIBRARY_PATH_LIST})
LINK_DIRECTORIES(${LD_LIBRARY_PATH_LIST})
ELSE ()
LINK_DIRECTORIES(/usr/local/lib)
ENDIF ()
find_package(benchmark CONFIG REQUIRED)
FILE(GLOB_RECURSE ALL_SRC *.cpp)
# AUX_SOURCE_DIRECTORY 类似功能
FOREACH (F ${ALL_SRC})
get_filename_component(prjName ${F} NAME_WE)
MESSAGE("Build ${prjName} according to ${F}")
ADD_EXECUTABLE(${prjName} ${F})
TARGET_LINK_LIBRARIES(${prjName} common pthread dl benchmark observer_static)
ENDFOREACH (F)
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/03/14
//
#include <inttypes.h>
#include <random>
#include <stdexcept>
#include <benchmark/benchmark.h>
#include "storage/index/bplus_tree.h"
#include "storage/default/disk_buffer_pool.h"
#include "rc.h"
#include "common/log/log.h"
using namespace std;
using namespace common;
using namespace benchmark;
class IntegerGenerator
{
public:
IntegerGenerator(int min, int max)
: distrib_(min, max)
{}
int next()
{
return distrib_(rd_);
}
private:
random_device rd_;
uniform_int_distribution<> distrib_;
};
once_flag init_bpm_flag;
BufferPoolManager bpm{512};
struct Stat
{
int64_t insert_success_count = 0;
int64_t duplicate_count = 0;
int64_t insert_other_count = 0;
int64_t delete_success_count = 0;
int64_t not_exist_count = 0;
int64_t delete_other_count = 0;
int64_t scan_success_count = 0;
int64_t scan_open_failed_count = 0;
int64_t mismatch_count = 0;
int64_t scan_other_count = 0;
};
class BenchmarkBase : public Fixture
{
public:
BenchmarkBase()
{
}
virtual ~BenchmarkBase()
{
BufferPoolManager::set_instance(nullptr);
}
virtual string Name() const = 0;
virtual void SetUp(const State &state)
{
if (0 != state.thread_index()) {
return;
}
string log_name = this->Name() + ".log";
string btree_filename = this->Name() + ".btree";
LoggerFactory::init_default(log_name.c_str(), LOG_LEVEL_TRACE);
std::call_once(init_bpm_flag, []() { BufferPoolManager::set_instance(&bpm); });
::remove(btree_filename.c_str());
const int internal_max_size = 200;
const int leaf_max_size = 200;
RC rc = handler_.create(btree_filename.c_str(), INTS, sizeof(int32_t)/*attr_len*/,
internal_max_size, leaf_max_size);
if (rc != RC::SUCCESS) {
throw runtime_error("failed to create btree handler");
}
LOG_INFO("test %s setup done. threads=%d, thread index=%d",
this->Name().c_str(), state.threads(), state.thread_index());
}
virtual void TearDown(const State &state)
{
if (0 != state.thread_index()) {
return;
}
handler_.close();
LOG_INFO("test %s teardown done. threads=%d, thread index=%d",
this->Name().c_str(), state.threads(), state.thread_index());
}
void FillUp(uint32_t min, uint32_t max)
{
for (uint32_t value = min; value < max; ++value) {
const char *key = reinterpret_cast<const char *>(&value);
RID rid(value, value);
RC rc = handler_.insert_entry(key, &rid);
ASSERT(rc == RC::SUCCESS, "failed to insert entry into btree. key=%" PRIu32, value);
}
}
uint32_t GetRangeMax(const State &state) const
{
uint32_t max = static_cast<uint32_t>(state.range(0) * 3);
if (max <= 0) {
max = (1 << 31);
}
return max;
}
void Insert(uint32_t value, Stat &stat)
{
const char *key = reinterpret_cast<const char *>(&value);
RID rid(value, value);
RC rc = handler_.insert_entry(key, &rid);
switch (rc) {
case RC::SUCCESS: {
stat.insert_success_count++;
} break;
case RC::RECORD_DUPLICATE_KEY: {
stat.duplicate_count++;
} break;
default: {
stat.insert_other_count++;
} break;
}
}
void Delete(uint32_t value, Stat &stat)
{
const char *key = reinterpret_cast<const char *>(&value);
RID rid(value, value);
RC rc = handler_.delete_entry(key, &rid);
switch (rc) {
case RC::SUCCESS: {
stat.delete_success_count++;
} break;
case RC::RECORD_RECORD_NOT_EXIST: {
stat.not_exist_count++;
} break;
default: {
stat.delete_other_count++;
} break;
}
}
void Scan(uint32_t begin, uint32_t end, Stat &stat)
{
const char *begin_key = reinterpret_cast<const char *>(&begin);
const char *end_key = reinterpret_cast<const char *>(&end);
BplusTreeScanner scanner(handler_);
RC rc = scanner.open(begin_key, sizeof(begin_key), true /*inclusive*/,
end_key, sizeof(end_key), true /*inclusive*/);
if (rc != RC::SUCCESS) {
stat.scan_open_failed_count++;
} else {
RID rid;
uint32_t count = 0;
while (RC::RECORD_EOF != (rc = scanner.next_entry(rid))) {
count++;
}
if (rc != RC::RECORD_EOF) {
stat.scan_other_count++;
} else if (count != (end - begin + 1)) {
stat.mismatch_count++;
} else {
stat.scan_success_count++;
}
scanner.close();
}
}
protected:
BplusTreeHandler handler_;
};
////////////////////////////////////////////////////////////////////////////////
struct InsertionBenchmark : public BenchmarkBase
{
string Name() const override { return "insertion"; }
};
BENCHMARK_DEFINE_F(InsertionBenchmark, Insertion) (State &state)
{
IntegerGenerator generator(1, 1 << 31);
Stat stat;
for (auto _ : state) {
uint32_t value = static_cast<uint32_t>(generator.next());
Insert(value, stat);
}
state.counters["success"] = Counter(stat.insert_success_count, Counter::kIsRate);
state.counters["duplicate"] = Counter(stat.duplicate_count, Counter::kIsRate);
state.counters["other"] = Counter(stat.insert_other_count, Counter::kIsRate);
}
BENCHMARK_REGISTER_F(InsertionBenchmark, Insertion)->Threads(10);
////////////////////////////////////////////////////////////////////////////////
class DeletionBenchmark : public BenchmarkBase
{
public:
string Name() const override { return "deletion"; }
void SetUp(const State &state) override
{
if (0 != state.thread_index()) {
return;
}
BenchmarkBase::SetUp(state);
uint32_t max = GetRangeMax(state);
ASSERT(max > 0, "invalid argument count. %ld", state.range(0));
FillUp(0, max);
}
};
BENCHMARK_DEFINE_F(DeletionBenchmark, Deletion) (State &state)
{
uint32_t max = GetRangeMax(state);
IntegerGenerator generator(0, max);
Stat stat;
for (auto _ : state) {
uint32_t value = static_cast<uint32_t>(generator.next());
Delete(value, stat);
}
state.counters["success"] = Counter(stat.delete_success_count, Counter::kIsRate);
state.counters["not_exist"] = Counter(stat.not_exist_count, Counter::kIsRate);
state.counters["other"] = Counter(stat.delete_other_count, Counter::kIsRate);
}
BENCHMARK_REGISTER_F(DeletionBenchmark, Deletion)->Threads(10)->Arg(4* 10000);
////////////////////////////////////////////////////////////////////////////////
class ScanBenchmark : public BenchmarkBase
{
public:
string Name() const override { return "scan"; }
void SetUp(const State &state) override
{
if (0 != state.thread_index()) {
return;
}
BenchmarkBase::SetUp(state);
uint32_t max = static_cast<uint32_t>(state.range(0)) * 3;
ASSERT(max > 0, "invalid argument count. %ld", state.range(0));
FillUp(0, max);
}
};
BENCHMARK_DEFINE_F(ScanBenchmark, Scan) (State &state)
{
int max_range_size = 100;
uint32_t max = GetRangeMax(state);
IntegerGenerator begin_generator(1, max - max_range_size);
IntegerGenerator range_generator(1, max_range_size);
Stat stat;
for (auto _ : state) {
uint32_t begin = static_cast<uint32_t>(begin_generator.next());
uint32_t end = begin + static_cast<uint32_t>(range_generator.next());
Scan(begin, end, stat);
}
state.counters["success"] = Counter(stat.scan_success_count, Counter::kIsRate);
state.counters["open_failed_count"] = Counter(stat.scan_open_failed_count, Counter::kIsRate);
state.counters["mismatch_number_count"] = Counter(stat.mismatch_count, Counter::kIsRate);
state.counters["other"] = Counter(stat.scan_other_count, Counter::kIsRate);
}
BENCHMARK_REGISTER_F(ScanBenchmark, Scan)->Threads(10)->Arg(4 * 10000);
////////////////////////////////////////////////////////////////////////////////
struct MixtureBenchmark : public BenchmarkBase
{
string Name() const override { return "mixture"; }
};
BENCHMARK_DEFINE_F(MixtureBenchmark, Mixture) (State &state)
{
pair<uint32_t, uint32_t> data_range{0, GetRangeMax(state)};
pair<uint32_t, uint32_t> scan_range{1, 100};
IntegerGenerator data_generator(data_range.first, data_range.second);
IntegerGenerator scan_range_generator(scan_range.first, scan_range.second);
IntegerGenerator operation_generator(0, 2);
Stat stat;
for (auto _ : state) {
int64_t operation_type = operation_generator.next();
switch (operation_type) {
case 0: { // insert
uint32_t value = static_cast<uint32_t>(data_generator.next());
Insert(value, stat);
} break;
case 1: { // delete
uint32_t value = static_cast<uint32_t>(data_generator.next());
Delete(value, stat);
} break;
case 2: { // scan
uint32_t begin = static_cast<uint32_t>(data_generator.next());
uint32_t end = begin + static_cast<uint32_t>(scan_range_generator.next());
Scan(begin, end, stat);
} break;
default: {
ASSERT(false, "should not happen. operation=%ld", operation_type);
}
}
}
state.counters.insert({
{"insert_success", Counter(stat.insert_success_count, Counter::kIsRate)},
{"insert_other", Counter(stat.insert_other_count, Counter::kIsRate)},
{"insert_duplicate", Counter(stat.duplicate_count, Counter::kIsRate)},
{"delete_success", Counter(stat.delete_success_count, Counter::kIsRate)},
{"delete_other", Counter(stat.delete_other_count, Counter::kIsRate)},
{"delete_not_exist", Counter(stat.not_exist_count, Counter::kIsRate)},
{"scan_success", Counter(stat.scan_success_count, Counter::kIsRate)},
{"scan_other", Counter(stat.scan_other_count, Counter::kIsRate)},
{"scan_mismatch", Counter(stat.mismatch_count, Counter::kIsRate)},
{"scan_open_failed", Counter(stat.scan_open_failed_count, Counter::kIsRate)}
});
}
BENCHMARK_REGISTER_F(MixtureBenchmark, Mixture)->Threads(10)->Arg(4 * 10000);
////////////////////////////////////////////////////////////////////////////////
BENCHMARK_MAIN();
#!/bin/bash
TOPDIR=`readlink -f \`dirname $0\``
BUILD_SH=$TOPDIR/build.sh
CMAKE_COMMAND="cmake -DCMAKE_EXPORT_COMPILE_COMMANDS=1"
ALL_ARGS=("$@")
BUILD_ARGS=()
MAKE_ARGS=(-j $CPU_CORES)
MAKE=make
ASAN_OPTION=ON
echo "$0 ${ALL_ARGS[@]}"
function usage
{
echo "Usage:"
echo "./build.sh -h"
echo "./build.sh init"
echo "./build.sh clean"
echo "./build.sh [BuildType] [--make [MakeOptions]]"
echo ""
echo "OPTIONS:"
echo "BuildType => debug(default), release, debug_asan, release_asan"
echo "MakeOptions => Options to make command, default: -j N"
echo ""
echo "Examples:"
echo "# Init."
echo "./build.sh init"
echo ""
echo "# Build by debug mode and make with -j24."
echo "./build.sh debug --make -j24"
}
function parse_args
{
make_start=false
for arg in "${ALL_ARGS[@]}"; do
if [[ "$arg" == "--make" ]]
then
make_start=true
elif [[ $make_start == false ]]
then
BUILD_ARGS+=("$arg")
else
MAKE_ARGS+=("$arg")
fi
done
}
# try call command make, if use give --make in command line.
function try_make
{
if [[ $MAKE != false ]]
then
$MAKE "${MAKE_ARGS[@]}"
fi
}
# create build directory and cd it.
function prepare_build_dir
{
TYPE=$1
mkdir -p $TOPDIR/build_$TYPE && cd $TOPDIR/build_$TYPE
}
function do_init
{
git submodule update --init || return
current_dir=$PWD
# build libevent
cd ${TOPDIR}/deps/3rd/libevent && \
git checkout release-2.1.12-stable && \
mkdir build && \
cd build && \
cmake .. -DEVENT__DISABLE_OPENSSL=ON && \
make -j4 && \
make install
# build googletest
cd ${TOPDIR}/deps/3rd/googletest && \
mkdir build && \
cd build && \
cmake .. && \
make -j4 && \
make install
# build google benchmark
cd ${TOPDIR}/deps/3rd/benchmark && \
mkdir build && \
cd build && \
cmake .. -DBENCHMARK_ENABLE_TESTING=OFF -DBENCHMARK_INSTALL_DOCS=OFF -DBENCHMARK_ENABLE_GTEST_TESTS=OFF -DBENCHMARK_USE_BUNDLED_GTEST=OFF -DBENCHMARK_ENABLE_ASSEMBLY_TESTS=OFF && \
make -j4 && \
make install
# build jsoncpp
cd ${TOPDIR}/deps/3rd/jsoncpp && \
mkdir build && \
cd build && \
cmake -DJSONCPP_WITH_TESTS=OFF -DJSONCPP_WITH_POST_BUILD_UNITTEST=OFF .. && \
make && \
make install
cd $current_dir
}
function prepare_build_dir
{
TYPE=$1
mkdir -p ${TOPDIR}/build_${TYPE} && cd ${TOPDIR}/build_${TYPE}
}
function do_build
{
TYPE=$1; shift
prepare_build_dir $TYPE || return
echo "${CMAKE_COMMAND} ${TOPDIR} $@"
${CMAKE_COMMAND} ${TOPDIR} $@
}
function do_clean
{
echo "clean build_* dirs"
find . -maxdepth 1 -type d -name 'build_*' | xargs rm -rf
}
function build
{
set -- "${BUILD_ARGS[@]}"
case "x$1" in
xrelease)
do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DDEBUG=OFF
;;
xrelease_asan)
do_build "$@" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DDEBUG=OFF -DENABLE_ASAN=$ASAN_OPTION
;;
xdebug)
do_build "$@" -DCMAKE_BUILD_TYPE=Debug -DDEBUG=ON
;;
xdebug_asan)
do_build "$@" -DCMAKE_BUILD_TYPE=Debug -DDEBUG=ON -DENABLE_ASAN=$ASAN_OPTION
;;
*)
BUILD_ARGS=(debug "${BUILD_ARGS[@]}")
build
;;
esac
}
function main
{
case "$1" in
-h)
usage
;;
init)
do_init
;;
clean)
do_clean
;;
*)
parse_args
build
try_make
;;
esac
}
main "$@"
Subproject commit f7547e29ccaed7b64ef4f7495ecfff1c9f6f3d03
Subproject commit 974e18ee6f146a2418f9cea83170c640e7d622d6
文件已移动
Subproject commit 6e1826dd7730330536e1838824bddd0d4d8adb0d
......@@ -258,4 +258,93 @@ void LockTrace::toString(std::string &result)
return;
}
void DebugMutex::lock()
{
#ifdef DEBUG
lock_.lock();
#endif
}
void DebugMutex::unlock()
{
#ifdef DEBUG
lock_.unlock();
#endif
}
////////////////////////////////////////////////////////////////////////////////
void Mutex::lock()
{
#ifdef CONCURRENCY
lock_.lock();
#endif
}
bool Mutex::try_lock()
{
#ifdef CONCURRENCY
return lock_.try_lock();
#else
return true;
#endif
}
void Mutex::unlock()
{
#ifdef CONCURRENCY
lock_.unlock();
#endif
}
////////////////////////////////////////////////////////////////////////////////
#ifdef CONCURRENCY
void SharedMutex::lock()
{
lock_.lock();
}
bool SharedMutex::try_lock()
{
return lock_.try_lock();
}
void SharedMutex::unlock() // unlock exclusive
{
lock_.unlock();
}
void SharedMutex::lock_shared()
{
lock_.lock_shared();
}
bool SharedMutex::try_lock_shared()
{
return lock_.try_lock_shared();
}
void SharedMutex::unlock_shared()
{
lock_.unlock_shared();
}
#else // CONCURRENCY undefined
void SharedMutex::lock()
{}
bool SharedMutex::try_lock()
{
return true;
}
void SharedMutex::unlock() // unlock exclusive
{}
void SharedMutex::lock_shared()
{}
bool SharedMutex::try_lock_shared()
{
return true;
}
void SharedMutex::unlock_shared()
{}
#endif // CONCURRENCY end
} // namespace common
\ No newline at end of file
......@@ -12,17 +12,18 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2010
//
#ifndef __COMMON_LANG_MUTEX_H__
#define __COMMON_LANG_MUTEX_H__
#pragma once
#include <sys/types.h>
#include <errno.h>
#include <map>
#include <pthread.h>
#include <string.h>
#include <map>
#include <set>
#include <sstream>
#include <string.h>
#include <string>
#include <sys/types.h>
#include <mutex>
#include <shared_mutex>
#include "common/log/log.h"
......@@ -239,5 +240,54 @@ protected:
#endif // DEBUG_LOCK
} // namespace common
#endif // __COMMON_LANG_MUTEX_H__
class DebugMutex final
{
public:
DebugMutex() = default;
~DebugMutex() = default;
void lock();
void unlock();
private:
#ifdef DEBUG
std::mutex lock_;
#endif
};
class Mutex final
{
public:
Mutex() = default;
~Mutex() = default;
void lock();
bool try_lock();
void unlock();
private:
#ifdef CONCURRENCY
std::mutex lock_;
#endif
};
class SharedMutex final
{
public:
SharedMutex() = default;
~SharedMutex() = default;
void lock(); // lock exclusive
bool try_lock();
void unlock(); // unlock exclusive
void lock_shared();
bool try_lock_shared();
void unlock_shared();
private:
#ifdef CONCURRENCY
std::shared_mutex lock_;
#endif
};
} // namespace common
\ No newline at end of file
......@@ -16,6 +16,7 @@ See the Mulan PSL v2 for more details. */
#include <exception>
#include <stdarg.h>
#include <stdio.h>
#include <execinfo.h>
#include "common/lang/string.h"
#include "common/log/log.h"
......@@ -339,11 +340,29 @@ int LoggerFactory::init_default(
const std::string &log_file, LOG_LEVEL log_level, LOG_LEVEL console_level, LOG_ROTATE rotate_type)
{
if (g_log != nullptr) {
LOG_WARN("Default logger has been initialized");
LOG_INFO("Default logger has been initialized");
return 0;
}
return init(log_file, &g_log, log_level, console_level, rotate_type);
}
const char *lbt()
{
constexpr int buffer_size = 100;
void *buffer[buffer_size];
constexpr int bt_buffer_size = 4096;
thread_local char backtrace_buffer[bt_buffer_size];
int size = backtrace(buffer, buffer_size);
int offset = 0;
for (int i = 0; i < size; i++) {
const char *format = (0 == i) ? "0x%lx" : " 0x%lx";
offset += snprintf(backtrace_buffer + offset, sizeof(backtrace_buffer) - offset, format,
reinterpret_cast<intptr_t>(buffer[i]));
}
return backtrace_buffer;
}
} // namespace common
\ No newline at end of file
......@@ -12,14 +12,13 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2010
//
#ifndef __COMMON_LOG_LOG_H__
#define __COMMON_LOG_LOG_H__
#pragma once
#include <sys/time.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
#include <fstream>
#include <iostream>
......@@ -47,7 +46,8 @@ typedef enum {
typedef enum { LOG_ROTATE_BYDAY = 0, LOG_ROTATE_BYSIZE, LOG_ROTATE_LAST } LOG_ROTATE;
class Log {
class Log
{
public:
Log(const std::string &log_name, const LOG_LEVEL log_level = LOG_LEVEL_INFO,
const LOG_LEVEL console_level = LOG_LEVEL_WARN);
......@@ -161,31 +161,34 @@ extern Log *g_log;
#define LOG_HEAD(prefix, level) \
if (common::g_log) { \
time_t now_time; \
time(&now_time); \
struct tm *p = localtime(&now_time); \
struct timeval tv; \
gettimeofday(&tv, NULL); \
struct tm *p = localtime(&tv.tv_sec); \
char sz_head[LOG_HEAD_SIZE] = {0}; \
if (p) { \
int usec = (int)tv.tv_usec; \
snprintf(sz_head, LOG_HEAD_SIZE, \
"%d-%d-%d %d:%d:%u pid:%u tid:%llx ", \
"%04d-%02d-%02d %02d:%02d:%02u.%06d pid:%u tid:%llx ", \
p->tm_year + 1900, \
p->tm_mon + 1, \
p->tm_mday, \
p->tm_hour, \
p->tm_min, \
p->tm_sec, \
usec, \
(u32_t)getpid(), \
gettid()); \
common::g_log->rotate(p->tm_year + 1900, p->tm_mon + 1, p->tm_mday); \
} \
snprintf(prefix, \
sizeof(prefix), \
"[%s %s %s %s %u]>>", \
"[%s %s %s:%u %s]>>", \
sz_head, \
(common::g_log)->prefix_msg(level), \
__FILE_NAME__, \
__FUNCTION__, \
(u32_t)__LINE__); \
(u32_t)__LINE__, \
__FUNCTION__ \
); \
}
#define LOG_OUTPUT(level, fmt, ...) \
......@@ -287,20 +290,30 @@ int Log::out(const LOG_LEVEL console_level, const LOG_LEVEL log_level, T &msg)
}
#ifndef ASSERT
#ifdef DEBUG
#define ASSERT(expression, description, ...) \
do { \
if (!(expression)) { \
if (common::g_log) { \
LOG_PANIC(description, ##__VA_ARGS__); \
LOG_PANIC("\n"); \
} \
assert(expression); \
} \
} while (0)
#else // DEBUG
#define ASSERT(expression, description, ...)
#endif // DEBUG
#endif // ASSERT
#define SYS_OUTPUT_FILE_POS ", File:" << __FILE__ << ", line:" << __LINE__ << ",function:" << __FUNCTION__
#define SYS_OUTPUT_ERROR ",error:" << errno << ":" << strerror(errno)
/**
* 获取当前函数调用栈
*/
const char *lbt();
} // namespace common
#endif //__COMMON_LOG_LOG_H__
// __CR__
// Copyright (c) 2021 LongdaFeng All Rights Reserved
//
// This software contains the intellectual property of LongdaFeng
// or is licensed to LongdaFeng from third parties. Use of this
// software and the intellectual property contained therein is
// expressly limited to the terms and conditions of the License Agreement
// under which it is provided by or on behalf of LongdaFeng.
// __CR__
//
// Created by Longda on 2010
//
#include "mm/debug_new.h"
#include <new>
#include <stdio.h>
#include <stdlib.h>
#ifdef _MSC_VER
#pragma warning(disable : 4073)
#pragma init_seg(lib)
#endif
#ifndef DEBUG_NEW_HASHTABLESIZE
#define DEBUG_NEW_HASHTABLESIZE 16384
#endif
#ifndef DEBUG_NEW_HASH
#define DEBUG_NEW_HASH(p) (((unsigned)(p) >> 8) % DEBUG_NEW_HASHTABLESIZE)
#endif
// The default behaviour now is to copy the file name, because we found
// that the exit leakage check cannot access the address of the file
// name sometimes (in our case, a core dump will occur when trying to
// access the file name in a shared library after a SIGINT).
#ifndef DEBUG_NEW_FILENAME_LEN
#define DEBUG_NEW_FILENAME_LEN 20
#endif
#if DEBUG_NEW_FILENAME_LEN == 0 && !defined(DEBUG_NEW_NO_FILENAME_COPY)
#define DEBUG_NEW_NO_FILENAME_COPY
#endif
#ifndef DEBUG_NEW_NO_FILENAME_COPY
#include <string.h>
#endif
struct new_ptr_list_t {
new_ptr_list_t *next;
#ifdef DEBUG_NEW_NO_FILENAME_COPY
const char *file;
#else
char file[DEBUG_NEW_FILENAME_LEN];
#endif
int line;
size_t size;
};
static new_ptr_list_t *new_ptr_list[DEBUG_NEW_HASHTABLESIZE];
bool new_verbose_flag = false;
bool new_autocheck_flag = true;
bool check_leaks()
{
bool fLeaked = false;
for (int i = 0; i < DEBUG_NEW_HASHTABLESIZE; ++i) {
new_ptr_list_t *ptr = new_ptr_list[i];
if (ptr == NULL)
continue;
fLeaked = true;
while (ptr) {
printf("Leaked object at %p (size %llu, %s:%d)\n",
(char *)ptr + sizeof(new_ptr_list_t),
(unsigned long long)ptr->size,
ptr->file,
ptr->line);
ptr = ptr->next;
}
}
if (fLeaked)
return true;
else
return false;
}
void *operator new(size_t size, const char *file, int line)
{
size_t s = size + sizeof(new_ptr_list_t);
new_ptr_list_t *ptr = (new_ptr_list_t *)malloc(s);
if (ptr == NULL) {
fprintf(stderr, "new: out of memory when allocating %u bytes\n", size);
abort();
}
void *pointer = (char *)ptr + sizeof(new_ptr_list_t);
size_t hash_index = DEBUG_NEW_HASH(pointer);
ptr->next = new_ptr_list[hash_index];
#ifdef DEBUG_NEW_NO_FILENAME_COPY
ptr->file = file;
#else
strncpy(ptr->file, file, DEBUG_NEW_FILENAME_LEN - 1);
ptr->file[DEBUG_NEW_FILENAME_LEN - 1] = '\0';
#endif
ptr->line = line;
ptr->size = size;
new_ptr_list[hash_index] = ptr;
if (new_verbose_flag)
printf("new: allocated %p (size %u, %s:%d)\n", pointer, size, file, line);
return pointer;
}
void *operator new[](size_t size, const char *file, int line)
{
return operator new(size, file, line);
}
void *operator new(size_t size)
{
return operator new(size, "<Unknown>", 0);
}
void *operator new[](size_t size)
{
return operator new(size);
}
void *operator new(size_t size, const std::nothrow_t &) throw()
{
return operator new(size);
}
void *operator new[](size_t size, const std::nothrow_t &) throw()
{
return operator new[](size);
}
void operator delete(void *pointer)
{
if (pointer == NULL)
return;
size_t hash_index = DEBUG_NEW_HASH(pointer);
new_ptr_list_t *ptr = new_ptr_list[hash_index];
new_ptr_list_t *ptr_last = NULL;
while (ptr) {
if ((char *)ptr + sizeof(new_ptr_list_t) == pointer) {
if (new_verbose_flag)
printf("delete: freeing %p (size %u)\n", pointer, ptr->size);
if (ptr_last == NULL)
new_ptr_list[hash_index] = ptr->next;
else
ptr_last->next = ptr->next;
free(ptr);
return;
}
ptr_last = ptr;
ptr = ptr->next;
}
fprintf(stderr, "delete: invalid pointer %p\n", pointer);
abort();
}
void operator delete[](void *pointer)
{
operator delete(pointer);
}
// Some older compilers like Borland C++ Compiler 5.5.1 and Digital Mars
// Compiler 8.29 do not support placement delete operators.
// NO_PLACEMENT_DELETE needs to be defined when using such compilers.
// Also note that in that case memory leakage will occur if an exception
// is thrown in the initialization (constructor) of a dynamically
// created object.
#ifndef NO_PLACEMENT_DELETE
void operator delete(void *pointer, const char *file, int line)
{
if (new_verbose_flag)
printf("info: exception thrown on initializing object at %p (%s:%d)\n", pointer, file, line);
operator delete(pointer);
}
void operator delete[](void *pointer, const char *file, int line)
{
operator delete(pointer, file, line);
}
void operator delete(void *pointer, const std::nothrow_t &)
{
operator delete(pointer, "<Unknown>", 0);
}
void operator delete[](void *pointer, const std::nothrow_t &)
{
operator delete(pointer, std::nothrow);
}
#endif // NO_PLACEMENT_DELETE
// Proxy class to automatically call check_leaks if new_autocheck_flag is set
class new_check_t {
public:
new_check_t()
{}
~new_check_t()
{
if (new_autocheck_flag) {
// Check for leakage.
// If any leaks are found, set new_verbose_flag so that any
// delete operations in the destruction of global/static
// objects will display information to compensate for
// possible false leakage reports.
if (check_leaks())
new_verbose_flag = true;
}
}
};
static new_check_t new_check_object;
......@@ -132,6 +132,13 @@ void *MemPoolItem::alloc()
return buffer;
}
MemPoolItem::unique_ptr MemPoolItem::alloc_unique_ptr()
{
void *item = this->alloc();
auto deleter = [this](void *p) { this->free(p); };
return MemPoolItem::unique_ptr(item, deleter);
}
void MemPoolItem::free(void *buf)
{
MUTEX_LOCK(&this->mutex);
......
......@@ -12,14 +12,15 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2010
//
#ifndef __COMMON_MM_MPOOL_H__
#define __COMMON_MM_MPOOL_H__
#pragma once
#include <queue>
#include <list>
#include <set>
#include <string>
#include <sstream>
#include <functional>
#include <memory>
#include "common/lang/mutex.h"
#include "common/log/log.h"
......@@ -290,12 +291,15 @@ T *MemPoolSimple<T>::alloc()
used.insert(buffer);
MUTEX_UNLOCK(&this->mutex);
new (buffer) T();
return buffer;
}
template <class T>
void MemPoolSimple<T>::free(T *buf)
{
buf->~T();
MUTEX_LOCK(&this->mutex);
size_t num = used.erase(buf);
......@@ -327,6 +331,9 @@ std::string MemPoolSimple<T>::to_string()
}
class MemPoolItem {
public:
using unique_ptr = std::unique_ptr<void, std::function<void(void * const)>>;
public:
MemPoolItem(const char *tag) : name(tag)
{
......@@ -369,6 +376,7 @@ public:
* @return
*/
void *alloc();
unique_ptr alloc_unique_ptr();
/**
* Free one item, the resouce will return to memory Pool
......@@ -446,4 +454,3 @@ protected:
};
} // namespace common
#endif /* __COMMON_MM_MPOOL_H__ */
Subproject commit 93f08be653c36ddc6943e9513fc14c7292b4d007
Subproject commit 117ee9a03fc74617e378f755c6b25005ac0e954f
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/03/07.
//
#include "session/thread_data.h"
#include "session/session.h"
thread_local ThreadData * ThreadData::thread_data_;
Trx * ThreadData::trx() const
{
return (session_ == nullptr) ? nullptr : session_->current_trx();
}
\ No newline at end of file
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/03/07.
//
#pragma once
class Trx;
class Session;
class ThreadData
{
public:
static ThreadData *current() { return thread_data_; }
static void setup(ThreadData *thread) { thread_data_ = thread; }
public:
ThreadData() = default;
~ThreadData() = default;
Session *session() const { return session_; }
Trx * trx() const;
void set_session(Session *session) { session_ = session; }
private:
static thread_local ThreadData * thread_data_;
private:
Session *session_ = nullptr;
};
\ No newline at end of file
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by lianyu on 2022/10/29.
//
#include "storage/buffer/frame.h"
#include "session/thread_data.h"
FrameId::FrameId(int file_desc, PageNum page_num) : file_desc_(file_desc), page_num_(page_num)
{}
bool FrameId::equal_to(const FrameId &other) const
{
return file_desc_ == other.file_desc_ && page_num_ == other.page_num_;
}
bool FrameId::operator==(const FrameId &other) const
{
return this->equal_to(other);
}
size_t FrameId::hash() const
{
return (static_cast<size_t>(file_desc_) << 32L) | page_num_;
}
int FrameId::file_desc() const
{
return file_desc_;
}
PageNum FrameId::page_num() const
{
return page_num_;
}
std::string to_string(const FrameId &frame_id)
{
std::stringstream ss;
ss << "fd:" << frame_id.file_desc() << ",page_num:" << frame_id.page_num();
return ss.str();
}
////////////////////////////////////////////////////////////////////////////////
intptr_t get_default_debug_xid()
{
ThreadData *thd = ThreadData::current();
intptr_t xid = (thd == nullptr) ?
// pthread_self的返回值类型是pthread_t,pthread_t在linux和mac上不同
// 在Linux上是一个整数类型,而在mac上是一个指针。为了能在两个平台上都编译通过,
// 就将pthread_self返回值转换两次
reinterpret_cast<intptr_t>(reinterpret_cast<void*>(pthread_self())) :
reinterpret_cast<intptr_t>(thd);
return xid;
}
void Frame::write_latch()
{
write_latch(get_default_debug_xid());
}
void Frame::write_latch(intptr_t xid)
{
{
std::scoped_lock debug_lock(debug_lock_);
ASSERT(pin_count_.load() > 0,
"frame lock. write lock failed while pin count is invalid. "
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(write_locker_ != xid,
"frame lock write twice."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(read_lockers_.find(xid) == read_lockers_.end(),
"frame lock write while holding the read lock."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
}
lock_.lock();
write_locker_ = xid;
LOG_DEBUG("frame write lock success."
"this=%p, pin=%d, pageNum=%d, write locker=%lx, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, write_locker_, file_desc_, xid, lbt());
// pthread_rwlock_wrlock(&rwlock_);
}
void Frame::write_unlatch()
{
write_unlatch(get_default_debug_xid());
}
void Frame::write_unlatch(intptr_t xid)
{
// 因为当前已经加着写锁,而且写锁只有一个,所以不再加debug_lock来做校验
ASSERT(pin_count_.load() > 0,
"frame lock. write unlock failed while pin count is invalid."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(write_locker_ == xid,
"frame unlock write while not the owner."
"write_locker=%lx, this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
write_locker_, this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
LOG_DEBUG("frame write unlock success. this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
write_locker_ = 0;
lock_.unlock();
// pthread_rwlock_unlock(&rwlock_);
}
void Frame::read_latch()
{
read_latch(get_default_debug_xid());
}
void Frame::read_latch(intptr_t xid)
{
{
std::scoped_lock debug_lock(debug_lock_);
ASSERT(pin_count_ > 0, "frame lock. read lock failed while pin count is invalid."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(read_lockers_.find(xid) == read_lockers_.end(),
"frame lock read double times."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(xid != write_locker_,
"frame lock read while holding the write lock."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
read_lockers_.insert(xid);
}
lock_.lock();
LOG_DEBUG("frame read lock success."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
// pthread_rwlock_rdlock(&rwlock_);
}
bool Frame::try_read_latch()
{
intptr_t xid = get_default_debug_xid();
{
std::scoped_lock debug_lock(debug_lock_);
ASSERT(pin_count_ > 0, "frame try lock. read lock failed while pin count is invalid."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(read_lockers_.find(xid) == read_lockers_.end(),
"frame try to lock read double times."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(xid != write_locker_,
"frame try to lock read while holding the write lock."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
}
bool ret = lock_.try_lock();
if (ret) {
debug_lock_.lock();
read_lockers_.insert(xid);
LOG_DEBUG("frame read lock success."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
debug_lock_.unlock();
}
return ret;
}
void Frame::read_unlatch()
{
read_unlatch(get_default_debug_xid());
}
void Frame::read_unlatch(intptr_t xid)
{
{
std::scoped_lock debug_lock(debug_lock_);
ASSERT(pin_count_.load() > 0,
"frame lock. read unlock failed while pin count is invalid."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
ASSERT(read_lockers_.find(xid) != read_lockers_.end(),
"frame unlock while not holding read lock."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
read_lockers_.erase(xid);
}
LOG_DEBUG("frame read unlock success."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
lock_.unlock();
// pthread_rwlock_unlock(&rwlock_);
}
void Frame::pin()
{
std::scoped_lock debug_lock(debug_lock_);
intptr_t xid = get_default_debug_xid();
int pin_count = ++pin_count_;
LOG_DEBUG("after frame pin. this=%p, write locker=%lx, read locker has xid %d? pin=%d, fd=%d, pageNum=%d, xid=%lx, lbt=%s",
this, write_locker_, read_lockers_.find(xid) != read_lockers_.end(),
pin_count, file_desc_, page_.page_num, xid, lbt());
}
int Frame::unpin()
{
intptr_t xid = get_default_debug_xid();
ASSERT(pin_count_.load() > 0,
"try to unpin a frame that pin count <= 0."
"this=%p, pin=%d, pageNum=%d, fd=%d, xid=%lx, lbt=%s",
this, pin_count_.load(), page_.page_num, file_desc_, xid, lbt());
std::scoped_lock debug_lock(debug_lock_);
int pin_count = --pin_count_;
LOG_DEBUG("after frame unpin. "
"this=%p, write locker=%lx, read locker has xid? %d, pin=%d, fd=%d, pageNum=%d, xid=%lx, lbt=%s",
this, write_locker_, read_lockers_.find(xid) != read_lockers_.end(),
pin_count, file_desc_, page_.page_num, xid, lbt());
if (0 == pin_count) {
ASSERT(write_locker_ == 0,
"frame unpin to 0 failed while someone hold the write lock. write locker=%lx, pageNum=%d, fd=%d, xid=%lx",
write_locker_, page_.page_num, file_desc_, xid);
ASSERT(read_lockers_.empty(),
"frame unpin to 0 failed while someone hold the read locks. reader num=%d, pageNum=%d, fd=%d, xid=%lx",
read_lockers_.size(), page_.page_num, file_desc_, xid);
}
return pin_count;
}
unsigned long current_time()
{
struct timespec tp;
clock_gettime(CLOCK_MONOTONIC, &tp);
return tp.tv_sec * 1000 * 1000 * 1000UL + tp.tv_nsec;
}
void Frame::access()
{
acc_time_ = current_time();
}
std::string to_string(const Frame &frame)
{
std::stringstream ss;
ss << "frame id:" << to_string(frame.frame_id())
<< ", dirty=" << frame.dirty()
<< ", pin=" << frame.pin_count()
<< ", fd=" << frame.file_desc()
<< ", page num=" << frame.page_num()
<< ", lsn=" << frame.lsn();
return ss.str();
}
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by lianyu on 2022/10/29.
//
#pragma once
#include <pthread.h>
#include <string.h>
#include <string>
#include <mutex>
#include <set>
#include <atomic>
#include "storage/buffer/page.h"
#include "common/log/log.h"
#include "common/lang/mutex.h"
class FrameId
{
public:
FrameId(int file_desc, PageNum page_num);
bool equal_to(const FrameId &other) const;
bool operator==(const FrameId &other) const;
size_t hash() const;
int file_desc() const;
PageNum page_num() const;
friend std::string to_string(const FrameId &frame_id);
private:
int file_desc_;
PageNum page_num_;
};
class Frame
{
public:
void clear_page()
{
memset(&page_, 0, sizeof(page_));
}
int file_desc() const { return file_desc_; }
void set_file_desc(int fd) { file_desc_ = fd; }
Page & page() { return page_; }
PageNum page_num() const { return page_.page_num; }
void set_page_num(PageNum page_num) { page_.page_num = page_num; }
FrameId frame_id() const { return FrameId(file_desc_, page_.page_num); }
LSN lsn() const { return page_.lsn; }
void set_lsn(LSN lsn) { page_.lsn = lsn; }
/// 刷新访问时间 TODO touch is better?
void access();
/**
* 标记指定页面为“脏”页。如果修改了页面的内容,则应调用此函数,
* 以便该页面被淘汰出缓冲区时系统将新的页面数据写入磁盘文件
*/
void mark_dirty() { dirty_ = true; }
void clear_dirty() { dirty_ = false; }
bool dirty() const { return dirty_; }
char *data() { return page_.data; }
bool can_purge() { return pin_count_.load() == 0; }
/**
* 给当前页帧增加引用计数
* pin通常都会加着frame manager锁来访问
*/
void pin();
/**
* 释放一个当前页帧的引用计数
* 与pin对应,但是通常不会加着frame manager的锁来访问
*/
int unpin();
int pin_count() const { return pin_count_.load(); }
void write_latch();
void write_latch(intptr_t xid);
void write_unlatch();
void write_unlatch(intptr_t xid);
void read_latch();
void read_latch(intptr_t xid);
bool try_read_latch();
void read_unlatch();
void read_unlatch(intptr_t xid);
friend std::string to_string(const Frame &frame);
private:
friend class BufferPool;
bool dirty_ = false;
std::atomic<int> pin_count_{0};
unsigned long acc_time_ = 0;
int file_desc_ = -1;
Page page_;
//读写锁
pthread_rwlock_t rwlock_ = PTHREAD_RWLOCK_INITIALIZER;
/// 在非并发编译时,加锁解锁动作将什么都不做
common::Mutex lock_;
/// 使用一些手段来做测试,提前检测出头疼的死锁问题
/// 如果编译时没有增加调试选项,这些代码什么都不做
common::DebugMutex debug_lock_;
intptr_t write_locker_ = 0;
std::set<intptr_t> read_lockers_;
};
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/03/07.
//
#pragma once
#include <stdint.h>
using TrxID = int32_t;
using SpaceID = int32_t;
/// 磁盘文件,包括存放数据的文件和索引(B+-Tree)文件,都按照页来组织
/// 每一页都有一个编号,称为PageNum
using PageNum = int32_t;
/// 数据文件中按照页来组织,每一页会存放一些行数据(row),或称为记录(record)
/// 每一行(row/record),都占用一个槽位(slot),这些槽有一个编号,称为SlotNum
using SlotNum = int32_t;
/// LSN for log sequence number
using LSN = int32_t;
static constexpr int BP_INVALID_PAGE_NUM = -1;
static constexpr int INVALID_TRX_ID = -1;
static constexpr int INVALID_LSN = -1;
static constexpr PageNum BP_HEADER_PAGE = 0;
static constexpr int LOG_BUFFER_SIZE = 1<<10; // TODO move to log record
static constexpr const int BP_PAGE_SIZE = (1 << 13);
static constexpr const int BP_PAGE_DATA_SIZE = (BP_PAGE_SIZE - sizeof(PageNum) - sizeof(LSN));
struct Page
{
PageNum page_num;
LSN lsn;
char data[BP_PAGE_DATA_SIZE];
};
......@@ -23,28 +23,22 @@ See the Mulan PSL v2 for more details. */
#include <string>
#include <mutex>
#include <unordered_map>
#include <functional>
#include "rc.h"
#include "defs.h"
#include "common/lang/mutex.h"
#include "common/mm/mem_pool.h"
#include "common/lang/lru_cache.h"
#include "common/lang/bitmap.h"
#include "storage/buffer/page.h"
#include "storage/buffer/frame.h"
class BufferPoolManager;
class DiskBufferPool;
//
#define BP_INVALID_PAGE_NUM (-1)
#define BP_PAGE_SIZE (1 << 14)
#define BP_PAGE_DATA_SIZE (BP_PAGE_SIZE - sizeof(PageNum))
#define BP_FILE_SUB_HDR_SIZE (sizeof(BPFileSubHeader))
struct Page {
PageNum page_num;
char data[BP_PAGE_DATA_SIZE];
};
// sizeof(Page) should be equal to BP_PAGE_SIZE
/**
* BufferPool的文件第一个页面,存放一些元数据信息,包括了后面每页的分配信息。
* TODO 1. 当前的做法,只能分配比较少的页面,你可以扩展一下,支持更多的页面或无限多的页面吗?
......@@ -52,7 +46,8 @@ struct Page {
* 2. 当前使用bitmap存放页面分配情况,但是这种方法在页面非常多的时候,查找空闲页面的
* 效率非常低,你有办法优化吗?
*/
struct BPFileHeader {
struct BPFileHeader
{
int32_t page_count; //! 当前文件一共有多少个页面
int32_t allocated_pages; //! 已经分配了多少个页面
char bitmap[0]; //! 页面分配位图, 第0个页面(就是当前页面),总是1
......@@ -63,96 +58,8 @@ struct BPFileHeader {
static const int MAX_PAGE_NUM = (BP_PAGE_DATA_SIZE - sizeof(page_count) - sizeof(allocated_pages)) * 8;
};
class Frame {
public:
void clear_page()
{
memset(&page_, 0, sizeof(page_));
}
PageNum page_num() const
{
return page_.page_num;
}
void set_page_num(PageNum page_num)
{
page_.page_num = page_num;
}
/**
* 标记指定页面为“脏”页。如果修改了页面的内容,则应调用此函数,
* 以便该页面被淘汰出缓冲区时系统将新的页面数据写入磁盘文件
*/
void mark_dirty()
{
dirty_ = true;
}
char *data()
{
return page_.data;
}
int file_desc() const
{
return file_desc_;
}
void set_file_desc(int fd)
{
file_desc_ = fd;
}
bool can_purge()
{
return pin_count_ <= 0;
}
private:
friend class DiskBufferPool;
bool dirty_ = false;
unsigned int pin_count_ = 0;
unsigned long acc_time_ = 0;
int file_desc_ = -1;
Page page_;
};
class BPFrameId {
public:
BPFrameId(int file_desc, PageNum page_num) : file_desc_(file_desc), page_num_(page_num)
{}
bool equal_to(const BPFrameId &other) const
{
return file_desc_ == other.file_desc_ && page_num_ == other.page_num_;
}
bool operator==(const BPFrameId &other) const
{
return this->equal_to(other);
}
size_t hash() const
{
return static_cast<size_t>(file_desc_) << 32L | page_num_;
}
int file_desc() const
{
return file_desc_;
}
PageNum page_num() const
{
return page_num_;
}
private:
int file_desc_;
PageNum page_num_;
};
class BPFrameManager {
class BPFrameManager
{
public:
BPFrameManager(const char *tag);
......@@ -173,9 +80,12 @@ public:
/**
* 如果不能从空闲链表中分配新的页面,就使用这个接口,
* 尝试从pin count=0的页面中淘汰一个
* 尝试从pin count=0的页面中淘汰一些
* @param count 想要purge多少个页面
* @param purger 需要在释放frame之前,对页面做些什么操作。当前是刷新脏数据到磁盘
* @return 返回本次清理了多少个页面
*/
Frame *begin_purge();
int purge_frames(int count, std::function<RC(Frame *frame)> purger);
size_t frame_num() const
{
......@@ -190,23 +100,29 @@ public:
return allocator_.get_size();
}
private:
Frame *get_internal(const FrameId &frame_id);
RC free_internal(const FrameId &frame_id, Frame *frame);
private:
class BPFrameIdHasher {
public:
size_t operator()(const BPFrameId &frame_id) const
size_t operator()(const FrameId &frame_id) const
{
return frame_id.hash();
}
};
using FrameLruCache = common::LruCache<BPFrameId, Frame *, BPFrameIdHasher>;
using FrameLruCache = common::LruCache<FrameId, Frame *, BPFrameIdHasher>;
using FrameAllocator = common::MemPoolSimple<Frame>;
std::mutex lock_;
FrameLruCache frames_;
FrameLruCache frames_;
FrameAllocator allocator_;
};
class BufferPoolIterator {
class BufferPoolIterator
{
public:
BufferPoolIterator();
~BufferPoolIterator();
......@@ -221,7 +137,8 @@ private:
PageNum current_page_num_ = -1;
};
class DiskBufferPool {
class DiskBufferPool
{
public:
DiskBufferPool(BufferPoolManager &bp_manager, BPFrameManager &frame_manager);
~DiskBufferPool();
......@@ -253,9 +170,6 @@ public:
*/
RC allocate_page(Frame **frame);
/**
* 比purge_page多一个动作, 在磁盘上将对应的页数据删掉。
*/
RC dispose_page(PageNum page_num);
/**
......@@ -300,7 +214,6 @@ public:
*/
RC recover_page(PageNum page_num);
protected:
protected:
RC allocate_frame(PageNum page_num, Frame **buf);
......@@ -315,22 +228,30 @@ protected:
*/
RC load_page(PageNum page_num, Frame *frame);
/**
* 如果页面是脏的,就将数据刷新到磁盘
*/
RC flush_page_internal(Frame &frame);
private:
BufferPoolManager &bp_manager_;
BPFrameManager &frame_manager_;
std::string file_name_;
int file_desc_ = -1;
Frame *hdr_frame_ = nullptr;
BPFileHeader *file_header_ = nullptr;
std::set<PageNum> disposed_pages;
BufferPoolManager & bp_manager_;
BPFrameManager & frame_manager_;
std::string file_name_;
int file_desc_ = -1;
Frame * hdr_frame_ = nullptr;
BPFileHeader * file_header_ = nullptr;
std::set<PageNum> disposed_pages_;
common::Mutex lock_;
private:
friend class BufferPoolIterator;
};
class BufferPoolManager {
class BufferPoolManager
{
public:
BufferPoolManager();
BufferPoolManager(int page_num = 0);
~BufferPoolManager();
RC create_file(const char *file_name);
......@@ -340,11 +261,13 @@ public:
RC flush_page(Frame &frame);
public:
static void set_instance(BufferPoolManager *bpm);
static void set_instance(BufferPoolManager *bpm); // TODO 优化全局变量的表示方法
static BufferPoolManager &instance();
private:
BPFrameManager frame_manager_{"BufPool"};
common::Mutex lock_;
std::unordered_map<std::string, DiskBufferPool *> buffer_pools_;
std::unordered_map<int, DiskBufferPool *> fd_buffer_pools_;
};
......@@ -14,22 +14,33 @@ See the Mulan PSL v2 for more details. */
// Rewritten by Longda & Wangyunlai
//
//
#ifndef __OBSERVER_STORAGE_COMMON_INDEX_MANAGER_H_
#define __OBSERVER_STORAGE_COMMON_INDEX_MANAGER_H_
#pragma once
#include <string.h>
#include <sstream>
#include <functional>
#include <memory>
#include "storage/record/record_manager.h"
#include "storage/default/disk_buffer_pool.h"
#include "storage/trx/latch_memo.h"
#include "sql/parser/parse_defs.h"
#include "common/lang/comparator.h"
#include "common/log/log.h"
#define EMPTY_RID_PAGE_NUM -1
#define EMPTY_RID_PAGE_NUM -1 // TODO remove me
#define EMPTY_RID_SLOT_NUM -1
class AttrComparator {
enum class BplusTreeOperationType
{
READ,
INSERT,
DELETE,
};
class AttrComparator
{
public:
void init(AttrType type, int length)
{
......@@ -55,8 +66,7 @@ public:
return common::compare_string((void *)v1, attr_length_, (void *)v2, attr_length_);
}
default: {
LOG_ERROR("unknown attr type. %d", attr_type_);
abort();
ASSERT(false, "unknown attr type. %d", attr_type_);
}
}
}
......@@ -66,7 +76,8 @@ private:
int attr_length_;
};
class KeyComparator {
class KeyComparator
{
public:
void init(AttrType type, int length)
{
......@@ -94,7 +105,8 @@ private:
AttrComparator attr_comparator_;
};
class AttrPrinter {
class AttrPrinter
{
public:
void init(AttrType type, int length)
{
......@@ -127,8 +139,7 @@ public:
return str;
}
default: {
LOG_ERROR("unknown attr type. %d", attr_type_);
abort();
ASSERT(false, "unknown attr type. %d", attr_type_);
}
}
}
......@@ -138,7 +149,8 @@ private:
int attr_length_;
};
class KeyPrinter {
class KeyPrinter
{
public:
void init(AttrType type, int length)
{
......@@ -169,7 +181,8 @@ private:
* this is the first page of bplus tree.
* only one field can be supported, can you extend it to multi-fields?
*/
struct IndexFileHeader {
struct IndexFileHeader
{
IndexFileHeader()
{
memset(this, 0, sizeof(IndexFileHeader));
......@@ -197,17 +210,17 @@ struct IndexFileHeader {
}
};
#define RECORD_RESERVER_PAIR_NUM 2
/**
* the common part of page describtion of bplus tree
* storage format:
* | page type | item number | parent page id |
*/
struct IndexNode {
struct IndexNode
{
static constexpr int HEADER_SIZE = 12;
bool is_leaf;
int key_num;
bool is_leaf;
int key_num;
PageNum parent;
};
......@@ -222,10 +235,10 @@ struct IndexNode {
* the value is rid.
* can you implenment a cluster index ?
*/
struct LeafIndexNode : public IndexNode {
static constexpr int HEADER_SIZE = IndexNode::HEADER_SIZE + 8;
struct LeafIndexNode : public IndexNode
{
static constexpr int HEADER_SIZE = IndexNode::HEADER_SIZE + 4;
PageNum prev_brother;
PageNum next_brother;
/**
* leaf can store order keys and rids at most
......@@ -242,7 +255,8 @@ struct LeafIndexNode : public IndexNode {
* the first key is ignored(key0).
* so it will waste space, can you fix this?
*/
struct InternalIndexNode : public IndexNode {
struct InternalIndexNode : public IndexNode
{
static constexpr int HEADER_SIZE = IndexNode::HEADER_SIZE;
/**
......@@ -251,24 +265,34 @@ struct InternalIndexNode : public IndexNode {
char array[0];
};
class IndexNodeHandler {
/**
* IndexNode 仅作为数据在内存或磁盘中的表示
* IndexNodeHandler 负责对IndexNode做各种操作。
* 作为一个类来说,虚函数会影响“结构体”真实的内存布局,所以将数据存储与操作分开
*/
class IndexNodeHandler
{
public:
IndexNodeHandler(const IndexFileHeader &header, Frame *frame);
virtual ~IndexNodeHandler() = default;
void init_empty(bool leaf);
bool is_leaf() const;
int key_size() const;
int value_size() const;
int item_size() const;
int key_size() const;
int value_size() const;
int item_size() const;
void increase_size(int n);
int size() const;
int size() const;
int max_size() const;
int min_size() const;
void set_parent_page_num(PageNum page_num);
PageNum parent_page_num() const;
PageNum page_num() const;
bool is_safe(BplusTreeOperationType op, bool is_root_node);
bool validate() const;
friend std::string to_string(const IndexNodeHandler &handler);
......@@ -279,15 +303,15 @@ protected:
IndexNode *node_;
};
class LeafIndexNodeHandler : public IndexNodeHandler {
class LeafIndexNodeHandler : public IndexNodeHandler
{
public:
LeafIndexNodeHandler(const IndexFileHeader &header, Frame *frame);
virtual ~LeafIndexNodeHandler() = default;
void init_empty();
void set_next_page(PageNum page_num);
void set_prev_page(PageNum page_num);
PageNum next_page() const;
PageNum prev_page() const;
char *key_at(int index);
char *value_at(int index);
......@@ -301,7 +325,7 @@ public:
void insert(int index, const char *key, const char *value);
void remove(int index);
int remove(const char *key, const KeyComparator &comparator);
int remove(const char *key, const KeyComparator &comparator);
RC move_half_to(LeafIndexNodeHandler &other, DiskBufferPool *bp);
RC move_first_to_end(LeafIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool);
RC move_last_to_front(LeafIndexNodeHandler &other, DiskBufferPool *bp);
......@@ -310,9 +334,6 @@ public:
*/
RC move_to(LeafIndexNodeHandler &other, DiskBufferPool *bp);
int max_size() const;
int min_size() const;
bool validate(const KeyComparator &comparator, DiskBufferPool *bp) const;
friend std::string to_string(const LeafIndexNodeHandler &handler, const KeyPrinter &printer);
......@@ -329,9 +350,11 @@ private:
LeafIndexNode *leaf_node_;
};
class InternalIndexNodeHandler : public IndexNodeHandler {
class InternalIndexNodeHandler : public IndexNodeHandler
{
public:
InternalIndexNodeHandler(const IndexFileHeader &header, Frame *frame);
virtual ~InternalIndexNodeHandler() = default;
void init_empty();
void create_new_root(PageNum first_page_num, const char *key, PageNum page_num);
......@@ -351,13 +374,16 @@ public:
/**
* 与Leaf节点不同,lookup返回指定key应该属于哪个子节点,返回这个子节点在当前节点中的索引
* 如果想要返回插入位置,就提供 `insert_position` 参数
* @param comparator 用于键值比较的函数
* @param key 查找的键值
* @param found 如果是有效指针,将会返回当前是否存在指定的键值
* @param insert_position 如果是有效指针,将会返回可以插入指定键值的位置
* NOTE: 查找效率不高,你可以优化它吗?
*/
int lookup(
const KeyComparator &comparator, const char *key, bool *found = nullptr, int *insert_position = nullptr) const;
int max_size() const;
int min_size() const;
int lookup(const KeyComparator &comparator,
const char *key,
bool *found = nullptr,
int *insert_position = nullptr) const;
RC move_to(InternalIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool);
RC move_first_to_end(InternalIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool);
......@@ -382,17 +408,21 @@ private:
int item_size() const;
private:
InternalIndexNode *internal_node_;
InternalIndexNode *internal_node_ = nullptr;
};
class BplusTreeHandler {
class BplusTreeHandler
{
public:
/**
* 此函数创建一个名为fileName的索引。
* attrType描述被索引属性的类型,attrLength描述被索引属性的长度
*/
RC create(
const char *file_name, AttrType attr_type, int attr_length, int internal_max_size = -1, int leaf_max_size = -1);
RC create(const char *file_name,
AttrType attr_type,
int attr_length,
int internal_max_size = -1,
int leaf_max_size = -1);
/**
* 打开名为fileName的索引文件。
......@@ -434,71 +464,85 @@ public:
/**
* Check whether current B+ tree is invalid or not.
* return true means current tree is valid, return false means current tree is invalid.
* @return
* @return true means current tree is valid, return false means current tree is invalid.
* @note thread unsafe
*/
bool validate_tree();
public:
/**
* 这些函数都是线程不安全的,不要在多线程的环境下调用
*/
RC print_tree();
RC print_leafs();
private:
/**
* 这些函数都是线程不安全的,不要在多线程的环境下调用
*/
RC print_leaf(Frame *frame);
RC print_internal_node_recursive(Frame *frame);
bool validate_node(IndexNode *node);
bool validate_leaf_link();
bool validate_node_recursive(Frame *frame);
bool validate_leaf_link(LatchMemo &latch_memo);
bool validate_node_recursive(LatchMemo &latch_memo, Frame *frame);
protected:
RC find_leaf(const char *key, Frame *&frame);
RC left_most_page(Frame *&frame);
RC right_most_page(Frame *&frame);
RC find_leaf_internal(const std::function<PageNum(InternalIndexNodeHandler &)> &child_page_getter, Frame *&frame);
RC insert_into_parent(PageNum parent_page, Frame *left_frame, const char *pkey, Frame &right_frame);
RC find_leaf(LatchMemo &latch_memo, BplusTreeOperationType op, const char *key, Frame *&frame);
RC left_most_page(LatchMemo &latch_memo, Frame *&frame);
RC find_leaf_internal(LatchMemo &latch_memo, BplusTreeOperationType op,
const std::function<PageNum(InternalIndexNodeHandler &)> &child_page_getter,
Frame *&frame);
RC crabing_protocal_fetch_page(LatchMemo &latch_memo, BplusTreeOperationType op, PageNum page_num, bool is_root_page,
Frame *&frame);
RC delete_entry_internal(Frame *leaf_frame, const char *key);
RC insert_into_parent(LatchMemo &latch_memo, PageNum parent_page, Frame *left_frame, const char *pkey,
Frame &right_frame);
RC insert_into_new_root(Frame *left_frame, const char *pkey, Frame &right_frame);
RC delete_entry_internal(LatchMemo &latch_memo, Frame *leaf_frame, const char *key);
template <typename IndexNodeHandlerType>
RC split(Frame *frame, Frame *&new_frame);
RC split(LatchMemo &latch_memo, Frame *frame, Frame *&new_frame);
template <typename IndexNodeHandlerType>
RC coalesce_or_redistribute(Frame *frame);
RC coalesce_or_redistribute(LatchMemo &latch_memo, Frame *frame);
template <typename IndexNodeHandlerType>
RC coalesce(Frame *neighbor_frame, Frame *frame, Frame *parent_frame, int index);
RC coalesce(LatchMemo &latch_memo, Frame *neighbor_frame, Frame *frame, Frame *parent_frame, int index);
template <typename IndexNodeHandlerType>
RC redistribute(Frame *neighbor_frame, Frame *frame, Frame *parent_frame, int index);
RC insert_entry_into_parent(Frame *frame, Frame *new_frame, const char *key);
RC insert_entry_into_leaf_node(Frame *frame, const char *pkey, const RID *rid);
RC update_root_page_num();
RC insert_entry_into_parent(LatchMemo &latch_memo, Frame *frame, Frame *new_frame, const char *key);
RC insert_entry_into_leaf_node(LatchMemo &latch_memo, Frame *frame, const char *pkey, const RID *rid);
RC create_new_tree(const char *key, const RID *rid);
RC adjust_root(Frame *root_frame);
void update_root_page_num(PageNum root_page_num);
void update_root_page_num_locked(PageNum root_page_num);
RC adjust_root(LatchMemo &latch_memo, Frame *root_frame);
private:
char *make_key(const char *user_key, const RID &rid);
common::MemPoolItem::unique_ptr make_key(const char *user_key, const RID &rid);
void free_key(char *key);
protected:
DiskBufferPool *disk_buffer_pool_ = nullptr;
bool header_dirty_ = false;
bool header_dirty_ = false; //
IndexFileHeader file_header_;
KeyComparator key_comparator_;
KeyPrinter key_printer_;
// 在调整根节点时,需要加上这个锁。
// 这个锁可以使用递归读写锁,但是这里偷懒先不改
common::SharedMutex root_lock_;
common::MemPoolItem *mem_pool_item_ = nullptr;
KeyComparator key_comparator_;
KeyPrinter key_printer_;
std::unique_ptr<common::MemPoolItem> mem_pool_item_;
private:
friend class BplusTreeScanner;
friend class BplusTreeTester;
};
class BplusTreeScanner {
class BplusTreeScanner
{
public:
BplusTreeScanner(BplusTreeHandler &tree_handler);
~BplusTreeScanner();
......@@ -512,10 +556,10 @@ public:
* @param right_len right_user_key 的内存大小(只有在变长字段中才会关注)
* @param right_inclusive 右边界的值是否包含在内
*/
RC open(const char *left_user_key, int left_len, bool left_inclusive, const char *right_user_key, int right_len,
bool right_inclusive);
RC open(const char *left_user_key, int left_len, bool left_inclusive,
const char *right_user_key, int right_len, bool right_inclusive);
RC next_entry(RID *rid);
RC next_entry(RID &rid);
RC close();
......@@ -525,16 +569,20 @@ private:
*/
RC fix_user_key(const char *user_key, int key_len, bool want_greater, char **fixed_key, bool *should_inclusive);
void fetch_item(RID &rid);
bool touch_end();
private:
bool inited_ = false;
BplusTreeHandler &tree_handler_;
LatchMemo latch_memo_;
/// 使用左右叶子节点和位置来表示扫描的起始位置和终止位置
/// 起始位置和终止位置都是有效的数据
Frame *left_frame_ = nullptr;
Frame *right_frame_ = nullptr;
Frame *current_frame_ = nullptr;
common::MemPoolItem::unique_ptr right_key_;
int iter_index_ = -1;
int end_index_ = -1; // use -1 for end of scan
bool first_emitted_ = false;
};
#endif //__OBSERVER_STORAGE_COMMON_INDEX_MANAGER_H_
......@@ -132,7 +132,7 @@ RC BplusTreeIndexScanner::open(
RC BplusTreeIndexScanner::next_entry(RID *rid)
{
return tree_scanner_.next_entry(rid);
return tree_scanner_.next_entry(*rid);
}
RC BplusTreeIndexScanner::destroy()
......
......@@ -222,28 +222,6 @@ RC RecordPageHandler::recover_insert_record(const char *data, RID *rid)
return RC::SUCCESS;
}
RC RecordPageHandler::update_record(const Record *rec)
{
if (rec->rid().slot_num >= page_header_->record_capacity) {
LOG_ERROR(
"Invalid slot_num %d, exceed page's record capacity, page_num %d.", rec->rid().slot_num, frame_->page_num());
return RC::INVALID_ARGUMENT;
}
Bitmap bitmap(bitmap_, page_header_->record_capacity);
if (!bitmap.get_bit(rec->rid().slot_num)) {
LOG_ERROR("Invalid slot_num %d, slot is empty, page_num %d.", rec->rid().slot_num, frame_->page_num());
return RC::RECORD_RECORD_NOT_EXIST;
} else {
char *record_data = get_record_data(rec->rid().slot_num);
memcpy(record_data, rec->data(), page_header_->record_real_size);
bitmap.set_bit(rec->rid().slot_num);
frame_->mark_dirty();
// LOG_TRACE("Update record. file_id=%d, page num=%d,slot=%d", file_id_, rec->rid.page_num, rec->rid.slot_num);
return RC::SUCCESS;
}
}
RC RecordPageHandler::delete_record(const RID *rid)
{
if (rid->slot_num >= page_header_->record_capacity) {
......@@ -258,10 +236,9 @@ RC RecordPageHandler::delete_record(const RID *rid)
frame_->mark_dirty();
if (page_header_->record_num == 0) {
DiskBufferPool *disk_buffer_pool = disk_buffer_pool_;
PageNum page_num = get_page_num();
// PageNum page_num = get_page_num();
cleanup();
disk_buffer_pool->dispose_page(page_num);
// disk_buffer_pool->dispose_page(page_num); // TODO 确认是否可以不删除页面
}
return RC::SUCCESS;
} else {
......@@ -415,18 +392,6 @@ RC RecordFileHandler::recover_insert_record(const char *data, int record_size, R
return record_page_handler.recover_insert_record(data, rid);
}
RC RecordFileHandler::update_record(const Record *rec)
{
RC ret;
RecordPageHandler page_handler;
if ((ret = page_handler.init(*disk_buffer_pool_, rec->rid().page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d", rec->rid().page_num);
return ret;
}
return page_handler.update_record(rec);
}
RC RecordFileHandler::delete_record(const RID *rid)
{
RC rc = RC::SUCCESS;
......
......@@ -20,8 +20,16 @@ See the Mulan PSL v2 for more details. */
#include "common/lang/bitmap.h"
class ConditionFilter;
class RecordPageHandler;
struct PageHeader {
/**
* 数据文件,按照页面来组织,每一页都存放一些记录/数据行
* 每一页都有一个这样的页头,虽然看起来浪费,但是现在就简单的这么做
* 从这个页头描述的信息来看,当前仅支持定长行/记录。如果要支持变长记录,
* 或者超长(超出一页)的记录,这么做是不合适的。
*/
struct PageHeader
{
int32_t record_num; // 当前页面记录的个数
int32_t record_capacity; // 最大记录个数
int32_t record_real_size; // 每条记录的实际大小
......@@ -29,8 +37,11 @@ struct PageHeader {
int32_t first_record_offset; // 第一条记录的偏移量
};
class RecordPageHandler;
class RecordPageIterator {
/**
* 遍历一个页面中每条记录的iterator
*/
class RecordPageIterator
{
public:
RecordPageIterator();
~RecordPageIterator();
......@@ -52,7 +63,11 @@ private:
SlotNum next_slot_num_ = 0;
};
class RecordPageHandler {
/**
* 负责处理一个页面中各种操作,比如插入记录、删除记录或者查找记录
*/
class RecordPageHandler
{
public:
RecordPageHandler() = default;
~RecordPageHandler();
......@@ -63,20 +78,6 @@ public:
RC insert_record(const char *data, RID *rid);
RC recover_insert_record(const char *data, RID *rid);
RC update_record(const Record *rec);
template <class RecordUpdater>
RC update_record_in_place(const RID *rid, RecordUpdater updater)
{
Record record;
RC rc = get_record(rid, &record);
if (rc != RC::SUCCESS) {
return rc;
}
rc = updater(record);
frame_->mark_dirty();
return rc;
}
RC delete_record(const RID *rid);
......@@ -102,7 +103,8 @@ private:
friend class RecordPageIterator;
};
class RecordFileHandler {
class RecordFileHandler
{
public:
RecordFileHandler() = default;
RC init(DiskBufferPool *buffer_pool);
......@@ -130,19 +132,6 @@ public:
*/
RC get_record(const RID *rid, Record *rec);
template <class RecordUpdater> // 改成普通模式, 不使用模板
RC update_record_in_place(const RID *rid, RecordUpdater updater)
{
RC rc = RC::SUCCESS;
RecordPageHandler page_handler;
if ((rc != page_handler.init(*disk_buffer_pool_, rid->page_num)) != RC::SUCCESS) {
return rc;
}
return page_handler.update_record_in_place(rid, updater);
}
private:
RC init_free_pages();
......@@ -151,7 +140,8 @@ private:
std::unordered_set<PageNum> free_pages_; // 没有填充满的页面集合
};
class RecordFileScanner {
class RecordFileScanner
{
public:
RecordFileScanner() = default;
......
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/03/08.
//
#include "storage/trx/latch_memo.h"
#include "storage/buffer/frame.h"
#include "storage/default/disk_buffer_pool.h"
#include "common/lang/mutex.h"
LatchMemoItem::LatchMemoItem(LatchMemoType type, Frame *frame)
{
this->type = type;
this->frame = frame;
}
LatchMemoItem::LatchMemoItem(LatchMemoType type, common::SharedMutex *lock)
{
this->type = type;
this->lock = lock;
}
////////////////////////////////////////////////////////////////////////////////
LatchMemo::LatchMemo(DiskBufferPool *buffer_pool) : buffer_pool_(buffer_pool)
{}
LatchMemo::~LatchMemo()
{
this->release();
}
RC LatchMemo::get_page(PageNum page_num, Frame *&frame)
{
frame = nullptr;
RC rc = buffer_pool_->get_this_page(page_num, &frame);
if (rc != RC::SUCCESS) {
return rc;
}
items_.emplace_back(LatchMemoType::PIN, frame);
return RC::SUCCESS;
}
RC LatchMemo::allocate_page(Frame *&frame)
{
frame = nullptr;
RC rc = buffer_pool_->allocate_page(&frame);
if (rc == RC::SUCCESS) {
items_.emplace_back(LatchMemoType::PIN, frame);
ASSERT(frame->pin_count() == 1, "allocate a new frame. frame=%s", to_string(*frame).c_str());
}
return rc;
}
void LatchMemo::dispose_page(PageNum page_num)
{
disposed_pages_.emplace_back(page_num);
}
void LatchMemo::latch(Frame *frame, LatchMemoType type)
{
switch (type) {
case LatchMemoType::EXCLUSIVE: {
frame->write_latch();
} break;
case LatchMemoType::SHARED: {
frame->read_latch();
} break;
default: {
ASSERT(false, "invalid latch type: %d", static_cast<int>(type));
}
}
items_.emplace_back(type, frame);
}
void LatchMemo::xlatch(Frame *frame)
{
this->latch(frame, LatchMemoType::EXCLUSIVE);
}
void LatchMemo::slatch(Frame *frame)
{
this->latch(frame, LatchMemoType::SHARED);
}
bool LatchMemo::try_slatch(Frame *frame)
{
bool ret = frame->try_read_latch();
if (ret) {
items_.emplace_back(LatchMemoType::SHARED, frame);
}
return ret;
}
void LatchMemo::xlatch(common::SharedMutex *lock)
{
lock->lock();
items_.emplace_back(LatchMemoType::EXCLUSIVE, lock);
LOG_DEBUG("lock root success");
}
void LatchMemo::slatch(common::SharedMutex *lock)
{
lock->lock_shared();
items_.emplace_back(LatchMemoType::SHARED, lock);
}
void LatchMemo::release_item(LatchMemoItem &item)
{
switch (item.type) {
case LatchMemoType::EXCLUSIVE: {
if (item.frame != nullptr) {
item.frame->write_unlatch();
} else {
LOG_DEBUG("release root lock");
item.lock->unlock();
}
} break;
case LatchMemoType::SHARED: {
if (item.frame != nullptr) {
item.frame->read_unlatch();
} else {
item.lock->unlock_shared();
}
} break;
case LatchMemoType::PIN: {
buffer_pool_->unpin_page(item.frame);
} break;
default: {
ASSERT(false, "invalid latch type: %d", static_cast<int>(item.type));
}
}
}
void LatchMemo::release()
{
int point = static_cast<int>(items_.size());
release_to(point);
for (PageNum page_num : disposed_pages_) {
buffer_pool_->dispose_page(page_num);
}
disposed_pages_.clear();
}
void LatchMemo::release_to(int point)
{
ASSERT(point >= 0 && point <= static_cast<int>(items_.size()),
"invalid memo point. point=%d, items size=%d",
point, static_cast<int>(items_.size()));
auto iter = items_.begin();
for (int i = point - 1; i >= 0; i--, ++iter) {
LatchMemoItem &item = items_[i];
release_item(item);
}
items_.erase(items_.begin(), iter);
}
此差异已折叠。
此差异已折叠。
......@@ -38,5 +38,4 @@ FOREACH (F ${ALL_SRC})
ADD_EXECUTABLE(${prjName} ${F})
TARGET_LINK_LIBRARIES(${prjName} common pthread dl observer_static)
ENDFOREACH (F)
ENDFOREACH (F)
\ No newline at end of file
PROJECT(unitest)
PROJECT(unittest)
MESSAGE("Begin to build " ${PROJECT_NAME})
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR})
......@@ -28,7 +28,6 @@ ENDIF ()
find_package(GTest CONFIG REQUIRED)
enable_testing()
include(GoogleTest)
#get_filename_component(<VAR> FileName
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册