diff --git a/universa_core/src/main/java/com/icodici/universa/node/Elections.java b/universa_core/src/main/java/com/icodici/universa/node/Elections.java index f171dfb07..eb49fd885 100644 --- a/universa_core/src/main/java/com/icodici/universa/node/Elections.java +++ b/universa_core/src/main/java/com/icodici/universa/node/Elections.java @@ -58,16 +58,26 @@ public ItemState getState() { private HashSet lockedToRevoke = new HashSet<>(); private HashSet lockedToCreate = new HashSet<>(); private AsyncEvent doneEvent = new AsyncEvent<>(); + private boolean started = false; public Elections(LocalNode localNode, HashId itemId) throws Error { this.itemId = itemId; - initWithNode(localNode); + this.localNode = localNode; } public Elections(LocalNode localNode, Approvable item) throws Error { this.itemId = item.getId(); this.item = item; itemDownloaded.fire(null); + this.localNode = localNode; + } + + public void ensureStarted() throws Error { + synchronized (this) { + if( started ) + return; + started = true; + } initWithNode(localNode); } @@ -462,7 +472,7 @@ public void run() { log.d(localNode.toString() + " stop polling, interrupted, pollers size: " + pollers.size()); pollers.remove(this); } catch (Exception e) { - log.e("failed to check item " + itemId + " from node " + node + ": " + e.getMessage() + ", retrying"); +// log.e("failed to check item " + itemId + " from node " + node + ": " + e.getMessage() + ", retrying"); // e.printStackTrace(); reschedule(); } @@ -475,7 +485,8 @@ public void run() { } private void reschedule() { - future = pool.schedule(this, network.getRequeryPause().toMillis(), TimeUnit.MILLISECONDS); + if( !stop ) + future = pool.schedule(this, network.getRequeryPause().toMillis(), TimeUnit.MILLISECONDS); } } diff --git a/universa_core/src/main/java/com/icodici/universa/node/LocalNode.java b/universa_core/src/main/java/com/icodici/universa/node/LocalNode.java index ab8747fcb..6627fa51d 100644 --- a/universa_core/src/main/java/com/icodici/universa/node/LocalNode.java +++ b/universa_core/src/main/java/com/icodici/universa/node/LocalNode.java @@ -56,7 +56,7 @@ public Network getNetwork() { public ItemResult checkItem(Node caller, HashId itemId, ItemState state, boolean haveCopy) throws IOException { // First, we can have it in the ledger ItemResult itemResult = processCheckItem(caller, itemId, state, haveCopy, null, null); - log.d(""+this+" checkItem( from: " + caller + ":" + itemId + ":" + haveCopy + " << " + itemResult); + log.d("" + this + " checkItem( from: " + caller + ":" + itemId + ":" + haveCopy + " << " + itemResult); return itemResult; } @@ -85,7 +85,7 @@ public Approvable getItem(HashId itemId) throws IOException { @Override public void shutdown() { - allElections.forEach((id, e)-> e.close()); + allElections.forEach((id, e) -> e.close()); // ledger.close(); } @@ -160,57 +160,70 @@ public ItemResult checkItem(HashId itemId) { */ @NonNull private ItemResult processCheckItem(Node caller, @NonNull HashId itemId, ItemState state, boolean haveCopy, Approvable item, Consumer onDone) throws Elections.Error { - synchronized (checkLock) { - // Check the election first, it is faster than checking teh ledger - Elections elections = allElections.get(itemId); - if (elections == null) { - // It is not being elected, it could be in the ledger: - StateRecord record = ledger.getRecord(itemId); - if (record != null) { - // We have state in ledger but already discarded the item itself - ItemResult result = new ItemResult(record, false); - if (onDone != null) { - onDone.accept(result); - } - return result; + // Check the election first, it is faster than checking teh ledger + Elections elections = allElections.get(itemId); + if (elections == null) { + // It is not being elected, it could be in the ledger: + StateRecord record = ledger.getRecord(itemId); + if (record != null) { + // We have state in ledger but already discarded the item itself + ItemResult result = new ItemResult(record, false); + if (onDone != null) { + onDone.accept(result); } - // it is not in the ledger, it is not being elected, creeate new elections. - // If it wil throw an exception, it would be processed by the caller - if (item != null) { - assert (item.getId().equals(itemId)); - elections = new Elections(this, item); - } else - elections = new Elections(this, itemId); - allElections.put(itemId, elections); - // purge finished elections - elections.onDone(itemResult -> { - Elections.pool.schedule(() -> { - allElections.remove(itemId); -// log.i("elections+item purged: "+itemId); - }, network.getMaxElectionsTime().toMillis(), TimeUnit.MILLISECONDS); - }); + return result; } - if (caller != null && haveCopy) - elections.addSourceNode(caller); - if (caller != null && state != null) { - switch (state) { - case PENDING_POSITIVE: - case APPROVED: - elections.registerVote(caller, true); - break; - case PENDING_NEGATIVE: - case REVOKED: - case DECLINED: - elections.registerVote(caller, false); - break; - default: + // it is not in the ledger, it is not being elected, creeate new elections. + // If it wil throw an exception, it would be processed by the caller + + // Race condition preventing - we should only create one elections for one itemId + // but is should stay in allElections long enough so we will find it there anyway - if it even + // was processed entirely while we were crawling to the point: + synchronized (checkLock) { + // we can go there by the time someone else just created elections for us: + elections = allElections.get(itemId); + if (elections == null) { + if (item != null) { + assert (item.getId().equals(itemId)); + elections = new Elections(this, item); + } else + elections = new Elections(this, itemId); + allElections.put(itemId, elections); } } - if (onDone != null) { - elections.onDone(onDone); + // from now we can and e should do everything in parallel again. + // as starting elections includes long initial item checking procedure, + // we do it outside of the checkLock mutex: + elections.ensureStarted(); + + // purge finished elections + elections.onDone(itemResult -> { + Elections.pool.schedule(() -> { + allElections.remove(itemId); +// log.i("elections+item purged: "+itemId); + }, network.getMaxElectionsTime().toMillis(), TimeUnit.MILLISECONDS); + }); + } + if (caller != null && haveCopy) + elections.addSourceNode(caller); + if (caller != null && state != null) { + switch (state) { + case PENDING_POSITIVE: + case APPROVED: + elections.registerVote(caller, true); + break; + case PENDING_NEGATIVE: + case REVOKED: + case DECLINED: + elections.registerVote(caller, false); + break; + default: } - return new ItemResult(elections.getRecord(), elections.getItem() != null); } + if (onDone != null) { + elections.onDone(onDone); + } + return new ItemResult(elections.getRecord(), elections.getItem() != null); } public Ledger getLedger() { @@ -227,8 +240,7 @@ public String toString() { } /** - * Testing only. Imitate situation when the item can't be downloaded prior to consensus - * found. + * Testing only. Imitate situation when the item can't be downloaded prior to consensus found. */ public void emulateLateDownload() { this.lateDownload = true; diff --git a/universa_core/src/main/java/com/icodici/universa/node/StateRecord.java b/universa_core/src/main/java/com/icodici/universa/node/StateRecord.java index 492dd594b..8492cb266 100644 --- a/universa_core/src/main/java/com/icodici/universa/node/StateRecord.java +++ b/universa_core/src/main/java/com/icodici/universa/node/StateRecord.java @@ -39,6 +39,9 @@ public StateRecord(Ledger ledger, ResultSet rs) throws SQLException, IOException } public void initFrom(ResultSet rs) throws SQLException { + // the processing mught be already fininshed by now: + if( rs == null || rs.isClosed() ) + throw new SQLException("resultset or connection is closed"); recordId = rs.getLong("id"); try { id = HashId.withDigest(Do.read(rs.getBinaryStream("hash")));