Skip to content

Commit

Permalink
Parallel tokenization and speedup during debugging. (#20)
Browse files Browse the repository at this point in the history
* Add parallel tokenization in reader.

* Fix bug in indexing dicts of questions and answers.
  • Loading branch information
Karan Desai committed Jul 25, 2019
1 parent a783a2d commit 9c1ee36
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 59 deletions.
2 changes: 2 additions & 0 deletions train.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
args.train_json,
overfit=args.overfit,
in_memory=args.in_memory,
num_workers=args.cpu_workers,
return_options=True if config["model"]["decoder"] == "disc" else False,
add_boundary_toks=False if config["model"]["decoder"] == "disc" else True,
)
Expand All @@ -145,6 +146,7 @@
args.val_dense_json,
overfit=args.overfit,
in_memory=args.in_memory,
num_workers=args.cpu_workers,
return_options=True,
add_boundary_toks=False if config["model"]["decoder"] == "disc" else True,
)
Expand Down
7 changes: 6 additions & 1 deletion visdialch/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ def __init__(
dense_annotations_jsonpath: Optional[str] = None,
overfit: bool = False,
in_memory: bool = False,
num_workers: int = 1,
return_options: bool = True,
add_boundary_toks: bool = False,
):
super().__init__()
self.config = config
self.return_options = return_options
self.add_boundary_toks = add_boundary_toks
self.dialogs_reader = DialogsReader(dialogs_jsonpath)
self.dialogs_reader = DialogsReader(
dialogs_jsonpath,
num_examples=(5 if overfit else None),
num_workers=num_workers
)

