Skip to content

Commit

Permalink
Add disk scheduler to P1 (#602)
Browse files Browse the repository at this point in the history
* Add disk scheduler to P1.

* Fix comment.

* Fixed formatting.

* Fixed more formatting and linting errors.
  • Loading branch information
fernandolis10 committed Sep 3, 2023
1 parent 606ef85 commit 4a04c19
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/buffer/buffer_pool_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace bustub {

BufferPoolManager::BufferPoolManager(size_t pool_size, DiskManager *disk_manager, size_t replacer_k,
LogManager *log_manager)
: pool_size_(pool_size), disk_manager_(disk_manager), log_manager_(log_manager) {
: pool_size_(pool_size), disk_scheduler_(std::make_unique<DiskScheduler>(disk_manager)), log_manager_(log_manager) {
// TODO(students): remove this line after you have implemented the buffer pool manager
throw NotImplementedException(
"BufferPoolManager is not implemented yet. If you have finished implementing BPM, please remove the throw "
Expand Down
12 changes: 6 additions & 6 deletions src/include/buffer/buffer_pool_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "buffer/lru_k_replacer.h"
#include "common/config.h"
#include "recovery/log_manager.h"
#include "storage/disk/disk_manager.h"
#include "storage/disk/disk_scheduler.h"
#include "storage/page/page.h"
#include "storage/page/page_guard.h"

Expand Down Expand Up @@ -92,9 +92,9 @@ class BufferPoolManager {
* but all frames are currently in use and not evictable (in another word, pinned).
*
* First search for page_id in the buffer pool. If not found, pick a replacement frame from either the free list or
* the replacer (always find from the free list first), read the page from disk by calling disk_manager_->ReadPage(),
* and replace the old page in the frame. Similar to NewPage(), if the old page is dirty, you need to write it back
* to disk and update the metadata of the new page
* the replacer (always find from the free list first), read the page from disk by scheduling a read DiskRequest with
* disk_scheduler_->Schedule(), and replace the old page in the frame. Similar to NewPage(), if the old page is dirty,
* you need to write it back to disk and update the metadata of the new page
*
* In addition, remember to disable eviction and record the access history of the frame like you did for NewPage().
*
Expand Down Expand Up @@ -180,8 +180,8 @@ class BufferPoolManager {

/** Array of buffer pool pages. */
Page *pages_;
/** Pointer to the disk manager. */
DiskManager *disk_manager_ __attribute__((__unused__));
/** Pointer to the disk sheduler. */
std::unique_ptr<DiskScheduler> disk_scheduler_ __attribute__((__unused__));
/** Pointer to the log manager. Please ignore this for P1. */
LogManager *log_manager_ __attribute__((__unused__));
/** Page table for keeping track of buffer pool pages. */
Expand Down
59 changes: 59 additions & 0 deletions src/include/common/channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//===----------------------------------------------------------------------===//
//
// BusTub
//
// channel.h
//
// Identification: src/include/common/channel.h
//
// Copyright (c) 2015-2023, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#pragma once

#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <queue>
#include <utility>

namespace bustub {

/**
* Channels allow for safe sharing of data between threads.
*/
template <class T>
class Channel {
public:
Channel() = default;
~Channel() = default;

/**
* @brief Inserts an element into a shared queue.
*
* @param element The element to be inserted.
*/
void Put(T element) {
std::unique_lock<std::mutex> lk(m_);
q_.push(std::move(element));
lk.unlock();
cv_.notify_all();
}

/**
* @brief Gets an element from the shared queue. If the queue is empty, blocks until an element is available.
*/
auto Get() -> T {
std::unique_lock<std::mutex> lk(m_);
cv_.wait(lk, [&]() { return !q_.empty(); });
T element = std::move(q_.front());
q_.pop();
return element;
}

private:
std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
};
} // namespace bustub
85 changes: 85 additions & 0 deletions src/include/storage/disk/disk_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//===----------------------------------------------------------------------===//
//
// BusTub
//
// disk_scheduler.h
//
// Identification: src/include/storage/disk/disk_scheduler.h
//
// Copyright (c) 2015-2023, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#pragma once

#include <future> // NOLINT
#include <optional>
#include <thread> // NOLINT

#include "common/channel.h"
#include "storage/disk/disk_manager.h"

namespace bustub {

/**
* @brief Represents a Write or Read request for the DiskManager to execute.
*/
struct DiskRequest {
/** Flag indicating whether the request is a write or a read. */
bool is_write_;

/**
* Pointer to the start of the memory location where a page is either:
* 1. being read into from disk (on a read).
* 2. being written out to disk (on a write).
*/
char *data_;

/** ID of the page being read from / written to disk. */
page_id_t page_id_;

/** Callback used to signal to the request issuer when the request has been completed. */
std::promise<bool> &callback_;
};

/**
* @brief The DiskScheduler schedules disk read and write operations.
*
* A request is scheduled by calling DiskScheduler::Schedule() with an appropriate DiskRequest object. The scheduler
* maintains a background worker thread that processes the scheduled requests using the disk manager. The background
* thread is created in the DiskScheduler constructor and joined in its destructor.
*/
class DiskScheduler {
public:
explicit DiskScheduler(DiskManager *disk_manager);
~DiskScheduler();

/**
* TODO(P1): Add implementation
*
* @brief Schedules a request for the DiskManager to execute.
*
* @param r The request to be scheduled.
*/
void Schedule(DiskRequest r);

/**
* TODO(P1): Add implementation
*
* @brief Background worker thread function that processes scheduled requests.
*
* The background thread needs to process requests while the DiskScheduler exists, i.e., this function should not
* return until ~DiskScheduler() is called. At that point you need to make sure that the function does return.
*/
void StartWorkerThread();

private:
/** Pointer to the disk manager. */
DiskManager *disk_manager_ __attribute__((__unused__));
/** A shared queue to concurrently schedule and process requests. When the DiskScheduler's destructor is called,
* `std::nullopt` is put into the queue to signal to the background thread to stop execution. */
Channel<std::optional<DiskRequest>> request_queue_;
/** The background thread responsible for issuing scheduled requests to the disk manager. */
std::optional<std::thread> background_thread_;
};
} // namespace bustub
3 changes: 2 additions & 1 deletion src/storage/disk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ add_library(
bustub_storage_disk
OBJECT
disk_manager.cpp
disk_manager_memory.cpp)
disk_manager_memory.cpp
disk_scheduler.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:bustub_storage_disk>
Expand Down
41 changes: 41 additions & 0 deletions src/storage/disk/disk_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//===----------------------------------------------------------------------===//
//
// BusTub
//
// disk_scheduler.cpp
//
// Identification: src/storage/disk/disk_scheduler.cpp
//
// Copyright (c) 2015-2023, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#include "storage/disk/disk_scheduler.h"
#include "common/exception.h"
#include "storage/disk/disk_manager.h"

namespace bustub {

DiskScheduler::DiskScheduler(DiskManager *disk_manager) : disk_manager_(disk_manager) {
// TODO(P1): remove this line after you have implemented the disk scheduler API
throw NotImplementedException(
"DiskScheduler is not implemented yet. If you have finished implementing the disk scheduler, please remove the "
"throw exception line in `disk_scheduler.cpp`.");

// Spawn the background thread
background_thread_.emplace([&] { StartWorkerThread(); });
}

DiskScheduler::~DiskScheduler() {
// Put a `std::nullopt` in the queue to signal to exit the loop
request_queue_.Put(std::nullopt);
if (background_thread_.has_value()) {
background_thread_->join();
}
}

void DiskScheduler::Schedule(DiskRequest r) {}

void DiskScheduler::StartWorkerThread() {}

} // namespace bustub

0 comments on commit 4a04c19

Please sign in to comment.