Skip to content

Commit

Permalink
cleaned up code
Browse files Browse the repository at this point in the history
  • Loading branch information
NehaSelvan1512 committed Oct 16, 2023
1 parent 002b814 commit 5c3723e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,22 +445,6 @@ private void subscribe( String topic ) {
.send()
.whenComplete( ( subAck, throwable ) -> {
if ( throwable != null ) {
//TODO: change settings correctly: Test this
List<String> topicsList = toList( this.getCurrentSettings().get( "topics" ) );
StringBuilder stringBuilder = new StringBuilder();
for ( String t : topicsList ) {
if ( !t.equals( topic ) ) {
stringBuilder.append( t ).append( "," );
}
}
String topicsString = stringBuilder.toString();
if ( topicsString != null && !topicsString.isBlank() ) {
topicsString = topicsString.substring( 0, topicsString.lastIndexOf( ',' ) );
}
synchronized ( settingsLock ) {
this.settings.put( "topics", topicsString );
}
log.info( "not successful: {}", topic );
throw new RuntimeException( String.format( "Subscription was not successful for topic \"%s\" . Please try again.", topic ), throwable );
} else {
this.topicsMap.put( topic, new AtomicLong( 0 ) );
Expand Down Expand Up @@ -598,12 +582,12 @@ protected static String extractPayload( Mqtt5Publish subMsg ) {

protected String getWildcardTopic( String topic ) {
for ( String t : topicsMap.keySet() ) {
//multilevel wildcard
//check for multilevel wildcard
if ( t.contains( "#" ) && topic.startsWith( t.substring( 0, t.indexOf( "#" ) ) ) ) {
return t;

}
// single level wildcard
// check for single level wildcard
if ( t.contains( "+" ) && topic.startsWith( t.substring( 0, t.indexOf( "+" ) ) ) && topic.endsWith( t.substring( t.indexOf( "+" ) + 1 ) ) ) {
return t;
}
Expand Down Expand Up @@ -677,7 +661,7 @@ private void createEntity( String entityName ) {
DdlManager.getInstance().createCollection(
namespaceID,
collectionName,
true, //only creates collection if it does not already exist.
true,
dataStores.size() == 0 ? null : dataStores,
PlacementType.MANUAL,
statement );
Expand Down Expand Up @@ -712,6 +696,7 @@ private void createAllEntities() {
}
} else {
// handle other namespace types
throw new RuntimeException("Other namespace types are not implemented yet");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import lombok.Getter;
import org.polypheny.db.catalog.Catalog.NamespaceType;

//TODO: javadoc

public class StoringMqttMessage {

private final MqttMessage msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
public class StreamCapture {

Transaction transaction;
PolyphenyHomeDirManager homeDirManager;
StoringMqttMessage storingMqttMessage;


Expand All @@ -60,36 +59,8 @@ private void insertMessage() {
if ( this.storingMqttMessage.getNamespaceType() == NamespaceType.DOCUMENT ) {
String sqlCollectionName = this.storingMqttMessage.getNamespaceName() + "." + this.storingMqttMessage.getEntityName();
Statement statement = transaction.createStatement();

// Builder which allows to construct the algebra tree which is equivalent to query and is executed
AlgBuilder builder = AlgBuilder.createDocumentBuilder( statement );

BsonDocument document = new BsonDocument();
document.put( "source", new BsonString( this.storingMqttMessage.getUniqueNameOfInterface() ) );
document.put( "topic", new BsonString( this.storingMqttMessage.getTopic() ) );
String msg = this.storingMqttMessage.getMessage();
BsonValue value;
if ( msg.contains( "{" ) && msg.contains( "}" ) ) {
value = BsonDocument.parse( msg );
} else if ( msg.contains( "[" ) && msg.contains( "]" ) ) {
BsonArray bsonArray = new BsonArray();
msg = msg.replace( "[", "" ).replace( "]", "" );
String[] msglist = msg.split( "," );
for ( String stringValue : msglist ) {
stringValue = stringValue.trim();
bsonArray.add( getBsonValue( stringValue ) );
}
value = bsonArray;
} else {
// msg is a single value
value = getBsonValue( msg );
}
document.put( "payload", value );

AlgNode algNode = builder.docInsert( statement, sqlCollectionName, document ).build();

AlgNode algNode = createDocument( statement, sqlCollectionName );
AlgRoot root = AlgRoot.of( algNode, Kind.INSERT );
// for inserts and all DML queries only a number is returned
List<List<Object>> res = executeAndTransformPolyAlg( root, statement, statement.getPrepareContext() );
try {
transaction.commit();
Expand All @@ -100,11 +71,39 @@ private void insertMessage() {
}


private AlgNode createDocument( Statement statement, String sqlCollectionName) {
AlgBuilder builder = AlgBuilder.createDocumentBuilder( statement );

BsonDocument document = new BsonDocument();
document.put( "source", new BsonString( this.storingMqttMessage.getUniqueNameOfInterface() ) );
document.put( "topic", new BsonString( this.storingMqttMessage.getTopic() ) );
String msg = this.storingMqttMessage.getMessage();
BsonValue value;
if ( msg.contains( "{" ) && msg.contains( "}" ) ) {
value = BsonDocument.parse( msg );
} else if ( msg.contains( "[" ) && msg.contains( "]" ) ) {
BsonArray bsonArray = new BsonArray();
msg = msg.replace( "[", "" ).replace( "]", "" );
String[] msglist = msg.split( "," );
for ( String stringValue : msglist ) {
stringValue = stringValue.trim();
bsonArray.add( getBsonValue( stringValue ) );
}
value = bsonArray;
} else {
// msg is a single value
value = getBsonValue( msg );
}
document.put( "payload", value );

AlgNode algNode = builder.docInsert( statement, sqlCollectionName, document ).build();
return algNode;
}


/**
* turns one single value into the corresponding BsonValue
*
* @param value value that has to be casted as String
* @return
*/
protected BsonValue getBsonValue( String value ) {
if ( isInteger( value ) ) {
Expand Down

0 comments on commit 5c3723e

Please sign in to comment.