As I described in previous post - The Internet of things with RabbitMQ, Python, MQTT and AMQP
we have created one publishing service and now we are going to add some info in the messages and insert the data into our database.
For the publishing part, I am using Python script:
import paho.mqtt.client as paho
import time
from time import gmtime, strftime
import os
import psutil
--configuration from the previous post--
while True:
disks=os.popen("df -H | grep -vE '^Filesystem|tmpfs|cdrom'" awk '{ print $5":"$6":"$3":"$2 }'").read()
memstat=os.popen("ps -eo pcpu,pmem,pid,user,args --no-headers| sort -t. -nk1,2 -k4,4 -r |head -n 5 | awk {'print $1":"$2":"$3":"$4":"$5$NF'}").read()
uptime=os.popen("uptime -p").read()
data = ""+thisserver+""+strftime("%Y-%m-%d %H:%M:%S", gmtime())+""
data=data+""+disks.replace('\n', ';')+""
data=data+""+memstat.replace('\n', ';')+""
data=data+""+str(psutil.cpu_percent())+""
data=data+""+str(psutil.virtual_memory().percent)+""
data=data+""+uptime.replace('\n', ';')+""
(rc, mid) = client.publish(mqtttopic, str(data), qos=1)
time.sleep(120)
function __autoload($class)
{
require str_replace("\\","/",$class) . '.php';
}
use PhpAmqpLib\Connection\AMQPStreamConnection;
function recieveQueue($queueName){
$connection = new AMQPStreamConnection('AMQP server', 5672, 'user', 'pass','virtual host');
$channel = $connection->channel();
$channel->queue_declare($queueName, true, false, false, false);
$callback = function($msg) {
$xml=simplexml_load_string($msg->body);
echo "<br><br>server:".$xml->server;
echo "<br>date:".$xml->date;
echo "<br>disks:".$xml->disks;
echo "<br>uptime:".$xml->uptime;
echo "<br>mem usage:".$xml->mem;
echo "<br>cpu usage:".$xml->cpu;
echo "<br>processes:".$xml->processes;
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
$timeout = 1;
while (count($channel->callbacks)) {
try{
$channel->wait(null, false , $timeout);
}catch(\PhpAmqpLib\Exception\AMQPTimeoutException $e){
$channel->close();
$connection->close();
// echo "empty queue";
exit;
}
}
}
recieveQueue('queue name');