Solarregler mit MQTT

Vorgeschichte

Dieser Artikel ist die Fortsetzung zu dem Artikel Solarregler Datalogger. Dort wird beschrieben, wie man HW-mäßig den DeltaSol BS Plus Regler ausliest und die Daten in einer Round Robin Datenbank (RRDTools) speichert.

Das Ziel

Um die Daten des Solarreglers auch anderen Systemen zur Verfügung zu stellen (z.B. einem Hausbussystem) sollen die Daten auch über einen MQTT-Broker zur Verfügung gestellt werden. Außerdem bietet es sich dann auch gleich an, die Speicherung und Visualisierung der Daten auf eine Influx-Datenbank mit Grafana-Frontend umzustellen.

Umsetzung

MQTT ist ein Protokoll Standard, der für die Kommunikation zwischen Maschinen entwickelt wurde. Einzelheiten zum MQTT-Standard dazu findet man z.B. auf Wikipedia.

Für die Kommunikation über MQTT braucht man einen Broker, der die Verteilung der Nachrichten an die MQTT-Clients übernimmt. Als Broker wird dabei der Mosquitto-Broker verwendet, der auch auf dem Beaglebone läuft.

Das Python-Script zur Dekodierung der vBus-Nachrichten wird jetzt so erweitert, dass es gleichzeitig die Daten als MQTT-Client an den MQTT-Broker abliefert.

Mit einem zweiten Python-Script werden die Daten dann vom MQTT-Broker abgeholt und in eine Influx-Datenbank geschrieben, die auf einem RaspberryPi 3 läuft. Dieses zweite Script läuft ebenfalls auf dem BeagleBone. Nur die Influx-DB habe ich ausgelagert aus Performance-Gründen ausgelagert.

Installation Mosquitto-Broker

Installation von mosquitto als MQTT-Broker:
sudo apt-get install mosquitto

MQTT-Admin Passwort festlegen:
sudo mosquitto_passwd -c pass.txt admin

Passwort für MQTT-Client anlegen:
sudo mosquitto_passwd -b pass.txt user passwort

In der mosquitto.conf mit nano /etc/mosquitto/mosquitto.conf die folgenden Zeilen einfügen:
allow_anonymous false
password_file /etc/mosquitto/pass.txt

Neustart des Mosquitto-Brokers mit:
sudo systctl restart mosquitto

Solarregler Datalogger mit MQTT-Client

Für die Kommunikation mit dem MQTT-Broker wird die Paho-MQTT Python-Library benutzt. Die Installation der Lib erfolgt durch
pip install paho-mqtt.

Das Pythonscript selbst wird wieder unter /home/debian/solarregler mit
nano /home/debian/solarregler/resol_logger.py angelegt bzw. bearbeitet. Das Script ist eine erweitere Version des Script aus dem ersten Artikel, d.h. auch dieses Script legt auch noch eine RRD-Datenbank an und schreibt die Daten dort hin.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/usr/bin/python
# Script for logging RESOL DeltaBS Plus to a RRDTOOL-Databank
# with integrated MQTT-Client
# Thomas Wedemeyer 2018
# www.thomas-wedemeyer.de
# Remark: 	My system is using the System schemata 9 -> WMZ is not testet

import serial
import sys
import rrdtool
import os
import time
import paho.mqtt.client as mqtt

  
temperatur_list = [0.0 for x in range(4)]
drehzahl1 = 0
drehzahl2 = 0
relmask = 0
errormask = 0
relTime1 = 0 
relTime2 = 0
#wmz = 0	# remove comment tag for WMZ-Support

mqtt_connected = False


