bta-proxy/bta_proxy/datainputstream.py

152 lines
4.5 KiB
Python
Raw Normal View History

2023-08-25 21:49:10 +03:00
from asyncio.queues import Queue
2023-08-24 15:29:57 +03:00
import struct
2023-08-25 21:49:10 +03:00
class AsyncDataInputStream:
def __init__(self, queue: Queue):
self._queue = queue
2023-08-26 01:12:19 +03:00
self._buffer = b''
2023-08-25 21:49:10 +03:00
self._last = b''
2023-08-24 15:29:57 +03:00
2023-08-26 01:12:19 +03:00
def read_rest(self):
out = self._buffer
self._buffer = b''
return out
2023-08-25 21:49:10 +03:00
async def read_bytes(self, n: int) -> bytes:
2023-08-26 01:12:19 +03:00
if len(self._buffer) < n:
self._last = (await self._queue.get())
self._buffer += self._last
out, self._buffer = self._buffer[:n], self._buffer[n:]
2023-08-25 21:49:10 +03:00
return out
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read(self) -> int:
return (await self.read_bytes(1))[0]
2023-08-24 15:29:57 +03:00
2023-08-26 01:12:19 +03:00
read_ubyte = read
async def read_byte(self) -> int:
return struct.unpack('b', await self.read_bytes(1))[0]
2023-08-25 21:49:10 +03:00
async def read_boolean(self) -> bool:
return (await self.read()) != 0
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_short(self) -> int:
return struct.unpack('>h', await self.read_bytes(2))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_ushort(self) -> int:
return struct.unpack('>H', await self.read_bytes(2))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_int(self) -> int:
return struct.unpack('>i', await self.read_bytes(4))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_uint(self) -> int:
return struct.unpack('>I', await self.read_bytes(4))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_long(self) -> int:
return struct.unpack('>q', await self.read_bytes(8))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_ulong(self) -> int:
return struct.unpack('>Q', await self.read_bytes(8))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_float(self) -> float:
return struct.unpack('>f', await self.read_bytes(4))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_double(self) -> float:
return struct.unpack('>d', await self.read_bytes(8))[0]
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_char(self) -> str:
return chr(await self.read_ushort())
2023-08-24 15:29:57 +03:00
2023-08-25 21:49:10 +03:00
async def read_varint(self, bits: int = 32) -> int:
2023-08-24 15:29:57 +03:00
value: int = 0
position: int = 0
while True:
2023-08-25 21:49:10 +03:00
byte = await self.read()
2023-08-24 15:29:57 +03:00
value |= (byte & 0x7F) << position
if ((byte & 0x80) == 0):
break
position += 7
if position >= bits:
raise ValueError('varint too big')
return value
2023-08-25 21:49:10 +03:00
async def read_string(self) -> str:
2023-08-26 01:12:19 +03:00
last = self._last
2023-08-25 21:49:10 +03:00
size = await self.read_short()
2023-08-26 01:12:19 +03:00
try:
return (await self.read_bytes(size)).decode('utf-8')
except Exception as e:
raise ValueError(f'failed reading string of size {size} in {last}') from e
2023-08-24 15:29:57 +03:00
class SyncDataInputStream:
def __init__(self, buffer: bytes):
self._buffer = buffer
self._cursor = 0
def read_bytes(self, n: int) -> bytes:
if self._cursor + n > len(self._buffer):
raise EOFError('stream overread')
blob = self._buffer[self._cursor : self._cursor + n]
self._cursor += n
return blob
def empty(self):
return self._cursor >= len(self._buffer) - 1
def end(self) -> bytes:
return self.read_bytes(len(self._buffer) - self._cursor)
def read(self) -> int:
if self._cursor >= len(self._buffer):
raise EOFError(f'stream overread in {self._buffer} at {self._cursor}')
self._cursor += 1
return self._buffer[self._cursor - 1]
def read_boolean(self) -> bool:
return self.read() != 0
def read_short(self) -> int:
return struct.unpack('>h', self.read_bytes(2))[0]
def read_ushort(self) -> int:
return struct.unpack('>H', self.read_bytes(2))[0]
def read_int(self) -> int:
return struct.unpack('>i', self.read_bytes(4))[0]
def read_uint(self) -> int:
return struct.unpack('>I', self.read_bytes(4))[0]
def read_long(self) -> int:
return struct.unpack('>q', self.read_bytes(8))[0]
def read_ulong(self) -> int:
return struct.unpack('>Q', self.read_bytes(8))[0]
def read_float(self) -> float:
return struct.unpack('>f', self.read_bytes(4))[0]
def read_double(self) -> float:
return struct.unpack('>d', self.read_bytes(8))[0]
def read_char(self) -> str:
return chr(self.read_ushort())
def read_varint(self, bits: int = 32) -> int:
value: int = 0
position: int = 0
while True:
byte = self.read()
value |= (byte & 0x7F) << position
if ((byte & 0x80) == 0):
break
position += 7
if position >= bits:
raise ValueError('varint too big')
return value
def read_string(self) -> str:
size = self.read_short()
return self.read_bytes(size).decode('utf-8')