-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmqtt_lcd_display.py
560 lines (447 loc) · 17.2 KB
/
mqtt_lcd_display.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
#!/usr/bin/env python3
# Refer to README.md for pre-reqs, and customize config.yaml
# to run:
# python3 mqtt_lcd_display.py
# TODO: implement track info scrolling settings and controls
import datetime
from datetime import timedelta
import os
from pathlib import Path
import signal
import shutil
import ssl
import tempfile
import time
import board
import busio
import adafruit_character_lcd.character_lcd_rgb_i2c as character_lcd
import paho.mqtt.client as mqtt
import tomli
try:
from colorthief import ColorThief # pip install colorthief
except ImportError:
print("For backlight colors from cover art:")
print(" pip install colorthief")
print(" sudo apt install libtiff5")
print(" sudo apt install libopenjp2-7")
# determine path to this script
mypath = Path().absolute()
# App will die here if config file is missing.
# Read only on startup. If edited, app must be relaunched to see changes
config_file = mypath / "config.toml"
print("Using config file {}".format(config_file))
with config_file.open(mode='rb') as f:
config = tomli.load(f)
# subtrees of the config file
MQTT_CONF = config["mqtt"] # required section
DISPLAYUI_CONF = config.get("displayui") # required section
REMOTECONTROL_CONF = config.get("remotecontrol", {})
# "base" topic - should match shairport-sync.conf {mqtt.topic}
TOPIC_ROOT = MQTT_CONF["topic"]
print(TOPIC_ROOT)
# this variable will keep the most recent playing track info
SAVED_INFO = {}
# Global variable for text update
UPDATE_DISPLAY = False
# Global variable for backlight update
UPDATE_DISPLAY_COLOR = False
known_play_metadata_types = {
"play_end": "play_end",
"play_start": "play_start",
"play_flush": "play_flush",
"play_resume": "play_resume",
}
known_track_metadata_types = {
"artist": "showArtist",
"album": "showAlbum",
"title": "showTitle",
"genre": "showGenre",
}
def resolveConfigData(config):
"""Use values from config file-style to resolve settings.
Set default value if the key is not found in config (second arg in dict.get())"""
templateData = {}
print("backlight coloring", config.get("update_backlight_color", False))
if config.get("update_backlight_color", False):
templateData["showBacklightColor"] = True
if config.get("show_track_metadata", True):
metadata_types = config.get(
"track_metadata", ["artist", "album", "title"]
) # defaults to these three
for metadata_type in metadata_types:
if metadata_type in known_track_metadata_types:
templateData[known_track_metadata_types[metadata_type]] = True
return templateData
def _form_subtopic_topic(subtopic):
"""Return full topic path given subtopic."""
topic = TOPIC_ROOT + "/" + subtopic
return topic
# Available commands listed in shairport-sync.conf
known_remote_commands = [
"command",
"beginff",
"beginrew",
"mutetoggle",
"nextitem",
"previtem",
"pause",
"playpause",
"play",
"stop",
"playresume",
"shuffle_songs",
"volumedown",
"volumeup",
]
def _generate_remote_command(command):
"""Return MQTT topic and message for a given remote command."""
if command in known_remote_commands:
print(command)
topic = TOPIC_ROOT + "/remote"
msg = command
return topic, msg
else:
raise ValueError("Unknown remote command: {}".format(command))
def on_connect(client, userdata, flags, rc):
"""For when MQTT client receives a CONNACK response from the server.
Adding subscriptions in on_connect() means that they'll be re-subscribed
for lost/re-connections to MQTT server.
"""
print("Connected with result code {}".format(rc))
subtopic_list = list(known_track_metadata_types.keys())
subtopic_list.extend(list(known_play_metadata_types.keys()))
# only subscribe to cover art if we are going to use it
if (resolveConfigData(DISPLAYUI_CONF)).get("showBacklightColor"):
subtopic_list.append("cover")
for subtopic in subtopic_list:
topic = _form_subtopic_topic(subtopic)
print("topic", topic, end=" ")
(result, msg_id) = client.subscribe(topic, 0) # QoS==0 should be fine
print(msg_id)
def _guessImageMime(magic):
"""Peeks at leading bytes in binary object to identify image format."""
if magic.startswith(b"\xff\xd8"):
return "image/jpeg"
elif magic.startswith(b"\x89PNG\r\n\x1a\r"):
return "image/png"
else:
return "image/jpg"
def _normalizeRGB8bToBacklightRGB(rgb):
"""Takes an (R,G,B) 8-bit (0-255) ordered tuple and converts it to backlight-compatible tuple.
CircuitPython adafruit_character_lcd library expects (R,G,B) fields in range of 0-100.
"""
rgb_sum = rgb[0] + rgb[1] + rgb[2]
scale_factor = 100.0 * 3
# scaled (100, 100, 100) if they're all equal (sum should add up to 300)
r_scaled = int((rgb[0] / rgb_sum) * scale_factor)
g_scaled = int((rgb[1] / rgb_sum) * scale_factor)
b_scaled = int((rgb[2] / rgb_sum) * scale_factor)
#
r_norm = 100 if r_scaled >= 99 else (0 if r_scaled > 95 else 0)
g_norm = 100 if g_scaled >= 99 else (0 if g_scaled > 95 else 0)
b_norm = 100 if b_scaled >= 99 else (0 if b_scaled > 95 else 0)
if (r_norm + g_norm + b_norm) != 0:
backlight_rgb = (r_norm, g_norm, b_norm)
else:
backlight_rgb = [0, 0, 0]
# set value for color that has highest value
print(max(rgb))
print(rgb.index(max(rgb)))
backlight_rgb[rgb.index(max(rgb))] = 100
if False:
print("rgb", rgb)
print("rgb scaled", (r_scaled, g_scaled, b_scaled))
print("rgb norm", (r_norm, g_norm, b_norm))
if False:
print("backlight_rgb = {}".format(backlight_rgb))
return backlight_rgb
def _send_and_store_playing_metadata(metadata_name, message):
"""Saves currently playing metadata info.
Applies a naming convention of prepending string 'playing_' to metadata
name in saving data structure.
"""
global UPDATE_DISPLAY
# print("{} update".format(metadata_name))
saved_metadata_name = "playing_{}".format(metadata_name)
if metadata_name == "dominant_color":
SAVED_INFO[saved_metadata_name] = message
else:
SAVED_INFO[saved_metadata_name] = message.payload.decode("utf8")
UPDATE_DISPLAY = True
def _send_play_event(metadata_name):
"""Forms play event message and sends to browser client using socket.io."""
print("{}".format(metadata_name))
# socketio.emit(metadata_name, metadata_name)
def on_message(client, userdata, message):
"""Callback for when a subscribed-to MQTT message is received."""
if message.topic != _form_subtopic_topic("cover"):
print(message.topic, message.payload)
# Playing track info fields
if message.topic == _form_subtopic_topic("artist"):
_send_and_store_playing_metadata("artist", message)
if message.topic == _form_subtopic_topic("album"):
_send_and_store_playing_metadata("album", message)
if message.topic == _form_subtopic_topic("genre"):
_send_and_store_playing_metadata("genre", message)
if message.topic == _form_subtopic_topic("title"):
_send_and_store_playing_metadata("title", message)
# Player state
if message.topic == _form_subtopic_topic("play_start"):
_send_play_event("play_start")
if message.topic == _form_subtopic_topic("play_end"):
_send_play_event("play_end")
if message.topic == _form_subtopic_topic("play_flush"):
_send_play_event("play_flush")
if message.topic == _form_subtopic_topic("play_resume"):
_send_play_event("play_resume")
# cover art
if message.topic == _form_subtopic_topic("cover"):
print("cover update")
if message.payload:
mime_type = _guessImageMime(message.payload)
print(len(message.payload), mime_type)
with tempfile.NamedTemporaryFile() as fp:
fp.write(message.payload)
if False: # for debugging
fname = fp.name
fname_copy = fname + ".bin"
print(fname, fname_copy)
shutil.copy(fname, fname_copy)
# get dominant color of cover art image
image_to_analyze = ColorThief(fp)
dominant_color = image_to_analyze.get_color(quality=20)
print(dominant_color)
_send_and_store_playing_metadata("dominant_color", dominant_color)
else:
pass
# Configure MQTT broker connection
mqttc = mqtt.Client()
# register callbacks
mqttc.on_connect = on_connect
mqttc.on_message = on_message
if MQTT_CONF.get("use_tls"):
tls_conf = MQTT_CONF.get("tls")
print("Using TLS config", tls_conf)
# assumes full valid TLS configuration for paho lib
if tls_conf:
mqttc.tls_set(
ca_certs=tls_conf["ca_certs_path"],
certfile=tls_conf["certfile_path"],
keyfile=tls_conf["keyfile_path"],
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None,
)
if tls_conf.get("allow_insecure_server_certificate", False):
# from docs: Do not use this function in a real system. Setting value
# to True means there is no point using encryption.
mqttc.tls_insecure_set(True)
if MQTT_CONF.get("username"):
username = MQTT_CONF.get("username")
print("MQTT username:", username)
pw = MQTT_CONF.get("password")
if pw:
mqttc.username_pw_set(username, password=pw)
else:
mqttc.username_pw_set(username)
if MQTT_CONF.get("logger"):
print("Enabling MQTT logging")
mqttc.enable_logger()
# Launch MQTT broker connection
mqtt_host = MQTT_CONF["host"]
mqtt_port = MQTT_CONF["port"]
print("Connecting to broker", mqtt_host, "port", mqtt_port)
mqttc.connect(mqtt_host, port=mqtt_port)
# loop_start run a thread in the background
mqttc.loop_start()
def lcd_startup_splash(lcd):
print(lcd, "Startup splash screen")
# Set LCD color to red
lcd.color = [100, 0, 0]
lcd.message = "shairport-sync\nmqtt_lcd_display"
time.sleep(0.5)
# Set LCD color to green
lcd.color = [0, 100, 0]
time.sleep(0.5)
# Set LCD color to blue
lcd.color = [0, 0, 100]
time.sleep(0.5)
# Set LCD color to purple
lcd.color = [50, 0, 50]
time.sleep(0.5)
# Set LCD color to yellow
lcd.color = [50, 50, 0]
# print a scrolling message
start_msg = "Starting up...\n"
lcd.message = start_msg
for i in range(len(start_msg)):
time.sleep(0.1)
lcd.move_left()
lcd.clear()
# via https://stackoverflow.com/questions/18499497/how-to-process-sigterm-signal-gracefully
class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, signum, frame):
print(
"Received signal "
+ str(signum)
+ " on line "
+ str(frame.f_lineno)
+ " in "
+ frame.f_code.co_filename
)
self.kill_now = True
# Initialize display and launch the main loop
if __name__ == "__main__":
lcd_columns = 16
lcd_row_max_columns = 31
lcd_rows = 2
i2c = busio.I2C(board.SCL, board.SDA)
lcd = character_lcd.Character_LCD_RGB_I2C(i2c, lcd_columns, lcd_rows)
if DISPLAYUI_CONF.get("show_lcd_splash", False):
lcd_startup_splash(lcd)
else:
# Set LCD color to "yellow"
lcd.color = [100, 100, 0]
lcd.clear()
def graceful_exit():
# f-strings require python3.6
msg = "Exiting..."
fmt_msg = f"{msg:{lcd_columns}s}\n{' ':{lcd_columns}s}"
lcd.message = fmt_msg
# time.sleep(3)
# Turn off LCD backlights
lcd.color = [0, 0, 0]
lcd.clear()
def _get_formatted_msg_and_props():
artist = SAVED_INFO.get("playing_artist", "Artist")
title = SAVED_INFO.get("playing_title", "Title")
formatted_msg = f"{artist:{lcd_row_max_columns}.{lcd_row_max_columns}s}\n{title:{lcd_row_max_columns}.{lcd_row_max_columns}s}"
# for longer strings
artist_len = len(artist)
title_len = len(title)
max_len = max(list([artist_len, title_len]))
return (formatted_msg, max_len)
# read default backlight color
default_rgb_backlight_color = DISPLAYUI_CONF.get(
"default_rgb_backlight_color", (0, 255, 0)
)
def _get_backlight_color():
dominant_color = SAVED_INFO.get(
"playing_dominant_color", default_rgb_backlight_color
)
backlight_color = _normalizeRGB8bToBacklightRGB(dominant_color)
return backlight_color
def _handle_button_pressed(lcd, button_pressed=None):
print(button_pressed)
command = REMOTECONTROL_CONF["buttons"].get(button_pressed, None)
if command:
# print command onto LCD
lcd.message = "<<" + command + ">>"
(topic, msg) = _generate_remote_command(command)
mqttc.publish(topic, msg)
else:
print(f"-E- Could not find command for button = {button_pressed}")
def _scan_for_button_press():
button_press = 0
button_pressed = None
# scan for button presses
if lcd.down_button:
# lcd.message = "Down! "
button_press = 1
button_pressed = "button_down"
elif lcd.left_button:
# lcd.message = "Left! "
button_press = 1
button_pressed = "button_left"
elif lcd.right_button:
# lcd.message = "Right! "
button_press = 1
button_pressed = "button_right"
elif lcd.select_button:
# lcd.message = "Select! "
button_press = 1
button_pressed = "button_select"
elif lcd.up_button:
# lcd.message = "Up! "
button_press = 1
button_pressed = "button_up"
if button_press:
UPDATE_DISPLAY = True
_handle_button_pressed(lcd, button_pressed=button_pressed)
return button_press
# initialize SAVED_INFO
SAVED_INFO["playing_artist"] = "Unknown Artist"
SAVED_INFO["playing_album"] = "Unknown Album"
SAVED_INFO["playing_genre"] = "Unknown Genre"
SAVED_INFO["playing_title"] = "Unknown Title"
SAVED_INFO["playing_dominant_color"] = default_rgb_backlight_color
UPDATE_DISPLAY = True
# output current process id
print("My PID is:", os.getpid())
killer = GracefulKiller()
print("Starting main loop")
scroll_sleep_length = 0.45
refresh_interval = 25 # in seconds
time_to_refresh = datetime.datetime.now()
button_press_delay = 0.7
while not killer.kill_now:
button_press = _scan_for_button_press()
if UPDATE_DISPLAY and button_press == 0:
# reset global variable
UPDATE_DISPLAY = False
if False:
print(SAVED_INFO)
fmt_msg1, max_len = _get_formatted_msg_and_props()
backlight_color = _get_backlight_color()
lcd.color = backlight_color
if button_press:
time.sleep(button_press_delay)
lcd.message = fmt_msg1
button_press = _scan_for_button_press()
if max_len > lcd_columns:
extra_chars = min(max_len, (2 * lcd_columns) - 1)
fmt_msg, junk = _get_formatted_msg_and_props()
lcd.color = _get_backlight_color()
if button_press:
time.sleep(button_press_delay)
lcd.message = fmt_msg
if killer.kill_now:
break
for i in range(extra_chars - lcd_columns):
if killer.kill_now:
break
# handle any button presses while scrolling
button_press = _scan_for_button_press()
if button_press:
time.sleep(button_press_delay)
break
# if MQTT message comes in, skip scrolling
if UPDATE_DISPLAY:
break
time.sleep(scroll_sleep_length)
button_press = _scan_for_button_press()
if button_press:
time.sleep(button_press_delay)
lcd.move_left()
lcd.home()
fmt_msg, junk = _get_formatted_msg_and_props()
lcd.color = _get_backlight_color()
button_press = _scan_for_button_press()
if button_press:
time.sleep(button_press_delay)
lcd.message = fmt_msg
current_time = datetime.datetime.now()
if False:
print(current_time, time_to_refresh) # duration of event loop
if current_time > time_to_refresh:
# schedule a display refresh
time_to_refresh = current_time + timedelta(seconds=refresh_interval)
if False:
print("scheduled refresh")
print(current_time, time_to_refresh)
UPDATE_DISPLAY = True
graceful_exit()