def vbus_decode(rxd_list,byte_counter):
    global temperatur_list, drehzahl1,drehzahl2,relmask,errormask,relTime1,relTime2#,wmz
	
	#Check command
    if rxd_list[5] == 0x10:
	if ((rxd_list[1]!=0x10) | (rxd_list[2]!=0x0)):
		return False
        system = ((rxd_list[4]<<8) | rxd_list[3])
        print "Systemcode: ", hex(system)
        datasets = rxd_list[8]	#get the number of datasets
        print "Dataset=",datasets
        #copy the sepetts back to the msb of the bytes 
        for x in range(0,datasets-1,1):
            for bcnt in range(0,4,1):
                if(rxd_list[14+x*6] & (1<<bcnt)) > 0:
                    rxd_list[10+bcnt+x*6] = rxd_list[10+bcnt+x*6] | 0x80
        #crc is only for whiny people... so no  crc checking here

        #stip protocol information of the sepetts & crc
        protocolOffset = 0
        for i in range(0,datasets-1,1):
            for bytecnt in range(0,4,1):
                rxd_list[bytecnt+4*i] = rxd_list[10+bytecnt+4*i+protocolOffset]
            protocolOffset = protocolOffset + 2

        if system == 0x4221: #Delta BS Plus V1
            if datasets != 7:
                return False
            #decoding of the message block according to VBusSpecificationResol.xml for Resol Delta BS Plus
            temperatur_list[0] = ((rxd_list[1]<<8) | rxd_list[0]) / 10.0
            print "Temperatur 0 =", temperatur_list[0]

            temperatur_list[1] = ((rxd_list[3]<<8) | rxd_list[2]) / 10.0
            print "Temperatur 1 =", temperatur_list[1]		

            temperatur_list[2] = ((rxd_list[5]<<8) | rxd_list[4]) / 10.0
            print "Temperatur 2 =", temperatur_list[2]

            temperatur_list[3] = ((rxd_list[7]<<8) | rxd_list[6]) / 10.0
            print "Temperatur 3 =", temperatur_list[3]			
            
            for x in range(0,4,1): # convert to signed temperatur
                if temperatur_list[x] > 3276.0:
                    temperatur_list[x] = temperatur_list[x] - 6553.4

            drehzahl1 = rxd_list[8]
            print "Drehzahl 1=", drehzahl1			
            
            drehzahl2 = rxd_list[9]
            print "Drehzahl 2=", drehzahl2	
             
            relmask = rxd_list[10]
            print "Relmask=", hex(relmask)
            
            errormask = rxd_list[11]
            print "Errormask=", hex(errormask)
            
            time = ((rxd_list[13]<<8) | rxd_list[12])
            print "Time =", time/60,":",time%60			
            
            schema = rxd_list[14]
            print "Schema=", schema	
            
            relTime1 = ((rxd_list[17]<<8) | rxd_list[16]) 
            print "RelTime1 =", relTime1			
            
            relTime2 = ((rxd_list[19]<<8) | rxd_list[18]) 
            print "RelTime2 =", relTime2
            
            #wmz= ((rxd_list[21]<<8) | rxd_list[20]) + (((rxd_list[23]<<8) | rxd_list[22])*1000) + (((rxd_list[25]<<8) | rxd_list[24])*1000000)
            #print "WMZ = ",wmz
            return True

        elif system == 0x427b:  # DeltaBSPlus V2
            if datasets != 9:
                return False
            
            #decoding of the message block according to VBusSpecificationResol.xml for Resol Delta BSPlus
            temperatur_list[0] = ((rxd_list[1]<<8) | rxd_list[0]) / 10.0
            print "Temperatur 0 =", temperatur_list[0]
    
            temperatur_list[1] = ((rxd_list[3]<<8) | rxd_list[2]) / 10.0
            print "Temperatur 1 =", temperatur_list[1]        
    
            temperatur_list[2] = ((rxd_list[5]<<8) | rxd_list[4]) / 10.0
            print "Temperatur 2 =", temperatur_list[2]
    
            temperatur_list[3] = ((rxd_list[7]<<8) | rxd_list[6]) / 10.0
            print "Temperatur 3 =", temperatur_list[3]            
    
            for x in range(0,4,1): # convert to signed temperatur
                if temperatur_list[x] > 3276.0:
                    temperatur_list[x] = temperatur_list[x] - 6553.4

            drehzahl1 = rxd_list[8]
            print "Drehzahl 1=", drehzahl1            
    
            drehzahl2 = rxd_list[12]
            print "Drehzahl 2=", drehzahl2    
             
            relmask = rxd_list[10]
            print "Relmask=", hex(relmask)
            
            errormask = ((rxd_list[21]<<8) | rxd_list[20])
            print "Errormask=", hex(errormask)
            
            time = ((rxd_list[23]<<8) | rxd_list[22])
            print "Time =", time/60,":",time%60            
            
            schema = rxd_list[17]
            print "Schema=", schema    
    
            relTime1 = ((rxd_list[11]<<8) | rxd_list[10]) 
            print "RelTime1 =", relTime1            
    
            relTime2 = ((rxd_list[15]<<8) | rxd_list[14]) 
            print "RelTime2 =", relTime2
            
            #wmz= ((rxd_list[31]<<24) | (rxd_list[30]<<16) |(rxd_list[29]<<8) | rxd_list[28])*1000) + (((rxd_list[25]<<8) | rxd_list[24])*1000000)
            #print "WMZ = ",wmz
            return True
        else:
            print "System not supported"
            return True
    else:
        return False				


