SolarEdge SE3000H monitoring

SolarEdge SE3000H monitoring

Locally monitor your SolarEdge solar panel installation. For example by using Telegraf, InfluxDB and Grafana running on a Raspberry Pi.

There are, at the time of writing, 2 ways of getting data from your solar panel installation:

  1. Use the SolarEdge cloud API. SolarEdge logs metrics in their cloud which one can access through their mobile app or website. Your inverter must be connected to the internet. It contains rich data, per panel if you have optimizers, and through its API one can fetch all data locally (max 300 requests a day). The update interval is 15 minutes, so not suitable for a live dashboard. It is not possible to read out this data locally through LAN or Wifi (although many people would like so), although the webserver is running on your inverter access is blocked1. According to SolarEdge’s support for security reasons.

  2. SunSpec Modbus protocol (over TCP or external RS485 meter) is also supported by the inverter. Data can be requested supporting a maximum update interval of 1 second. Modbus is disabled by default. However per panel data, if you have optimizers, cannot be queried. Numerous people are hoping this will be supported one day. The Modbus interface is an open standard so often supported by logging software out of the box. All data is read-only2.

Both methods can be used at the same time and do not interfere with one another. Both are officially supported and documented.

SolarEdge has a bit of a backwards set-up where the level of detail available in the cloud platform is determined by the access give to you by your installer. Hence some people might see more that than others. However anyway can create an installer account on SolarEdges’s website, so if you want to see all data and your installer does not want to co-operate one can circumvent them. This is not required for this post’s use-case.

This post is written for the SH3000H inverter (no LCD). My set-up is very straight forward, just a single converter without battery or meters.

While the method itself should work for all models some configuration steps on your inverter might differ, consult SolarEdge’s documentation.

Monitoring set-up

This post will show both Modbus and Cloud logging. They are independent of each other.

This post assumes you have Telegraf, InfluxDb (2) and Grafana installed on your machine, for example a Raspberry Pi. One can substitute InfluxDb with another timeseries database (e.g. Prometheus) and Grafana with another visualization tool (e.g. Chronograf) as desired. To install, and set-up, these applications see this and this post.

Telegraf can be replaced by custom scripts, there are quite a few open source options available where Python seems the most popular at the time of writing. However Telegraf has built-in Modbus support which simplifies the set-up (especially if you already have Telegraf running anyway).

Configure your inverter

This information can also be found in SolarEdge’s official documentation.

Configuration can be done though SolarEdge’s SetApp mobile application, however not every installer enables this for its customers (you).

Everything can also be configured by the wifi direct on the inverter. To enable it, move the switch on the inverter to P and within 2 seconds let go. Connect to its wifi network (inverter name will be in the SSID), password is published on the right side of the inverter. Open your browser and go to: http://172.16.0.1. From this page one can configure everything without SetApp permission or an installer account.

Make sure you are in reach of the inverter’s Wifi, and quick, since the Wifi will turn of after a couple of minutes if no request is received.

SolarEdge also has a mobile app through which the portal can be accessed, mySolarEdge, however at the time of writing it failed to connect for me.

Enable MODBUS over TCP: Site Communication > Modbus TCP > Enable. Default port is 1502, default device ID is 1.

The TCP server idle time is 2 minutes. In order to leave the connection open, at least 1 request should be made within 2 minutes. The connection can remain open without any MODBUS requests.

Hence if your inverter is re-started and your logging infrastructure does not send a ModBus request within 2 minutes all subsequent requests will fail.

For a more easy configuration of the logging infrastructure it is advised to assign your inverter a static IP: Site Communication > Ethernet > Static and fill in the network details.

Note: from experience, if the inverter is reset, P switched to the right, the IP setting might be reset to dynamic again (the default).

Configure InfluxDB

InfluxDB can be configured through its web UI or API.

If desired, or not already done so, generate a dedicated access token for per bucket for Telegraf.

Modbus over TCP

Create a new bucket, e.g. called solaredge with retention policy 1d.

If the data is logged with relative high frequency, e.g. to provide a live dashboard, and one wants to retain it for longer periods of time downsampling the data to save storage space is strongly advised. It will also positively impact the query performance due to there being less data. Create another bucket, canonically called [bucket name]_[downsample interval], for example solaredge_15m. The raw measurements will be logged in solaredge, get periodically downsampled (aggregated) and saved to solaredge_15m. One may create as much downsample levels as desired, for example keep 15m downsampled data for 1 month and have a 1d downsampled bucket retained forever. Make sure the retention policy for each bucket is properly set. I will stick to 1 downsample level, 15 minutes, keeping the data forever.

Note: due to the nature of the set-up the downsampled bucket(s) will never contain real time data.

Note: for backups the raw bucket, solaredge, may be ignored as the data is only kept for 1d anyway.

In InfluxDb create a new task running every hour for example:

