-
Notifications
You must be signed in to change notification settings - Fork 5
/
SortMergeJoin.java
307 lines (270 loc) · 10.2 KB
/
SortMergeJoin.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package qp.operators;
import java.util.Date;
import java.util.Vector;
import qp.utils.Attribute;
import qp.utils.Batch;
import qp.utils.Tuple;
public class SortMergeJoin extends Join {
// The number of tuples per output batch.
private int batchSize;
// Index of the join attribute in left table.
private int leftIndex;
// Index of the join attribute in right table.
private int rightIndex;
// Type of the join attribute.
private int attrType;
// The buffer for the left input stream.
private Batch leftBatch;
// The buffer for the right input stream.
private Batch rightBatch;
// The tuple that is currently being processed from left input batch.
private Tuple leftTuple = null;
// The tuple that is currently being processed from right input batch.
private Tuple rightTuple = null;
// Cursor for left side buffer.
private int leftCursor = 0;
// Cursor for right side buffer.
private int rightCursor = 0;
// The right partition that is currently being joined in.
private Vector<Tuple> rightPartition = new Vector<>();
// The index of the tuple that is currently being processed in the current right partition (0-based).
private int rightPartitionIndex = 0;
// The next right tuple (i.e., the first element of the next right partition).
private Tuple nextRightTuple = null;
// Whether end of stream is reached for the left table.
private boolean eosLeft = false;
// Whether end of stream is reached for the right table.
private boolean eosRight = false;
/**
* Instantiates a new join operator using block-based nested loop algorithm.
*
* @param jn is the base join operator.
*/
public SortMergeJoin(Join jn) {
super(jn.getLeft(), jn.getRight(), jn.getCondition(), jn.getOpType());
schema = jn.getSchema();
joinType = jn.getJoinType();
numOfBuffer = jn.getNumOfBuffer();
}
/**
* Opens this operator by performing the following operations:
* 1. Sorts the left & right relation with external sort;
* 2. Stores the sorted relations from both sides into files;
* 3. Opens the connections.
*
* @return true if the operator is opened successfully.
*/
@Override
public boolean open() {
// Sorts the left relation.
left.open();
right.open();
// Selects the number of tuples per page based tuple size.
int tupleSize = schema.getTupleSize();
batchSize = Batch.getPageSize() / tupleSize;
// Gets the join attribute from left & right table.
Attribute leftAttr = con.getLeft();
Attribute rightAttr = (Attribute) con.getRight();
leftIndex = left.getSchema().indexOf(leftAttr);
rightIndex = right.getSchema().indexOf(rightAttr);
// Gets the type of the join attribute.
attrType = left.getSchema().typeOf(leftAttr);
return super.open();
}
/**
* Selects tuples satisfying the join condition from input buffers and returns.
*
* @return the next page of output tuples.
*/
@Override
public Batch next() {
// Returns empty if either left or right table reaches end-of-stream.
if (eosLeft || eosRight) {
close();
return null;
}
// To handle the 1st run.
if (leftBatch == null) {
leftBatch = left.next();
if (leftBatch == null) {
eosLeft = true;
return null;
}
leftTuple = readNextLeftTuple();
if (leftTuple == null) {
eosLeft = true;
return null;
}
}
if (rightBatch == null) {
rightBatch = right.next();
if (rightBatch == null) {
eosRight = true;
return null;
}
rightPartition = createNextRightPartition();
if (rightPartition.isEmpty()) {
eosRight = true;
return null;
}
rightPartitionIndex = 0;
rightTuple = rightPartition.elementAt(rightPartitionIndex);
}
// The output buffer.
Batch outBatch = new Batch(batchSize);
while (!outBatch.isFull()) {
int comparisionResult = compareTuples(leftTuple, rightTuple, leftIndex, rightIndex);
if (comparisionResult == 0) {
outBatch.add(leftTuple.joinWith(rightTuple));
// Left tuple remains unchanged if it has not attempted to match with all tuples in the current right partition.
if (rightPartitionIndex < rightPartition.size() - 1) {
rightPartitionIndex++;
rightTuple = rightPartition.elementAt(rightPartitionIndex);
} else {
Tuple nextLeftTuple = readNextLeftTuple();
if (nextLeftTuple == null) {
eosLeft = true;
break;
}
comparisionResult = compareTuples(leftTuple, nextLeftTuple, leftIndex, leftIndex);
leftTuple = nextLeftTuple;
// Moves back to the beginning of right partition if the next left tuple remains the same value as the current one.
if (comparisionResult == 0) {
rightPartitionIndex = 0;
rightTuple = rightPartition.elementAt(0);
} else {
// Proceeds and creates a new right partition otherwise.
rightPartition = createNextRightPartition();
if (rightPartition.isEmpty()) {
eosRight = true;
break;
}
// Updates the right tuple.
rightPartitionIndex = 0;
rightTuple = rightPartition.elementAt(rightPartitionIndex);
}
}
} else if (comparisionResult < 0) {
leftTuple = readNextLeftTuple();
if (leftTuple == null) {
eosLeft = true;
break;
}
} else {
rightPartition = createNextRightPartition();
if (rightPartition.isEmpty()) {
eosRight = true;
break;
}
rightPartitionIndex = 0;
rightTuple = rightPartition.elementAt(rightPartitionIndex);
}
}
return outBatch;
}
/**
* Creates the next partition from the right input batch based on the current right cursor value.
*
* @return a vector containing all tuples in the next right partition.
*/
private Vector<Tuple> createNextRightPartition() {
Vector<Tuple> partition = new Vector<>();
int comparisionResult = 0;
if (nextRightTuple == null) {
nextRightTuple = readNextRightTuple();
if (nextRightTuple == null) {
return partition;
}
}
// Continues until the next tuple carries a different value.
while (comparisionResult == 0) {
partition.add(nextRightTuple);
nextRightTuple = readNextRightTuple();
if (nextRightTuple == null) {
break;
}
comparisionResult = compareTuples(partition.elementAt(0), nextRightTuple, rightIndex, rightIndex);
}
return partition;
}
/**
* Reads the next tuple from left input batch.
*
* @return the next tuple if available; null otherwise.
*/
private Tuple readNextLeftTuple() {
// Reads in another batch if necessary.
if (leftBatch == null) {
eosLeft = true;
return null;
} else if (leftCursor == leftBatch.size()) {
leftBatch = left.next();
leftCursor = 0;
}
// Checks whether the left batch still has tuples left.
if (leftBatch == null || leftBatch.size() <= leftCursor) {
eosLeft = true;
return null;
}
// Reads in the next tuple from left batch.
Tuple nextLeftTuple = leftBatch.elementAt(leftCursor);
leftCursor++;
return nextLeftTuple;
}
/**
* Reads the next tuple from right input batch.
*
* @return the next tuple if available; null otherwise.
*/
private Tuple readNextRightTuple() {
// Reads another batch if necessary.
if (rightBatch == null) {
return null;
} else if (rightCursor == rightBatch.size()) {
rightBatch = right.next();
rightCursor = 0;
}
// Checks whether the right batch still has tuples left.
if (rightBatch == null || rightBatch.size() <= rightCursor) {
return null;
}
// Reads the next tuple.
Tuple next = rightBatch.elementAt(rightCursor);
rightCursor++;
return next;
}
/**
* Compares two tuples based on the join attribute.
*
* @param tuple1 is the first tuple.
* @param tuple2 is the second tuple.
* @return an integer indicating the comparision result, compatible with the {@link java.util.Comparator} interface.
*/
private int compareTuples(Tuple tuple1, Tuple tuple2, int index1, int index2) {
Object value1 = tuple1.dataAt(index1);
Object value2 = tuple2.dataAt(index2);
switch (attrType) {
case Attribute.INT:
return Integer.compare((int) value1, (int) value2);
case Attribute.STRING:
return ((String) value1).compareTo((String) value2);
case Attribute.REAL:
return Float.compare((float) value1, (float) value2);
case Attribute.TIME:
return ((Date) value1).compareTo((Date) value2);
default:
return 0;
}
}
/**
* Closes this operator.
*
* @return true if the operator is closed successfully.
*/
@Override
public boolean close() {
left.close();
right.close();
return super.close();
}
}