def check_database():
	if os.path.isfile("%s/resol_databank.rrd" % (os.path.dirname(os.path.abspath(__file__)))) == False:
		print "Database not found -> Create Database" 
		rrdtool.create(
	    	"%s/resol_databank.rrd" % (os.path.dirname(os.path.abspath(__file__))),
	     	"--start", "now",
	      	"--step", "300",			# 300sec -> 5Min
		  	"--no-overwrite",			# don't overwrite old database
		  	"DS:Temp_Collector:GAUGE:600:-25:150", # 600 sek -> 10Min Haertbeat, Min:-25, Max 150
		  	"DS:Temp_TankBottom:GAUGE:600:-25:150",
		  	"DS:Temp_TankTop:GAUGE:600:-25:150",
		  	"DS:Temp_Recirculation:GAUGE:600:-25:150",
		  	"DS:PumpSpeed:GAUGE:600:0:100",	# 600 Sec Heartbeat, Min:0, Max: 100%
		  	"DS:Valve:GAUGE:600:0:100",
		  	"DS:Error:GAUGE:600:0:255",
		  	"DS:PumpActiv:COUNTER:600:0:99999",
		  	"DS:ValveActiv:COUNTER:600:0:99999",
		  	#DS:Wmz:COUNTER:600:0:99999999
		  	"RRA:AVERAGE:0.5:1:2016", # Average 1 point, 2016 points = 7 days / 5min interval
		  	"RRA:AVERAGE:0.5:12:8760",# Average 12 points (1hour), 8760 Points = 1 Year (24*365)
		  	"RRA:MIN:0.5:288:3650", #Min 288 points (1Day), 3650 Points = 10 Years
		  	"RRA:MAX:0.5:288:3650",
		  	"RRA:LAST:0.5:288:3600") 
	else:
		print "Database allready exits"
		
		
def update_database():
	global temperatur_list, drehzahl1,drehzahl2,relmask,errormask,relTime1,relTime2

	# build data point
	data = 'N:'
	for i in range(4):
		data += str(temperatur_list[i])
		data +=':'
	
	data += str(drehzahl1)
	data +=':'
	data += str(drehzahl2)
	data +=':'
	data += str(errormask)
	data +=':'
	data += str(relTime1)
	data +=':'
	data += str(relTime2)
	#data +=':'
	#data += str(wmz)
 
 	print data
	# insert data into round-robin-database
	rrdtool.update(
  		"%s/resol_databank.rrd" % (os.path.dirname(os.path.abspath(__file__))),data)
					
 
def on_connect(client, userdata, flags, rc):
    global mqtt_connected
    print("Connected to MQTT-Broker with result code " + str(rc))
    mqtt_connected = True



