40 lines
1.5 KiB
Python
40 lines
1.5 KiB
Python
|
from asyncio.protocols import Protocol
|
||
|
from asyncio.queues import Queue
|
||
|
from asyncio import AbstractEventLoop, get_event_loop
|
||
|
from asyncio.streams import StreamReader, StreamWriter, open_connection
|
||
|
from typing import Optional
|
||
|
|
||
|
from bta_proxy.dpi import inspect_client, inspect_server
|
||
|
|
||
|
|
||
|
class BTAProxy:
|
||
|
def __init__(self, host: str, port: int, loop: Optional[AbstractEventLoop] = None):
|
||
|
self.host = host
|
||
|
self.port = port
|
||
|
self.loop = loop or get_event_loop()
|
||
|
|
||
|
@staticmethod
|
||
|
async def pipe(reader: StreamReader, writer: StreamWriter, queue: Queue):
|
||
|
try:
|
||
|
while not reader.at_eof():
|
||
|
packet = await reader.read(0x400000)
|
||
|
queue.put_nowait(packet)
|
||
|
writer.write(packet)
|
||
|
finally:
|
||
|
writer.close()
|
||
|
|
||
|
async def handle_client(self, cli_reader: StreamReader, cli_writer: StreamWriter):
|
||
|
try:
|
||
|
peername = cli_writer.get_extra_info("peername")
|
||
|
srv_reader, srv_writer = await open_connection(self.host, self.port)
|
||
|
|
||
|
queue_srv: Queue = Queue()
|
||
|
queue_cli: Queue = Queue()
|
||
|
|
||
|
self.loop.create_task(inspect_client(queue_cli, peername))
|
||
|
self.loop.create_task(inspect_server(queue_srv, peername))
|
||
|
self.loop.create_task(self.pipe(cli_reader, srv_writer, queue_cli))
|
||
|
self.loop.create_task(self.pipe(srv_reader, cli_writer, queue_srv))
|
||
|
except Exception as e:
|
||
|
print(f"oopsie whoopsie {e}")
|