if "val" in self.split and dense_annotations_jsonpath is not None:
self.annotations_reader = DenseAnnotationsReader(
Expand Down
207 changes: 149 additions & 58 deletions visdialch/data/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

import copy
import json
from typing import Dict, List, Union
import multiprocessing as mp
from typing import Any, Dict, List, Optional, Set, Union

import h5py

Expand All @@ -34,97 +35,187 @@ class DialogsReader(object):
----------
dialogs_jsonpath : str
Path to json file containing VisDial v1.0 train, val or test data.
num_examples: int, optional (default = None)
Process first ``num_examples`` from the split. Useful to speed up while
debugging.
"""

def __init__(self, dialogs_jsonpath: str):
def __init__(
self,
dialogs_jsonpath: str,
num_examples: Optional[int] = None,
num_workers: int = 1,
):
with open(dialogs_jsonpath, "r") as visdial_file:
visdial_data = json.load(visdial_file)
self._split = visdial_data["split"]

self.questions = visdial_data["data"]["questions"]
self.answers = visdial_data["data"]["answers"]
# Maintain questions and answers as a dict instead of list because
# they are referenced by index in dialogs. We drop elements from
# these in "overfit" mode to save time (tokenization is slow).
self.questions = {
i: question for i, question in
enumerate(visdial_data["data"]["questions"])
}
self.answers = {
i: answer for i, answer in
enumerate(visdial_data["data"]["answers"])
}

# Add empty question, answer at the end, useful for padding dialog
# rounds for test.
self.questions.append("")
self.answers.append("")
# Add empty question, answer - useful for padding dialog rounds
# for test split.
self.questions[-1] = ""
self.answers[-1] = ""

# Image_id serves as key for all three dicts here.
self.captions = {}
self.dialogs = {}
self.num_rounds = {}
# ``image_id``` serves as key for all three dicts here.
self.captions: Dict[int, Any] = {}
self.dialogs: Dict[int, Any] = {}
self.num_rounds: Dict[int, Any] = {}

for dialog_for_image in visdial_data["data"]["dialogs"]:
self.captions[dialog_for_image["image_id"]] = dialog_for_image[
"caption"
]
all_dialogs = visdial_data["data"]["dialogs"]

# Retain only first ``num_examples`` dialogs if specified.
if num_examples is not None:
all_dialogs = all_dialogs[:num_examples]

for _dialog in all_dialogs:

self.captions[_dialog["image_id"]] = _dialog["caption"]

# Record original length of dialog, before padding.
# 10 for train and val splits, 10 or less for test split.
self.num_rounds[dialog_for_image["image_id"]] = len(
dialog_for_image["dialog"]
)
self.num_rounds[_dialog["image_id"]] = len(_dialog["dialog"])

# Pad dialog at the end with empty question and answer pairs
# (for test split).
while len(dialog_for_image["dialog"]) < 10:
dialog_for_image["dialog"].append(
{"question": -1, "answer": -1}
)

# Add empty answer /answer options if not provided
# (for test split).
for i in range(len(dialog_for_image["dialog"])):
if "answer" not in dialog_for_image["dialog"][i]:
dialog_for_image["dialog"][i]["answer"] = -1
if "answer_options" not in dialog_for_image["dialog"][i]:
dialog_for_image["dialog"][i]["answer_options"] = [
-1
] * 100

self.dialogs[dialog_for_image["image_id"]] = dialog_for_image[
"dialog"
]

while len(_dialog["dialog"]) < 10:
_dialog["dialog"].append({"question": -1, "answer": -1})

# Add empty answer (and answer options) if not provided
# (for test split). We use "-1" as a key for empty questions
# and answers.
for i in range(len(_dialog["dialog"])):
if "answer" not in _dialog["dialog"][i]:
_dialog["dialog"][i]["answer"] = -1
if "answer_options" not in _dialog["dialog"][i]:
_dialog["dialog"][i]["answer_options"] = [-1] * 100

self.dialogs[_dialog["image_id"]] = _dialog["dialog"]

# If ``num_examples`` is specified, collect questions and answers
# included in those examples, and drop the rest to save time while
# tokenizing. Collecting these should be fast because num_examples
# during debugging are generally small.
if num_examples is not None:
questions_included: Set[int] = set()
answers_included: Set[int] = set()

for _dialog in self.dialogs.values():
for _dialog_round in _dialog:
questions_included.add(_dialog_round["question"])
answers_included.add(_dialog_round["answer"])
for _answer_option in _dialog_round["answer_options"]:
answers_included.add(_answer_option)

self.questions = {
i: self.questions[i] for i in questions_included
}
self.answers = {
i: self.answers[i] for i in answers_included
}

self._multiprocess_tokenize(num_workers)

def _multiprocess_tokenize(self, num_workers: int):
"""
Tokenize captions, questions and answers in parallel processes. This
method uses multiprocessing module internally.
Since questions, answers and captions are dicts - and multiprocessing
map utilities operate on lists, we convert these to lists first and
then back to dicts.
Parameters
----------
num_workers: int
Number of workers (processes) to run in parallel.
"""

# While displaying progress bar through tqdm, specify total number of
# sequences to tokenize, because tqdm won't know in case of pool.imap
with mp.Pool(num_workers) as pool:
print(f"[{self._split}] Tokenizing questions...")
for i in tqdm(range(len(self.questions))):
self.questions[i] = word_tokenize(self.questions[i] + "?")
_question_tuples = self.questions.items()
_question_indices = [t[0] for t in _question_tuples]
_questions = list(
tqdm(
pool.imap(word_tokenize, [t[1] for t in _question_tuples]),
total=len(self.questions)
)
)
self.questions = {
i: question + ["?"] for i, question in
zip(_question_indices, _questions)
}
# Delete variables to free memory.
del _question_tuples, _question_indices, _questions

print(f"[{self._split}] Tokenizing answers...")
for i in tqdm(range(len(self.answers))):
self.answers[i] = word_tokenize(self.answers[i])
_answer_tuples = self.answers.items()
_answer_indices = [t[0] for t in _answer_tuples]
_answers = list(
tqdm(
pool.imap(word_tokenize, [t[1] for t in _answer_tuples]),
total=len(self.answers)
)
)
self.answers = {
i: answer + ["?"] for i, answer in
zip(_answer_indices, _answers)
}
# Delete variables to free memory.
del _answer_tuples, _answer_indices, _answers

print(f"[{self._split}] Tokenizing captions...")
for image_id, caption in tqdm(self.captions.items()):
self.captions[image_id] = word_tokenize(caption)
# Convert dict to separate lists of image_ids and captions.
_caption_tuples = self.captions.items()
_image_ids = [t[0] for t in _caption_tuples]
_captions = list(
tqdm(
pool.imap(word_tokenize, [t[1] for t in _caption_tuples]),
total=(len(_caption_tuples))
)
)
# Convert tokenized captions back to a dict.
self.captions = {i: c for i, c in zip(_image_ids, _captions)}

def __len__(self):
return len(self.dialogs)

def __getitem__(self, image_id: int) -> Dict[str, Union[int, str, List]]:
caption_for_image = self.captions[image_id]
dialog_for_image = copy.copy(self.dialogs[image_id])
dialog = copy.copy(self.dialogs[image_id])
num_rounds = self.num_rounds[image_id]

# Replace question and answer indices with actual word tokens.
for i in range(len(dialog_for_image)):
dialog_for_image[i]["question"] = self.questions[
dialog_for_image[i]["question"]
for i in range(len(dialog)):
dialog[i]["question"] = self.questions[
dialog[i]["question"]
]
dialog_for_image[i]["answer"] = self.answers[
dialog_for_image[i]["answer"]
dialog[i]["answer"] = self.answers[
dialog[i]["answer"]
]
for j, answer_option in enumerate(
dialog_for_image[i]["answer_options"]
dialog[i]["answer_options"]
):
dialog_for_image[i]["answer_options"][j] = self.answers[
dialog[i]["answer_options"][j] = self.answers[
answer_option
]

return {
"image_id": image_id,
"caption": caption_for_image,
"dialog": dialog_for_image,
"dialog": dialog,
"num_rounds": num_rounds,
}

Expand Down Expand Up @@ -201,16 +292,16 @@ def __init__(self, features_hdfpath: str, in_memory: bool = False):

with h5py.File(self.features_hdfpath, "r") as features_hdf:
self._split = features_hdf.attrs["split"]
self.image_id_list = list(features_hdf["image_id"])
self._image_id_list = list(features_hdf["image_id"])
# "features" is List[np.ndarray] if the dataset is loaded in-memory
# If not loaded in memory, then list of None.
self.features = [None] * len(self.image_id_list)
self.features = [None] * len(self._image_id_list)

def __len__(self):
return len(self.image_id_list)
return len(self._image_id_list)

def __getitem__(self, image_id: int):
index = self.image_id_list.index(image_id)
index = self._image_id_list.index(image_id)
if self._in_memory:
# Load features during first epoch, all not loaded together as it
# has a slow start.
Expand All @@ -228,7 +319,7 @@ def __getitem__(self, image_id: int):
return image_id_features

def keys(self) -> List[int]:
return self.image_id_list
return self._image_id_list

@property
def split(self):
Expand Down

0 comments on commit 9c1ee36

Please sign in to comment.