def main():
    #MQTT Connection...
    user="your_userid"
    passw="your_passwd"
    client = mqtt.Client()
    client.on_connect = on_connect
    client.username_pw_set(user,passw)
    client.connect("127.0.0.1", 1883, 60)	# localhost or external ip adr.
    client.loop_start()
    
    check_database()
    ser = serial.Serial(port = "/dev/ttyO2", baudrate=9600)
    ser.close()
    ser.open()
    rxd_list = [0 for x in range(70)]	#define a buffer for the receiver
    counter = 0
    Starttime = time.time()
    if ser.isOpen():
        print "Serial is open!"
        try:
            while True:
                value = 0
                data = ser.read()
                value = ord(data)
                #print "Value: " , hex(value) 
                if value == 0xaa:
                    #print "New Frame"
                    if(counter > 0):
                        if vbus_decode(rxd_list,counter) == True:
                            update_database()
                            ser.close()
                            if mqtt_connected == True:
                                client.publish("solaranlage/collector", temperatur_list[0])
                                client.publish("solaranlage/tank_top", temperatur_list[2])
                                client.publish("solaranlage/tank_bottom", temperatur_list[1])
                                client.publish("solaranlage/recirculation", temperatur_list[3])
                                client.publish("solaranlage/pumpspeed", drehzahl1)
                                client.publish("solaranlage/valve", drehzahl2)
                                client.publish("solaranlage/error", errormask)
                                client.publish("solaranlage/pumpactiv", relTime1)
                                client.publish("solaranlage/relactiv", relTime2)
                                client.loop_stop()    #Stop loop 
                                client.disconnect() # disconnect
                            exit()	
                    counter = 0
                
                rxd_list[counter] = value
                if(counter < 59):
                    counter += 1

                    
                if (time.time() > (Starttime +60)):
                    print "Timeout!"
                    ser.close()
                    exit()
                     			
        except  KeyboardInterrupt:
            ser.close()

if __name__ == "__main__":
        main()

Diese Script wird jetzt alle 5 Minuten über einen Cronjob aufgerufen und liefert die Daten beim MQTT-Broker ab.

MQTT-Client zum Befüllen der Influx-Datenbank

Für die Kommunikation mit der Influx-Datenbank gibt es natürlich wieder eine Python-Library, die vor der Benutzung installiert werden muss:
pip install influxdb

Das Script zum Befüllen der Influx-Datenbank wird jetzt im Verzeichnis “/home/debian/MQTT-Influx” angelegt:
mkdir /home/debian/MQTT-Influx
nano /home/debian/MQTT-Influx/mqtt2influx.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import datetime
import time
from influxdb import InfluxDBClient

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("#")
    
def on_message(client, userdata, msg):
    print("Received a message on topic: " + msg.topic)
    # Use utc as timestamp
    receiveTime=datetime.datetime.utcnow()
    message=msg.payload.decode("utf-8")
    isfloatValue=False
    try:
        # Convert the string to a float so that it is stored as a number and not a string in the database
        val = float(message)
        isfloatValue=True
    except:
        print("Could not convert " + message + " to a float value")
        isfloatValue=False

    if isfloatValue:
        print(str(receiveTime) + ": " + msg.topic + " " + str(val))

        json_body = [
            {
                "measurement": msg.topic,
                "time": receiveTime,
                "fields": {
                    "value": val
                }
            }
        ]

        dbclient.write_points(json_body)
        print("Finished writing to InfluxDB")
        
# Set up a client for InfluxDB
dbclient = InfluxDBClient('IP_of_InfluxDB', 8086, 'user_id', 'pass_id', 'database_name')

# Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
    try:
        #MQTT Connection...
        user="your_mqtt_userid"
        passw="your_mqtt_pw"
        client.username_pw_set(user,passw)
        client.connect("l27.0.0.1", 1883, 60)
        connOK = True
    except:
        connOK = False
    time.sleep(2)

# Blocking loop to the Mosquitto broker
client.loop_forever()

Diese Script wird jetzt wieder mit chmod a+x mqtt2influx.py ausführbar gemacht.

Normalerweise läuft dieses Script die ganze Zeit, wenn es einmal gestartet wurde und überträgt jetzt jede Zustandsänderung auf dem MQTT-Broker mit einem Zeitstempel an die Influx-Datenbank. Da die Welt nicht perfekt ist, kann es aber immer mal passieren, das z.B. ein Timeout bei einer Connection auftritt. In diesem Fall wird das Script mit einem Fehler beendet. Um dieses Problem zu umgehen, wird die Ausführung des Python-Scripts alle 5 Minuten durch ein Bash-Script überwacht. Wenn das Python-Script nicht in der Taskliste gefunden wird, wird das Script automatisch neu gestartet.

nano /home/debian/MQTT-Influx/mqtt2influx_check.sh

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  
#!/bin/bash

cnt=`ps -ef | grep "mqtt2influx.py" | grep -v grep | wc -l`
if [ "$cnt" -eq 0 ]
then
    echo "MQTT2Influx ist abgestuerzt!"
    python /home/debian/MQTT-Influx/mqtt2influx.py &
