Skip to content

Commit

Permalink
found potential root cause and fix for deadlocks and random crashes i…
Browse files Browse the repository at this point in the history
…n benchmarks (ycsb, gavel, tpcc, tpch, etc.),
  • Loading branch information
datomo committed Aug 26, 2024
1 parent ce226fd commit 7e78573
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2137,7 +2137,7 @@ public static PolyBoolean isNotFalse( PolyBoolean b ) {
* NULL → NULL, FALSE → TRUE, TRUE → FALSE.
*/
public static PolyBoolean not( PolyBoolean b ) {
return PolyBoolean.of( !b.value );
return b == null ? PolyBoolean.of( null ): PolyBoolean.of( !b.value );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ public List<ImplementationContext> anyPrepareQuery( QueryContext context, Statem
if ( transaction.isAnalyze() ) {
statement.getOverviewDuration().stop( "Translation" );
}

if ( !statement.getTransaction().isActive() ) {
log.warn( "Transaction is not active" );
}
implementation = statement.getQueryProcessor().prepareQuery( root, true );
}
// queries are able to switch the context of the following queries
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/main/java/org/polypheny/db/transaction/LockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class LockManager {
public static final LockManager INSTANCE = new LockManager();

private boolean isExclusive = false;
private final Set<Xid> owners = new HashSet<>();
private final Set<Long> owners = new HashSet<>();
private final ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
Expand All @@ -54,7 +54,7 @@ public void lock( LockMode mode, @NonNull Transaction transaction ) throws Deadl
if ( owners.isEmpty() ) {
handleLockOrThrow( mode, transaction );
return;
} else if ( owners.contains( transaction.getXid() ) && (mode == LockMode.SHARED || isExclusive) ) {
} else if ( owners.contains( transaction.getId() ) && (mode == LockMode.SHARED || isExclusive) ) {
log.debug( "already locked {}", transaction.getXid() );
// already have the required lock
return;
Expand All @@ -69,7 +69,7 @@ public void lock( LockMode mode, @NonNull Transaction transaction ) throws Deadl
lock.lock();
try {
while ( waiters.peek() != thread ) {
log.debug( "wait {} ", transaction.getXid() );
log.debug( "wait {} ", transaction.getId() );
boolean successful = condition.await( RuntimeConfig.LOCKING_MAX_TIMEOUT_SECONDS.getInteger(), TimeUnit.SECONDS );
if ( !successful ) {
cleanup( thread );
Expand Down Expand Up @@ -120,23 +120,23 @@ private void handleLockOrThrow( LockMode mode, @NotNull Transaction transaction
private synchronized boolean handleSimpleLock( @NonNull LockMode mode, Transaction transaction ) {
if ( mode == LockMode.EXCLUSIVE ) {
// get w
if ( owners.isEmpty() || (owners.size() == 1 && owners.contains( transaction.getXid() )) ) {
if ( owners.isEmpty() || (owners.size() == 1 && owners.contains( transaction.getId() )) ) {
if ( isExclusive ) {
log.debug( "lock already exclusive" );
return true;
}

log.debug( "x lock {}", transaction.getXid() );
isExclusive = true;
owners.add( transaction.getXid() );
owners.add( transaction.getId() );
return true;
}

} else {
// get r
if ( !isExclusive || owners.contains( transaction.getXid() ) ) {
if ( !isExclusive || owners.contains( transaction.getId() ) ) {
log.debug( "r lock {}", transaction.getXid() );
owners.add( transaction.getXid() );
owners.add( transaction.getId() );
return true;
}

Expand All @@ -158,7 +158,7 @@ private void signalAll() {


public synchronized void unlock( @NonNull Transaction transaction ) {
if ( !owners.contains( transaction.getXid() ) ) {
if ( !owners.contains( transaction.getId() ) ) {
log.debug( "Transaction is no owner" );
return;
}
Expand All @@ -167,7 +167,7 @@ public synchronized void unlock( @NonNull Transaction transaction ) {
isExclusive = false;
}
log.debug( "release {}", transaction.getXid() );
owners.remove( transaction.getXid() );
owners.remove( transaction.getId() );

// wake up waiters
signalAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ public void commit() throws TransactionException {
for ( Adapter<?> adapter : involvedAdapters ) {
adapter.commit( xid );
}
if ( involvedAdapters.isEmpty() ) {
log.debug( "No adapter used." );
}

this.statements.forEach( statement -> {
if ( statement.getMonitoringEvent() != null ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ private Enumerator<PolyValue[]> enumeratorBasedOnStatement() {
int updateCount = statement.getUpdateCount();
return Linq4j.singletonEnumerator( new PolyValue[]{ PolyLong.of( updateCount ) } );
}
} catch ( SQLException e ) {
} catch ( Throwable e ) {
throw Static.RESOURCE.exceptionWhilePerformingQueryOnJdbcSubSchema( sql ).ex( e );
} finally {
closeIfPossible( statement );
Expand Down Expand Up @@ -439,7 +439,7 @@ private Enumerator<PolyValue[]> enumeratorBasedOnPreparedStatement() {
return Linq4j.singletonEnumerator( new PolyValue[]{ PolyLong.of( updateCount ) } );
}
}
} catch ( SQLException e ) {
} catch ( Throwable e ) {
throw Static.RESOURCE.exceptionWhilePerformingQueryOnJdbcSubSchema( sql ).ex( e );
} finally {
closeIfPossible( preparedStatement );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public void commit( PolyXid xid ) {
if ( connectionFactory.hasConnectionHandler( xid ) ) {
try {
connectionFactory.getConnectionHandler( xid ).commit();
} catch ( ConnectionHandlerException e ) {
} catch ( Throwable e ) {
throw new GenericRuntimeException( e );
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class PIClient {
@Getter
private final String clientUUID;
private final LogicalUser catalogUser;
@Getter
@Setter
private Transaction currentTransaction;
private final TransactionManager transactionManager;
@Getter
Expand Down Expand Up @@ -94,7 +96,7 @@ private void commitCurrentTransactionUnsynchronized() throws PIServiceException
}
try {
currentTransaction.commit();
} catch ( TransactionException e ) {
} catch ( Throwable e ) {
throw new PIServiceException( "Committing current transaction failed: " + e.getMessage() );
} finally {
clearCurrentTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ private Response prepareIndexedStatement( PrepareStatementRequest request, Respo
private Response executeIndexedStatement( ExecuteIndexedStatementRequest request, ResponseMaker<StatementResult> responseObserver ) {
PIClient client = getClient();
PIPreparedIndexedStatement statement = client.getStatementManager().getIndexedPreparedStatement( request.getStatementId() );
if ( statement != null && statement.getTransaction() != null && client.getCurrentTransaction() != null && statement.getTransaction().getId() != client.getCurrentTransaction().getId() ) {
if ( !statement.getTransaction().isActive() ){ // todo @gartens @tobiashafner fix
log.debug( "This definitely should not happen! {}", statement.getTransaction().getId() );
statement.setStatement( client.getCurrentTransaction().createStatement() );
}
}
int fetchSize = request.hasFetchSize()
? request.getFetchSize()
: PropertyUtils.DEFAULT_FETCH_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class PIPreparedIndexedStatement extends PIPreparedStatement {

private final String query;
@Setter
private Statement statement;
@Setter
private PolyImplementation implementation;
Expand Down Expand Up @@ -173,7 +174,7 @@ public void close() {

@Override
public Transaction getTransaction() {
return statement.getTransaction();
return statement != null ? statement.getTransaction() : null;
}

}

0 comments on commit 7e78573

Please sign in to comment.