{
 "meta": {
  "version": "1",
  "type": "task",
  "name": "Downsample Modbus SolarEdge-Template",
  "description": "template created from task: Downsample Modbus SolarEdge"
 },
 "content": {
  "data": {
   "type": "task",
   "attributes": {
    "status": "active",
    "name": "Downsample Modbus SolarEdge",
    "flux": "option task = {name: \"Downsample Modbus SolarEdge\", cron: \"0 * * * *\", offset: 1m}\n\nsource_bucket = \"solaredge\"\ndestination_bucket = \"solaredge_15m\"\ndestination_org = \"piserver1\"\naggregateField = (data=<-, name) => {\n\tfilteredData = data\n\t\t|> filter(fn: (r) =>\n\t\t\t(r[\"_field\"] == name))\n\tmedian = filteredData\n\t\t|> median()\n\t\t|> drop(columns: [\"_start\"])\n\t\t|> rename(columns: {_stop: \"_time\"})\n\t\t|> set(key: \"aggType\", value: \"median\")\n\tmax = filteredData\n\t\t|> max()\n\t\t|> drop(columns: [\"_start\", \"_time\"])\n\t\t|> rename(columns: {_stop: \"_time\"})\n\t\t|> set(key: \"aggType\", value: \"max\")\n\tmin = filteredData\n\t\t|> min()\n\t\t|> drop(columns: [\"_start\", \"_time\"])\n\t\t|> rename(columns: {_stop: \"_time\"})\n\t\t|> set(key: \"aggType\", value: \"min\")\n\n\tunion(tables: [min, max, median])\n\t\t|> to(bucket: destination_bucket, org: destination_org)\n\n\treturn data\n}\ndata = from(bucket: source_bucket)\n\t|> range(start: -1h)\n\t|> filter(fn: (r) =>\n\t\t(r[\"_measurement\"] == \"inverter\"))\n\t|> window(every: 15m)\n\t|> aggregateField(name: \"I_AC_Current\")\n\t|> aggregateField(name: \"I_AC_CurrentA\")\n\t|> aggregateField(name: \"I_AC_CurrentB\")\n\t|> aggregateField(name: \"I_AC_CurrentC\")\n\t|> aggregateField(name: \"I_AC_VoltageAB\")\n\t|> aggregateField(name: \"I_AC_VoltageBC\")\n\t|> aggregateField(name: \"I_AC_VoltageCA\")\n\t|> aggregateField(name: \"I_AC_VoltageAN\")\n\t|> aggregateField(name: \"I_AC_VoltageBN\")\n\t|> aggregateField(name: \"I_AC_VoltageCN\")\n\t|> aggregateField(name: \"I_AC_Power\")\n\t|> aggregateField(name: \"I_AC_Frequency\")\n\t|> aggregateField(name: \"I_AC_VA\")\n\t|> aggregateField(name: \"I_AC_VAR\")\n\t|> aggregateField(name: \"I_AC_PF\")\n\t|> aggregateField(name: \"I_DC_Current\")\n\t|> aggregateField(name: \"I_DC_Voltage\")\n\t|> aggregateField(name: \"I_DC_Power\")\n\t|> aggregateField(name: \"I_Temp\")\n\ndata\n\t|> filter(fn: (r) =>\n\t\t(r[\"_field\"] == \"I_AC_Energy_WH\"))\n\t|> last()\n\t|> drop(columns: [\"_start\", \"_time\"])\n\t|> rename(columns: {_stop: \"_time\"})\n\t|> to(bucket: destination_bucket, org: destination_org)",
    "cron": "0 * * * *",
    "offset": "1m"
   },
   "relationships": {
    "label": {
     "data": []
    }
   }
  },
  "included": []
 },
 "labels": []
}

Note: while a reduce() function (min, max, median at once) might seem more efficient, empirically measured it is >x5 slower.

Note: the status is not saved when downsampled.

See also this Influxdb blogpost.

SolarEdge Cloud API

Create a new bucket, e.g. called solaredge_cloud, keep the data forever.

Configure Telegraf

Remember to restart the Telegraf service when a config file has been changed.

Modbus over TCP

Info based on the latest technical note 2.3, Feb 2021. For the latest version consult the “SolarEdge’s SunSpec implementation Technical note” on their website.

Only a single host/application/script can interact with the Modbus of the inverter at the same time. See SolarEdge’s documentation for more info.

Add the Telegraf config file in /etc/telegraf/telegraf.d/solaredge_modbus.conf and fill in your outputs.influxdb_v2 details.

[[outputs.influxdb_v2]]
  urls = ["http://127.0.0.1:8086"]
  token = ""
  organization = ""
  bucket = "solaredge"
  namepass = ["inverter"]
  
# ------------------------------------------------ Inputs -------------------------------------------- 

