Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug in the optimization rule that adds FilterJoins. #5

Merged
merged 1 commit into from
Jun 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/LineReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ std::string LineReader::getNextCommand() {
// Skip all the whitespaces before the command.
const std::size_t start_position =
multiline_buffer.find_first_not_of(" \t\r\n");
DCHECK_LT(start_position, special_char_location + 1);
DCHECK_LE(start_position, special_char_location + 1);
return multiline_buffer.substr(start_position,
special_char_location + 1 - start_position);
}
Expand Down
1 change: 1 addition & 0 deletions query_optimizer/rules/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_InjectJoinFilters
quickstep_queryoptimizer_physical_PhysicalType
quickstep_queryoptimizer_physical_Selection
quickstep_queryoptimizer_physical_TopLevelPlan
quickstep_queryoptimizer_rules_CollapseSelection
quickstep_queryoptimizer_rules_PruneColumns
quickstep_queryoptimizer_rules_Rule
quickstep_types_TypeID
Expand Down
55 changes: 18 additions & 37 deletions query_optimizer/rules/InjectJoinFilters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/Selection.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "query_optimizer/rules/CollapseSelection.hpp"
#include "query_optimizer/rules/PruneColumns.hpp"
#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
Expand All @@ -49,28 +50,6 @@ namespace optimizer {
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;

namespace {

P::PhysicalPtr wrapSelection(const P::PhysicalPtr &input) {
DCHECK(P::SomeTopLevelPlan::Matches(input));
const P::TopLevelPlanPtr &top_level_plan =
std::static_pointer_cast<const P::TopLevelPlan>(input);
const P::PhysicalPtr &plan = top_level_plan->plan();

if (P::SomeFilterJoin::Matches(plan)) {
return input;
}

const P::SelectionPtr selection =
P::Selection::Create(
plan,
E::ToNamedExpressions(top_level_plan->plan()->getOutputAttributes()),
nullptr /* filter_predicate */);
return input->copyWithNewChildren({ selection });
}

} // namespace

P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) {
DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);

Expand All @@ -88,16 +67,15 @@ P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) {
return input;
}

// Step 2. If the top level plan is a filter join, wrap it with a Selection
// to stabilize output columns.
output = wrapSelection(output);

// Step 3. Push down FilterJoin nodes to be evaluated early.
// Step 2. Push down FilterJoin nodes to be evaluated early.
output = pushDownFilters(output);

// Step 4. Add Selection nodes for attaching the LIPFilters, if necessary.
// Step 3. Add Selection nodes for attaching the LIPFilters, if necessary.
output = addFilterAnchors(output, false);

// Step 4. Collapse redundant Selection nodes.
output = CollapseSelection().apply(output);

// Step 5. Because of the pushdown of FilterJoin nodes, there are optimization
// opportunities for projecting columns early.
output = PruneColumns().apply(output);
Expand Down Expand Up @@ -201,15 +179,18 @@ P::PhysicalPtr InjectJoinFilters::transformHashJoinToFilters(
build_child = hash_join->right();
build_side_filter_predicate = hash_join->build_predicate();
}
return P::FilterJoin::Create(new_children[0],
build_child,
hash_join->left_join_attributes(),
hash_join->right_join_attributes(),
hash_join->project_expressions(),
build_side_filter_predicate,
is_anti_join,
hash_join->hasRepartition(),
hash_join->cloneOutputPartitionSchemeHeader());
return P::Selection::Create(
P::FilterJoin::Create(new_children[0],
build_child,
hash_join->left_join_attributes(),
hash_join->right_join_attributes(),
hash_join->project_expressions(),
build_side_filter_predicate,
is_anti_join,
hash_join->hasRepartition(),
hash_join->cloneOutputPartitionSchemeHeader()),
E::ToNamedExpressions(input->getOutputAttributes()),
/*filter_predicate=*/nullptr);
}

