1
0
Fork 0

Compare commits

...

6 Commits

3 changed files with 99 additions and 17 deletions

View File

@ -42,6 +42,10 @@ list = 1
auto-reconnect = yes auto-reconnect = yes
reconnect-delay = 1.0 reconnect-delay = 1.0
# Change websocket connection opening timeout.
# It may be useful when initial server connection may take a long time.
connect-timeout = 60.0
# Number of retries in case request fails. Applies globally # Number of retries in case request fails. Applies globally
# Can be changed on per-module basis # Can be changed on per-module basis
http-retries = 5 http-retries = 5

View File

@ -37,7 +37,7 @@ from mastoposter import (
__description__, __description__,
) )
from mastoposter.integrations import FilteredIntegration from mastoposter.integrations import FilteredIntegration
from mastoposter.sources import websocket_source from mastoposter.sources import websocket_source, single_status_source
from mastoposter.types import Account, Status from mastoposter.types import Account, Status
from mastoposter.utils import normalize_config from mastoposter.utils import normalize_config
@ -110,6 +110,10 @@ def main():
"config", nargs="?", default=getenv("MASTOPOSTER_CONFIG_FILE") "config", nargs="?", default=getenv("MASTOPOSTER_CONFIG_FILE")
) )
parser.add_argument("-v", action="version", version=__version__) parser.add_argument("-v", action="version", version=__version__)
parser.add_argument(
"--single-status", nargs="?", type=str,
help="process single status and exit"
)
args = parser.parse_args() args = parser.parse_args()
if not args.config: if not args.config:
@ -142,21 +146,32 @@ def main():
"wss://{}/api/v1/streaming".format(conf["main"]["instance"]), "wss://{}/api/v1/streaming".format(conf["main"]["instance"]),
) )
source = websocket_source
source_params = dict(
url=url,
replies_to_other_accounts_should_not_be_skipped=conf[
"main"
].getboolean(
"replies_to_other_accounts_should_not_be_skipped", False
),
reconnect=conf["main"].getboolean("auto_reconnect", False),
reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0),
connect_timeout=conf["main"].getfloat("connect_timeout", 60.0),
list=conf["main"]["list"],
access_token=conf["main"]["token"],
)
if args.single_status:
source = single_status_source
source_params["status_url"] = args.single_status
source_params["retries"] = retries
run( run(
listen( listen(
websocket_source, source,
modules, modules,
user_id, user_id,
url=url, **source_params
replies_to_other_accounts_should_not_be_skipped=conf[
"main"
].getboolean(
"replies_to_other_accounts_should_not_be_skipped", False
),
reconnect=conf["main"].getboolean("auto_reconnect", False),
reconnect_delay=conf["main"].getfloat("reconnect_delay", 1.0),
list=conf["main"]["list"],
access_token=conf["main"]["token"],
) )
) )

View File

@ -16,23 +16,30 @@ GNU General Public License for more details.
from asyncio import exceptions, sleep from asyncio import exceptions, sleep
from json import loads from json import loads
from logging import getLogger from logging import getLogger
from typing import AsyncGenerator from typing import AsyncGenerator, List
from urllib.parse import urlencode from urllib.parse import urlencode, urlparse
from mastoposter.types import Status from mastoposter.types import Status
logger = getLogger("sources") logger = getLogger("sources")
async def websocket_source( async def websocket_source(
url: str, reconnect: bool = False, reconnect_delay: float = 1.0, **params url: str, reconnect: bool = False, reconnect_delay: float = 1.0,
connect_timeout: float = 60.0, **params
) -> AsyncGenerator[Status, None]: ) -> AsyncGenerator[Status, None]:
from websockets.client import connect from websockets.client import connect
from websockets.exceptions import WebSocketException from websockets.exceptions import WebSocketException
url = f"{url}?" + urlencode({"stream": "list", **params}) param_dict = {"stream": "list", **params}
public_param_dict = param_dict.copy()
public_param_dict["access_token"] = 'SCRUBBED'
public_url = f"{url}?" + urlencode(public_param_dict)
url = f"{url}?" + urlencode(param_dict)
while True: while True:
try: try:
async with connect(url) as ws: logger.info("attempting to connect to %s", public_url)
async with connect(url, open_timeout=connect_timeout) as ws:
logger.info("Connected to WebSocket")
while (msg := await ws.recv()) is not None: while (msg := await ws.recv()) is not None:
event = loads(msg) event = loads(msg)
logger.debug("data: %r", event) logger.debug("data: %r", event)
@ -59,3 +66,59 @@ async def websocket_source(
"but we're not done yet" "but we're not done yet"
) )
await sleep(reconnect_delay) await sleep(reconnect_delay)
async def single_status_source(
status_url: str, url: str = None, access_token: str = None, retries: int = 5, **kwargs
) -> AsyncGenerator[Status, None]:
# TODO: catch exceptions
from httpx import Client, HTTPTransport
user_authority = urlparse(url).netloc if url is not None else None
try:
status_url = f"https://{user_authority}/api/v1/statuses/{int(status_url)}"
except ValueError:
pass
parsed_status_url = urlparse(status_url)
with Client(transport=HTTPTransport(retries=retries)) as c:
status: Status
if parsed_status_url.path.startswith("/api/v1/statuses/"):
if parsed_status_url.netloc != user_authority:
access_token = None
# headers = {}
# if access_token is not None:
# headers['Authorization'] = 'Bearer ' + access_token
params = {}
if access_token is not None:
params['access_token'] = access_token
rq = c.get(
status_url,
params=params,
)
status = Status.from_dict(rq.json())
else:
search_instance = user_authority if user_authority is not None else parsed_status_url.netloc
if search_instance != user_authority:
access_token = None
params = {}
if access_token is not None:
params["access_token"] = access_token
params["q"] = status_url
rq = c.get(
f"https://{search_instance}/api/v2/search",
params=params,
)
statuses: List[Status] = rq.json().get("statuses", [])
if len(statuses) < 1:
logger.error("Instance %s hasn't found status %r",
search_instance, status_url)
return
status = Status.from_dict(statuses[0])
yield status