else
    echo "mqtt2influx is running!"
fi

Das Script wieder ausführbar machen mit:
chmod a+x /home/debian/MQTT-Influx/mqtt2influx_check.sh

Die Ausführung des Scripts alle 5 Minuten starten, indem das Script in die Crontab eingetragen wird mit crontab -e :

*/5 * * * * /home/debian/MQTT-Influx/mqtt2influx_check.sh

So, auf der Beaglebone Seite hat ist jetzt alles fertig. Jetzt nur noch kurz Influx und Grafana auf dem Raspberry Pi installieren…

Influx Installation auf dem Raspberry

InfluxDB installieren mit:

wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add -
source /etc/lsb-release
echo "deb https://repos.influxdata.com/${DISTRIB_ID,,} ${DISTRIB_CODENAME} stable" | sudo tee /etc/apt/sources.list.d/influxdb.list

sudo apt update
sudo apt install influxdb

InfluxDB für den Start beim Booten einrichten:
sudo systemctl enable influxdb
sudo systemctl start influxdb

Admin für InfluxDB anlegen:
Im Terminal influx mit influxaufrufen.

Admin anlegen mit
CREATE USER admin WITH PASSWORD 'admin_password' WITH ALL PRIVILEGES

“einfachen Benutzer” anlegen mit:
CREATE USER "user_name" WITH PASSWORD 'your_password

Datenbank anlegen mit
CREATE DATABASE database_name

Rechte an der Datenbank setzen:
GRANT ALL ON "database_name" TO "user_name"

Influx mit exit verlassen.

Influx.config anpassen mit
sudo nano /etc/influxdb/influxdb.conf

Im Abschnitt http folgende Eintragungen überprüfen:

enabled = true
bind-address = „:8086“
auth-enabled = true

So jetzt noch kurz die Datenbank neu starten mit
sudo systemctl restart influxdb

Installation Grafana auf dem RaspberryPi

APT key & Grafana Repository einbinden:
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add -
echo "deb https://packages.grafana.com/oss/deb stable main" | sudo tee -a /etc/apt/sources.list.d/grafana.list

Grafana installieren:
sudo apt-get update
sudo apt-get install -y grafana

Grafana bei Systemstart hochfahren:
sudo /bin/systemctl enable grafana-server

Manueller Start von Grafana mit:
sudo /bin/systemctl start grafana-server

Jetzt kann man die Grafana-Oberfläche im Webbrowser mit “http:\your_rasppi_ip:3000” öffnen. Das Default Login ist admin mit dem Passwort admin .

Grafana mit Influx verbinden

  • Unter Configuration->Datasource-> Add data source anklicken
  • InfluxDB wählen
  • Name z.B. MyInfluxDB
  • Bei HTTP: http://localhost:8086 eintragen
  • Bei InfluxDB Details “your database”, user, password und HTTP Method “GET” eintragen
  • Speichern nicht vergessen…

Erstes Dasboard anlegen

  • Bei dem “+"-Zeichen -> Create Dashboard
  • Add Query
  • Query = MyInfluxDB
  • From: default
  • select measurement anklicken und Topics aus der DB wählen z.B. /solaranlage/collector
  • Visualisierung anpassen usw.
  • Und fertig….

Bei mir sieht das Dashboard dann am Ende so aus:

Grafana Dashboard mit Temperaturkurven

Ergebnis

So, wie man mal wieder sieht ist es relativ einfach aus einem kleinen Problem ein viel größeres zu machen ohne, dass man wirklich viel erreicht hat. 😏 Aber schön anzusehen ist die neue Darstellung der Temperaturverläufe auf jeden Fall!

Aber im Ernst: Die Anbindung der Solaranlage über MQTT ist nicht nur für die Influx Anbindung interessant. Es ergeben sich dadurch noch viel mehr Möglichkeiten z.B. aus die dem Bereich der Hausautomatisierung. Ich habe inzwischen die Solaranlage über MQTT mit in den Homeassistant mit eingebunden, wo sich wiederum weitere Möglichkeiten ergeben… Dazu aber später mehr!

Auch die Möglichkeiten, die die Kombination aus Influx und Grafana bieten sind wirklich spannend!