if (input->children() != new_children) {
Expand Down
2 changes: 1 addition & 1 deletion query_optimizer/rules/InjectJoinFilters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class InjectJoinFilters : public Rule<physical::Physical> {
~InjectJoinFilters() override {}

std::string getName() const override {
return "TransformFilterJoins";
return "InjectJoinFilters";
}

physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
Expand Down
9 changes: 9 additions & 0 deletions query_optimizer/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_types_TypedValue
quickstep_types_containers_Tuple
quickstep_utility_Macros
quickstep_utility_ScopedDeleter
tmb)
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader
Expand Down Expand Up @@ -139,6 +140,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
glog
gtest
quickstep_catalog_CatalogDatabase
quickstep_cli_CommandExecutor
quickstep_cli_DropRelation
quickstep_cli_PrintToScreen
quickstep_parser_ParseStatement
Expand All @@ -148,10 +150,17 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest
quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_Worker
quickstep_queryexecution_WorkerDirectory
quickstep_queryoptimizer_LogicalGenerator
quickstep_queryoptimizer_Optimizer
quickstep_queryoptimizer_OptimizerContext
quickstep_queryoptimizer_PhysicalGenerator
quickstep_queryoptimizer_QueryHandle
quickstep_queryoptimizer_QueryProcessor
quickstep_queryoptimizer_logical_Logical
quickstep_queryoptimizer_physical_Physical
quickstep_queryoptimizer_tests_TestDatabaseLoader
quickstep_storage_StorageConstants
quickstep_storage_StorageManager
quickstep_threading_ThreadIDBasedMap
quickstep_utility_Macros
quickstep_utility_MemStream
Expand Down
4 changes: 2 additions & 2 deletions query_optimizer/tests/ExecutionGeneratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ int main(int argc, char** argv) {
new quickstep::optimizer::ExecutionGeneratorTestRunner(argv[3]));
test_driver.reset(
new quickstep::TextBasedTestDriver(&input_file, test_runner.get()));
test_driver->registerOption(
quickstep::optimizer::ExecutionGeneratorTestRunner::kResetOption);
test_driver->registerOptions(
quickstep::optimizer::ExecutionGeneratorTestRunner::kTestOptions);

::testing::InitGoogleTest(&argc, argv);
int success = RUN_ALL_TESTS();
Expand Down
180 changes: 140 additions & 40 deletions query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,30 @@
#include "query_optimizer/tests/ExecutionGeneratorTestRunner.hpp"

#include <cstdio>
#include <fstream>
#include <memory>
#include <set>
#include <string>
#include <vector>

#include "catalog/CatalogDatabase.hpp"
#include "cli/CommandExecutor.hpp"
#include "cli/DropRelation.hpp"
#include "cli/PrintToScreen.hpp"
#include "parser/ParseStatement.hpp"
#include "query_execution/ForemanSingleNode.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_optimizer/LogicalGenerator.hpp"
#include "query_optimizer/Optimizer.hpp"
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/PhysicalGenerator.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryProcessor.hpp"
#include "query_optimizer/logical/Logical.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/tests/TestDatabaseLoader.hpp"
#include "storage/StorageConstants.hpp"
#include "storage/StorageManager.hpp"
#include "utility/MemStream.hpp"
#include "utility/SqlError.hpp"

Expand All @@ -42,9 +54,81 @@ namespace quickstep {
class CatalogRelation;

namespace optimizer {
namespace {

void GenerateAndPrintPhysicalPlan(const ParseStatement &parse_statement,
CatalogDatabase *catalog_database,
std::string *output) {
OptimizerContext optimizer_context;
LogicalGenerator logical_generator(&optimizer_context);
PhysicalGenerator physical_generator(&optimizer_context);

const logical::LogicalPtr logical_plan =
logical_generator.generatePlan(*catalog_database, parse_statement);
const physical::PhysicalPtr physical_plan =
physical_generator.generatePlan(logical_plan, catalog_database);
output->append(physical_plan->toString());
output->append("--\n");
}

const char ExecutionGeneratorTestRunner::kResetOption[] =
"reset_before_execution";
} // namespace

const std::vector<std::string> ExecutionGeneratorTestRunner::kTestOptions = {
"reset_before_execution", "print_physical_plan",
};

ExecutionGeneratorTestRunner::ExecutionGeneratorTestRunner(
const std::string &storage_path)
: storage_manager_(storage_path),
thread_id_map_(ClientIDMap::Instance()) {
// Create the default catalog file.
const std::string catalog_path = storage_path + kCatalogFilename;
std::ofstream catalog_file(catalog_path);
CHECK(catalog_file.good())
<< "ERROR: Unable to open " << catalog_path << " for writing.";

Catalog catalog;
catalog.addDatabase(new CatalogDatabase(nullptr, "default"));
CHECK(catalog.getProto().SerializeToOstream(&catalog_file))
<< "ERROR: Unable to serialize catalog proto to file " << storage_path;
catalog_file.close();

// Create query processor and initialize the test database.
query_processor_ = std::make_unique<QueryProcessor>(std::string(catalog_path));

test_database_loader_ = std::make_unique<TestDatabaseLoader>(
query_processor_->getDefaultDatabase(), &storage_manager_),
test_database_loader_->createTestRelation(false /* allow_vchar */);
test_database_loader_->loadTestRelation();

bus_.Initialize();

main_thread_client_id_ = bus_.Connect();
bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage);
bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage);
bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage);

worker_.reset(new Worker(0, &bus_));

std::vector<client_id> worker_client_ids;
worker_client_ids.push_back(worker_->getBusClientID());

// We don't use the NUMA aware version of foreman code.
std::vector<int> numa_nodes;
numa_nodes.push_back(-1);

workers_.reset(new WorkerDirectory(1 /* number of workers */,
worker_client_ids, numa_nodes));
foreman_.reset(
new ForemanSingleNode(main_thread_client_id_,
workers_.get(),
&bus_,
query_processor_->getDefaultDatabase(),
&storage_manager_));

foreman_->start();
worker_->start();
}

