Skip to content

Commit

Permalink
add http service, plan parsing, catalog serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Jul 3, 2024
1 parent c5dfacc commit 11379e5
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ set(BUSTUB_THIRD_PARTY_INCLUDE_DIR
${PROJECT_SOURCE_DIR}/third_party/argparse/include
${PROJECT_SOURCE_DIR}/third_party/cpp_random_distributions
${PROJECT_SOURCE_DIR}/third_party/backward-cpp/include
${PROJECT_SOURCE_DIR}/third_party/json/include
)

include_directories(${BUSTUB_SRC_INCLUDE_DIR} ${BUSTUB_TEST_INCLUDE_DIR} ${BUSTUB_THIRD_PARTY_INCLUDE_DIR})
Expand Down
44 changes: 44 additions & 0 deletions src/common/bustub_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,50 @@ auto BustubInstance::ExecuteSql(const std::string &sql, ResultWriter &writer,
}
}

auto BustubInstance::ExecutePlan(const AbstractPlanNodeRef &plan, ResultWriter &writer) -> bool {
auto txn = txn_manager_->Begin();
try {
auto result = ExecutePlanTxn(plan, txn, writer);
txn_manager_->Commit(txn);
delete txn;
return result;
} catch (bustub::Exception &ex) {
txn_manager_->Abort(txn);
delete txn;
throw ex;
}
}

auto BustubInstance::ExecutePlanTxn(const AbstractPlanNodeRef &plan, Transaction *txn, ResultWriter &writer) -> bool {
// Execute the query.
auto exec_ctx = MakeExecutorContext(txn, false);
std::vector<Tuple> result_set{};
auto is_successful = execution_engine_->Execute(plan, &result_set, txn, exec_ctx.get());

// Return the result set as a vector of string.
auto schema = plan->OutputSchema();

// Generate header for the result set.
writer.BeginTable(false);
writer.BeginHeader();
for (const auto &column : schema.GetColumns()) {
writer.WriteHeaderCell(column.GetName());
}
writer.EndHeader();

// Transforming result set into strings.
for (const auto &tuple : result_set) {
writer.BeginRow();
for (uint32_t i = 0; i < schema.GetColumnCount(); i++) {
writer.WriteCell(tuple.GetValue(&schema, i).ToString());
}
writer.EndRow();
}
writer.EndTable();

return is_successful;
}

auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn,
std::shared_ptr<CheckOptions> check_options) -> bool {
if (!sql.empty() && sql[0] == '\\') {
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/bustub_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/config.h"
#include "common/util/string_util.h"
#include "execution/check_options.h"
#include "execution/plans/abstract_plan.h"
#include "libfort/lib/fort.hpp"
#include "type/value.h"

Expand Down Expand Up @@ -265,6 +266,9 @@ class BustubInstance {
/** Get the current transaction. */
auto CurrentManagedTxn() -> Transaction *;

auto ExecutePlan(const AbstractPlanNodeRef &plan, ResultWriter &writer) -> bool;
auto ExecutePlanTxn(const AbstractPlanNodeRef &plan, Transaction *txn, ResultWriter &writer) -> bool;

/**
* FOR TEST ONLY. Generate test tables in this BusTub instance.
* It's used in the shell to predefine some tables, as we don't support
Expand Down
4 changes: 4 additions & 0 deletions third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ add_subdirectory(utf8proc)
add_subdirectory(backward-cpp)

add_subdirectory(readerwriterqueue)

add_subdirectory(json)

add_subdirectory(cpp-httplib)
1 change: 1 addition & 0 deletions tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ add_subdirectory(terrier_bench)
add_subdirectory(bpm_bench)
add_subdirectory(btree_bench)
add_subdirectory(htable_bench)
add_subdirectory(http-server)

add_backward(shell)
add_backward(nc-shell)
Expand Down
5 changes: 5 additions & 0 deletions tools/http-server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
set(HTTP_SERVER_SOURCES http-server.cpp)
add_executable(http-server ${HTTP_SERVER_SOURCES})

target_link_libraries(http-server bustub)
set_target_properties(http-server PROPERTIES OUTPUT_NAME bustub-http-server)
205 changes: 205 additions & 0 deletions tools/http-server/http-server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#include <iostream>
#include <memory>
#include <nlohmann/json.hpp>
#include <sstream>
#include <string>
#include <vector>
#include "binder/table_ref/bound_join_ref.h"
#include "catalog/column.h"
#include "catalog/schema.h"
#include "common/bustub_instance.h"
#include "common/exception.h"
#include "cpp-httplib/httplib.h"
#include "execution/expressions/abstract_expression.h"
#include "execution/expressions/column_value_expression.h"
#include "execution/expressions/comparison_expression.h"
#include "execution/plans/abstract_plan.h"
#include "execution/plans/nested_loop_join_plan.h"
#include "execution/plans/projection_plan.h"
#include "execution/plans/seq_scan_plan.h"
#include "type/type.h"
#include "type/type_id.h"

using json = nlohmann::json;

auto TransformType(const std::string &ft) -> bustub::TypeId {
if (ft == "INTEGER") {
return bustub::TypeId::INTEGER;
}
if (ft == "BOOLEAN") {
return bustub::TypeId::BOOLEAN;
}
throw bustub::Exception("unsupported field type");
}

auto TransformExpr(json expr, const std::vector<bustub::AbstractPlanNodeRef> &children)
-> bustub::AbstractExpressionRef {
bustub::TypeId out_type = TransformType(expr["outType"]);
if (expr.contains("op")) {
json op = expr["op"];
std::string name = op["name"];
std::vector<json> operands_json = expr["operands"];
std::vector<bustub::AbstractExpressionRef> operands;
operands.reserve(operands_json.size());
for (const auto &operand_json : operands_json) {
operands.emplace_back(TransformExpr(operand_json, children));
}
if (name == "=") {
return std::make_shared<bustub::ComparisonExpression>(operands[0], operands[1], bustub::ComparisonType::Equal);
}
throw bustub::Exception("unsupported op");
}

// column value expression
std::string name = expr["name"];
size_t input = expr["input"];
size_t i = 0;
for (; i < children.size(); i++) {
size_t cnt = children[i]->OutputSchema().GetColumnCount();
if (input < cnt) {
break;
}
input -= cnt;
}
return std::make_shared<bustub::ColumnValueExpression>(i, input, out_type);
}

auto TransformRootRel(bustub::BustubInstance &bustub, const std::map<std::string, json> &rels, json root_rel)
-> bustub::AbstractPlanNodeRef {
std::string rel_op = root_rel["relOp"];
std::vector<std::string> inputs = root_rel["inputs"];
std::vector<bustub::AbstractPlanNodeRef> input_plan_nodes;
for (const auto &input : inputs) {
auto input_rel = rels.find(input)->second;
auto input_plan_node = TransformRootRel(bustub, rels, input_rel);
input_plan_nodes.emplace_back(std::move(input_plan_node));
}
std::vector<std::string> fields = root_rel["fields"];
std::vector<std::string> field_types = root_rel["fieldTypes"];
std::vector<bustub::Column> columns;
for (size_t i = 0; i < fields.size(); i++) {
auto ft = field_types[i];
columns.emplace_back(fields[i], TransformType(ft));
}
bustub::SchemaRef schema = std::make_shared<bustub::Schema>(columns);
if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableJoin") {
std::string join_type = root_rel["joinType"];
bustub::JoinType jt;
if (join_type == "inner") {
jt = bustub::JoinType::INNER;
} else {
throw bustub::Exception("unsupported join type");
}
auto predicate = TransformExpr(root_rel["condition"], input_plan_nodes);
return std::make_shared<bustub::NestedLoopJoinPlanNode>(std::move(schema), input_plan_nodes[0], input_plan_nodes[1],
predicate, jt);
}
if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableTableScan") {
std::string table_name = root_rel["table"][0];
auto table_info = bustub.catalog_->GetTable(table_name);
return std::make_shared<bustub::SeqScanPlanNode>(std::move(schema), table_info->oid_, table_name);
}
if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableProject") {
std::vector<bustub::AbstractExpressionRef> exprs;
std::vector<json> exprs_json = root_rel["exprs"];
exprs.reserve(exprs_json.size());
for (const auto &expr_json : exprs_json) {
exprs.emplace_back(TransformExpr(expr_json, input_plan_nodes));
}
return std::make_shared<bustub::ProjectionPlanNode>(std::move(schema), exprs, input_plan_nodes[0]);
}
throw bustub::Exception("unsupported rel type");
}

auto BuildPlanNodeFromJson(bustub::BustubInstance &bustub, json plan) -> bustub::AbstractPlanNodeRef {
std::map<std::string, json> rels;
std::vector<json> json_rels = plan["rels"];
for (const auto &rel : json_rels) {
rels[rel["id"]] = rel;
}
json root_rel = *json_rels.rbegin();
return TransformRootRel(bustub, rels, root_rel);
}

// NOLINTNEXTLINE
auto main(int argc, char **argv) -> int {
auto bustub = std::make_unique<bustub::BustubInstance>();

// HTTP
httplib::Server svr;

svr.set_exception_handler([](const auto &req, auto &res, std::exception_ptr ep) {
std::string exception;
try {
std::rethrow_exception(ep);
} catch (bustub::Exception &e) {
exception = e.what();
} catch (std::exception &e) {
exception = e.what();
} catch (...) { // See the following NOTE
exception = "unknown exception";
}
res.set_content(exception, "text/plain");
res.status = 500;
});

svr.Post("/sql", [&bustub](const httplib::Request &req, httplib::Response &res) {
std::stringstream ss;
bustub::SimpleStreamWriter writer(ss, false);
json data = json::parse(req.body);
std::cerr << "SQL request: " << data["sql"] << std::endl;
bustub->ExecuteSql(data["sql"], writer);
res.set_content(ss.str(), "text/plain");
});

svr.Post("/plan", [&bustub](const httplib::Request &req, httplib::Response &res) {
std::stringstream ss;
bustub::SimpleStreamWriter writer(ss, false);
std::cerr << "Plan request:";
json json_plan = json::parse(req.body);
std::cerr << json_plan << std::endl;
auto plan = BuildPlanNodeFromJson(*bustub, json_plan);
std::cerr << "BusTub plan:" << std::endl << plan->ToString(true) << std::endl;
bustub->ExecutePlan(plan, writer);
res.set_content(ss.str(), "text/plain");
});

svr.Get("/catalog", [&bustub](const httplib::Request &req, httplib::Response &res) {
std::cerr << "Catalog request" << std::endl;
auto tables = bustub->catalog_->GetTableNames();
std::vector<json> catalog;
for (const auto &tbl_name : tables) {
auto table = bustub->catalog_->GetTable(tbl_name);
std::vector<json> schema;
for (size_t c = 0; c < table->schema_.GetColumnCount(); c++) {
auto col = table->schema_.GetColumn(c);
switch (col.GetType()) {
case bustub::TypeId::BIGINT: {
schema.emplace_back(std::map<std::string, std::string>{{"name", col.GetName()}, {"type", "bigint"}});
break;
}
case bustub::TypeId::INTEGER: {
schema.emplace_back(std::map<std::string, std::string>{{"name", col.GetName()}, {"type", "integer"}});
break;
}
case bustub::TypeId::VARCHAR: {
schema.emplace_back(std::map<std::string, std::string>{{"name", col.GetName()}, {"type", "varchar"}});
break;
}
default:
throw bustub::Exception("unsupported column type");
}
}
catalog.emplace_back(std::map<std::string, json>{{std::string("name"), json(table->name_)},
{std::string("oid"), json(table->oid_)},
{std::string("schema"), json(schema)}});
}
res.set_content(json(std::map<std::string, json>{{"catalog", catalog}}).dump(), "text/plain");
});

std::cerr << "BusTub HTTP server listening" << std::endl;

svr.listen("127.0.0.1", 23333);

return 0;
}

0 comments on commit 11379e5

Please sign in to comment.