Skip to content

Commit

Permalink
Adapt RocksDB 9.1.1 (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu committed May 21, 2024
1 parent 205af2d commit 231588e
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 30 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version}

# Note: if you don't have a good reason, please do not set -DPORTABLE=ON
# This one is set here on purpose of compatibility with github action runtime processor
rocksdb_version="8.11.4"
rocksdb_version="9.1.1"
cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \
mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \
-DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \
Expand Down
21 changes: 15 additions & 6 deletions c.h
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ extern ROCKSDB_LIBRARY_API const rocksdb_snapshot_t* rocksdb_create_snapshot(
extern ROCKSDB_LIBRARY_API void rocksdb_release_snapshot(
rocksdb_t* db, const rocksdb_snapshot_t* snapshot);

extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_snapshot_get_sequence_number(const rocksdb_snapshot_t* snapshot);

/* Returns NULL if property name is unknown.
Else returns a pointer to a malloc()-ed null-terminated value. */
extern ROCKSDB_LIBRARY_API char* rocksdb_property_value(rocksdb_t* db,
Expand Down Expand Up @@ -691,8 +694,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_flush_wal(rocksdb_t* db,
extern ROCKSDB_LIBRARY_API void rocksdb_disable_file_deletions(rocksdb_t* db,
char** errptr);

extern ROCKSDB_LIBRARY_API void rocksdb_enable_file_deletions(
rocksdb_t* db, unsigned char force, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_enable_file_deletions(rocksdb_t* db,
char** errptr);

/* Management operations */

Expand Down Expand Up @@ -1152,10 +1155,16 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_env(rocksdb_options_t*,
rocksdb_env_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_info_log(rocksdb_options_t*,
rocksdb_logger_t*);
extern ROCKSDB_LIBRARY_API rocksdb_logger_t* rocksdb_options_get_info_log(
rocksdb_options_t* opt);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_info_log_level(
rocksdb_options_t*, int);
extern ROCKSDB_LIBRARY_API int rocksdb_options_get_info_log_level(
rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API rocksdb_logger_t*
rocksdb_logger_create_stderr_logger(int log_level, const char* prefix);
extern ROCKSDB_LIBRARY_API void rocksdb_logger_destroy(
rocksdb_logger_t* logger);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_write_buffer_size(
rocksdb_options_t*, size_t);
extern ROCKSDB_LIBRARY_API size_t
Expand Down Expand Up @@ -1499,10 +1508,6 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_advise_random_on_open(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_options_get_advise_random_on_open(rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_access_hint_on_compaction_start(rocksdb_options_t*, int);
extern ROCKSDB_LIBRARY_API int
rocksdb_options_get_access_hint_on_compaction_start(rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_adaptive_mutex(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_use_adaptive_mutex(
Expand Down Expand Up @@ -1684,6 +1689,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t*
rocksdb_ratelimiter_create_auto_tuned(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness);
extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t*
rocksdb_ratelimiter_create_with_mode(int64_t rate_bytes_per_sec,
int64_t refill_period_us, int32_t fairness,
int mode, bool auto_tuned);
extern ROCKSDB_LIBRARY_API void rocksdb_ratelimiter_destroy(
rocksdb_ratelimiter_t*);

Expand Down
4 changes: 2 additions & 2 deletions cf_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestColumnFamilyMultiGetWithTS(t *testing.T) {
givenVal1 = []byte("world1")
givenVal2 = []byte("world2")
givenVal3 = []byte("world3")
givenTs1 = marshalTimestamp(1)
givenTs1 = marshalTimestamp(0)
givenTs2 = marshalTimestamp(2)
givenTs3 = marshalTimestamp(3)
)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestColumnFamilyMultiGetWithTS(t *testing.T) {
require.EqualValues(t, values[1].Data(), givenVal2)
require.EqualValues(t, values[2].Data(), givenVal3)

require.EqualValues(t, times[0].Data(), givenTs1)
require.EqualValues(t, times[0].Data(), []byte{})
require.EqualValues(t, times[1].Data(), givenTs2)
require.EqualValues(t, times[2].Data(), givenTs3)
}
10 changes: 5 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ func (db *DB) SingleDeleteCFWithTS(opts *WriteOptions, cf *ColumnFamilyHandle, k
}

// DeleteRangeCF deletes keys that are between [startKey, endKey)
func (db *DB) DeleteRangeCF(opts *WriteOptions, cf *ColumnFamilyHandle, startKey []byte, endKey []byte) (err error) {
func (db *DB) DeleteRangeCF(opts *WriteOptions, cf *ColumnFamilyHandle, startKey, endKey []byte) (err error) {
var (
cErr *C.char
cStartKey = refGoBytes(startKey)
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func (db *DB) SingleDeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []b
}

// Merge merges the data associated with the key with the actual data in the database.
func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) (err error) {
func (db *DB) Merge(opts *WriteOptions, key, value []byte) (err error) {
var (
cErr *C.char
cKey = refGoBytes(key)
Expand All @@ -1102,7 +1102,7 @@ func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) (err error) {

// MergeCF merges the data associated with the key with the actual data in the
// database and column family.
func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte, value []byte) (err error) {
func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byte) (err error) {
var (
cErr *C.char
cKey = refGoBytes(key)
Expand Down Expand Up @@ -1662,10 +1662,10 @@ func (db *DB) DisableFileDeletions() (err error) {
}

// EnableFileDeletions enables file deletions for the database.
func (db *DB) EnableFileDeletions(force bool) (err error) {
func (db *DB) EnableFileDeletions() (err error) {
var cErr *C.char

C.rocksdb_enable_file_deletions(db.c, boolToChar(force), &cErr)
C.rocksdb_enable_file_deletions(db.c, &cErr)
err = fromCError(cErr)

return
Expand Down
2 changes: 2 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func newTestDBMultiCF(t *testing.T, columns []string, applyOpts func(opts *Optio
dir := t.TempDir()

opts := NewDefaultOptions()
rateLimiter := NewGenericRateLimiter(1024, 100*1000, 10, RateLimiterModeAllIo, true)
opts.SetRateLimiter(rateLimiter)
opts.SetCreateIfMissingColumnFamilies(true)
opts.SetCreateIfMissing(true)
opts.SetCompression(ZSTDCompression)
Expand Down
26 changes: 26 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package grocksdb

// #include "rocksdb/c.h"
// #include "grocksdb.h"
import "C"
import "unsafe"

// Logger struct.
type Logger struct {
c *C.rocksdb_logger_t
}

func NewStderrLogger(level InfoLogLevel, prefix string) *Logger {
prefix_ := C.CString(prefix)
defer C.free(unsafe.Pointer(prefix_))

return &Logger{
c: C.rocksdb_logger_create_stderr_logger(C.int(level), prefix_),
}
}

// Destroy Logger.
func (l *Logger) Destroy() {
C.rocksdb_logger_destroy(l.c)
l.c = nil
}
41 changes: 27 additions & 14 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,18 @@ func (opts *Options) SetEnv(env *Env) {
env.c = nil
}

// SetInfoLog sets info logger.
func (opts *Options) SetInfoLog(logger *Logger) {
C.rocksdb_options_set_info_log(opts.c, logger.c)
}

// GetInfoLog gets info logger.
func (opts *Options) GetInfoLog() *Logger {
return &Logger{
c: C.rocksdb_options_get_info_log(opts.c),
}
}

// SetInfoLogLevel sets the info log level.
//
// Default: InfoInfoLogLevel
Expand Down Expand Up @@ -1451,20 +1463,21 @@ func (opts *Options) GetDbWriteBufferSize() uint64 {
return uint64(C.rocksdb_options_get_db_write_buffer_size(opts.c))
}

// SetAccessHintOnCompactionStart specifies the file access pattern
// once a compaction is started.
//
// It will be applied to all input files of a compaction.
// Default: NormalCompactionAccessPattern
func (opts *Options) SetAccessHintOnCompactionStart(value CompactionAccessPattern) {
C.rocksdb_options_set_access_hint_on_compaction_start(opts.c, C.int(value))
}

// GetAccessHintOnCompactionStart returns the file access pattern
// once a compaction is started.
func (opts *Options) GetAccessHintOnCompactionStart() CompactionAccessPattern {
return CompactionAccessPattern(C.rocksdb_options_get_access_hint_on_compaction_start(opts.c))
}
// Deprecation in rocksdb v9.x
// // SetAccessHintOnCompactionStart specifies the file access pattern
// // once a compaction is started.
// //
// // It will be applied to all input files of a compaction.
// // Default: NormalCompactionAccessPattern
// func (opts *Options) SetAccessHintOnCompactionStart(value CompactionAccessPattern) {
// C.rocksdb_options_set_access_hint_on_compaction_start(opts.c, C.int(value))
// }
//
// // GetAccessHintOnCompactionStart returns the file access pattern
// // once a compaction is started.
// func (opts *Options) GetAccessHintOnCompactionStart() CompactionAccessPattern {
// return CompactionAccessPattern(C.rocksdb_options_get_access_hint_on_compaction_start(opts.c))
// }

// SetUseAdaptiveMutex enable/disable adaptive mutex, which spins
// in the user space before resorting to kernel.
Expand Down
8 changes: 6 additions & 2 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func TestOptions(t *testing.T) {
opts.SetAdviseRandomOnOpen(true)
require.EqualValues(t, true, opts.AdviseRandomOnOpen())

opts.SetAccessHintOnCompactionStart(SequentialCompactionAccessPattern)
require.EqualValues(t, SequentialCompactionAccessPattern, opts.GetAccessHintOnCompactionStart())
// opts.SetAccessHintOnCompactionStart(SequentialCompactionAccessPattern)
// require.EqualValues(t, SequentialCompactionAccessPattern, opts.GetAccessHintOnCompactionStart())

opts.SetDbWriteBufferSize(1 << 30)
require.EqualValues(t, 1<<30, opts.GetDbWriteBufferSize())
Expand Down Expand Up @@ -401,6 +401,10 @@ func TestOptions(t *testing.T) {

opts.SetWriteBufferManager(wbm)

lg := NewStderrLogger(InfoInfoLogLevel, "prefix")
opts.SetInfoLog(lg)
require.NotNil(t, opts.GetInfoLog())

// cloning
cl := opts.Clone()
require.EqualValues(t, 5, cl.GetTableCacheNumshardbits())
Expand Down
49 changes: 49 additions & 0 deletions ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ package grocksdb
// #include "rocksdb/c.h"
import "C"

type RateLimiterMode int

const (
RateLimiterModeReadsOnly RateLimiterMode = iota
RateLimiterModeWritesOnly
RateLimiterModeAllIo
)

// RateLimiter is used to control write rate of flush and
// compaction.
type RateLimiter struct {
Expand Down Expand Up @@ -52,6 +60,47 @@ func NewAutoTunedRateLimiter(rateBytesPerSec, refillPeriodMicros int64, fairness
return newNativeRateLimiter(cR)
}

// NewGenericRateLimiter creates a RateLimiter object, which can be shared among RocksDB instances to
// control write rate of flush and compaction.
//
// @rate_bytes_per_sec: this is the only parameter you want to set most of the
// time. It controls the total write rate of compaction and flush in bytes per
// second. Currently, RocksDB does not enforce rate limit for anything other
// than flush and compaction, e.g. write to WAL.
//
// @refill_period_us: this controls how often tokens are refilled. For example,
// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to
// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to
// burstier writes while smaller value introduces more CPU overhead.
// The default should work for most cases.
//
// @fairness: RateLimiter accepts high-pri requests and low-pri requests.
// A low-pri request is usually blocked in favor of hi-pri request. Currently,
// RocksDB assigns low-pri to request from compaction and high-pri to request
// from flush. Low-pri requests can get blocked if flush requests come in
// continuously. This fairness parameter grants low-pri requests permission by
// 1/fairness chance even though high-pri requests exist to avoid starvation.
// You should be good by leaving it at default 10.
//
// @mode: Mode indicates which types of operations count against the limit.
//
// @auto_tuned: Enables dynamic adjustment of rate limit within the range
// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to
// the recent demand for background I/O.
func NewGenericRateLimiter(
rateBytesPerSec, refillPeriodMicros int64, fairness int32,
mode RateLimiterMode, autoTuned bool,
) *RateLimiter {
cR := C.rocksdb_ratelimiter_create_with_mode(
C.int64_t(rateBytesPerSec),
C.int64_t(refillPeriodMicros),
C.int32_t(fairness),
C.int(mode),
C.bool(autoTuned),
)
return newNativeRateLimiter(cR)
}

// NewNativeRateLimiter creates a native RateLimiter object.
func newNativeRateLimiter(c *C.rocksdb_ratelimiter_t) *RateLimiter {
return &RateLimiter{c: c}
Expand Down
5 changes: 5 additions & 0 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ func newNativeSnapshot(c *C.rocksdb_snapshot_t) *Snapshot {
return &Snapshot{c: c}
}

// GetSequenceNumber gets sequence number of the Snapshot.
func (snapshot *Snapshot) GetSequenceNumber() uint64 {
return uint64(C.rocksdb_snapshot_get_sequence_number(snapshot.c))
}

// Destroy deallocates the Snapshot object.
func (snapshot *Snapshot) Destroy() {
C.rocksdb_free(unsafe.Pointer(snapshot.c))
Expand Down

0 comments on commit 231588e

Please sign in to comment.