diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp index 89defa50..43a29fd4 100644 --- a/src/braft/fsm_caller.cpp +++ b/src/braft/fsm_caller.cpp @@ -34,6 +34,10 @@ namespace braft { static bvar::CounterRecorder g_commit_tasks_batch_counter( "raft_commit_tasks_batch_counter"); +DEFINE_int32(raft_fsm_caller_commit_batch, 512, + "Max numbers of logs for the state machine to commit in a single batch"); +BRPC_VALIDATE_GFLAG(raft_fsm_caller_commit_batch, brpc::PositiveInteger); + FSMCaller::FSMCaller() : _log_manager(NULL) , _fsm(NULL) @@ -60,8 +64,9 @@ int FSMCaller::run(void* meta, bthread::TaskIterator& iter) { } int64_t max_committed_index = -1; int64_t counter = 0; + size_t batch_size = FLAGS_raft_fsm_caller_commit_batch; for (; iter; ++iter) { - if (iter->type == COMMITTED) { + if (iter->type == COMMITTED && counter < batch_size) { if (iter->committed_index > max_committed_index) { max_committed_index = iter->committed_index; counter++; @@ -73,10 +78,10 @@ int FSMCaller::run(void* meta, bthread::TaskIterator& iter) { max_committed_index = -1; g_commit_tasks_batch_counter << counter; counter = 0; + batch_size = FLAGS_raft_fsm_caller_commit_batch; } switch (iter->type) { case COMMITTED: - CHECK(false) << "Impossible"; break; case SNAPSHOT_SAVE: caller->_cur_task = SNAPSHOT_SAVE;