Skip to content

Commit

Permalink
better parallelism: node will make use of many cores now when chack
Browse files Browse the repository at this point in the history
is called from different threads
  • Loading branch information
sergeych committed Aug 24, 2017
1 parent 97a6c1a commit ca375a6
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,26 @@ public ItemState getState() {
private HashSet<StateRecord> lockedToRevoke = new HashSet<>();
private HashSet<StateRecord> lockedToCreate = new HashSet<>();
private AsyncEvent<ItemResult> doneEvent = new AsyncEvent<>();
private boolean started = false;

public Elections(LocalNode localNode, HashId itemId) throws Error {
this.itemId = itemId;
initWithNode(localNode);
this.localNode = localNode;
}

public Elections(LocalNode localNode, Approvable item) throws Error {
this.itemId = item.getId();
this.item = item;
itemDownloaded.fire(null);
this.localNode = localNode;
}

public void ensureStarted() throws Error {
synchronized (this) {
if( started )
return;
started = true;
}
initWithNode(localNode);
}

Expand Down Expand Up @@ -462,7 +472,7 @@ public void run() {
log.d(localNode.toString() + " stop polling, interrupted, pollers size: " + pollers.size());
pollers.remove(this);
} catch (Exception e) {
log.e("failed to check item " + itemId + " from node " + node + ": " + e.getMessage() + ", retrying");
// log.e("failed to check item " + itemId + " from node " + node + ": " + e.getMessage() + ", retrying");
// e.printStackTrace();
reschedule();
}
Expand All @@ -475,7 +485,8 @@ public void run() {
}

private void reschedule() {
future = pool.schedule(this, network.getRequeryPause().toMillis(), TimeUnit.MILLISECONDS);
if( !stop )
future = pool.schedule(this, network.getRequeryPause().toMillis(), TimeUnit.MILLISECONDS);

}
}
Expand Down
110 changes: 61 additions & 49 deletions universa_core/src/main/java/com/icodici/universa/node/LocalNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Network getNetwork() {
public ItemResult checkItem(Node caller, HashId itemId, ItemState state, boolean haveCopy) throws IOException {
// First, we can have it in the ledger
ItemResult itemResult = processCheckItem(caller, itemId, state, haveCopy, null, null);
log.d(""+this+" checkItem( from: " + caller + ":" + itemId + ":" + haveCopy + " << " + itemResult);
log.d("" + this + " checkItem( from: " + caller + ":" + itemId + ":" + haveCopy + " << " + itemResult);
return itemResult;
}

Expand Down Expand Up @@ -85,7 +85,7 @@ public Approvable getItem(HashId itemId) throws IOException {

@Override
public void shutdown() {
allElections.forEach((id, e)-> e.close());
allElections.forEach((id, e) -> e.close());
// ledger.close();
}

Expand Down Expand Up @@ -160,57 +160,70 @@ public ItemResult checkItem(HashId itemId) {
*/
@NonNull
private ItemResult processCheckItem(Node caller, @NonNull HashId itemId, ItemState state, boolean haveCopy, Approvable item, Consumer<ItemResult> onDone) throws Elections.Error {
synchronized (checkLock) {
// Check the election first, it is faster than checking teh ledger
Elections elections = allElections.get(itemId);
if (elections == null) {
// It is not being elected, it could be in the ledger:
StateRecord record = ledger.getRecord(itemId);
if (record != null) {
// We have state in ledger but already discarded the item itself
ItemResult result = new ItemResult(record, false);
if (onDone != null) {
onDone.accept(result);
}
return result;
// Check the election first, it is faster than checking teh ledger
Elections elections = allElections.get(itemId);
if (elections == null) {
// It is not being elected, it could be in the ledger:
StateRecord record = ledger.getRecord(itemId);
if (record != null) {
// We have state in ledger but already discarded the item itself
ItemResult result = new ItemResult(record, false);
if (onDone != null) {
onDone.accept(result);
}
// it is not in the ledger, it is not being elected, creeate new elections.
// If it wil throw an exception, it would be processed by the caller
if (item != null) {
assert (item.getId().equals(itemId));
elections = new Elections(this, item);
} else
elections = new Elections(this, itemId);
allElections.put(itemId, elections);
// purge finished elections
elections.onDone(itemResult -> {
Elections.pool.schedule(() -> {
allElections.remove(itemId);
// log.i("elections+item purged: "+itemId);
}, network.getMaxElectionsTime().toMillis(), TimeUnit.MILLISECONDS);
});
return result;
}
if (caller != null && haveCopy)
elections.addSourceNode(caller);
if (caller != null && state != null) {
switch (state) {
case PENDING_POSITIVE:
case APPROVED:
elections.registerVote(caller, true);
break;
case PENDING_NEGATIVE:
case REVOKED:
case DECLINED:
elections.registerVote(caller, false);
break;
default:
// it is not in the ledger, it is not being elected, creeate new elections.
// If it wil throw an exception, it would be processed by the caller

// Race condition preventing - we should only create one elections for one itemId
// but is should stay in allElections long enough so we will find it there anyway - if it even
// was processed entirely while we were crawling to the point:
synchronized (checkLock) {
// we can go there by the time someone else just created elections for us:
elections = allElections.get(itemId);
if (elections == null) {
if (item != null) {
assert (item.getId().equals(itemId));
elections = new Elections(this, item);
} else
elections = new Elections(this, itemId);
allElections.put(itemId, elections);
}
}
if (onDone != null) {
elections.onDone(onDone);
// from now we can and e should do everything in parallel again.
// as starting elections includes long initial item checking procedure,
// we do it outside of the checkLock mutex:
elections.ensureStarted();

// purge finished elections
elections.onDone(itemResult -> {
Elections.pool.schedule(() -> {
allElections.remove(itemId);
// log.i("elections+item purged: "+itemId);
}, network.getMaxElectionsTime().toMillis(), TimeUnit.MILLISECONDS);
});
}
if (caller != null && haveCopy)
elections.addSourceNode(caller);
if (caller != null && state != null) {
switch (state) {
case PENDING_POSITIVE:
case APPROVED:
elections.registerVote(caller, true);
break;
case PENDING_NEGATIVE:
case REVOKED:
case DECLINED:
elections.registerVote(caller, false);
break;
default:
}
return new ItemResult(elections.getRecord(), elections.getItem() != null);
}
if (onDone != null) {
elections.onDone(onDone);
}
return new ItemResult(elections.getRecord(), elections.getItem() != null);
}

public Ledger getLedger() {
Expand All @@ -227,8 +240,7 @@ public String toString() {
}

/**
* Testing only. Imitate situation when the item can't be downloaded prior to consensus
* found.
* Testing only. Imitate situation when the item can't be downloaded prior to consensus found.
*/
public void emulateLateDownload() {
this.lateDownload = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public StateRecord(Ledger ledger, ResultSet rs) throws SQLException, IOException
}

public void initFrom(ResultSet rs) throws SQLException {
// the processing mught be already fininshed by now:
if( rs == null || rs.isClosed() )
throw new SQLException("resultset or connection is closed");
recordId = rs.getLong("id");
try {
id = HashId.withDigest(Do.read(rs.getBinaryStream("hash")));
Expand Down

0 comments on commit ca375a6

Please sign in to comment.