Skip to content

Commit

Permalink
Fix a bug in the optimization rule that adds FilterJoins. (#5)
Browse files Browse the repository at this point in the history
Also updated execution query tests to support testing LIP.
  • Loading branch information
jianqiao authored and hbdeshmukh committed Jun 30, 2019
1 parent 9ce6f75 commit e7e0ab9
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 139 deletions.
2 changes: 1 addition & 1 deletion cli/LineReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ std::string LineReader::getNextCommand() {
// Skip all the whitespaces before the command.
const std::size_t start_position =
multiline_buffer.find_first_not_of(" \t\r\n");
DCHECK_LT(start_position, special_char_location + 1);
DCHECK_LE(start_position, special_char_location + 1);
return multiline_buffer.substr(start_position,
special_char_location + 1 - start_position);
}
Expand Down
1 change: 1 addition & 0 deletions query_optimizer/rules/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_InjectJoinFilters
quickstep_queryoptimizer_physical_PhysicalType
quickstep_queryoptimizer_physical_Selection
quickstep_queryoptimizer_physical_TopLevelPlan
quickstep_queryoptimizer_rules_CollapseSelection
quickstep_queryoptimizer_rules_PruneColumns
quickstep_queryoptimizer_rules_Rule
quickstep_types_TypeID
Expand Down
55 changes: 18 additions & 37 deletions query_optimizer/rules/InjectJoinFilters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/Selection.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "query_optimizer/rules/CollapseSelection.hpp"
#include "query_optimizer/rules/PruneColumns.hpp"
#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
Expand All @@ -49,28 +50,6 @@ namespace optimizer {
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;

namespace {

P::PhysicalPtr wrapSelection(const P::PhysicalPtr &input) {
DCHECK(P::SomeTopLevelPlan::Matches(input));
const P::TopLevelPlanPtr &top_level_plan =
std::static_pointer_cast<const P::TopLevelPlan>(input);
const P::PhysicalPtr &plan = top_level_plan->plan();

if (P::SomeFilterJoin::Matches(plan)) {
return input;
}

const P::SelectionPtr selection =
P::Selection::Create(
plan,
E::ToNamedExpressions(top_level_plan->plan()->getOutputAttributes()),
nullptr /* filter_predicate */);
return input->copyWithNewChildren({ selection });
}

} // namespace

P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) {
DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);

Expand All @@ -88,16 +67,15 @@ P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) {
return input;
}

// Step 2. If the top level plan is a filter join, wrap it with a Selection
// to stabilize output columns.
output = wrapSelection(output);

// Step 3. Push down FilterJoin nodes to be evaluated early.
// Step 2. Push down FilterJoin nodes to be evaluated early.
output = pushDownFilters(output);

// Step 4. Add Selection nodes for attaching the LIPFilters, if necessary.
// Step 3. Add Selection nodes for attaching the LIPFilters, if necessary.
output = addFilterAnchors(output, false);

// Step 4. Collapse redundant Selection nodes.
output = CollapseSelection().apply(output);

// Step 5. Because of the pushdown of FilterJoin nodes, there are optimization
// opportunities for projecting columns early.
output = PruneColumns().apply(output);
Expand Down Expand Up @@ -201,15 +179,18 @@ P::PhysicalPtr InjectJoinFilters::transformHashJoinToFilters(
build_child = hash_join->right();
build_side_filter_predicate = hash_join->build_predicate();
}
return P::FilterJoin::Create(new_children[0],
build_child,
hash_join->left_join_attributes(),
hash_join->right_join_attributes(),
hash_join->project_expressions(),
build_side_filter_predicate,
is_anti_join,
hash_join->hasRepartition(),
hash_join->cloneOutputPartitionSchemeHeader());
return P::Selection::Create(
P::FilterJoin::Create(new_children[0],
build_child,
hash_join->left_join_attributes(),
hash_join->right_join_attributes(),
hash_join->project_expressions(),
build_side_filter_predicate,
is_anti_join,
hash_join->hasRepartition(),
hash_join->cloneOutputPartitionSchemeHeader()),
E::ToNamedExpressions(input->getOutputAttributes()),
/*filter_predicate=*/nullptr);
}

