file_type_apache Asset 1 file_type_apache Asset 1

Internet Of Things - publishing messages and inserting the data into database

Internet Of Things - publishing messages and inserting the data into database @ www.Vasilev.link DevOps consultant
Jul 1, 2016

As I described in previous post - The Internet of things with RabbitMQ, Python, MQTT and AMQP
we have created one publishing service and now we are going to add some info in the messages and insert the data into our database.

For the publishing part, I am using Python script:

import paho.mqtt.client as paho
import time
from time import gmtime, strftime
import os
import psutil

--configuration from the previous post--

while True:
    disks=os.popen("df -H | grep -vE '^Filesystem|tmpfs|cdrom'" awk '{ print $5":"$6":"$3":"$2 }'").read()
    memstat=os.popen("ps -eo pcpu,pmem,pid,user,args --no-headers| sort -t. -nk1,2 -k4,4 -r |head -n 5 | awk {'print $1":"$2":"$3":"$4":"$5$NF'}").read()
    uptime=os.popen("uptime -p").read()
    data = ""+thisserver+""+strftime("%Y-%m-%d %H:%M:%S", gmtime())+""
    data=data+""+disks.replace('\n', ';')+""
    data=data+""+memstat.replace('\n', ';')+""
    data=data+""+str(psutil.cpu_percent())+""
    data=data+""+str(psutil.virtual_memory().percent)+""
    data=data+""+uptime.replace('\n', ';')+""
    (rc, mid) = client.publish(mqtttopic, str(data), qos=1)
    time.sleep(120)

Here we are adding sleep of 120 seconds it means script will run in interval of 2 minutes. You can configure as much as you want. In this example I am forming a XML message, because it is easier to read it with different types of languages.

For the subscriber part, I am using php script. In my case we have a MySQL database, but of course you can use your own:)
First you have to download PhpAMQLib libraries.
Here is the php code:


function __autoload($class)
{
    require str_replace("\\","/",$class) . '.php';
}
use PhpAmqpLib\Connection\AMQPStreamConnection;
  function recieveQueue($queueName){
    $connection = new AMQPStreamConnection('AMQP server', 5672, 'user', 'pass','virtual host');
    $channel = $connection->channel();
    $channel->queue_declare($queueName, true, false, false, false);
    $callback = function($msg) {
     $xml=simplexml_load_string($msg->body);
     echo "<br><br>server:".$xml->server;
     echo "<br>date:".$xml->date;
     echo "<br>disks:".$xml->disks;
     echo "<br>uptime:".$xml->uptime;
     echo "<br>mem usage:".$xml->mem;
     echo "<br>cpu usage:".$xml->cpu;
    echo "<br>processes:".$xml->processes;  
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    $timeout = 1;
    while (count($channel->callbacks)) {
      try{
        $channel->wait(null, false , $timeout);
      }catch(\PhpAmqpLib\Exception\AMQPTimeoutException $e){
        $channel->close();
        $connection->close();
       // echo "empty queue";
        exit;
      }
    }
  }
recieveQueue('queue name');

The script for publishing the messages is running each 2 minutes, the one for reading the messages is each 10 minutes.