User Tools

Site Tools


mqttingressproc

MQTT Ingress Processor

For the Luebeck setup a dedicated MQTT Ingress Processor has been implemented using NodeRed. The incoming data MQTT stream is a sequence of JSON objects. Although this mechanisms has been implemented for the Luebeck setup, it can be used as a common ingress path for the MainsCnt project.

  • There is a public accessible broker, broker.mainscnt.eu.
  • Access to this broker is secured using certificates, a client certificate signed using a CA known at the broker is required. You will get them if you want to contribute.
  • The topic of incoming messages shall start with MainsCnt.
  • The time in this entries must be synchronized against an NTP server or another authorative time source (DCF77 or GPS receiver …), it must be given as an UTC time (without time zone).
  • Currently only one measurement value can be given per message.

The actual processor in NodeRed is as simple as

The most interesting part (if these pieces of code can be called interesting at all) are the three function blocks the flow:

The individual device (contributor or measurement location) is identified by the MQTT topic, which is stored in the database:

FindDevice

msg.incoming = msg.payload
msg.queryParameters = {
    "deviceid": msg.topic
}
msg.payload = `
    SELECT location 
      FROM device_t
      WHERE active = 't' AND
            deviceid = $deviceid
`
return msg

ExtractLocation

if (msg.payload.length == 1) {
    msg.location = msg.payload[0].location
    return msg
} else {
    return
}

  • The full topic of the incoming message will be used to span a relationship to the location information which will be used in the entries in the timeseries database.

The valid flag from the incoming message is considered. However, if a frequency value is out of a defined range, it will be overwritten. The value is nevertheless stored in the database but it is marked as invalid. The name of the measurement location is not taken from the incoming message but from the device entry in the database, retrieved in the both earlier function blocks.

Check and Prepare

const LOWER_BOUND = 48.5
const UPPER_BOUND = 51.5
 
let valid = msg.incoming.Valid
let freq = msg.incoming.Freq
 
if (freq < LOWER_BOUND || freq > UPPER_BOUND) {
    valid = false
}
 
msg.queryParameters = {
    "time": msg.incoming.Time,
    "freq": freq,
    "location": msg.location,
    "valid": valid
}
msg.payload = `
  INSERT INTO mainsfrequency
    (time, freq, location)
    VALUES($time, $freq, $location)
`
return msg;

Both, database and broker are protected by TLS client authentication.

Contributions of data to the MainsCnt project using this ingress path are very welcome. Approach me, if you interested.

mqttingressproc.txt · Last modified: 2021/09/29 09:48 by admin