if (input->children() != new_children) {
Expand Down
2 changes: 1 addition & 1 deletion query_optimizer/rules/InjectJoinFilters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class InjectJoinFilters : public Rule<physical::Physical> {
~InjectJoinFilters() override {}

std::string getName() const override {
return "TransformFilterJoins";
return "InjectJoinFilters";
}

physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
Expand Down
9 changes: 9 additions & 0 deletions query_optimizer/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_types_TypedValue
quickstep_types_containers_Tuple
quickstep_utility_Macros
quickstep_utility_ScopedDeleter
tmb)
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
Expand Down Expand Up @@ -139,6 +140,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
glog
gtest
quickstep_catalog_CatalogDatabase
quickstep_cli_CommandExecutor
quickstep_cli_DropRelation
quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
Expand All @@ -148,10 +150,17 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_Worker
quickstep_queryexecution_WorkerDirectory
quickstep_queryoptimizer_LogicalGenerator
quickstep_queryoptimizer_Optimizer
quickstep_queryoptimizer_OptimizerContext
quickstep_queryoptimizer_PhysicalGenerator
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryProcessor
quickstep_queryoptimizer_logical_Logical
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_storage_StorageConstants
quickstep_storage_StorageManager
quickstep_threading_ThreadIDBasedMap
quickstep_utility_Macros
quickstep_utility_MemStream
Expand Down
4 changes: 2 additions & 2 deletions query_optimizer/tests/ExecutionGeneratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ int main(int argc, char** argv) {
new quickstep::optimizer::ExecutionGeneratorTestRunner(argv[3]));
test_driver.reset(
new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
test_driver->registerOption(
quickstep::optimizer::ExecutionGeneratorTestRunner::kResetOption);
test_driver->registerOptions(
quickstep::optimizer::ExecutionGeneratorTestRunner::kTestOptions);

::testing::InitGoogleTest(&argc, argv);
int success = RUN_ALL_TESTS();
Expand Down
180 changes: 140 additions & 40 deletions query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,30 @@
#include "query_optimizer/tests/ExecutionGeneratorTestRunner.hpp"

#include <cstdio>
#include <fstream>
#include <memory>
#include <set>
#include <string>
#include <vector>

#include "catalog/CatalogDatabase.hpp"
#include "cli/CommandExecutor.hpp"
#include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/LogicalGenerator.hpp"
#include "query_optimizer/Optimizer.hpp"
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/PhysicalGenerator.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "query_optimizer/logical/Logical.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/tests/TestDatabaseLoader.hpp"
#include "storage/StorageConstants.hpp"
#include "storage/StorageManager.hpp"
#include "utility/MemStream.hpp"
#include "utility/SqlError.hpp"

Expand All @@ -42,9 +54,81 @@ namespace quickstep {
class CatalogRelation;

namespace optimizer {
namespace {

void GenerateAndPrintPhysicalPlan(const ParseStatement &parse_statement,
CatalogDatabase *catalog_database,
std::string *output) {
OptimizerContext optimizer_context;
LogicalGenerator logical_generator(&optimizer_context);
PhysicalGenerator physical_generator(&optimizer_context);

const logical::LogicalPtr logical_plan =
logical_generator.generatePlan(*catalog_database, parse_statement);
const physical::PhysicalPtr physical_plan =
physical_generator.generatePlan(logical_plan, catalog_database);
output->append(physical_plan->toString());
output->append("--\n");
}

const char ExecutionGeneratorTestRunner::kResetOption[] =
"reset_before_execution";
} // namespace

const std::vector<std::string> ExecutionGeneratorTestRunner::kTestOptions = {
"reset_before_execution", "print_physical_plan",
};

ExecutionGeneratorTestRunner::ExecutionGeneratorTestRunner(
const std::string &storage_path)
: storage_manager_(storage_path),
thread_id_map_(ClientIDMap::Instance()) {
// Create the default catalog file.
const std::string catalog_path = storage_path + kCatalogFilename;
std::ofstream catalog_file(catalog_path);
CHECK(catalog_file.good())
<< "ERROR: Unable to open " << catalog_path << " for writing.";

Catalog catalog;
catalog.addDatabase(new CatalogDatabase(nullptr, "default"));
CHECK(catalog.getProto().SerializeToOstream(&catalog_file))
<< "ERROR: Unable to serialize catalog proto to file " << storage_path;
catalog_file.close();

// Create query processor and initialize the test database.
query_processor_ = std::make_unique<QueryProcessor>(std::string(catalog_path));

test_database_loader_ = std::make_unique<TestDatabaseLoader>(
query_processor_->getDefaultDatabase(), &storage_manager_),
test_database_loader_->createTestRelation(false /* allow_vchar */);
test_database_loader_->loadTestRelation();

bus_.Initialize();

main_thread_client_id_ = bus_.Connect();
bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage);
bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage);
bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage);

worker_.reset(new Worker(0, &bus_));

std::vector<client_id> worker_client_ids;
worker_client_ids.push_back(worker_->getBusClientID());

// We don't use the NUMA aware version of foreman code.
std::vector<int> numa_nodes;
numa_nodes.push_back(-1);

workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
foreman_.reset(
new ForemanSingleNode(main_thread_client_id_,
workers_.get(),
&bus_,
query_processor_->getDefaultDatabase(),
&storage_manager_));

foreman_->start();
worker_->start();
}

void ExecutionGeneratorTestRunner::runTestCase(
const std::string &input, const std::set<std::string> &options,
Expand All @@ -53,10 +137,10 @@ void ExecutionGeneratorTestRunner::runTestCase(

VLOG(4) << "Test SQL(s): " << input;

if (options.find(kResetOption) != options.end()) {
test_database_loader_.clear();
test_database_loader_.createTestRelation(false /* allow_vchar */);
test_database_loader_.loadTestRelation();
if (options.find(kTestOptions[0]) != options.end()) {
test_database_loader_->clear();
test_database_loader_->createTestRelation(false /* allow_vchar */);
test_database_loader_->loadTestRelation();
}

MemStream output_stream;
Expand All @@ -71,48 +155,64 @@ void ExecutionGeneratorTestRunner::runTestCase(
ParseResult result = sql_parser_.getNextStatement();
if (result.condition != ParseResult::kSuccess) {
if (result.condition == ParseResult::kError) {
*output = result.error_message;
output->append(result.error_message);
}
break;
} else {
const ParseStatement &parse_statement = *result.parsed_statement;
const CatalogRelation *query_result_relation = nullptr;
}

const ParseStatement &statement = *result.parsed_statement;
if (statement.getStatementType() == ParseStatement::kCommand) {
try {
OptimizerContext optimizer_context;
auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_);

optimizer_.generateQueryHandle(parse_statement,
test_database_loader_.catalog_database(),
&optimizer_context,
query_handle.get());
query_result_relation = query_handle->getQueryResultRelation();

QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id_,
foreman_->getBusClientID(),
query_handle.release(),
&bus_);
} catch (const SqlError &error) {
*output = error.formatMessage(input);
break;
cli::executeCommand(statement,
*(query_processor_->getDefaultDatabase()),
main_thread_client_id_,
foreman_->getBusClientID(),
&bus_,
&storage_manager_,
query_processor_.get(),
output_stream.file());
} catch (const quickstep::SqlError &sql_error) {
fprintf(stderr, "%s", sql_error.formatMessage(input).c_str());
}
output->append(output_stream.str());
continue;
}

QueryExecutionUtil::ReceiveQueryCompletionMessage(
main_thread_client_id_, &bus_);

if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
test_database_loader_.storage_manager(),
output_stream.file());
DropRelation::Drop(*query_result_relation,
test_database_loader_.catalog_database(),
test_database_loader_.storage_manager());
const CatalogRelation *query_result_relation = nullptr;
try {
if (options.find(kTestOptions[1]) != options.end()) {
GenerateAndPrintPhysicalPlan(statement,
query_processor_->getDefaultDatabase(),
output);
}
auto query_handle = std::make_unique<QueryHandle>(
query_processor_->query_id(), main_thread_client_id_);

query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);

query_result_relation = query_handle->getQueryResultRelation();

QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id_, foreman_->getBusClientID(),
query_handle.release(), &bus_);
} catch (const SqlError &error) {
output->append(error.formatMessage(input));
break;
}
}

if (output->empty()) {
*output = output_stream.str();
QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id_,
&bus_);

if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
&storage_manager_,
output_stream.file());
DropRelation::Drop(*query_result_relation,
test_database_loader_->catalog_database(),
test_database_loader_->storage_manager());
}
output->append(output_stream.str());
}
}

Expand Down
Loading

0 comments on commit e7e0ab9

Please sign in to comment.