Skip to content

Commit

Permalink
fix: add support and automated processing of categorical variables in…
Browse files Browse the repository at this point in the history
… timeseries data
  • Loading branch information
beniz authored and sileht committed Sep 28, 2020
1 parent 69ff0fb commit 1a9af3e
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 125 deletions.
115 changes: 76 additions & 39 deletions src/backends/caffe/caffeinputconns.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,7 @@ namespace dd
{
if (_cifc)
{
_cifc->_columns.clear();
std::string test_file = _cifc->_csv_test_fname;
_cifc->_csv_test_fname = "";
_cifc->read_csv(fname);
Expand Down Expand Up @@ -1697,63 +1698,100 @@ namespace dd
return 0;
}

int DDCCsvTS::read_dir(const std::string &dir, bool is_test_data,
bool update_bounds)
int DDCCsvTS::read_dir(const std::string &dir)
{
// first recursive list csv files
std::unordered_set<std::string> allfiles;
int ret = fileops::list_directory(dir, true, false, true, allfiles);
//- list all CSV files in directory
std::unordered_set<std::string> trainfiles;
int ret = fileops::list_directory(dir, true, false, true, trainfiles);
if (ret != 0)
return ret;
// then simply read them
if (!_cifc)
return -1;

if (update_bounds && _cifc->_scale
&& (_cifc->_min_vals.empty() || _cifc->_max_vals.empty()))
//- pick one file up and read header once
std::string fname = (*trainfiles.begin());
std::ifstream csv_file(fname, std::ios::binary);
if (!csv_file.is_open())
throw InputConnectorBadParamException("cannot open file " + fname);
std::string hline;
std::getline(csv_file, hline);
_cifc->read_header(hline);

//- read all test files
std::unordered_set<std::string> testfiles;
if (!_cifc->_csv_test_fname.empty())
fileops::list_directory(_cifc->_csv_test_fname, true, false, true,
testfiles);

std::unordered_set<std::string> allfiles = trainfiles;

//- aggregate all files = train + test
allfiles.insert(testfiles.begin(), testfiles.end());

//- read categoricals first if any as it affects the number of columns (and
// thus bounds)
if (!_cifc->_categoricals.empty())
{
std::unordered_set<std::string> reallyallfiles;
ret = fileops::list_directory(_cifc->_csv_test_fname, true, false,
true, reallyallfiles);
reallyallfiles.insert(allfiles.begin(), allfiles.end());
std::unordered_map<std::string, CCategorical> categoricals;
for (auto fname : allfiles)
{
csv_file = std::ifstream(fname, std::ios::binary);
if (!csv_file.is_open())
throw InputConnectorBadParamException("cannot open file "
+ fname);
std::string hline;
std::getline(csv_file, hline); // skip header

// read on categoricals
_cifc->fillup_categoricals(csv_file);
_cifc->merge_categoricals(categoricals);
}
}

std::vector<double> min_vals = _cifc->_min_vals;
std::vector<double> max_vals = _cifc->_max_vals;
for (auto fname : reallyallfiles)
//- read bounds across all TS CSV files
if (_cifc->_scale
&& (_cifc->_min_vals.empty() || _cifc->_max_vals.empty()))
{
std::vector<double> min_vals(_cifc->_min_vals);
std::vector<double> max_vals(_cifc->_max_vals);
for (auto fname : allfiles)
{
std::pair<std::vector<double>, std::vector<double>> mm
= _cifc->get_min_max_vals(fname);
if (min_vals.empty())
min_vals = mm.first;
else
for (size_t j = 0; j < mm.first.size(); j++)
min_vals.at(j) = std::min(mm.first.at(j), min_vals.at(j));
if (max_vals.empty())
max_vals = mm.second;
else
for (size_t j = 0; j < mm.first.size(); j++)
max_vals.at(j) = std::max(mm.second.at(j), max_vals.at(j));
csv_file = std::ifstream(fname, std::ios::binary);
if (!csv_file.is_open())
throw InputConnectorBadParamException("cannot open file "
+ fname);
std::string hline;
std::getline(csv_file, hline); // skip header

//- read bounds min/max
_cifc->_min_vals.clear();
_cifc->_max_vals.clear();
_cifc->find_min_max(csv_file);
_cifc->merge_min_max(min_vals, max_vals);
}

//- update global bounds
_cifc->_min_vals = min_vals;
_cifc->_max_vals = max_vals;
_cifc->serialize_bounds();
}

if (!is_test_data && _cifc->_shuffle)
// shuffle training data as needed
std::vector<std::string> trainfiles_v;
for (auto fname : trainfiles)
trainfiles_v.push_back(fname);
if (_cifc->_shuffle)
{
std::vector<std::string> allfiles_v;
for (auto fname : allfiles)
allfiles_v.push_back(fname);
auto rng = std::default_random_engine();
std::shuffle(allfiles_v.begin(), allfiles_v.end(), rng);
for (auto fname : allfiles_v)
read_file(fname, is_test_data);
std::shuffle(trainfiles_v.begin(), trainfiles_v.end(), rng);
}
else

for (auto fname : allfiles)
read_file(fname, is_test_data);

for (auto fname : trainfiles_v)
read_file(fname, false);
for (auto fname : testfiles)
read_file(fname, true);
_cifc->update_columns();
return 0;
}

Expand Down Expand Up @@ -2034,8 +2072,7 @@ namespace dd
DDCCsvTS ddccsvts;
ddccsvts._cifc = this;
ddccsvts._adconf = ad_input;
ddccsvts.read_dir(_csv_fname, false, true);
ddccsvts.read_dir(_csv_test_fname, true, false);
ddccsvts.read_dir(_csv_fname);

_txn->Commit();
_ttxn->Commit();
Expand Down
3 changes: 1 addition & 2 deletions src/backends/caffe/caffeinputconns.h
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,7 @@ namespace dd
int read_file(const std::string &fname, bool is_test_data = false);
int read_db(const std::string &fname);
int read_mem(const std::string &content);
int read_dir(const std::string &dir, bool is_test_data = false,
bool update_bounds = true);
int read_dir(const std::string &dir);

DDCsvTS _ddcsvts;
CSVTSCaffeInputFileConn *_cifc = nullptr;
Expand Down
85 changes: 46 additions & 39 deletions src/csvinputfileconn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,49 @@ namespace dd
throw InputConnectorBadParamException("cannot find id column " + _id);
}

