From 881b1e15323cbce0835fdcb84992d5ff77bb9a3c Mon Sep 17 00:00:00 2001 From: hkc Date: Wed, 31 Aug 2022 16:19:39 +0300 Subject: [PATCH] Custom formatting? --- mastoposter/integrations/discord/__init__.py | 37 +------ mastoposter/integrations/telegram.py | 100 +++++++------------ mastoposter/types.py | 26 +++++ mastoposter/utils.py | 60 +++++++++++ requirements.txt | 3 + 5 files changed, 128 insertions(+), 98 deletions(-) create mode 100644 mastoposter/utils.py diff --git a/mastoposter/integrations/discord/__init__.py b/mastoposter/integrations/discord/__init__.py index 03b990a..38b96bb 100644 --- a/mastoposter/integrations/discord/__init__.py +++ b/mastoposter/integrations/discord/__init__.py @@ -1,6 +1,5 @@ from configparser import SectionProxy from typing import List, Optional -from bs4 import BeautifulSoup, PageElement, Tag from httpx import AsyncClient from zlib import crc32 from mastoposter.integrations.base import BaseIntegration @@ -16,38 +15,6 @@ class DiscordIntegration(BaseIntegration): def __init__(self, section: SectionProxy): self.webhook = section.get("webhook", "") - @staticmethod - def md_escape(text: str) -> str: - return ( - text.replace("\\", "\\\\") - .replace("*", "\\*") - .replace("[", "\\[") - .replace("]", "\\]") - .replace("_", "\\_") - .replace("~", "\\~") - .replace("|", "\\|") - .replace("`", "\\`") - ) - - @classmethod - def node_to_text(cls, el: PageElement) -> str: - if isinstance(el, Tag): - if el.name == "a": - return "[%s](%s)" % ( - cls.md_escape( - str.join("", map(cls.node_to_text, el.children)) - ), - el.attrs["href"], - ) - elif el.name == "p": - return ( - str.join("", map(cls.node_to_text, el.children)) + "\n\n" - ) - elif el.name == "br": - return "\n" - return str.join("", map(cls.node_to_text, el.children)) - return cls.md_escape(str(el)) - async def execute_webhook( self, content: Optional[str] = None, @@ -75,9 +42,7 @@ class DiscordIntegration(BaseIntegration): source = status.reblog or status embeds: List[DiscordEmbed] = [] - text = self.node_to_text( - BeautifulSoup(source.content, features="lxml") - ) + text = source.content_markdown if source.spoiler_text: text = f"{source.spoiler_text}\n||{text}||" diff --git a/mastoposter/integrations/telegram.py b/mastoposter/integrations/telegram.py index cd952d4..7e7b879 100644 --- a/mastoposter/integrations/telegram.py +++ b/mastoposter/integrations/telegram.py @@ -1,11 +1,11 @@ from configparser import SectionProxy from dataclasses import dataclass -from html import escape from typing import Any, List, Mapping, Optional -from bs4 import BeautifulSoup, Tag, PageElement from httpx import AsyncClient +from jinja2 import Template from mastoposter.integrations.base import BaseIntegration from mastoposter.types import Attachment, Poll, Status +from emoji import emojize @dataclass @@ -25,32 +25,44 @@ class TGResponse: ) -class TelegramIntegration(BaseIntegration): - API_URL: str = "https://api.telegram.org/bot{}/{}" - MEDIA_COMPATIBILITY: Mapping[str, set] = { - "image": {"image", "video"}, - "video": {"image", "video"}, - "gifv": {"gifv"}, - "audio": {"audio"}, - "unknown": {"unknown"}, - } - MEDIA_MAPPING: Mapping[str, str] = { - "image": "photo", - "video": "video", - "gifv": "animation", - "audio": "audio", - "unknown": "document", - } +API_URL: str = "https://api.telegram.org/bot{}/{}" +MEDIA_COMPATIBILITY: Mapping[str, set] = { + "image": {"image", "video"}, + "video": {"image", "video"}, + "gifv": {"gifv"}, + "audio": {"audio"}, + "unknown": {"unknown"}, +} +MEDIA_MAPPING: Mapping[str, str] = { + "image": "photo", + "video": "video", + "gifv": "animation", + "audio": "audio", + "unknown": "document", +} +DEFAULT_TEMPLATE: str = """\ +{% if status.reblog %}\ +Boost from \ +{{status.reblog.account.name}}\ +{% endif %}\ +{% if status.spoiler_text %}{{status.spoiler_text}} +{% endif %}{{ status.content_flathtml }}\ +{% if status.spoiler_text %}{% endif %} +Link to post""" + + +class TelegramIntegration(BaseIntegration): def __init__(self, sect: SectionProxy): self.token = sect.get("token", "") self.chat_id = sect.get("chat", "") self.show_post_link = sect.getboolean("show_post_link", True) self.show_boost_from = sect.getboolean("show_boost_from", True) self.silent = sect.getboolean("silent", True) + self.template = Template(sect.get("template", DEFAULT_TEMPLATE)) async def _tg_request(self, method: str, **kwargs) -> TGResponse: - url = self.API_URL.format(self.token, method) + url = API_URL.format(self.token, method) async with AsyncClient() as client: return TGResponse.from_dict( (await client.post(url, json=kwargs)).json(), kwargs @@ -68,17 +80,17 @@ class TelegramIntegration(BaseIntegration): async def _post_media(self, text: str, media: Attachment) -> TGResponse: # Just to be safe - if media.type not in self.MEDIA_MAPPING: + if media.type not in MEDIA_MAPPING: return await self._post_plaintext(text) return await self._tg_request( - "send%s" % self.MEDIA_MAPPING[media.type].title(), + "send%s" % MEDIA_MAPPING[media.type].title(), parse_mode="HTML", disable_notification=self.silent, disable_web_page_preview=True, chat_id=self.chat_id, caption=text, - **{self.MEDIA_MAPPING[media.type]: media.url}, + **{MEDIA_MAPPING[media.type]: media.url}, ) async def _post_mediagroup( @@ -89,12 +101,12 @@ class TelegramIntegration(BaseIntegration): for attachment in media: if attachment.type not in allowed_medias: continue - if attachment.type not in self.MEDIA_COMPATIBILITY: + if attachment.type not in MEDIA_COMPATIBILITY: continue - allowed_medias &= self.MEDIA_COMPATIBILITY[attachment.type] + allowed_medias &= MEDIA_COMPATIBILITY[attachment.type] media_list.append( { - "type": self.MEDIA_MAPPING[attachment.type], + "type": MEDIA_MAPPING[attachment.type], "media": attachment.url, } ) @@ -128,46 +140,10 @@ class TelegramIntegration(BaseIntegration): options=[opt.title for opt in poll.options], ) - @classmethod - def node_to_text(cls, el: PageElement) -> str: - if isinstance(el, Tag): - if el.name == "a": - return '{}'.format( - escape(el.attrs["href"]), - str.join("", map(cls.node_to_text, el.children)), - ) - elif el.name == "p": - return ( - str.join("", map(cls.node_to_text, el.children)) + "\n\n" - ) - elif el.name == "br": - return "\n" - return str.join("", map(cls.node_to_text, el.children)) - return escape(str(el)) - async def __call__(self, status: Status) -> Optional[str]: source = status.reblog or status - text = self.node_to_text( - BeautifulSoup(source.content, features="lxml") - ) - text = text.rstrip() - if source.spoiler_text: - text = "Spoiler: {cw}\n{text}".format( - cw=source.spoiler_text, text=text - ) - - if self.show_post_link: - text += '\n\nLink to post' % status.link - - if status.reblog and self.show_boost_from: - text = ( - 'Boosted post from {}\n'.format( - source.account.url, - source.account.display_name or source.account.username, - ) - + text - ) + text = emojize(self.template.render({"status": status})) ids = [] diff --git a/mastoposter/types.py b/mastoposter/types.py index ecffd2f..8010285 100644 --- a/mastoposter/types.py +++ b/mastoposter/types.py @@ -2,6 +2,10 @@ from dataclasses import dataclass, field from datetime import datetime from typing import Any, Callable, Optional, List, Literal, TypeVar +from bs4 import BeautifulSoup + +from mastoposter.utils import node_to_html, node_to_markdown, node_to_plaintext + def _date(val: str) -> datetime: return datetime.fromisoformat(val.rstrip("Z")) @@ -100,6 +104,10 @@ class Account: bot=bool(data.get("bot")), ) + @property + def name(self) -> str: + return self.display_name or self.username + @dataclass class AttachmentMetaImage: @@ -307,3 +315,21 @@ class Status: @property def link(self) -> str: return self.account.url + "/" + str(self.id) + + @property + def content_flathtml(self) -> str: + return node_to_html( + BeautifulSoup(self.content, features="lxml") + ).rstrip() + + @property + def content_markdown(self) -> str: + return node_to_markdown( + BeautifulSoup(self.content, features="lxml") + ).rstrip() + + @property + def content_plaintext(self) -> str: + return node_to_plaintext( + BeautifulSoup(self.content, features="lxml") + ).rstrip() diff --git a/mastoposter/utils.py b/mastoposter/utils.py new file mode 100644 index 0000000..7e234a2 --- /dev/null +++ b/mastoposter/utils.py @@ -0,0 +1,60 @@ +from html import escape +from bs4.element import Tag, PageElement + + +def md_escape(text: str) -> str: + return ( + text.replace("\\", "\\\\") + .replace("*", "\\*") + .replace("[", "\\[") + .replace("]", "\\]") + .replace("_", "\\_") + .replace("~", "\\~") + .replace("|", "\\|") + .replace("`", "\\`") + ) + + +def node_to_html(el: PageElement) -> str: + if isinstance(el, Tag): + if el.name == "a": + return '{}'.format( + escape(el.attrs["href"]), + str.join("", map(node_to_html, el.children)), + ) + elif el.name == "p": + return str.join("", map(node_to_html, el.children)) + "\n\n" + elif el.name == "br": + return "\n" + return str.join("", map(node_to_html, el.children)) + return escape(str(el)) + + +def node_to_markdown(el: PageElement) -> str: + if isinstance(el, Tag): + if el.name == "a": + return "[%s](%s)" % ( + md_escape(str.join("", map(node_to_markdown, el.children))), + el.attrs["href"], + ) + elif el.name == "p": + return str.join("", map(node_to_markdown, el.children)) + "\n\n" + elif el.name == "br": + return "\n" + return str.join("", map(node_to_markdown, el.children)) + return md_escape(str(el)) + + +def node_to_plaintext(el: PageElement) -> str: + if isinstance(el, Tag): + if el.name == "a": + return "%s (%s)" % ( + str.join("", map(node_to_plaintext, el.children)), + el.attrs["href"], + ) + elif el.name == "p": + return str.join("", map(node_to_plaintext, el.children)) + "\n\n" + elif el.name == "br": + return "\n" + return str.join("", map(node_to_plaintext, el.children)) + return str(el) diff --git a/requirements.txt b/requirements.txt index d9d2f61..985a2f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,11 +2,14 @@ anyio==3.6.1 beautifulsoup4==4.11.1 bs4==0.0.1 certifi==2022.6.15 +emoji==2.0.0 h11==0.12.0 httpcore==0.15.0 httpx==0.23.0 idna==3.3 +Jinja2==3.1.2 lxml==4.9.1 +MarkupSafe==2.1.1 rfc3986==1.5.0 sniffio==1.2.0 soupsieve==2.3.2.post1