Skip to content

Latest commit

 

History

History
77 lines (57 loc) · 2.83 KB

README.md

File metadata and controls

77 lines (57 loc) · 2.83 KB

Sensor Streaming Lakehouse

Instruction

The entire stack runs on containerized Docker services.

  1. Run start.sh
  2. (Optional) Use ngrok tcp 27017 to expose mongoDB to external dashboards

Pipeline

  • Data Generator (generator)

    • Generate fake data for a sensor and publish them on a Kafka topic
  • Kafka Broker (broker)

    • Listen to specific Kafka topic and forward the messages to other services
  • MongoDB-Sink (connect)

    • Listen to specific Kafka topic and store the messages in MongoDB
  • MongoDB (mongo)

    • Store the messages received and the aggregated values from the scheduled task
  • Apache Drill (drill)

    • Query the database via SQL queries
    • Expose REST APIs to execute queries
  • Scheduled Aggregator (sched_aggregator)

    • Runs scheduled aggregation tasks through Drill, store the output in MongoDB
    • Evaluate temporal mean on each minute
    • The task is scheduled to run every 5 minutes
    • Only the last 5 minutes are evaluated

Examples

  • Drill logs shows that the scheduled aggregation query is performed successfully every 5 minutes

drill_1

  • Some dashboards realized via Qlik, which connects to MongoDB to fetch up-to-date values

qlik_1 qlik_2

Snippets

  • Access to MongoDB via MongoShell
mongosh "mongodb://localhost:27017/m_db" --username admin1 --password admin1
  • Select all measurements stored in the db
SELECT * FROM mongo.m_db.measure;
  • Evaluate 1-minute mean for the measurement
SELECT nearestDate(TO_TIMESTAMP(CAST(macro.macro_val.`timestamp` AS int)), 'MINUTE') AS `minute_timestamp`, 
       AVG(CAST(`right`(macro.macro_val.`value`,6) AS float) ) AS `measure_value`
    FROM (SELECT value as macro_val FROM mongo.m_db.measure) macro
    GROUP BY minute_timestamp;
  • Evaluate 1-minute mean for the measurement for the last 5 minutes
SELECT nearestDate(TO_TIMESTAMP(CAST(macro.macro_val.`timestamp` AS int)), 'MINUTE') AS `minute_timestamp`, 
        AVG(CAST(`right`(macro.macro_val.`value`,6) AS float) ) AS `measure_value` 
        FROM (SELECT value as macro_val FROM mongo.m_db.measure) macro 
        WHERE nearestDate(TO_TIMESTAMP(CAST(macro.macro_val.`timestamp` AS int)), 'MINUTE') >= DATE_SUB(CURRENT_TIMESTAMP, interval '"+str(batch_time)+"' minute) 
        GROUP BY minute_timestamp

References