[[inputs.modbus]]
 interval = "15s"
 name_override="inverter"
 name = "NOT USED" # Not used
 tagexclude = ["type", "name", "host"]

 slave_id = 1
 timeout = "5s"
 controller = "tcp://192.168.1.12:1502"

 # Note: most static data is omitted (pointless to monitor)
 # Note: battery and meter data is omitted (as i have none)
 #
 # Always 0/not used: 
 #  { name = "I_Status_Vendor", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [108]} 

 # Disabled for now since Telegraf does not support reading strings
 # { name = "c_serialnumber", address = [52, 67]},
 holding_registers = [
    { name = "C_SunSpec_DID", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [69]},
    { name = "I_AC_Current", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [71]},
    { name = "I_AC_CurrentA", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [72]},
    { name = "I_AC_CurrentB", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [73]},
    { name = "I_AC_CurrentC", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [74]},
    { name = "I_AC_Current_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [75]},
    { name = "I_AC_VoltageAB", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [76]},
    { name = "I_AC_VoltageBC", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [77]},
    { name = "I_AC_VoltageCA", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [78]},
    { name = "I_AC_VoltageAN", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [79]},
    { name = "I_AC_VoltageBN", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [80]},
    { name = "I_AC_VoltageCN", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [81]},
    { name = "I_AC_Voltage_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [82]},
    { name = "I_AC_Power", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [83]},
    { name = "I_AC_Power_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [84]},
    { name = "I_AC_Frequency", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [85]},
    { name = "I_AC_Frequency_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [86]},
    { name = "I_AC_VA", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [87]},
    { name = "I_AC_VA_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [88]},
    { name = "I_AC_VAR", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [89]},
    { name = "I_AC_VAR_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [90]},
    { name = "I_AC_PF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [91]},
    { name = "I_AC_PF_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [92]},
    { name = "I_AC_Energy_WH", byte_order = "ABCD", data_type = "INT32", scale=1.0,  address = [93, 94]},
    { name = "I_AC_Energy_WH_SF", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [95]},
    { name = "I_DC_Current", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [96]},
    { name = "I_DC_Current_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [97]},
    { name = "I_DC_Voltage", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [98]},
    { name = "I_DC_Voltage_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [99]},
    { name = "I_DC_Power", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [100]},
    { name = "I_DC_Power_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [101]},
    { name = "I_Temp", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [103]},
    { name = "I_Temp_SF", byte_order = "AB", data_type = "INT16", scale=1.0,  address = [106]},
    { name = "I_Status", byte_order = "AB", data_type = "UINT16", scale=1.0,  address = [107]}
  ]

  # Must come last because Telegraf
  [inputs.modbus.tags]
   site = "" # Site ID can be found on SolarEdge's website
   sn = "" # Inverter serial

# Apply the scaling and drop the scaling fields.
[[processors.starlark]]
  namepass = ["inverter"]
  source = '''
def scale(metric, name):
        metric.fields[name] *= pow(metric.fields[name + "_SF"])
        metric.fields.pop(name + "_SF")

def pow(exp):
        # It works i suppose
        return float("1e{}".format(exp))

def drop(metric, name):
	metric.fields.pop(name)
	metric.fields.pop(name + "_SF")

def apply(metric):
	type = metric.fields["C_SunSpec_DID"]
	metric.fields.pop("C_SunSpec_DID")

        I_AC_Voltage_Scale = pow(metric.fields["I_AC_Voltage_SF"])
        metric.fields["I_AC_VoltageAB"] *= I_AC_Voltage_Scale
        metric.fields["I_AC_VoltageBC"] *= I_AC_Voltage_Scale
        metric.fields["I_AC_VoltageCA"] *= I_AC_Voltage_Scale
        metric.fields["I_AC_VoltageAN"] *= I_AC_Voltage_Scale
        metric.fields["I_AC_VoltageBN"] *= I_AC_Voltage_Scale
        metric.fields["I_AC_VoltageCN"] *= I_AC_Voltage_Scale
        metric.fields.pop("I_AC_Voltage_SF")

	# Drop meaningless measurements
	if type == 101: # Single Phase
		metric.fields.pop("I_AC_VoltageBC")
		metric.fields.pop("I_AC_VoltageCA")
		metric.fields.pop("I_AC_VoltageAN")
		metric.fields.pop("I_AC_VoltageBN")
		metric.fields.pop("I_AC_VoltageCN")
	elif type == 102: # Split Phase
		metric.fields.pop("I_AC_VoltageCA")
		metric.fields.pop("I_AC_VoltageCN")
	
        scale(metric, "I_AC_Frequency")
	scale(metric, "I_DC_Voltage")
	scale(metric, "I_Temp")

	# Drop obsolete measurements at night/sleep mode to reduce stored data size.
	if metric.fields["I_Status"] == 2:
		drop(metric, "I_AC_Current")
		metric.fields.pop("I_AC_CurrentA")
		metric.fields.pop("I_AC_CurrentB")
		metric.fields.pop("I_AC_CurrentC")
	
		drop(metric, "I_AC_Power")		
		drop(metric, "I_AC_VA")
		drop(metric, "I_AC_VAR")
		drop(metric, "I_AC_PF")
		drop(metric, "I_AC_Energy_WH")
		drop(metric, "I_DC_Current")
		drop(metric, "I_DC_Power")
	else:
		I_AC_Current_Scale = pow(metric.fields["I_AC_Current_SF"])
		metric.fields["I_AC_Current"] *= I_AC_Current_Scale
		metric.fields["I_AC_CurrentA"] *= I_AC_Current_Scale
		metric.fields["I_AC_CurrentB"] *= I_AC_Current_Scale
		metric.fields["I_AC_CurrentC"] *= I_AC_Current_Scale
		metric.fields.pop("I_AC_Current_SF")
		
		# Drop obsolete measurments
		if type == 101: # Single Phase
			metric.fields.pop("I_AC_CurrentB")
			metric.fields.pop("I_AC_CurrentC")
		elif type == 102: # Split Phase
			metric.fields.pop("I_AC_CurrentC")

		scale(metric, "I_AC_Power")
		scale(metric, "I_AC_VA")
		scale(metric, "I_AC_VAR")
		scale(metric, "I_AC_PF")
		scale(metric, "I_AC_Energy_WH")
		scale(metric, "I_DC_Current")
		scale(metric, "I_DC_Power")

	# Convert serial to tag
	#metric.tags["sn"] = metric.fields["C_SerialNumber"]
	#metric.fields.pop("C_SerialNumber")

	# Correct the type, we multiply by float but still some are reported as int (for reasons unkown).
	for k, v in metric.fields.items():
		if k != "I_Status":
			metric.fields[k] = float(v)

	return metric
'''

To test if it works run: telegraf -config /etc/telegraf/telegraf.d/solaredge_modbus.conf -test.

SolarEdge Cloud API

Telegraf supports reading data from HTTP(S) endpoints, however SolarEdge’s API requires, for most calls, to specify the desired time range (e.g. last hour). This requires information about the update interval and dynamic urls; neither are supported by Telegraf at the moment. Alas.

As alternative there are a couple of options: create a script and call it with an execd input or create a service (systemd) which writes to file/tcp (unix domain)/udp (which is read by Telegraf). One could also opt to take Telegraf completely out of the equation since logging to InfluxDb is easy to implement and some scripting is required anyway. All have pro’s and con’s.

I went for a Python script running as a service (execd) due to it being only a single file (compiled languages typically require some build/project files as well).

The Python daemon, copy the file to /var/lib/telegraf/ and mark it as executable: chmod +x /path. Do not forget to fill in the token (found on the solaredge website) and login details; top of the script.

Install the Python packages: python-requests, python-pytz

Some (generated) files are stored in the telegraf’s user home folder, /var/lib/telegraf/.

The script is only tested on Linux, YMMV.

To get all data multiple requests are required, at the time of writing the rate limit is 300 requests per day. See SolarEdge’s documentation for more info. This limits the update rate. The Modbus data will be used for a live dashboard and the cloud data will only be scraped once a day.

By default the panel data will contain 2 aggregate values, both with a particular id. They are typically not of interest and can be excluded in this script (exercise for the reader) or when visualizing the data (e.g. in Grafana).

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
#!/usr/bin/env python3

import json
import datetime
import codecs
import sys
import os
import time
import ast
import getpass
# 3rd party dependencies:
import requests
import pytz

# Stand-alone daemon managed by Telegraf.
# Up to date with: Monitoring server API version January 2019
# Note: document does not correspond to reality
#
# Automatically fetches all sites connected to the account/API key
# Does not support the whole API, just what is required.
#
SETTING_API_KEY = ''
SETTING_SITE_USERNAME = ''
SETTING_SITE_PW = ''
# Args:
# - 'history' to scrape the past history from the cloud
# - 'debug' to run the update loop once
# - No args to run the daily loop

# -----------------------------------------------------------------------

# SolarEdge does not expose all information they have through their API,
# hence some of it has to be scraped from their website. As a bonus
# their API is rate limited but their website is not (or a much higher limit?).

BASE_SITE_PANELS_URL = 'https://monitoring.solaredge.com/solaredge-web/p/playbackData'
SITE_LOGIN_URL = 'https://monitoring.solaredge.com/solaredge-apigw/api/login'
BASE_API_URL = 'https://monitoringapi.solaredge.com'
REQUEST_TIMEOUT = 60
SITE_COOKIE_FILE = 'solaredge.com.cookies'
LAST_SUCCESSFUL_UPDATE_FILE = 'lastupdated'
INSTALLATION_INFO_FILE = 'installinfo'
SITE_IDS = []
SITES = ''  # Same as SITE_IDS but as string
SERIALS = {}
SITE_TIMEZONES = {}
HAS_OPTIMIZERS = {}
LAST_UPDATES = {}
HOME_DIR = ''
HISTORY_SCRAPER_MAX_API_CALLS = 280  # Limit is 300/day, take some margin
# Data is updated once a day at this interval. Assumed to be at the ~end of the day.
UPDATE_INTERVAL_HOUR = 23
UPDATE_INTERVAL_MIN = 50

# ------------------------------ Utils -----------------------------------------


# Write to Telegraf
def flush():
    sys.stdout.flush()
    sys.stderr.flush()


def flush_and_exit(code: int):
    flush()
    exit(code)


def format_datetime_url(date: datetime.datetime):
    return date.strftime('%Y-%m-%d %H:%M:%S')


def format_date_url(date: datetime.datetime):
    return date.strftime('%Y-%m-%d')


def wh_unit_to_multiplier(unit: str):
    first = unit[:1]
    if first == 'G':
        return 1000000000.0
    if first == 'M':
        return 1000000.0
    if first == 'k':
        return 1000.0
    return 1.0


def print_err(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


# In ns
def to_unix_timestamp(date: str):
    return f"{int(datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S').timestamp())}000000000"


def safe_str_to_float(num: str):
    # A better way is to know the used locale and convert, alas it is not exposed by SolarEdge cloud.
    if num.find(",") != -1 and num.find(
            "."):  # Locals that use 1.000,00 for 1000
        num = num.replace(".", "")
    return float(num.replace(",", "."))


def get_date_intervals(start: datetime.datetime, end: datetime.datetime,
                       maxDays: int):
    intervals = []

    days = (end - start).days
    prev = start
    while True:
        if days <= (maxDays + 1):
            intervals.append((prev, prev + datetime.timedelta(days=days)))
            break

        days -= maxDays + 1  # +1 because we want no overlaps between the intervals

        next = prev + datetime.timedelta(days=maxDays)
        intervals.append((prev, next))
        prev = next + datetime.timedelta(days=1)

    return intervals


# Because Python, datetime is not supported by literal_eval(...)
# Source: https://stackoverflow.com/questions/4235606/python-ast-literal-eval-and-datetime
def parse_datetime_dict(astr, debug=False):
    try:
        tree = ast.parse(astr)
    except SyntaxError:
        raise ValueError(astr)
    for node in ast.walk(tree):
        if isinstance(node,
                      (ast.Module, ast.Expr, ast.Dict, ast.Str, ast.Attribute,
                       ast.Num, ast.Name, ast.Load, ast.Tuple)):
            continue
        if (isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute)
                and node.func.attr == 'datetime'):
            continue
        if debug:
            attrs = [attr for attr in dir(node) if not attr.startswith('__')]
            print(node)
            for attrname in attrs:
                print('    {k} ==> {v}'.format(k=attrname,
                                               v=getattr(node, attrname)))
        raise ValueError(astr)
    return eval(astr)


def format_L_data(data, label: str):
    return f",I_{label}_AC_Voltage={data['acVoltage']},I_{label}_AC_Current={data['acCurrent']},I_{label}_AC_PF={data['cosPhi']},I_{label}_AC_Freq={data['acFrequency']},I_{label}_AC_VAR={data['reactivePower']},I_{label}_AC_VA={data['apparentPower']},I_{label}_AC_Power={data['activePower']}"


# --------------------------- Main() helpers ------------------------------


# Should only be called once
def initialize_home_dir():
    global HOME_DIR

    # By default the env will be set to the session user (alarm's home) which the telegraf user
    # has no access too. Hence when writing files a full path must be used.
    HOME_DIR = os.path.expanduser(
        '~' + getpass.getuser()
    )  # get the home of the telegraf user (same as the location of the script)


# Should only be called once
def initialize_installation_info():
    global SITES, SERIALS, SITE_TIMEZONES, SITE_IDS, HAS_OPTIMIZERS

    # Check if the info is already cached
    if os.path.exists(os.path.join(HOME_DIR, INSTALLATION_INFO_FILE)):
        with open(os.path.join(HOME_DIR, INSTALLATION_INFO_FILE), "r") as f:
            data = ast.literal_eval(f.read())
            SITE_IDS = data['SITE_IDS']
            SITES = ','.join(SITE_IDS)
            SERIALS = data['SERIALS']
            SITE_TIMEZONES = data['SITE_TIMEZONES']
            HAS_OPTIMIZERS = data['HAS_OPTIMIZERS']
            # TODO TBD should check for equipment updates (there is an API available)
            return True

    # Get sites
    r = requests.get(f"{BASE_API_URL}/sites/list.json",
                     {'api_key': SETTING_API_KEY},
                     timeout=REQUEST_TIMEOUT)
    if r.status_code != 200:
        print_err(f"SolarEdge Cloud: Sites: HTTP {r.status_code} : {r.url}")
        return False

    # Parse response
    for site in r.json()['sites']['site']:
        site_id = str(site['id'])
        SITE_IDS.append(site_id)
        SITE_TIMEZONES[site_id] = site['location']['timeZone']
        HAS_OPTIMIZERS[site_id] = site['type'].find("Optimizers") != -1
    SITES = ','.join(SITE_IDS)

    # Get serials
    # Note: 1 call per site
    for site in SITE_IDS:
        r = requests.get(f"{BASE_API_URL}/site/{site}/inventory",
                         {'api_key': SETTING_API_KEY},
                         timeout=REQUEST_TIMEOUT)
        if r.status_code != 200:
            print_err(
                f"SolarEdge Cloud: Inventory: HTTP {r.status_code} : {r.url}")
            return False

        # Parse response
        serials = []
        for inverter in r.json()['Inventory']['inverters']:
            serials.append(inverter['SN'])
        SERIALS[site] = serials

    # Cache data
    with open(os.path.join(HOME_DIR, INSTALLATION_INFO_FILE), "w") as f:
        f.write(
            repr({
                'SITE_IDS': SITE_IDS,
                'SERIALS': SERIALS,
                'SITE_TIMEZONES': SITE_TIMEZONES,
                'HAS_OPTIMIZERS': HAS_OPTIMIZERS
            }))

    return True


# Should only be called once
def initialize_last_updated():
    global LAST_UPDATES

    if os.path.exists(os.path.join(HOME_DIR, LAST_SUCCESSFUL_UPDATE_FILE)):
        with open(os.path.join(HOME_DIR, LAST_SUCCESSFUL_UPDATE_FILE),
                  "r") as f:
            LAST_UPDATES = parse_datetime_dict(f.read())
    else:
        # Well it must be intialized at something
        # Note: will not auto scrape full history
        resetDate = datetime.datetime.now().replace(
            hour=UPDATE_INTERVAL_HOUR, minute=UPDATE_INTERVAL_MIN,
            second=0) - datetime.timedelta(days=1)
        site_dict = {}
        for site in SITE_IDS:
            site_dict[site] = resetDate
        LAST_UPDATES['power'] = site_dict.copy()
        LAST_UPDATES['energy'] = site_dict.copy()
        LAST_UPDATES['data'] = site_dict.copy()
        LAST_UPDATES['playback'] = site_dict.copy()


def ensure_logged_in(session: requests.Session, function):
    if os.path.exists(os.path.join(HOME_DIR, SITE_COOKIE_FILE)):
        with open(os.path.join(HOME_DIR, SITE_COOKIE_FILE), 'r') as f:
            session.cookies.update(
                requests.utils.cookiejar_from_dict(json.load(f)))
            response = function()
            if response.status_code == 200:
                return response

    # Log in again
    session.post(SITE_LOGIN_URL,
                 headers={"Content-Type": "application/x-www-form-urlencoded"},
                 data={
                     "j_username": SETTING_SITE_USERNAME,
                     "j_password": SETTING_SITE_PW
                 })
    with open(os.path.join(HOME_DIR, SITE_COOKIE_FILE), 'w') as f:
        json.dump(requests.utils.dict_from_cookiejar(session.cookies), f)

    return function()


def update_all_data(endTime: datetime.datetime):
    playbackTimeStamps = LAST_UPDATES['playback']
    for site in SITE_IDS:
        if HAS_OPTIMIZERS[site]:
            nr_days = max((endTime - playbackTimeStamps[site]).days,
                          7)  # API only supports up to 1 week history
            days = [0]
            if nr_days != 1:
                days = list(range(-nr_days, 0, 1))
            if get_playback_data_site(days, site):
                playbackTimeStamps[site] = endTime
    powerTimeStamps = LAST_UPDATES['power']
    for site in SITE_IDS:
        if get_power_api(site, powerTimeStamps[site], endTime):
            powerTimeStamps[site] = endTime
    energyTimeStamps = LAST_UPDATES['energy']
    for site in SITE_IDS:
        if get_energy_api(site, energyTimeStamps[site], endTime):
            energyTimeStamps[site] = endTime
    dataTimeStamps = LAST_UPDATES['data']
    for site in SITE_IDS:
        if get_data_api(site, dataTimeStamps[site], endTime):
            dataTimeStamps[site] = endTime

    flush()

    # Persist last successful update
    with open(os.path.join(HOME_DIR, LAST_SUCCESSFUL_UPDATE_FILE), "w") as f:
        f.write(repr(LAST_UPDATES))


# --------------------------- Data gathering ----------------------------

# API


def get_power_api(site: str, startTime: datetime, endTime: datetime):
    r = requests.get(f"{BASE_API_URL}/site/{site}/powerDetails.json", {
        'startTime': format_datetime_url(startTime),
        'endTime': format_datetime_url(endTime),
        'api_key': SETTING_API_KEY
    },
        timeout=REQUEST_TIMEOUT)
    if r.status_code != 200:
        print_err(f"SolarEdge Cloud: Power: HTTP {r.status_code} : {r.url}")
        return False

    # Parse request
    json = r.json()
    multiplier = wh_unit_to_multiplier(json['powerDetails']['unit'])
    for meter in json['powerDetails']['meters']:
        type = meter['type'].lower()
        for point in meter['values']:
            if 'value' in point:
                print(
                    f'power,site={site},type={type} w={float(point["value"]) * multiplier} {to_unix_timestamp(point["date"])}',
                    flush=False)
    return True


def get_energy_api(site: str, startTime: datetime, endTime: datetime):
    r = requests.get(f"{BASE_API_URL}/site/{site}/energyDetails.json", {
        'timeUnit': 'QUARTER_OF_AN_HOUR',
        'startTime': format_datetime_url(startTime),
        'endTime': format_datetime_url(endTime),
        'api_key': SETTING_API_KEY
    },
        timeout=REQUEST_TIMEOUT)
    if r.status_code != 200:
        print_err(f"SolarEdge Cloud: Energy: HTTP {r.status_code} : {r.url}")
        return False

    # Parse request
    json = r.json()
    multiplier = wh_unit_to_multiplier(json['energyDetails']['unit'])
    for meter in json['energyDetails']['meters']:
        type = meter['type'].lower()
        for point in meter['values']:
            if 'value' in point:
                print(
                    f'energy,site={site},type={type} wh={float(point["value"]) * multiplier} {to_unix_timestamp(point["date"])}',
                    flush=False)
    return True


# This data is similar as what can be read from modbus
def get_data_api(site: str, startTime: datetime, endTime: datetime):
    for serial in SERIALS[site]:
        r = requests.get(f"{BASE_API_URL}/equipment/{site}/{serial}/data", {
            'startTime': format_datetime_url(startTime),
            'endTime': format_datetime_url(endTime),
            'api_key': SETTING_API_KEY
        },
            timeout=REQUEST_TIMEOUT)
        if r.status_code != 200:
            print_err(f"SolarEdge Cloud: Data: HTTP {r.status_code} : {r.url}")
            # Note: might cause double data if the first call does not fail (prevous sites wil be remeasured next call)
            return False

        # Parse request
        for value in r.json()['data']['telemetries']:
            date = value['date']
            # Note: not all data is logged; see Json/API for all available options
            conditionalData = ''
            dcVoltage = value['dcVoltage']
            if dcVoltage is not None:
                conditionalData += f",I_DC_Voltage={dcVoltage}"
            if 'L1Data' in value:
                conditionalData += format_L_data(value['L1Data'], 'L1')
            if 'L2Data' in value:
                conditionalData += format_L_data(value['L2Data'], 'L2')
            if 'L3Data' in value:
                conditionalData += format_L_data(value['L3Data'], 'L3')
            print(
                f'data,site={site},sn={serial} I_Temp={value["temperature"]},I_AC_Energy_WH={value["totalEnergy"]},I_AC_Power={value["totalActivePower"]}{conditionalData} {to_unix_timestamp(date)}',
                flush=False)
    return True


# Scrape website


# Based on: https://gist.github.com/dragoshenron/0920411a2f3e53c214be0a26f51c53e2
# Note: only available if you have optimizers
def get_playback_data_site(days, site: str):
    PANELS_DAILY_DATA = '4'
    PANELS_WEEKLY_DATA = '5'
    timeUnit = PANELS_WEEKLY_DATA if len(
        days) > 1 or days[0] != 0 else PANELS_DAILY_DATA

    session = requests.session()
    panels = ensure_logged_in(
        session, lambda: session.post(
            BASE_SITE_PANELS_URL,
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "X-CSRF-TOKEN": session.cookies["CSRF-TOKEN"]
            },
            data={
                "fieldId": site,
                "timeUnit": timeUnit
            },
            timeout=REQUEST_TIMEOUT))
    if panels.status_code != 200:
        print_err(
            f"SolarEdge Cloud: Playback: HTTP {panels.status_code} : {panels.url}"
        )
        return

    # Correct their JSON
    response = panels.content.decode("utf-8").replace('\'', '"').replace(
        'Array', '').replace('key', '"key"').replace('value', '"value"')
    response = response.replace('timeUnit', '"timeUnit"').replace(
        'fieldData', '"fieldData"').replace('reportersData', '"reportersData"')
    response = json.loads(response)
    for date, sids in response["reportersData"].items():
        timestamp = str(int((pytz.timezone(SITE_TIMEZONES[site]).localize(
            datetime.datetime.strptime(date,
                                       '%a %b %d %H:%M:%S GMT %Y')).astimezone(
                                           pytz.utc)).timestamp())) + "000000000"
        for values in sids.values():  # SID's (key) are meaningless
            for panel in values:
                if panel['value'] != "0":  # No measurement
                    print(
                        f'panel,site={site},id={panel["key"]} w={float(safe_str_to_float(panel["value"]))} {timestamp}',
                        flush=False)
    return


# TODO
# Get the logical layout: 'https://monitoring.solaredge.com/solaredge-apigw/api/sites/{site}/layout/logical'
# Can be used to fetch more info per panel: optimizer: V, general V, Current and Power.
# However this has to be queried manually every 15 minutes (cloud update interval) and not all panels update at the same time so some bookkeeping is required.
# As the current script only updates once a day this has been omitted. Unfortunately this data is not included in the playback data.

# ------------------------- History Scraper ------------------------------


def reduce_and_check(nr_calls: int):
    nr_calls -= 1
    if nr_calls == 0:
        nr_calls = HISTORY_SCRAPER_MAX_API_CALLS
        now = datetime.datetime.now()
        time.sleep(now.replace(hour=24, minute=0, second=0) -
                   now)  # Wait till next day
    return nr_calls


def get_production_duration():
    r = requests.get(f"{BASE_API_URL}/sites/{SITES}/dataPeriod.json",
                     {'api_key': SETTING_API_KEY},
                     timeout=REQUEST_TIMEOUT)
    if r.status_code != 200:
        print_err(
            f"SolarEdge Cloud: DataPeriod: HTTP {r.status_code} : {r.url}")
        return

    # Parse request
    json = r.json()
    ranges = {}
    for site in json['datePeriodList']['siteEnergyList']:
        startDate = site['dataPeriod']['startDate']
        endDate = site['dataPeriod']['endDate']
        if startDate is not None and endDate is not None:
            ranges[str(site['siteId'])] = (datetime.datetime.strptime(
                startDate,
                '%Y-%m-%d'), datetime.datetime.strptime(endDate, '%Y-%m-%d'))
    return ranges


def scrape_full_history():
    RETRY_SLEEP = 60.0
    INTERVAL_SLEEP = 1.0

    last_updates = LAST_UPDATES.copy()
    ranges = get_production_duration()
    remaining_API_calls = reduce_and_check(HISTORY_SCRAPER_MAX_API_CALLS)

    for site in SITE_IDS:
        ranges = ranges[site]

        # API limited to 1 month time range (apparently 1 month == 28 days)
        # Assumption: not called between midnight and UPDATE_INTERVAL
        powerLastUpdates = last_updates['power']
        for month in get_date_intervals(ranges[0],
                                        min(powerLastUpdates[site], ranges[1]),
                                        28):
            remaining_API_calls = reduce_and_check(remaining_API_calls)
            while not get_power_api(site, month[0], month[1]):
                remaining_API_calls = reduce_and_check(remaining_API_calls)
                time.sleep(RETRY_SLEEP)
            time.sleep(INTERVAL_SLEEP)
        flush()

        # API limited to 1 month time range
        energyLastUpdates = last_updates['energy']
        for month in get_date_intervals(
                ranges[0], min(energyLastUpdates[site], ranges[1]), 28):
            remaining_API_calls = reduce_and_check(remaining_API_calls)
            while not get_energy_api(site, month[0], month[1]):
                remaining_API_calls = reduce_and_check(remaining_API_calls)
                time.sleep(RETRY_SLEEP)
            time.sleep(INTERVAL_SLEEP)
        flush()

        # API limited to 1 week time range
        dataLastUpdates = last_updates['data']
        for week in get_date_intervals(ranges[0],
                                       min(dataLastUpdates[site], ranges[1]),
                                       7):
            remaining_API_calls = reduce_and_check(remaining_API_calls)
            while not get_data_api(site, week[0], week[1]):
                remaining_API_calls = reduce_and_check(remaining_API_calls)
                time.sleep(RETRY_SLEEP)
            time.sleep(INTERVAL_SLEEP)
        flush()


# -----------------------------------------------------------------------
# Main()
# -----------------------------------------------------------------------

initialize_home_dir()

if not initialize_installation_info():
    print_err('Failed to initialize installation info, exiting.')
    flush_and_exit(1)

initialize_last_updated()

if len(sys.argv) > 2:
    print_err(f'Unknown CLI arguments {str(sys.argv)}, existing.')
    flush_and_exit(1)

if len(sys.argv) == 2:
    # History scrape loop
    if sys.argv[1] == 'history':
        scrape_full_history()
        flush_and_exit(0)
    # Debug loop
    elif sys.argv[1] == 'debug':
        update_all_data(datetime.datetime.now().replace(
            hour=UPDATE_INTERVAL_HOUR, minute=UPDATE_INTERVAL_MIN))
        flush_and_exit(0)

    print_err(f'Unknown CLI argument {sys.argv[1]}, existing.')
    flush_and_exit(1)

# Daily update loop
while True:
    # Always run at the end of the day ~midnight to get the most accurate daily data.
    # Assumption: it will be dark by midnight
    now = datetime.datetime.now()
    nextUpdate = now.replace(hour=UPDATE_INTERVAL_HOUR,
                             minute=UPDATE_INTERVAL_MIN,
                             second=0)
    if now.hour >= UPDATE_INTERVAL_HOUR and now.minute >= UPDATE_INTERVAL_MIN:
        nextUpdate += datetime.timedelta(days=1)
    time.sleep((nextUpdate - datetime.datetime.now()).total_seconds())

    update_all_data(datetime.datetime.now().replace(hour=UPDATE_INTERVAL_HOUR,
                                                    minute=UPDATE_INTERVAL_MIN,
                                                    second=0))
flush_and_exit(0)

Add the Telegraf config file in /etc/telegraf/telegraf.d/solaredge_cloud.conf and fill in the InfluxDb details.

[[outputs.influxdb_v2]]
  urls = ["http://127.0.0.1:8086"]
  token = ""
  organization = ""
  bucket = "solaredge_cloud"
  namepass = ["power","energy","data","panel"]

# ------------------------------------------------ Inputs --------------------------------------------

[[inputs.execd]]
  tagexclude = ["host"]
  command = ["/var/lib/telegraf/solarEdgeCloudScraper.py"]
  signal = "none"
  restart_delay = "10m"
  data_format = "influx"

To test if it works run: telegraf -config /etc/telegraf/telegraf.d/solaredge_cloud.conf -test. If you want to print something to debug in Python use print_err(...).

Configure Grafana

Given the data logged from the cloud and modbus is different two different dashboards are required.

If you also have a battery and meter more stats can be displayed, for inspiration see this German forum. However some tinkering will be required as they do not use InfluxDb 2. Or this Dutch forum.

This post is written for Grafana 7.5

Make sure to fill in your site and inverter serial in the dashboards. And/or buckets if you have used different names.

Because the data is spread across multiple buckets (downsampled), the bucket has to be selected dynamically based on the interval. This is not a default feature available in Grafana, albeit being requested for quite a while now. Fortunately there is a solution; Flux is very capable.

A single panel, e.g. Graph, cannot use data from multiple sources therefor the Power and Sun Altitude graphs cannot be overlain in a single graph; unfortunate.

Graph panels cannot influence x-axis time range ticks. For example for the daily production graph we only care about full days, hence do not want the hour information on the x axis. However this cannot be adjusted.

Graph panels do not support a value min/max area range, so therefor in the Power graph we have to plot 3 lines instead of and area around the median value.

Graph series cannot change color based on the value, for example if max > 5: color = red is not possible. This would be nice to have for visual alerts/quickly interpreting the data without looking at the scale (Y-axis).

All these features seem rather basic and have been requested numerous times for quite a while now, albeit there seems no progress on them what so ever.

Modbus dashboard

Add the solaredge InfluxDb bucket as “SolarEdge Modbus” Flux (query language) data source.

To get the Sun Altitude install the “Sun and Moon” datasource, grafana-cli plugins install fetzerch-sunandmoon-datasource, restart the grafana service and add the data source. This information together with the power production can be used to eyeball how good a day it was for power production (i.e. optimal vs actual produced electricity).

Import the Grafana SolarEdge modbus dashboard.

The dashboard is also available on Grafana with id: 14168.

Result

Grafana 7d dashboard overview

DetailsOverview1Overview2Overview24hOverview24h2

Overview of the full dashboard

SolarEdge Cloud dashboard

Add the solaredge_cloud InfluxDb bucket as “SolarEdge Cloud” Flux (query language) data source.

Import the Grafana SolarEdge cloud dashboard.

The dashboard is also available on Grafana with id: 14169.

Result

Grafana dashboard overview

Details7dDetails90dOverview30pdOverview7dOverview7d2Overview90d

Overview of the full dashboard

Import historical data from SolarEdge Cloud

The python script to fetch data from SolarEdge’s cloud also supports scraping the full history, store the following config somewhere (not in the Telegraf folder, e.g. /home/alarm/solaredge_cloud_history.conf as it only has to be run once) and fill in your outputs.influxdb_v2 details. As it only needs to be executed once.

[agent]
debug = true
quiet = false
metric_buffer_limit = 1000000 # Enlarge as required if you have a lot of history
omit_hostname = true

[[outputs.influxdb_v2]]
  urls = ["http://127.0.0.1:8086"]
  token = ""
  organization = ""
  bucket = "solaredge_cloud"

# ------------------------------------------------ Inputs --------------------------------------------

[[inputs.exec]]
  command = "/var/lib/telegraf/solarEdgeCloudScraper.py history"
  timeout = "604800s" # 7d
  data_format = "influx"

Run the following command, once: telegraf --once --config /home/alarm/solaredge_cloud_history.conf. Afterwards remove the config and generated files from the users home folder (e.g. /home/alarm/).


  1. with exploits it was possible in the past to open access to the internal server allowing one to log all data available in the cloud locally at a higher update interval. However at the time of writing all known exploits are patched. Some people opted to disable automatic firmware updates to remain on an exploitable firmware version, this is however not possible for new customers. Some man in the middle (MITM) attacks are still possible and there are also hardware exploits, neither i do not recommend so will not discuss further. ↩︎

  2. technically not true but not relevant for this use-case and not well documented. ↩︎

Noticed an error in this post? Corrections are appreciated.

© Nelis Oostens