Merge branch 'master' into unsafe

This commit is contained in:
Casey 2022-09-14 20:35:35 +03:00
commit 03cfa75bc7
Signed by: hkc
GPG Key ID: F0F6CFE11CDB0960
8 changed files with 152 additions and 119 deletions

View File

@ -43,16 +43,20 @@ token = 12345:blahblah
# username, if it is public
chat = @username
# Should we show link to post as a link after post content?
show-post-link = yes
# Should we show link to original author before post content?
show-boost-from = yes
# Should we make posts silent?
# https://core.telegram.org/bots/api#sendmessage `disable_notification`
silent = true
# Jinja2 template string for the post. Works only in Telegram.
# This is the default template, not specifying that property at all will result
# in this string (probably)
# Pay attention to 4 spaces in the empty line, I think it's required
template = {% if status.reblog %}Boost from <a href="{{status.reblog.account.url}}">{{status.reblog.account.name}}</a>
{% endif %}{% if status.reblog_or_status.spoiler_text %}{{status.reblog_or_status.spoiler_text}}
<tg-spoiler>{% endif %}{{ status.reblog_or_status.content_flathtml }}{% if status.reblog_or_status.spoiler_text %}</tg-spoiler>{% endif %}
<a href="{{status.link}}">Link to post</a>
# Discord integration
[module/discord]
type = discord
@ -60,7 +64,6 @@ type = discord
# Webhook URL with the `?wait=true`
webhook = url
;# Boost filter. Only boosts will be matched by that one
;[filter/boost]
;type = boost

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
from asyncio import run
from configparser import ConfigParser
from configparser import ConfigParser, ExtendedInterpolation
from mastoposter import execute_integrations, load_integrations_from
from mastoposter.integrations import FilteredIntegration
from mastoposter.sources import websocket_source
@ -25,7 +25,7 @@ async def listen(
continue
# TODO: add option/filter to handle that
if status.visibility in ("direct", "private"):
if status.visibility in ("direct",):
continue
# TODO: find a better way to handle threads
@ -39,7 +39,7 @@ async def listen(
def main(config_path: str):
conf = ConfigParser()
conf = ConfigParser(interpolation=ExtendedInterpolation())
conf.read(config_path)
for section in conf.sections():

View File

@ -1,6 +1,5 @@
from configparser import SectionProxy
from typing import List, Optional
from bs4 import BeautifulSoup, PageElement, Tag
from httpx import AsyncClient
from zlib import crc32
from mastoposter.integrations.base import BaseIntegration
@ -16,38 +15,6 @@ class DiscordIntegration(BaseIntegration):
def __init__(self, section: SectionProxy):
self.webhook = section.get("webhook", "")
@staticmethod
def md_escape(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("*", "\\*")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("_", "\\_")
.replace("~", "\\~")
.replace("|", "\\|")
.replace("`", "\\`")
)
@classmethod
def node_to_text(cls, el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "[%s](%s)" % (
cls.md_escape(
str.join("", map(cls.node_to_text, el.children))
),
el.attrs["href"],
)
elif el.name == "p":
return (
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
)
elif el.name == "br":
return "\n"
return str.join("", map(cls.node_to_text, el.children))
return cls.md_escape(str(el))
async def execute_webhook(
self,
content: Optional[str] = None,
@ -75,9 +42,7 @@ class DiscordIntegration(BaseIntegration):
source = status.reblog or status
embeds: List[DiscordEmbed] = []
text = self.node_to_text(
BeautifulSoup(source.content, features="lxml")
)
text = source.content_markdown
if source.spoiler_text:
text = f"{source.spoiler_text}\n||{text}||"

View File

@ -1,11 +1,11 @@
from configparser import SectionProxy
from dataclasses import dataclass
from html import escape
from typing import Any, List, Mapping, Optional
from bs4 import BeautifulSoup, Tag, PageElement
from httpx import AsyncClient
from jinja2 import Template
from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Attachment, Poll, Status
from emoji import emojize
@dataclass
@ -25,32 +25,45 @@ class TGResponse:
)
class TelegramIntegration(BaseIntegration):
API_URL: str = "https://api.telegram.org/bot{}/{}"
MEDIA_COMPATIBILITY: Mapping[str, set] = {
API_URL: str = "https://api.telegram.org/bot{}/{}"
MEDIA_COMPATIBILITY: Mapping[str, set] = {
"image": {"image", "video"},
"video": {"image", "video"},
"gifv": {"gifv"},
"audio": {"audio"},
"unknown": {"unknown"},
}
MEDIA_MAPPING: Mapping[str, str] = {
}
MEDIA_MAPPING: Mapping[str, str] = {
"image": "photo",
"video": "video",
"gifv": "animation",
"audio": "audio",
"unknown": "document",
}
}
DEFAULT_TEMPLATE: str = """\
{% if status.reblog %}\
Boost from <a href="{{status.reblog.account.url}}">\
{{status.reblog.account.name}}</a>\
{% endif %}\
{% if status.reblog_or_status.spoiler_text %}\
{{status.reblog_or_status.spoiler_text}}
<tg-spoiler>{% endif %}{{ status.reblog_or_status.content_flathtml }}\
{% if status.reblog_or_status.spoiler_text %}</tg-spoiler>{% endif %}
<a href="{{status.link}}">Link to post</a>"""
class TelegramIntegration(BaseIntegration):
def __init__(self, sect: SectionProxy):
self.token = sect.get("token", "")
self.chat_id = sect.get("chat", "")
self.show_post_link = sect.getboolean("show_post_link", True)
self.show_boost_from = sect.getboolean("show_boost_from", True)
self.silent = sect.getboolean("silent", True)
self.template: Template = Template(
emojize(sect.get("template", DEFAULT_TEMPLATE))
)
async def _tg_request(self, method: str, **kwargs) -> TGResponse:
url = self.API_URL.format(self.token, method)
url = API_URL.format(self.token, method)
async with AsyncClient() as client:
return TGResponse.from_dict(
(await client.post(url, json=kwargs)).json(), kwargs
@ -68,17 +81,17 @@ class TelegramIntegration(BaseIntegration):
async def _post_media(self, text: str, media: Attachment) -> TGResponse:
# Just to be safe
if media.type not in self.MEDIA_MAPPING:
if media.type not in MEDIA_MAPPING:
return await self._post_plaintext(text)
return await self._tg_request(
"send%s" % self.MEDIA_MAPPING[media.type].title(),
"send%s" % MEDIA_MAPPING[media.type].title(),
parse_mode="HTML",
disable_notification=self.silent,
disable_web_page_preview=True,
chat_id=self.chat_id,
caption=text,
**{self.MEDIA_MAPPING[media.type]: media.url},
**{MEDIA_MAPPING[media.type]: media.url},
)
async def _post_mediagroup(
@ -89,12 +102,12 @@ class TelegramIntegration(BaseIntegration):
for attachment in media:
if attachment.type not in allowed_medias:
continue
if attachment.type not in self.MEDIA_COMPATIBILITY:
if attachment.type not in MEDIA_COMPATIBILITY:
continue
allowed_medias &= self.MEDIA_COMPATIBILITY[attachment.type]
allowed_medias &= MEDIA_COMPATIBILITY[attachment.type]
media_list.append(
{
"type": self.MEDIA_MAPPING[attachment.type],
"type": MEDIA_MAPPING[attachment.type],
"media": attachment.url,
}
)
@ -128,46 +141,10 @@ class TelegramIntegration(BaseIntegration):
options=[opt.title for opt in poll.options],
)
@classmethod
def node_to_text(cls, el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return '<a href="{}">{}</a>'.format(
escape(el.attrs["href"]),
str.join("", map(cls.node_to_text, el.children)),
)
elif el.name == "p":
return (
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
)
elif el.name == "br":
return "\n"
return str.join("", map(cls.node_to_text, el.children))
return escape(str(el))
async def __call__(self, status: Status) -> Optional[str]:
source = status.reblog or status
text = self.node_to_text(
BeautifulSoup(source.content, features="lxml")
)
text = text.rstrip()
if source.spoiler_text:
text = "Spoiler: {cw}\n<tg-spoiler>{text}</tg-spoiler>".format(
cw=source.spoiler_text, text=text
)
if self.show_post_link:
text += '\n\n<a href="%s">Link to post</a>' % status.link
if status.reblog and self.show_boost_from:
text = (
'Boosted post from <a href="{}">{}</a>\n'.format(
source.account.url,
source.account.display_name or source.account.username,
)
+ text
)
text = self.template.render({"status": status})
ids = []
@ -205,12 +182,6 @@ class TelegramIntegration(BaseIntegration):
return (
"<TelegramIntegration "
"chat_id={chat!r} "
"show_post_link={show_post_link!r} "
"show_boost_from={show_boost_from!r} "
"template={template!r} "
"silent={silent!r}>"
).format(
chat=self.chat_id,
show_post_link=self.show_post_link,
show_boost_from=self.show_boost_from,
silent=self.silent,
)
).format(chat=self.chat_id, silent=self.silent, template=self.template)

View File

@ -1,3 +1,4 @@
from asyncio import exceptions
from json import loads
from typing import AsyncGenerator
from urllib.parse import urlencode
@ -21,6 +22,6 @@ async def websocket_source(
raise Exception(event["error"])
if event["event"] == "update":
yield Status.from_dict(loads(event["payload"]))
except WebSocketException:
except (WebSocketException, TimeoutError, exceptions.TimeoutError):
if not reconnect:
raise

View File

@ -2,6 +2,10 @@ from dataclasses import dataclass, field, fields
from datetime import datetime
from typing import Any, Callable, Optional, List, Literal, TypeVar
from bs4 import BeautifulSoup
from mastoposter.utils import node_to_html, node_to_markdown, node_to_plaintext
def _date(val: str) -> datetime:
return datetime.fromisoformat(val.rstrip("Z"))
@ -100,6 +104,10 @@ class Account:
bot=bool(data.get("bot")),
)
@property
def name(self) -> str:
return self.display_name or self.username
@dataclass
class AttachmentMetaImage:
@ -304,6 +312,28 @@ class Status:
tags=[Tag.from_dict(m) for m in data.get("tags", [])],
)
@property
def reblog_or_status(self) -> "Status":
return self.reblog or self
@property
def link(self) -> str:
return self.account.url + "/" + str(self.id)
@property
def content_flathtml(self) -> str:
return node_to_html(
BeautifulSoup(self.content, features="lxml")
).rstrip()
@property
def content_markdown(self) -> str:
return node_to_markdown(
BeautifulSoup(self.content, features="lxml")
).rstrip()
@property
def content_plaintext(self) -> str:
return node_to_plaintext(
BeautifulSoup(self.content, features="lxml")
).rstrip()

60
mastoposter/utils.py Normal file
View File

@ -0,0 +1,60 @@
from html import escape
from bs4.element import Tag, PageElement
def md_escape(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("*", "\\*")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("_", "\\_")
.replace("~", "\\~")
.replace("|", "\\|")
.replace("`", "\\`")
)
def node_to_html(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return '<a href="{}">{}</a>'.format(
escape(el.attrs["href"]),
str.join("", map(node_to_html, el.children)),
)
elif el.name == "p":
return str.join("", map(node_to_html, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_html, el.children))
return escape(str(el))
def node_to_markdown(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "[%s](%s)" % (
md_escape(str.join("", map(node_to_markdown, el.children))),
el.attrs["href"],
)
elif el.name == "p":
return str.join("", map(node_to_markdown, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_markdown, el.children))
return md_escape(str(el))
def node_to_plaintext(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "%s (%s)" % (
str.join("", map(node_to_plaintext, el.children)),
el.attrs["href"],
)
elif el.name == "p":
return str.join("", map(node_to_plaintext, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_plaintext, el.children))
return str(el)

View File

@ -2,11 +2,14 @@ anyio==3.6.1
beautifulsoup4==4.11.1
bs4==0.0.1
certifi==2022.6.15
emoji==2.0.0
h11==0.12.0
httpcore==0.15.0
httpx==0.23.0
idna==3.3
Jinja2==3.1.2
lxml==4.9.1
MarkupSafe==2.1.1
rfc3986==1.5.0
sniffio==1.2.0
soupsieve==2.3.2.post1