void CSVInputFileConn::fillup_categoricals(std::ifstream &csv_file)
{
int l = 0;
std::string hline;
while (std::getline(csv_file, hline))
{
hline.erase(std::remove(hline.begin(), hline.end(), '\r'),
hline.end());
std::vector<double> vals;
std::string cid;
std::string col;
auto hit = _columns.begin();
std::unordered_set<int>::const_iterator igit;
std::stringstream sh(hline);
int cu = 0;
while (std::getline(sh, col, _delim[0]))
{
if (cu >= _detect_cols)
{
_logger->error("line {} has more columns than headers / this "
"line: {} / header: {}",
l, cu, _detect_cols);
_logger->error(hline);
throw InputConnectorBadParamException(
"line has more columns than headers");
}
if ((igit = _ignored_columns_pos.find(cu))
!= _ignored_columns_pos.end())
{
++cu;
continue;
}
update_category((*hit), col);
++hit;
++cu;
}
++l;
}
csv_file.clear();
csv_file.seekg(0, std::ios::beg);
std::getline(csv_file, hline); // skip header line
}

void CSVInputFileConn::find_min_max(std::ifstream &csv_file)
{
int nlines = 0;
Expand Down Expand Up @@ -330,42 +373,7 @@ namespace dd
// categorical variables
if (_train && !_categoricals.empty())
{
int l = 0;
while (std::getline(csv_file, hline))
{
hline.erase(std::remove(hline.begin(), hline.end(), '\r'),
hline.end());
std::vector<double> vals;
std::string cid;
std::string col;
auto hit = _columns.begin();
std::unordered_set<int>::const_iterator igit;
std::stringstream sh(hline);
int cu = 0;
while (std::getline(sh, col, _delim[0]))
{
if (cu >= _detect_cols)
{
_logger->error("line {} has more columns than headers", l);
_logger->error(hline);
throw InputConnectorBadParamException(
"line has more columns than headers");
}
if ((igit = _ignored_columns_pos.find(cu))
!= _ignored_columns_pos.end())
{
++cu;
continue;
}
update_category((*hit), col);
++hit;
++cu;
}
++l;
}
csv_file.clear();
csv_file.seekg(0, std::ios::beg);
std::getline(csv_file, hline); // skip header line
fillup_categoricals(csv_file);
}

// scaling to [0,1]
Expand Down Expand Up @@ -397,8 +405,8 @@ namespace dd
// debug
/*std::cout << "csv data line #" << nlines << "= " << vals.size() <<
std::endl;
std::copy(vals.begin(),vals.end(),std::ostream_iterator<double>(std::cout,"
")); std::cout << std::endl;*/
std::copy(vals.begin(),vals.end(),std::ostream_iterator<double>(std::cout,""));
std::cout << std::endl;*/
// debug
}
_logger->info("read {} lines from {}", nlines, fname);
Expand Down Expand Up @@ -454,5 +462,4 @@ namespace dd
if (!_ignored_columns.empty() || !_categoricals.empty())
update_columns();
}

}
2 changes: 2 additions & 0 deletions src/csvinputfileconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ namespace dd

void read_header(std::string &hline);

void fillup_categoricals(std::ifstream &csv_file);

void read_csv_line(const std::string &hline, const std::string &delim,
std::vector<double> &vals, std::string &column_id,
int &nlines);
Expand Down
Loading

0 comments on commit 1a9af3e

Please sign in to comment.