onemillioncheckboxes/swarm/worker.py

101 lines
3.0 KiB
Python

import functools
from multiprocessing.shared_memory import SharedMemory
import asyncio
import random
from typing import NamedTuple, Optional
from aiohttp_socks import ProxyConnector
import socketio
import aiohttp
import signal
OFFSET_STATE = 0
OFFSET_AVOID = 125000
OFFSET_CANVAS = 250000
OFFSET_MASK = 375000
class PixelState(NamedTuple):
state: bool
avoid: bool
canvas: bool
mask: bool
class WorkerManager:
def __init__(self, shmem_name: str = "omcb-bot"):
self.shmem_name = shmem_name
self.base = "https://onemillioncheckboxes.com"
self.delay = 0.25
self.queue: asyncio.Queue[int] = asyncio.Queue(128)
async def queue_manager(self):
while True:
index = random.randint(0, 999999)
state = self.get_state(index)
if (
not state.avoid
and state.mask
and (state.state != state.canvas)
):
await self.queue.put(index)
async def writer(self, bot_index: int, proxy: Optional[str] = None):
connector = ProxyConnector.from_url(proxy) if proxy else None
async with aiohttp.ClientSession(connector=connector) as http:
sio = socketio.AsyncClient(http_session=http)
async def writer_itself():
print("Writer running")
while not sio.connected:
await asyncio.sleep(0.1)
print("Connected and running")
while sio.connected:
index = await self.queue.get()
state = self.get_state(index)
if (
not state.avoid
and state.mask
and (state.state != state.canvas)
):
byte, bit = divmod(index, 8)
print("toggle", index)
self.shmem.buf[OFFSET_STATE + byte] ^= 0x80 >> bit
await sio.emit("toggle_bit", {"index": index})
await asyncio.sleep(self.delay)
await asyncio.sleep(0.1)
print("Writer closed")
sio.on("connect", writer_itself)
await sio.connect(self.base.replace("http", "ws"))
await sio.wait()
def get_state(self, index: int) -> PixelState:
byte, bit = divmod(index, 8)
mask = 0x80 >> bit
return PixelState(
self.shmem.buf[OFFSET_STATE + byte] & mask != 0,
self.shmem.buf[OFFSET_AVOID + byte] & mask != 0,
self.shmem.buf[OFFSET_CANVAS + byte] & mask != 0,
self.shmem.buf[OFFSET_MASK + byte] & mask != 0,
)
async def __aenter__(self):
self.shmem = SharedMemory(self.shmem_name)
return self
async def __aexit__(self, a, b, c):
self.shmem.close()
async def main():
async with WorkerManager() as mgr:
await asyncio.gather(
mgr.queue_manager(), *[mgr.writer(i) for i in range(4)]
)
if __name__ == "__main__":
asyncio.run(main())