Added a bunch (almost all) types into manager
This commit is contained in:
parent
6cce15b2ca
commit
40c50328d0
20
async-bot.py
20
async-bot.py
|
@ -353,7 +353,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
print("AVOID image", avoid["path"])
|
print("AVOID image", avoid["path"])
|
||||||
|
|
||||||
for elem in settings["elements"]:
|
for elem in settings["elements"]:
|
||||||
if elem["type"] == "text":
|
if elem["type"] == "text": # XXX migrated
|
||||||
mgr.put_text(
|
mgr.put_text(
|
||||||
elem["x"],
|
elem["x"],
|
||||||
elem["y"],
|
elem["y"],
|
||||||
|
@ -363,7 +363,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
elem.get("negative", False),
|
elem.get("negative", False),
|
||||||
)
|
)
|
||||||
print("ADD text", elem)
|
print("ADD text", elem)
|
||||||
elif elem["type"] == "text_anim":
|
elif elem["type"] == "text_anim": # XXX migrated
|
||||||
frames: list[Image.Image] = []
|
frames: list[Image.Image] = []
|
||||||
for text in elem["lines"]:
|
for text in elem["lines"]:
|
||||||
frames.append(
|
frames.append(
|
||||||
|
@ -379,12 +379,12 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
for frame in frames:
|
for frame in frames:
|
||||||
frame.close()
|
frame.close()
|
||||||
print("ADD text animation", elem)
|
print("ADD text animation", elem)
|
||||||
elif elem["type"] == "image":
|
elif elem["type"] == "image": # XXX migrated
|
||||||
with Image.open(elem["path"]).convert("LA") as im:
|
with Image.open(elem["path"]).convert("LA") as im:
|
||||||
mgr.put_image(elem["x"], elem["y"], im)
|
mgr.put_image(elem["x"], elem["y"], im)
|
||||||
print("ADD image", elem)
|
print("ADD image", elem)
|
||||||
|
|
||||||
elif elem["type"] == "time":
|
elif elem["type"] == "time": # XXX migrated
|
||||||
time_format = elem["format"]
|
time_format = elem["format"]
|
||||||
pos_x, pos_y = elem["x"], elem["y"]
|
pos_x, pos_y = elem["x"], elem["y"]
|
||||||
font = mgr.get_font(
|
font = mgr.get_font(
|
||||||
|
@ -400,7 +400,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
return pixmap
|
return pixmap
|
||||||
|
|
||||||
mgr.animation_functions.append(update_time)
|
mgr.animation_functions.append(update_time)
|
||||||
elif elem["type"] == "tile":
|
elif elem["type"] == "tile": # XXX unused
|
||||||
with Image.open(elem["path"]).convert("LA") as im:
|
with Image.open(elem["path"]).convert("LA") as im:
|
||||||
for oy in range(elem.get("h", im.height)):
|
for oy in range(elem.get("h", im.height)):
|
||||||
for ox in range(elem.get("w", im.width)):
|
for ox in range(elem.get("w", im.width)):
|
||||||
|
@ -410,7 +410,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
index = x + y * 1000
|
index = x + y * 1000
|
||||||
mgr.put_bit(index, l > 0)
|
mgr.put_bit(index, l > 0)
|
||||||
print("ADD tile", elem)
|
print("ADD tile", elem)
|
||||||
elif elem["type"] == "animation":
|
elif elem["type"] == "animation": # XXX migrated
|
||||||
with Image.open(elem["path"]) as anim:
|
with Image.open(elem["path"]) as anim:
|
||||||
frames: list[Image.Image] = []
|
frames: list[Image.Image] = []
|
||||||
for frame in ImageSequence.Iterator(anim):
|
for frame in ImageSequence.Iterator(anim):
|
||||||
|
@ -421,7 +421,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
for frame in frames:
|
for frame in frames:
|
||||||
frame.close()
|
frame.close()
|
||||||
print("ADD animation", elem)
|
print("ADD animation", elem)
|
||||||
elif elem["type"] == "rgb111":
|
elif elem["type"] == "rgb111": # XXX unused
|
||||||
ox, oy = elem["x"], elem["y"]
|
ox, oy = elem["x"], elem["y"]
|
||||||
with Image.open(elem["path"]).convert("RGBA") as im:
|
with Image.open(elem["path"]).convert("RGBA") as im:
|
||||||
for y in range(im.height):
|
for y in range(im.height):
|
||||||
|
@ -434,7 +434,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
mgr.put_bit(start_ndx, r > 128)
|
mgr.put_bit(start_ndx, r > 128)
|
||||||
mgr.put_bit(start_ndx + 1, g > 128)
|
mgr.put_bit(start_ndx + 1, g > 128)
|
||||||
mgr.put_bit(start_ndx + 2, b > 128)
|
mgr.put_bit(start_ndx + 2, b > 128)
|
||||||
elif elem["type"] == "rgb565":
|
elif elem["type"] == "rgb565": # XXX unused
|
||||||
ox, oy = elem["x"], elem["y"]
|
ox, oy = elem["x"], elem["y"]
|
||||||
with Image.open(elem["path"]).convert("RGBA") as im:
|
with Image.open(elem["path"]).convert("RGBA") as im:
|
||||||
for y in range(im.height):
|
for y in range(im.height):
|
||||||
|
@ -459,7 +459,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
for y in range(elem["y"], elem["y"] + elem["h"]):
|
for y in range(elem["y"], elem["y"] + elem["h"]):
|
||||||
for x in range(elem["x"], elem["x"] + elem["w"]):
|
for x in range(elem["x"], elem["x"] + elem["w"]):
|
||||||
mgr.put_pixel(x, y, elem["fill"])
|
mgr.put_pixel(x, y, elem["fill"])
|
||||||
elif elem["type"] == "blob":
|
elif elem["type"] == "blob": # XXX unused
|
||||||
with open(elem["path"], "rb") as fp:
|
with open(elem["path"], "rb") as fp:
|
||||||
offset = elem["offset"]
|
offset = elem["offset"]
|
||||||
length = elem.get("length", 1000000)
|
length = elem.get("length", 1000000)
|
||||||
|
@ -474,7 +474,7 @@ async def amain(conf_path: str = "settings.json", *_) -> None:
|
||||||
mgr.put_bit(offset, bool((byte >> (7 - i)) & 1))
|
mgr.put_bit(offset, bool((byte >> (7 - i)) & 1))
|
||||||
written += 1
|
written += 1
|
||||||
offset += 1
|
offset += 1
|
||||||
elif elem["type"] == "shrek":
|
elif elem["type"] == "shrek": # XXX: migrated
|
||||||
with open(elem["path"], "r") as fp:
|
with open(elem["path"], "r") as fp:
|
||||||
lyrics = list(map(str.strip, fp))
|
lyrics = list(map(str.strip, fp))
|
||||||
|
|
||||||
|
|
365
swarm/manager.py
365
swarm/manager.py
|
@ -1,5 +1,5 @@
|
||||||
from multiprocessing.shared_memory import SharedMemory
|
from multiprocessing.shared_memory import SharedMemory
|
||||||
from typing import Optional
|
from typing import Callable, NamedTuple, NewType, Optional
|
||||||
import asyncio
|
import asyncio
|
||||||
import socketio
|
import socketio
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
@ -12,24 +12,33 @@ from PIL import (
|
||||||
ImageSequence,
|
ImageSequence,
|
||||||
ImageChops,
|
ImageChops,
|
||||||
)
|
)
|
||||||
from enum import IntFlag
|
|
||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
import signal
|
import signal
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
|
||||||
class PixelMask(IntFlag):
|
class Animation(NamedTuple):
|
||||||
AVOID = 16
|
x: int
|
||||||
MASK = 32
|
y: int
|
||||||
FILL = 64
|
frames: list[Image.Image]
|
||||||
CHECKED = 128
|
spf: float
|
||||||
|
|
||||||
|
|
||||||
|
OFFSET_STATE = 0
|
||||||
|
OFFSET_AVOID = 125000
|
||||||
|
OFFSET_CANVAS = 250000
|
||||||
|
OFFSET_MASK = 375000
|
||||||
|
|
||||||
class Manager:
|
class Manager:
|
||||||
def __init__(self):
|
def __init__(self, settings_path: str):
|
||||||
self.shmem: Optional[SharedMemory] = None
|
self.shmem: Optional[SharedMemory] = None
|
||||||
self.shmem_name = "omcb-bot"
|
self.shmem_name = "omcb-bot"
|
||||||
|
|
||||||
|
self.settings_path = settings_path
|
||||||
|
|
||||||
self.base = "https://onemillioncheckboxes.com"
|
self.base = "https://onemillioncheckboxes.com"
|
||||||
self.last_update = 0
|
self.last_update = 0
|
||||||
|
|
||||||
|
@ -37,6 +46,143 @@ class Manager:
|
||||||
self.bits_toggled_off = 0
|
self.bits_toggled_off = 0
|
||||||
self.last_printout = 0
|
self.last_printout = 0
|
||||||
|
|
||||||
|
self.fonts: dict[tuple[str, int], ImageFont.FreeTypeFont] = {}
|
||||||
|
self.default_font_size = 8
|
||||||
|
self.animations: list[Animation] = []
|
||||||
|
self.animation_functions: list[Callable] = []
|
||||||
|
|
||||||
|
def reload_config(self):
|
||||||
|
with open(self.settings_path, "r") as fp:
|
||||||
|
settings = json.load(fp)
|
||||||
|
|
||||||
|
assert self.shmem is not None
|
||||||
|
|
||||||
|
print("Resetting shmem...")
|
||||||
|
|
||||||
|
if fontconfig := settings.get("default_font"):
|
||||||
|
self.default_font_size = int(fontconfig.get("size", 8))
|
||||||
|
self.fonts["default", self.default_font_size] = self.get_font(
|
||||||
|
fontconfig["path"],
|
||||||
|
self.default_font_size
|
||||||
|
)
|
||||||
|
|
||||||
|
for avoid in settings.get("avoid", []):
|
||||||
|
if avoid["type"] == "rect":
|
||||||
|
self.add_avoid_rect(
|
||||||
|
avoid["x"], avoid["y"], avoid["w"], avoid["h"]
|
||||||
|
)
|
||||||
|
elif avoid["type"] == "range":
|
||||||
|
self.add_avoid_range(
|
||||||
|
range(avoid["start"], avoid["stop"], avoid.get("step", 1))
|
||||||
|
)
|
||||||
|
elif avoid["type"] == "image":
|
||||||
|
with Image.open(avoid["path"]).convert("LA") as im:
|
||||||
|
assert im.width == 1000 and im.height == 1000
|
||||||
|
for y in range(im.height):
|
||||||
|
for x in range(im.width):
|
||||||
|
_, a = im.getpixel((x, y)) # type: ignore
|
||||||
|
if a > 128:
|
||||||
|
self.add_avoid_index(x + y * 1000)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"invalid avoid: {avoid}")
|
||||||
|
print("AVOID", avoid)
|
||||||
|
|
||||||
|
for elem in settings.get("elements", []):
|
||||||
|
if elem["type"].startswith("~"):
|
||||||
|
continue
|
||||||
|
elif elem["type"] == "text":
|
||||||
|
self.put_text(
|
||||||
|
elem["x"],
|
||||||
|
elem["y"],
|
||||||
|
elem["text"],
|
||||||
|
elem.get("font", "default"),
|
||||||
|
elem.get("size", self.default_font_size),
|
||||||
|
elem.get("negative", False),
|
||||||
|
elem.get("padding", 2),
|
||||||
|
)
|
||||||
|
elif elem["type"] in ("text_anim", "text_animation"):
|
||||||
|
frames: list[Image.Image] = []
|
||||||
|
for line in elem["lines"]:
|
||||||
|
if isinstance(line, str):
|
||||||
|
frames.append(
|
||||||
|
self.make_text_image(
|
||||||
|
line,
|
||||||
|
elem.get("font", "default"),
|
||||||
|
elem.get("size", 8),
|
||||||
|
elem.get("negative", False),
|
||||||
|
elem.get("padding", 2),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
elif isinstance(line, dict):
|
||||||
|
frames.append(
|
||||||
|
self.make_text_image(
|
||||||
|
line["text"],
|
||||||
|
line.get("font", elem.get("font", "default")),
|
||||||
|
line.get("size", elem.get("size", self.default_font_size)),
|
||||||
|
line.get(
|
||||||
|
"negative", elem.get("negative", False)
|
||||||
|
),
|
||||||
|
line.get("padding", elem.get("padding", 2)),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError(f"invalid line: {line}")
|
||||||
|
self.put_animation(
|
||||||
|
elem["x"], elem["y"], frames, elem.get("spf", 10)
|
||||||
|
)
|
||||||
|
elif elem["type"] in ("anim", "animation"):
|
||||||
|
with Image.open(elem["path"]) as anim:
|
||||||
|
self.put_animation(elem["x"], elem["y"], [
|
||||||
|
frame.convert("LA")
|
||||||
|
for frame in ImageSequence.Iterator(anim)
|
||||||
|
], elem.get("spf", 10))
|
||||||
|
elif elem["type"] == "image":
|
||||||
|
with Image.open(elem["path"]).convert("LA") as im:
|
||||||
|
self.put_image(elem["x"], elem["y"], im)
|
||||||
|
elif elem["type"] == "time":
|
||||||
|
pos_x, pos_y = elem["x"], elem["y"]
|
||||||
|
|
||||||
|
time_format = elem.get("format", "%Y%m%dT%H%M%S UTC")
|
||||||
|
time_font = elem.get("font", "default")
|
||||||
|
time_font_size = elem.get("size", self.default_font_size)
|
||||||
|
time_negative = elem.get("negative", False)
|
||||||
|
time_outline = elem.get("outline", 2)
|
||||||
|
|
||||||
|
def update_time():
|
||||||
|
now = datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
self.put_image(pos_x, pos_y, self.make_text_image(
|
||||||
|
now.strftime(time_format),
|
||||||
|
time_font,
|
||||||
|
time_font_size,
|
||||||
|
time_negative,
|
||||||
|
time_outline
|
||||||
|
))
|
||||||
|
|
||||||
|
self.animation_functions.append(update_time)
|
||||||
|
elif elem["type"] == "shrek":
|
||||||
|
shrek_x, shrek_y = elem["x"], elem["y"]
|
||||||
|
|
||||||
|
shrek_font = self.get_font(elem.get("font", "default"), elem.get("size", self.default_font_size))
|
||||||
|
|
||||||
|
with open(elem["path"], "r") as fp:
|
||||||
|
lyrics = list(map(str.strip, fp))
|
||||||
|
|
||||||
|
def update_shrek():
|
||||||
|
with Image.new("LA", (325, 10), (0, 255)) as im:
|
||||||
|
draw = ImageDraw.Draw(im)
|
||||||
|
draw.rectangle((0, 0, 325, 10), fill=(0, 255))
|
||||||
|
now = datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
line = lyrics[
|
||||||
|
int(now.timestamp() / elem["spf"]) % len(lyrics)
|
||||||
|
]
|
||||||
|
draw.text((2, -1), line, font=shrek_font, fill=(255, 255))
|
||||||
|
self.put_image(shrek_x, shrek_y, im)
|
||||||
|
|
||||||
|
self.animation_functions.append(update_shrek)
|
||||||
|
else:
|
||||||
|
raise TypeError(f"invalid element: {elem}")
|
||||||
|
print("ADD", elem)
|
||||||
|
|
||||||
async def listener(self):
|
async def listener(self):
|
||||||
sio = socketio.AsyncClient()
|
sio = socketio.AsyncClient()
|
||||||
sio.on("connect", self.on_connect)
|
sio.on("connect", self.on_connect)
|
||||||
|
@ -47,17 +193,8 @@ class Manager:
|
||||||
await sio.wait()
|
await sio.wait()
|
||||||
|
|
||||||
def update_shmem(self, state: bytes):
|
def update_shmem(self, state: bytes):
|
||||||
if not self.shmem:
|
assert self.shmem is not None
|
||||||
raise ValueError("shared memory is not initialized yet")
|
self.shmem.buf[OFFSET_STATE:OFFSET_STATE + 125000] = state
|
||||||
|
|
||||||
buffer = bytearray(bytes(1000000))
|
|
||||||
for i in range(1000000):
|
|
||||||
byte, bit = divmod(i, 8)
|
|
||||||
if state[byte] & (0x80 >> bit):
|
|
||||||
buffer[i] |= PixelMask.CHECKED
|
|
||||||
else:
|
|
||||||
buffer[i] &= ~PixelMask.CHECKED
|
|
||||||
self.shmem.buf[:] = buffer
|
|
||||||
|
|
||||||
async def on_connect(self):
|
async def on_connect(self):
|
||||||
async with aiohttp.ClientSession() as http:
|
async with aiohttp.ClientSession() as http:
|
||||||
|
@ -68,15 +205,13 @@ class Manager:
|
||||||
self.last_update = data["timestamp"]
|
self.last_update = data["timestamp"]
|
||||||
|
|
||||||
async def on_full_state(self, data):
|
async def on_full_state(self, data):
|
||||||
if not self.shmem:
|
assert self.shmem is not None
|
||||||
raise ValueError("shared memory is not initialized yet")
|
|
||||||
buffer = b64decode(data["full_state"].encode() + b"=")
|
buffer = b64decode(data["full_state"].encode() + b"=")
|
||||||
self.update_shmem(buffer)
|
self.update_shmem(buffer)
|
||||||
self.last_update = data["timestamp"]
|
self.last_update = data["timestamp"]
|
||||||
|
|
||||||
async def on_batched_bit_toggles(self, data):
|
async def on_batched_bit_toggles(self, data):
|
||||||
if not self.shmem:
|
assert self.shmem is not None
|
||||||
raise ValueError("shared memory is not initialized yet")
|
|
||||||
bits_on, bits_off, timestamp = data
|
bits_on, bits_off, timestamp = data
|
||||||
if timestamp < self.last_update:
|
if timestamp < self.last_update:
|
||||||
print("old update, ignoring")
|
print("old update, ignoring")
|
||||||
|
@ -86,43 +221,76 @@ class Manager:
|
||||||
self.bits_toggled_on += len(bits_on)
|
self.bits_toggled_on += len(bits_on)
|
||||||
self.bits_toggled_off = len(bits_off)
|
self.bits_toggled_off = len(bits_off)
|
||||||
|
|
||||||
for bit in bits_on:
|
for ndx in bits_on:
|
||||||
self.shmem.buf[bit] |= PixelMask.CHECKED
|
byte, bit = divmod(ndx, 8)
|
||||||
for bit in bits_off:
|
self.shmem.buf[OFFSET_STATE + byte] |= (0x80 >> bit)
|
||||||
self.shmem.buf[bit] &= ~PixelMask.CHECKED
|
for ndx in bits_off:
|
||||||
|
byte, bit = divmod(ndx, 8)
|
||||||
|
self.shmem.buf[OFFSET_STATE + byte] &= 0xFF ^ (0x80 >> bit)
|
||||||
|
|
||||||
since_last_printout = time.time() - self.last_printout
|
since_last_printout = time.time() - self.last_printout
|
||||||
if since_last_printout >= 5:
|
if since_last_printout >= 5:
|
||||||
self.last_printout = time.time()
|
self.last_printout = time.time()
|
||||||
print()
|
print()
|
||||||
print(f"Toggled on: {self.bits_toggled_on / since_last_printout}/s")
|
print(
|
||||||
print(f"Toggled off: {self.bits_toggled_off / since_last_printout}/s")
|
f"Toggled on: {self.bits_toggled_on / since_last_printout}/s"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f"Toggled off: {self.bits_toggled_off / since_last_printout}/s"
|
||||||
|
)
|
||||||
self.bits_toggled_on = self.bits_toggled_off = 0
|
self.bits_toggled_on = self.bits_toggled_off = 0
|
||||||
|
|
||||||
def on_sigusr1(self, signum, frame):
|
def on_sigusr1(self, signum, frame):
|
||||||
if not self.shmem:
|
assert self.shmem is not None
|
||||||
raise ValueError("shared memory is not initialized yet")
|
|
||||||
print("Caught SIGUSR1, dumping state")
|
print("Caught SIGUSR1, dumping state")
|
||||||
buf = bytes(self.shmem.buf[:])
|
buf = bytes(self.shmem.buf[:])
|
||||||
with Image.new("RGB", (1000, 1000), 0) as im:
|
with Image.new("RGB", (1000, 1000), 0) as im:
|
||||||
for i in range(1000000):
|
for i in range(1000000):
|
||||||
y, x = divmod(i, 1000)
|
y, x = divmod(i, 1000)
|
||||||
im.putpixel((x, y), (
|
byte, bit = divmod(i, 8)
|
||||||
255 if buf[i] & PixelMask.FILL else 0,
|
im.putpixel(
|
||||||
255 if buf[i] & PixelMask.MASK else 0,
|
(x, y),
|
||||||
255 if buf[i] & PixelMask.CHECKED else 0
|
(
|
||||||
))
|
255 if (buf[OFFSET_MASK + byte] << bit) & 0x80 else 0,
|
||||||
|
255 if (buf[OFFSET_CANVAS + byte] << bit) & 0x80 else 0,
|
||||||
|
255 if (buf[OFFSET_STATE + byte] << bit) & 0x80 else 0,
|
||||||
|
),
|
||||||
|
)
|
||||||
im.save("state.png")
|
im.save("state.png")
|
||||||
|
with Image.new("L", (1000, 1000), 0) as im:
|
||||||
|
for i in range(1000000):
|
||||||
|
y, x = divmod(i, 1000)
|
||||||
|
byte, bit = divmod(i, 8)
|
||||||
|
im.putpixel(
|
||||||
|
(x, y),
|
||||||
|
255 if (buf[OFFSET_AVOID + byte] << bit) & 0x80 else 0,
|
||||||
|
)
|
||||||
|
im.save("avoid.png")
|
||||||
|
print("Dump done")
|
||||||
|
|
||||||
|
|
||||||
|
def on_sigusr2(self, signum, frame):
|
||||||
|
print("Reloading config")
|
||||||
|
self.reload_config()
|
||||||
|
|
||||||
async def animator(self):
|
async def animator(self):
|
||||||
while True:
|
while True:
|
||||||
|
for animation in self.animations:
|
||||||
|
frame = int(time.time() / animation.spf) % len(
|
||||||
|
animation.frames
|
||||||
|
)
|
||||||
|
self.put_image(
|
||||||
|
animation.x, animation.y, animation.frames[frame]
|
||||||
|
)
|
||||||
|
for func in self.animation_functions:
|
||||||
|
func()
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
def add_avoid_range(self, rng: range):
|
def add_avoid_range(self, rng: range):
|
||||||
if not self.shmem:
|
assert self.shmem is not None
|
||||||
raise ValueError("shared memory is not initialized yet")
|
|
||||||
for ndx in rng:
|
for ndx in rng:
|
||||||
self.shmem.buf[ndx] |= PixelMask.AVOID
|
byte, bit = divmod(ndx, 8)
|
||||||
|
self.shmem.buf[OFFSET_AVOID + byte] |= (0x80 >> bit)
|
||||||
|
|
||||||
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int):
|
def add_avoid_rect(self, sx: int, sy: int, w: int, h: int):
|
||||||
for y in range(sy, sy + h):
|
for y in range(sy, sy + h):
|
||||||
|
@ -130,10 +298,10 @@ class Manager:
|
||||||
self.add_avoid_range(range(sx + ox, sx + w + ox))
|
self.add_avoid_range(range(sx + ox, sx + w + ox))
|
||||||
|
|
||||||
def add_avoid_index(self, index: int):
|
def add_avoid_index(self, index: int):
|
||||||
if not self.shmem:
|
assert self.shmem is not None
|
||||||
raise ValueError("shared memory is not initialized yet")
|
|
||||||
assert 0 <= index < 1000000
|
assert 0 <= index < 1000000
|
||||||
self.shmem.buf[index] |= PixelMask.AVOID
|
byte, bit = divmod(index, 8)
|
||||||
|
self.shmem.buf[OFFSET_AVOID + byte] |= (0x80 >> bit)
|
||||||
|
|
||||||
def add_avoid_image(self, im: Image.Image):
|
def add_avoid_image(self, im: Image.Image):
|
||||||
assert im.width == 1000
|
assert im.width == 1000
|
||||||
|
@ -142,11 +310,110 @@ class Manager:
|
||||||
y, x = divmod(i, 1000)
|
y, x = divmod(i, 1000)
|
||||||
la = im.getpixel((x, y))
|
la = im.getpixel((x, y))
|
||||||
assert isinstance(la, (tuple, list)) and len(la) == 2
|
assert isinstance(la, (tuple, list)) and len(la) == 2
|
||||||
if la[1]:
|
if la[1] > 128:
|
||||||
self.add_avoid_index(i)
|
self.add_avoid_index(i)
|
||||||
|
|
||||||
|
def set_index(self, index: int, value: bool):
|
||||||
|
assert 0 <= index <= 1000000
|
||||||
|
assert self.shmem is not None
|
||||||
|
byte, bit = divmod(index, 8)
|
||||||
|
self.shmem.buf[OFFSET_MASK + byte] |= (0x80 >> bit)
|
||||||
|
if value:
|
||||||
|
self.shmem.buf[OFFSET_CANVAS + byte] |= (0x80 >> bit)
|
||||||
|
else:
|
||||||
|
self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit)
|
||||||
|
|
||||||
|
def clear_index(self, index: int):
|
||||||
|
assert 0 <= index <= 1000000
|
||||||
|
assert self.shmem is not None
|
||||||
|
byte, bit = divmod(index, 8)
|
||||||
|
self.shmem.buf[OFFSET_MASK + byte] &= 0xFF ^ (0x80 >> bit)
|
||||||
|
self.shmem.buf[OFFSET_CANVAS + byte] &= 0xFF ^ (0x80 >> bit)
|
||||||
|
|
||||||
|
def put_image(self, ox: int, oy: int, im: Image.Image):
|
||||||
|
for y in range(im.height):
|
||||||
|
for x in range(im.width):
|
||||||
|
la: tuple[int, int] = im.getpixel((x, y)) # type: ignore
|
||||||
|
if la[1]:
|
||||||
|
self.set_index(x + ox + (y + oy) * 1000, la[0] > 0)
|
||||||
|
|
||||||
|
def put_text(
|
||||||
|
self,
|
||||||
|
x: int,
|
||||||
|
y: int,
|
||||||
|
text: str,
|
||||||
|
font_name: str = "default",
|
||||||
|
font_size: int = 8,
|
||||||
|
negative: bool = False,
|
||||||
|
outline: int = 2,
|
||||||
|
):
|
||||||
|
self.put_image(
|
||||||
|
x,
|
||||||
|
y,
|
||||||
|
self.make_text_image(
|
||||||
|
text, font_name, font_size, negative, outline
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def make_text_image(
|
||||||
|
self,
|
||||||
|
text: str,
|
||||||
|
font_name: str = "default",
|
||||||
|
font_size: int = 8,
|
||||||
|
negative: bool = False,
|
||||||
|
outline: int = 2,
|
||||||
|
):
|
||||||
|
font = self.get_font(font_name, font_size)
|
||||||
|
left, top, right, bottom = font.getbbox(text, anchor="lt")
|
||||||
|
with Image.new(
|
||||||
|
"RGBA",
|
||||||
|
(int(right - left) + outline * 2, int(bottom - top) + outline * 2),
|
||||||
|
0,
|
||||||
|
) as im:
|
||||||
|
draw = ImageDraw.Draw(im)
|
||||||
|
draw.rectangle((0, 0, im.width, im.height), (0, 0, 0, 0))
|
||||||
|
draw.text(
|
||||||
|
(left + outline, top + outline),
|
||||||
|
text,
|
||||||
|
font=font,
|
||||||
|
fill=(255, 255, 255, 255),
|
||||||
|
anchor="lt",
|
||||||
|
)
|
||||||
|
|
||||||
|
alpha = im.convert("L").filter(
|
||||||
|
ImageFilter.MaxFilter(outline * 2 + 1)
|
||||||
|
)
|
||||||
|
im.putalpha(alpha)
|
||||||
|
if negative:
|
||||||
|
im = ImageChops.invert(im)
|
||||||
|
im.putalpha(alpha) # ty PIL
|
||||||
|
return im.convert("LA")
|
||||||
|
|
||||||
|
def put_animation(
|
||||||
|
self, x: int, y: int, frames: list[Image.Image], spf: float = 10
|
||||||
|
):
|
||||||
|
self.animations.append(Animation(x, y, frames, spf))
|
||||||
|
|
||||||
|
def get_font(
|
||||||
|
self, font_name: str, font_size: int
|
||||||
|
) -> ImageFont.FreeTypeFont:
|
||||||
|
if font := self.fonts.get((font_name, font_size)):
|
||||||
|
return font
|
||||||
|
print("FONT", font_name, font_size)
|
||||||
|
font = ImageFont.truetype(font_name, font_size)
|
||||||
|
self.fonts[font_name, font_size] = font
|
||||||
|
return font
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
self.shmem = SharedMemory(self.shmem_name, True, 1000000)
|
print("Acquiring shared memory")
|
||||||
|
self.shmem = SharedMemory(self.shmem_name, True, 500000)
|
||||||
|
print("Loading config...")
|
||||||
|
try:
|
||||||
|
self.reload_config()
|
||||||
|
except Exception:
|
||||||
|
self.shmem.close()
|
||||||
|
self.shmem.unlink()
|
||||||
|
raise
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def __aexit__(self, a, b, c):
|
async def __aexit__(self, a, b, c):
|
||||||
|
@ -158,9 +425,15 @@ class Manager:
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
print(f"PID: {os.getpid()}")
|
print(f"PID: {os.getpid()}")
|
||||||
async with Manager() as mgr:
|
async with Manager("./settings.json") as mgr:
|
||||||
signal.signal(signal.SIGUSR1, mgr.on_sigusr1)
|
signal.signal(signal.SIGUSR1, mgr.on_sigusr1)
|
||||||
await mgr.listener()
|
signal.signal(signal.SIGUSR2, mgr.on_sigusr2)
|
||||||
|
|
||||||
|
print("Listening...")
|
||||||
|
await asyncio.gather(
|
||||||
|
mgr.listener(),
|
||||||
|
mgr.animator()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
{
|
||||||
|
"default_font": {
|
||||||
|
"path": "../ic8x8u.ttf",
|
||||||
|
"size": 8
|
||||||
|
},
|
||||||
|
"avoid": [
|
||||||
|
{
|
||||||
|
"type": "rect",
|
||||||
|
"x": 0,
|
||||||
|
"y": 0,
|
||||||
|
"w": 1000,
|
||||||
|
"h": 10,
|
||||||
|
"description": "Not ruining fun for normal users"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "rect",
|
||||||
|
"x": 0,
|
||||||
|
"y": 20,
|
||||||
|
"w": 1000,
|
||||||
|
"h": 80,
|
||||||
|
"description": "The VOID"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "rect",
|
||||||
|
"x": 0,
|
||||||
|
"y": 750,
|
||||||
|
"w": 123,
|
||||||
|
"h": 123,
|
||||||
|
"description": "catgirls.win QR code"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "range",
|
||||||
|
"start": 900000,
|
||||||
|
"stop": 1000000,
|
||||||
|
"description": "catgirls.win text (both b64 and plain)"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "image",
|
||||||
|
"path": "../avoid_masks/noita.png",
|
||||||
|
"description": "Noita logo by Cr4xy"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"type": "time",
|
||||||
|
"x": 75,
|
||||||
|
"y": 100,
|
||||||
|
"format": "And time is: %Y-%m-%d %H:%M:%S UTC",
|
||||||
|
"spf": 20,
|
||||||
|
"font": "/usr/share/fonts/TTF/TerminusTTF.ttf",
|
||||||
|
"size": 12
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "image",
|
||||||
|
"path": "../pictures/casey.png",
|
||||||
|
"x": 0,
|
||||||
|
"y": 128
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "animation",
|
||||||
|
"path": "../pictures/neko.gif",
|
||||||
|
"spf": 30,
|
||||||
|
"x": 625,
|
||||||
|
"y": 496
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "image",
|
||||||
|
"path": "../pictures/casey_qr.png",
|
||||||
|
"x": 10,
|
||||||
|
"y": 240
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "image",
|
||||||
|
"path": "../pictures/hueh.png",
|
||||||
|
"x": 490,
|
||||||
|
"y": 810
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "text_anim",
|
||||||
|
"font": "/usr/share/fonts/TTF/TerminusTTF.ttf",
|
||||||
|
"size": 18,
|
||||||
|
"lines": [
|
||||||
|
"крипер2004",
|
||||||
|
"Крипер2004",
|
||||||
|
"КРипер2004",
|
||||||
|
"КРИпер2004",
|
||||||
|
"КРИПер2004",
|
||||||
|
"КРИПЕр2004",
|
||||||
|
"КРИПЕР2004",
|
||||||
|
"кРИПЕР2004",
|
||||||
|
"крИПЕР2004",
|
||||||
|
"криПЕР2004",
|
||||||
|
"крипЕР2004",
|
||||||
|
"крипеР2004",
|
||||||
|
"крипер2004"
|
||||||
|
],
|
||||||
|
"spf": 30,
|
||||||
|
"x": 3,
|
||||||
|
"y": 872
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "shrek",
|
||||||
|
"font": "../creep2.ttf",
|
||||||
|
"size": 11,
|
||||||
|
"path": "../funnies/shrek.txt",
|
||||||
|
"x": 490,
|
||||||
|
"y": 700,
|
||||||
|
"spf": 120
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
Loading…
Reference in New Issue