void ExecutionGeneratorTestRunner::runTestCase(
const std::string &input, const std::set<std::string> &options,
Expand All @@ -53,10 +137,10 @@ void ExecutionGeneratorTestRunner::runTestCase(

VLOG(4) << "Test SQL(s): " << input;

if (options.find(kResetOption) != options.end()) {
test_database_loader_.clear();
test_database_loader_.createTestRelation(false /* allow_vchar */);
test_database_loader_.loadTestRelation();
if (options.find(kTestOptions[0]) != options.end()) {
test_database_loader_->clear();
test_database_loader_->createTestRelation(false /* allow_vchar */);
test_database_loader_->loadTestRelation();
}

MemStream output_stream;
Expand All @@ -71,48 +155,64 @@ void ExecutionGeneratorTestRunner::runTestCase(
ParseResult result = sql_parser_.getNextStatement();
if (result.condition != ParseResult::kSuccess) {
if (result.condition == ParseResult::kError) {
*output = result.error_message;
output->append(result.error_message);
}
break;
} else {
const ParseStatement &parse_statement = *result.parsed_statement;
const CatalogRelation *query_result_relation = nullptr;
}

const ParseStatement &statement = *result.parsed_statement;
if (statement.getStatementType() == ParseStatement::kCommand) {
try {
OptimizerContext optimizer_context;
auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_);

optimizer_.generateQueryHandle(parse_statement,
test_database_loader_.catalog_database(),
&optimizer_context,
query_handle.get());
query_result_relation = query_handle->getQueryResultRelation();

QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id_,
foreman_->getBusClientID(),
query_handle.release(),
&bus_);
} catch (const SqlError &error) {
*output = error.formatMessage(input);
break;
cli::executeCommand(statement,
*(query_processor_->getDefaultDatabase()),
main_thread_client_id_,
foreman_->getBusClientID(),
&bus_,
&storage_manager_,
query_processor_.get(),
output_stream.file());
} catch (const quickstep::SqlError &sql_error) {
fprintf(stderr, "%s", sql_error.formatMessage(input).c_str());
}
output->append(output_stream.str());
continue;
}

QueryExecutionUtil::ReceiveQueryCompletionMessage(
main_thread_client_id_, &bus_);

if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
test_database_loader_.storage_manager(),
output_stream.file());
DropRelation::Drop(*query_result_relation,
test_database_loader_.catalog_database(),
test_database_loader_.storage_manager());
const CatalogRelation *query_result_relation = nullptr;
try {
if (options.find(kTestOptions[1]) != options.end()) {
GenerateAndPrintPhysicalPlan(statement,
query_processor_->getDefaultDatabase(),
output);
}
auto query_handle = std::make_unique<QueryHandle>(
query_processor_->query_id(), main_thread_client_id_);

query_processor_->generateQueryHandle(statement, query_handle.get());
DCHECK(query_handle->getQueryPlanMutable() != nullptr);

query_result_relation = query_handle->getQueryResultRelation();

QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
main_thread_client_id_, foreman_->getBusClientID(),
query_handle.release(), &bus_);
} catch (const SqlError &error) {
output->append(error.formatMessage(input));
break;
}
}

if (output->empty()) {
*output = output_stream.str();
QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id_,
&bus_);

if (query_result_relation) {
PrintToScreen::PrintRelation(*query_result_relation,
&storage_manager_,
output_stream.file());
DropRelation::Drop(*query_result_relation,
test_database_loader_->catalog_database(),
test_database_loader_->storage_manager());
}
output->append(output_stream.str());
}
}

Expand Down
Loading