Formatting
This commit is contained in:
parent
adf16c9059
commit
5d17b37bea
|
@ -1,9 +1,8 @@
|
||||||
from multiprocessing.shared_memory import SharedMemory
|
from multiprocessing.shared_memory import SharedMemory
|
||||||
from typing import Callable, NamedTuple, NewType, Optional
|
from typing import Callable, NamedTuple, Optional
|
||||||
import asyncio
|
import asyncio
|
||||||
import socketio
|
import socketio
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from aiohttp_socks import ProxyConnector
|
|
||||||
from PIL import (
|
from PIL import (
|
||||||
Image,
|
Image,
|
||||||
ImageFont,
|
ImageFont,
|
||||||
|
@ -32,6 +31,7 @@ OFFSET_AVOID = 125000
|
||||||
OFFSET_CANVAS = 250000
|
OFFSET_CANVAS = 250000
|
||||||
OFFSET_MASK = 375000
|
OFFSET_MASK = 375000
|
||||||
|
|
||||||
|
|
||||||
class Manager:
|
class Manager:
|
||||||
def __init__(self, settings_path: str):
|
def __init__(self, settings_path: str):
|
||||||
self.shmem: Optional[SharedMemory] = None
|
self.shmem: Optional[SharedMemory] = None
|
||||||
|
@ -62,8 +62,7 @@ class Manager:
|
||||||
if fontconfig := settings.get("default_font"):
|
if fontconfig := settings.get("default_font"):
|
||||||
self.default_font_size = int(fontconfig.get("size", 8))
|
self.default_font_size = int(fontconfig.get("size", 8))
|
||||||
self.fonts["default", self.default_font_size] = self.get_font(
|
self.fonts["default", self.default_font_size] = self.get_font(
|
||||||
fontconfig["path"],
|
fontconfig["path"], self.default_font_size
|
||||||
self.default_font_size
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for avoid in settings.get("avoid", []):
|
for avoid in settings.get("avoid", []):
|
||||||
|
@ -118,7 +117,10 @@ class Manager:
|
||||||
self.make_text_image(
|
self.make_text_image(
|
||||||
line["text"],
|
line["text"],
|
||||||
line.get("font", elem.get("font", "default")),
|
line.get("font", elem.get("font", "default")),
|
||||||
line.get("size", elem.get("size", self.default_font_size)),
|
line.get(
|
||||||
|
"size",
|
||||||
|
elem.get("size", self.default_font_size),
|
||||||
|
),
|
||||||
line.get(
|
line.get(
|
||||||
"negative", elem.get("negative", False)
|
"negative", elem.get("negative", False)
|
||||||
),
|
),
|
||||||
|
@ -132,10 +134,15 @@ class Manager:
|
||||||
)
|
)
|
||||||
elif elem["type"] in ("anim", "animation"):
|
elif elem["type"] in ("anim", "animation"):
|
||||||
with Image.open(elem["path"]) as anim:
|
with Image.open(elem["path"]) as anim:
|
||||||
self.put_animation(elem["x"], elem["y"], [
|
self.put_animation(
|
||||||
frame.convert("LA")
|
elem["x"],
|
||||||
for frame in ImageSequence.Iterator(anim)
|
elem["y"],
|
||||||
], elem.get("spf", 10))
|
[
|
||||||
|
frame.convert("LA")
|
||||||
|
for frame in ImageSequence.Iterator(anim)
|
||||||
|
],
|
||||||
|
elem.get("spf", 10),
|
||||||
|
)
|
||||||
elif elem["type"] == "image":
|
elif elem["type"] == "image":
|
||||||
with Image.open(elem["path"]).convert("LA") as im:
|
with Image.open(elem["path"]).convert("LA") as im:
|
||||||
self.put_image(elem["x"], elem["y"], im)
|
self.put_image(elem["x"], elem["y"], im)
|
||||||
|
@ -150,19 +157,26 @@ class Manager:
|
||||||
|
|
||||||
def update_time():
|
def update_time():
|
||||||
now = datetime.datetime.now(datetime.timezone.utc)
|
now = datetime.datetime.now(datetime.timezone.utc)
|
||||||
self.put_image(pos_x, pos_y, self.make_text_image(
|
self.put_image(
|
||||||
now.strftime(time_format),
|
pos_x,
|
||||||
time_font,
|
pos_y,
|
||||||
time_font_size,
|
self.make_text_image(
|
||||||
time_negative,
|
now.strftime(time_format),
|
||||||
time_outline
|
time_font,
|
||||||
))
|
time_font_size,
|
||||||
|
time_negative,
|
||||||
|
time_outline,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
self.animation_functions.append(update_time)
|
self.animation_functions.append(update_time)
|
||||||
elif elem["type"] == "shrek":
|
elif elem["type"] == "shrek":
|
||||||
shrek_x, shrek_y = elem["x"], elem["y"]
|
shrek_x, shrek_y = elem["x"], elem["y"]
|
||||||
|
|
||||||
shrek_font = self.get_font(elem.get("font", "default"), elem.get("size", self.default_font_size))
|
shrek_font = self.get_font(
|
||||||
|
elem.get("font", "default"),
|
||||||
|
elem.get("size", self.default_font_size),
|
||||||
|
)
|
||||||
|
|
||||||
with open(elem["path"], "r") as fp:
|
with open(elem["path"], "r") as fp:
|
||||||
lyrics = list(map(str.strip, fp))
|
lyrics = list(map(str.strip, fp))
|
||||||
|
@ -175,7 +189,9 @@ class Manager:
|
||||||
line = lyrics[
|
line = lyrics[
|
||||||
int(now.timestamp() / elem["spf"]) % len(lyrics)
|
int(now.timestamp() / elem["spf"]) % len(lyrics)
|
||||||
]
|
]
|
||||||
draw.text((2, -1), line, font=shrek_font, fill=(255, 255))
|
draw.text(
|
||||||
|
(2, -1), line, font=shrek_font, fill=(255, 255)
|
||||||
|
)
|
||||||
self.put_image(shrek_x, shrek_y, im)
|
self.put_image(shrek_x, shrek_y, im)
|
||||||
|
|
||||||
self.animation_functions.append(update_shrek)
|
self.animation_functions.append(update_shrek)
|
||||||
|
@ -194,7 +210,7 @@ class Manager:
|
||||||
|
|
||||||
def update_shmem(self, state: bytes):
|
def update_shmem(self, state: bytes):
|
||||||
assert self.shmem is not None
|
assert self.shmem is not None
|
||||||
self.shmem.buf[OFFSET_STATE:OFFSET_STATE + 125000] = state
|
self.shmem.buf[OFFSET_STATE : OFFSET_STATE + 125000] = state
|
||||||
|
|
||||||
async def on_connect(self):
|
async def on_connect(self):
|
||||||
async with aiohttp.ClientSession() as http:
|
async with aiohttp.ClientSession() as http:
|
||||||
|
@ -223,7 +239,7 @@ class Manager:
|
||||||
|
|
||||||
for ndx in bits_on:
|
for ndx in bits_on:
|
||||||
byte, bit = divmod(ndx, 8)
|
byte, bit = divmod(ndx, 8)
|
||||||
self.shmem.buf[OFFSET_STATE + byte] |= (0x80 >> bit)
|
self.shmem.buf[OFFSET_STATE + byte] |= 0x80 >> bit
|
||||||
for ndx in bits_off:
|
for ndx in bits_off:
|
||||||
byte, bit = divmod(ndx, 8)
|
byte, bit = divmod(ndx, 8)
|
||||||
self.shmem.buf[OFFSET_STATE + byte] &= 0xFF ^ (0x80 >> bit)
|
self.shmem.buf[OFFSET_STATE + byte] &= 0xFF ^ (0x80 >> bit)
|
||||||
|
@ -252,7 +268,11 @@ class Manager:
|
||||||
(x, y),
|
(x, y),
|
||||||
(
|
(
|
||||||
255 if (buf[OFFSET_MASK + byte] << bit) & 0x80 else 0,
|
255 if (buf[OFFSET_MASK + byte] << bit) & 0x80 else 0,
|
||||||
255 if (buf[OFFSET_CANVAS + byte] << bit) & 0x80 else 0,
|
(
|
||||||
|
255
|
||||||
|
if (buf[OFFSET_CANVAS + byte] << bit) & 0x80
|
||||||
|
else 0
|
||||||
|
),
|
||||||
255 if (buf[OFFSET_STATE + byte] << bit) & 0x80 else 0,
|
255 if (buf[OFFSET_STATE + byte] << bit) & 0x80 else 0,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -268,7 +288,6 @@ class Manager:
|
||||||
im.save("avoid.png")
|
im.save("avoid.png")
|
||||||
print("Dump done")
|
print("Dump done")
|
||||||
|
|
||||||
|
|
||||||
def on_sigusr2(self, signum, frame):
|
def on_sigusr2(self, signum, frame):
|
||||||
print("Reloading config")
|
print("Reloading config")
|
||||||
self.reload_config()
|
self.reload_config()
|
||||||
|
@ -290,7 +309,7 @@ class Manager:
|
||||||
assert self.shmem is not None
|
assert self.shmem is not None
|
||||||
for ndx in rng:
|
for ndx in rng:
|
||||||
byte, bit = divmod(ndx, 8)
|
byte, bit = divmod(ndx, 8)
|
||||||
self.shmem.buf[OFFSET_AVOID + byte] |= (0x80 >> bit)
|
self.shmem.buf[OFFSET_AVOID + byte] |= 0x80 >> bit
|
||||||
|
|
||||||
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int):
|
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int):
|
||||||
for y in range(sy, sy + h):
|
for y in range(sy, sy + h):
|
||||||
|
@ -301,7 +320,7 @@ class Manager:
|
||||||
assert self.shmem is not None
|
assert self.shmem is not None
|
||||||
assert 0 <= index < 1000000
|
assert 0 <= index < 1000000
|
||||||
byte, bit = divmod(index, 8)
|
byte, bit = divmod(index, 8)
|
||||||
self.shmem.buf[OFFSET_AVOID + byte] |= (0x80 >> bit)
|
self.shmem.buf[OFFSET_AVOID + byte] |= 0x80 >> bit
|
||||||
|
|
||||||
def add_avoid_image(self, im: Image.Image):
|
def add_avoid_image(self, im: Image.Image):
|
||||||
assert im.width == 1000
|
assert im.width == 1000
|
||||||
|
@ -317,9 +336,9 @@ class Manager:
|
||||||
assert 0 <= index <= 1000000
|
assert 0 <= index <= 1000000
|
||||||
assert self.shmem is not None
|
assert self.shmem is not None
|
||||||
byte, bit = divmod(index, 8)
|
byte, bit = divmod(index, 8)
|
||||||
self.shmem.buf[OFFSET_MASK + byte] |= (0x80 >> bit)
|
self.shmem.buf[OFFSET_MASK + byte] |= 0x80 >> bit
|
||||||
if value:
|
if value:
|
||||||
self.shmem.buf[OFFSET_CANVAS + byte] |= (0x80 >> bit)
|
self.shmem.buf[OFFSET_CANVAS + byte] |= 0x80 >> bit
|
||||||
else:
|
else:
|
||||||
self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit)
|
self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit)
|
||||||
|
|
||||||
|
@ -430,12 +449,10 @@ async def main(cfg_path: str = "./settings.json", *_):
|
||||||
signal.signal(signal.SIGUSR2, mgr.on_sigusr2)
|
signal.signal(signal.SIGUSR2, mgr.on_sigusr2)
|
||||||
|
|
||||||
print("Listening...")
|
print("Listening...")
|
||||||
await asyncio.gather(
|
await asyncio.gather(mgr.listener(), mgr.animator())
|
||||||
mgr.listener(),
|
|
||||||
mgr.animator()
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
from sys import argv
|
from sys import argv
|
||||||
|
|
||||||
asyncio.run(main(*argv[1:]))
|
asyncio.run(main(*argv[1:]))
|
||||||
|
|
Loading…
Reference in New Issue