1
0
Fork 0

Custom formatting?

This commit is contained in:
Casey 2022-08-31 16:19:39 +03:00
parent 9d672dbbba
commit 881b1e1532
Signed by: hkc
GPG Key ID: F0F6CFE11CDB0960
5 changed files with 128 additions and 98 deletions

View File

@ -1,6 +1,5 @@
from configparser import SectionProxy from configparser import SectionProxy
from typing import List, Optional from typing import List, Optional
from bs4 import BeautifulSoup, PageElement, Tag
from httpx import AsyncClient from httpx import AsyncClient
from zlib import crc32 from zlib import crc32
from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.base import BaseIntegration
@ -16,38 +15,6 @@ class DiscordIntegration(BaseIntegration):
def __init__(self, section: SectionProxy): def __init__(self, section: SectionProxy):
self.webhook = section.get("webhook", "") self.webhook = section.get("webhook", "")
@staticmethod
def md_escape(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("*", "\\*")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("_", "\\_")
.replace("~", "\\~")
.replace("|", "\\|")
.replace("`", "\\`")
)
@classmethod
def node_to_text(cls, el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "[%s](%s)" % (
cls.md_escape(
str.join("", map(cls.node_to_text, el.children))
),
el.attrs["href"],
)
elif el.name == "p":
return (
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
)
elif el.name == "br":
return "\n"
return str.join("", map(cls.node_to_text, el.children))
return cls.md_escape(str(el))
async def execute_webhook( async def execute_webhook(
self, self,
content: Optional[str] = None, content: Optional[str] = None,
@ -75,9 +42,7 @@ class DiscordIntegration(BaseIntegration):
source = status.reblog or status source = status.reblog or status
embeds: List[DiscordEmbed] = [] embeds: List[DiscordEmbed] = []
text = self.node_to_text( text = source.content_markdown
BeautifulSoup(source.content, features="lxml")
)
if source.spoiler_text: if source.spoiler_text:
text = f"{source.spoiler_text}\n||{text}||" text = f"{source.spoiler_text}\n||{text}||"

View File

@ -1,11 +1,11 @@
from configparser import SectionProxy from configparser import SectionProxy
from dataclasses import dataclass from dataclasses import dataclass
from html import escape
from typing import Any, List, Mapping, Optional from typing import Any, List, Mapping, Optional
from bs4 import BeautifulSoup, Tag, PageElement
from httpx import AsyncClient from httpx import AsyncClient
from jinja2 import Template
from mastoposter.integrations.base import BaseIntegration from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Attachment, Poll, Status from mastoposter.types import Attachment, Poll, Status
from emoji import emojize
@dataclass @dataclass
@ -25,32 +25,44 @@ class TGResponse:
) )
class TelegramIntegration(BaseIntegration): API_URL: str = "https://api.telegram.org/bot{}/{}"
API_URL: str = "https://api.telegram.org/bot{}/{}" MEDIA_COMPATIBILITY: Mapping[str, set] = {
MEDIA_COMPATIBILITY: Mapping[str, set] = {
"image": {"image", "video"}, "image": {"image", "video"},
"video": {"image", "video"}, "video": {"image", "video"},
"gifv": {"gifv"}, "gifv": {"gifv"},
"audio": {"audio"}, "audio": {"audio"},
"unknown": {"unknown"}, "unknown": {"unknown"},
} }
MEDIA_MAPPING: Mapping[str, str] = { MEDIA_MAPPING: Mapping[str, str] = {
"image": "photo", "image": "photo",
"video": "video", "video": "video",
"gifv": "animation", "gifv": "animation",
"audio": "audio", "audio": "audio",
"unknown": "document", "unknown": "document",
} }
DEFAULT_TEMPLATE: str = """\
{% if status.reblog %}\
Boost from <a href="{{status.reblog.account.url}}">\
{{status.reblog.account.name}}</a>\
{% endif %}\
{% if status.spoiler_text %}{{status.spoiler_text}}
<tg-spoiler>{% endif %}{{ status.content_flathtml }}\
{% if status.spoiler_text %}</tg-spoiler>{% endif %}
<a href="{{status.link}}">Link to post</a>"""
class TelegramIntegration(BaseIntegration):
def __init__(self, sect: SectionProxy): def __init__(self, sect: SectionProxy):
self.token = sect.get("token", "") self.token = sect.get("token", "")
self.chat_id = sect.get("chat", "") self.chat_id = sect.get("chat", "")
self.show_post_link = sect.getboolean("show_post_link", True) self.show_post_link = sect.getboolean("show_post_link", True)
self.show_boost_from = sect.getboolean("show_boost_from", True) self.show_boost_from = sect.getboolean("show_boost_from", True)
self.silent = sect.getboolean("silent", True) self.silent = sect.getboolean("silent", True)
self.template = Template(sect.get("template", DEFAULT_TEMPLATE))
async def _tg_request(self, method: str, **kwargs) -> TGResponse: async def _tg_request(self, method: str, **kwargs) -> TGResponse:
url = self.API_URL.format(self.token, method) url = API_URL.format(self.token, method)
async with AsyncClient() as client: async with AsyncClient() as client:
return TGResponse.from_dict( return TGResponse.from_dict(
(await client.post(url, json=kwargs)).json(), kwargs (await client.post(url, json=kwargs)).json(), kwargs
@ -68,17 +80,17 @@ class TelegramIntegration(BaseIntegration):
async def _post_media(self, text: str, media: Attachment) -> TGResponse: async def _post_media(self, text: str, media: Attachment) -> TGResponse:
# Just to be safe # Just to be safe
if media.type not in self.MEDIA_MAPPING: if media.type not in MEDIA_MAPPING:
return await self._post_plaintext(text) return await self._post_plaintext(text)
return await self._tg_request( return await self._tg_request(
"send%s" % self.MEDIA_MAPPING[media.type].title(), "send%s" % MEDIA_MAPPING[media.type].title(),
parse_mode="HTML", parse_mode="HTML",
disable_notification=self.silent, disable_notification=self.silent,
disable_web_page_preview=True, disable_web_page_preview=True,
chat_id=self.chat_id, chat_id=self.chat_id,
caption=text, caption=text,
**{self.MEDIA_MAPPING[media.type]: media.url}, **{MEDIA_MAPPING[media.type]: media.url},
) )
async def _post_mediagroup( async def _post_mediagroup(
@ -89,12 +101,12 @@ class TelegramIntegration(BaseIntegration):
for attachment in media: for attachment in media:
if attachment.type not in allowed_medias: if attachment.type not in allowed_medias:
continue continue
if attachment.type not in self.MEDIA_COMPATIBILITY: if attachment.type not in MEDIA_COMPATIBILITY:
continue continue
allowed_medias &= self.MEDIA_COMPATIBILITY[attachment.type] allowed_medias &= MEDIA_COMPATIBILITY[attachment.type]
media_list.append( media_list.append(
{ {
"type": self.MEDIA_MAPPING[attachment.type], "type": MEDIA_MAPPING[attachment.type],
"media": attachment.url, "media": attachment.url,
} }
) )
@ -128,46 +140,10 @@ class TelegramIntegration(BaseIntegration):
options=[opt.title for opt in poll.options], options=[opt.title for opt in poll.options],
) )
@classmethod
def node_to_text(cls, el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return '<a href="{}">{}</a>'.format(
escape(el.attrs["href"]),
str.join("", map(cls.node_to_text, el.children)),
)
elif el.name == "p":
return (
str.join("", map(cls.node_to_text, el.children)) + "\n\n"
)
elif el.name == "br":
return "\n"
return str.join("", map(cls.node_to_text, el.children))
return escape(str(el))
async def __call__(self, status: Status) -> Optional[str]: async def __call__(self, status: Status) -> Optional[str]:
source = status.reblog or status source = status.reblog or status
text = self.node_to_text(
BeautifulSoup(source.content, features="lxml")
)
text = text.rstrip()
if source.spoiler_text: text = emojize(self.template.render({"status": status}))
text = "Spoiler: {cw}\n<tg-spoiler>{text}</tg-spoiler>".format(
cw=source.spoiler_text, text=text
)
if self.show_post_link:
text += '\n\n<a href="%s">Link to post</a>' % status.link
if status.reblog and self.show_boost_from:
text = (
'Boosted post from <a href="{}">{}</a>\n'.format(
source.account.url,
source.account.display_name or source.account.username,
)
+ text
)
ids = [] ids = []

View File

@ -2,6 +2,10 @@ from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import Any, Callable, Optional, List, Literal, TypeVar from typing import Any, Callable, Optional, List, Literal, TypeVar
from bs4 import BeautifulSoup
from mastoposter.utils import node_to_html, node_to_markdown, node_to_plaintext
def _date(val: str) -> datetime: def _date(val: str) -> datetime:
return datetime.fromisoformat(val.rstrip("Z")) return datetime.fromisoformat(val.rstrip("Z"))
@ -100,6 +104,10 @@ class Account:
bot=bool(data.get("bot")), bot=bool(data.get("bot")),
) )
@property
def name(self) -> str:
return self.display_name or self.username
@dataclass @dataclass
class AttachmentMetaImage: class AttachmentMetaImage:
@ -307,3 +315,21 @@ class Status:
@property @property
def link(self) -> str: def link(self) -> str:
return self.account.url + "/" + str(self.id) return self.account.url + "/" + str(self.id)
@property
def content_flathtml(self) -> str:
return node_to_html(
BeautifulSoup(self.content, features="lxml")
).rstrip()
@property
def content_markdown(self) -> str:
return node_to_markdown(
BeautifulSoup(self.content, features="lxml")
).rstrip()
@property
def content_plaintext(self) -> str:
return node_to_plaintext(
BeautifulSoup(self.content, features="lxml")
).rstrip()

60
mastoposter/utils.py Normal file
View File

@ -0,0 +1,60 @@
from html import escape
from bs4.element import Tag, PageElement
def md_escape(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("*", "\\*")
.replace("[", "\\[")
.replace("]", "\\]")
.replace("_", "\\_")
.replace("~", "\\~")
.replace("|", "\\|")
.replace("`", "\\`")
)
def node_to_html(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return '<a href="{}">{}</a>'.format(
escape(el.attrs["href"]),
str.join("", map(node_to_html, el.children)),
)
elif el.name == "p":
return str.join("", map(node_to_html, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_html, el.children))
return escape(str(el))
def node_to_markdown(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "[%s](%s)" % (
md_escape(str.join("", map(node_to_markdown, el.children))),
el.attrs["href"],
)
elif el.name == "p":
return str.join("", map(node_to_markdown, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_markdown, el.children))
return md_escape(str(el))
def node_to_plaintext(el: PageElement) -> str:
if isinstance(el, Tag):
if el.name == "a":
return "%s (%s)" % (
str.join("", map(node_to_plaintext, el.children)),
el.attrs["href"],
)
elif el.name == "p":
return str.join("", map(node_to_plaintext, el.children)) + "\n\n"
elif el.name == "br":
return "\n"
return str.join("", map(node_to_plaintext, el.children))
return str(el)

View File

@ -2,11 +2,14 @@ anyio==3.6.1
beautifulsoup4==4.11.1 beautifulsoup4==4.11.1
bs4==0.0.1 bs4==0.0.1
certifi==2022.6.15 certifi==2022.6.15
emoji==2.0.0
h11==0.12.0 h11==0.12.0
httpcore==0.15.0 httpcore==0.15.0
httpx==0.23.0 httpx==0.23.0
idna==3.3 idna==3.3
Jinja2==3.1.2
lxml==4.9.1 lxml==4.9.1
MarkupSafe==2.1.1
rfc3986==1.5.0 rfc3986==1.5.0
sniffio==1.2.0 sniffio==1.2.0
soupsieve==2.3.2.post1 soupsieve==2.3.2.post1