FILTERS!!! AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

This commit is contained in:
Casey 2022-08-29 10:28:51 +03:00
parent bba6168f2b
commit f048cf07a9
Signed by: hkc
GPG Key ID: F0F6CFE11CDB0960
16 changed files with 288 additions and 80 deletions

View File

@ -1,66 +1,134 @@
[main]
; This is a list of output modules. Each module should be defined in section,
; named "module/MODULENAME". Space-separated list of strings.
# This is a list of output modules. Each module should be defined in section,
# named "module/MODULENAME". Space-separated list of strings.
modules = telegram
; Mastodon instance to grab posts from
# Mastodon instance to grab posts from
instance = mastodon.example.org
; Mastodon user token.
; Required permissions: read:statuses read:lists
; You can get your token by creating application in
; ${instance}/settings/applications
# Mastodon user token.
# Required permissions: read:statuses read:lists
# You can get your token by creating application in
# ${instance}/settings/applications
token = blahblah
; Mastodon user ID. Used to filter out posts. Unfortunately, I can't find a way
; to get it using token itself. GARGROOOOOOON!!!!!
; Anyways, you could navigate to your profile ${instance}/@${username} and
; look for your profile picture link. For example, for me it's
; https://mastodon.astrr.ru/system/accounts/avatars/107/914/495/779/447/227/original/9651ac2f47cb2993.jpg
; that part between "avarars" and "original" is the user ID. Grab it, remove
; all of the slashes and you should be left with, for example, this:
# Mastodon user ID. Used to filter out posts. Unfortunately, I can't find a way
# to get it using token itself. GARGROOOOOOON!!!!!
# Anyways, you could navigate to your profile ${instance}/@${username} and
# look for your profile picture link. For example, for me it's
# https://mastodon.astrr.ru/system/accounts/avatars/107/914/495/779/447/227/original/9651ac2f47cb2993.jpg
# that part between "avarars" and "original" is the user ID. Grab it, remove
# all of the slashes and you should be left with, for example, this:
user = 107914495779447227
; Mastodon user list ID. AGAIN, UNFORTUNATELY, there is no way to reliably use
; streaming API to get all of your posts. Using home timeline is unreliable and
; does not always include boosts, same with public:local
; So, create a list, add yourself here, and put its ID here (it should be in
; address bar while you have that list open)
# Mastodon user list ID. AGAIN, UNFORTUNATELY, there is no way to reliably use
# streaming API to get all of your posts. Using home timeline is unreliable and
# does not always include boosts, same with public:local
# So, create a list, add yourself here, and put its ID here (it should be in
# address bar while you have that list open)
list = 1
; Should we automatically reconnect to the streaming socket?
; That option exists because it's not really a big deal when crossposter runs
; as a service and restarts automatically by the service manager.
# Should we automatically reconnect to the streaming socket?
# That option exists because it's not really a big deal when crossposter runs
# as a service and restarts automatically by the service manager.
auto-reconnect = yes
; Example Telegram integration. You can use it as a template
# Example Telegram integration. You can use it as a template
[module/telegram]
; For Telegram it should be "telegram". Obviously
type = telegram
; Telegram Bot API token. There's plenty of guides how to obtain one.
; https://core.telegram.org/bots#3-how-do-i-create-a-bot
# Telegram Bot API token. There's plenty of guides how to obtain one.
# https://core.telegram.org/bots#3-how-do-i-create-a-bot
token = 12345:blahblah
; Telegram channel/chat ID or name. Also can be just a regular user.
; You can use @showjsonbot to obtain your channel ID, or just use its
; username, if it is public
# Telegram channel/chat ID or name. Also can be just a regular user.
# You can use @showjsonbot to obtain your channel ID, or just use its
# username, if it is public
chat = @username
; Should we show link to post as a link after post content?
# Should we show link to post as a link after post content?
show-post-link = yes
; Should we show link to original author before post content?
# Should we show link to original author before post content?
show-boost-from = yes
; Should we make posts silent?
; https://core.telegram.org/bots/api#sendmessage `disable_notification`
# Should we make posts silent?
# https://core.telegram.org/bots/api#sendmessage `disable_notification`
silent = true
; Discord integration
# Discord integration
[module/discord]
type = discord
; Webhook URL with the `?wait=true`
# Webhook URL with the `?wait=true`
webhook = url
;# Boost filter. Only boosts will be matched by that one
;[filter/boost]
;type = boost
;# List of sources. If empty, boost from any account will be allowed
;list = @MaidsBot@*
;# Mention filter. If anyone from that list is mentioned in the post,
;# it will be triggered. Useful in negation mode to ignore some people
;[filter/mention]
;type = mention
;# Space-separated list of mentions.
;# @[name] means specific local user
;# @[name]@[instance] means specific remote user
;# @[name]@* means specific user on any remote instance
;# @*@[instance] means any remote user on specific instance
;# @*@* means any remote user
;# @* __should__ mean any local user, but we're using `glob` to test for it and
;# it just means "any user" for now. This will be changed to more consistent
;# behavior
;list = @name @name@instance @*@instance @name@* @*@*
;# Media filter. Only posts with some specific media content are triggered
;[filter/media]
;type = media
;# space-separated list of media types to be checked
;valid-media = image video gifv audio unknown
;# mode of the filter itself
;# "include" means "there should be at least one media of any type listed"
;# "exclude" means "there shouldn't be anything from that list"
;# "only" allows only media from the list to be sent
;mode = include
;# Text content filter
;[filter/content]
;type = content
;# Mode of the filter.
;# "regexp" requires "regexp" property and should contain... A RegExp
;# "hashtag" should contain space-separated list of tags
;mode = regexp
;# Regular expression pattern to be matched
;regexp = ^x-no-repost
;# List of tags
; tags = maids artspam
;# Spoiler text filter
;# Will be matched if spoiler matches some regexp
;# (use ^.+$ to check for any spoiler)
;[filter/spoiler]
;type = spoiler
;regexp = ^CW:
;# Visibility filter.
;# Only posts with specific visibility will be matched
;[filter/visibility]
;type = visibility
;# Space-separated list of visibilities
;# NOTE: `direct` visibility is always ignored even before filters are ran
;options = public
;# Combined filter
;# Basically a way to combine multiple filters using some operation
;[filter/combined]
;type = combined
;# List of filters inside of itself
;filters = spoiler boost
;# Operator to be used here
;# Options: "and", "or", "xor"
;operator = or

View File

@ -1,27 +1,58 @@
from asyncio import gather
from configparser import ConfigParser
from typing import List, Optional
from typing import Dict, List, Optional
from mastoposter.filters import run_filters
from mastoposter.filters.base import BaseFilter, FilterInstance
from mastoposter.integrations.base import BaseIntegration
from mastoposter.integrations import DiscordIntegration, TelegramIntegration
from mastoposter.integrations import (
DiscordIntegration,
FilteredIntegration,
TelegramIntegration,
)
from mastoposter.types import Status
def load_integrations_from(config: ConfigParser) -> List[BaseIntegration]:
modules: List[BaseIntegration] = []
def load_integrations_from(config: ConfigParser) -> List[FilteredIntegration]:
modules: List[FilteredIntegration] = []
for module_name in config.get("main", "modules").split():
mod = config[f"module/{module_name}"]
filters: Dict[str, FilterInstance] = {}
for filter_name in mod.get("filters", "").split():
filter_basename = filter_name.lstrip("~!")
filters[filter_basename] = BaseFilter.new_instance(
filter_name, config[f"filter/{filter_basename}"]
)
for finst in list(filters.values()):
finst.filter.post_init(filters, config)
if mod["type"] == "telegram":
modules.append(TelegramIntegration(mod))
modules.append(
FilteredIntegration(
TelegramIntegration(mod), list(filters.values())
)
)
elif mod["type"] == "discord":
modules.append(DiscordIntegration(mod))
modules.append(
FilteredIntegration(
DiscordIntegration(mod), list(filters.values())
)
)
else:
raise ValueError("Invalid module type %r" % mod["type"])
return modules
async def execute_integrations(
status: Status, sinks: List[BaseIntegration]
status: Status, sinks: List[FilteredIntegration]
) -> List[Optional[str]]:
coros = [sink.post(status) for sink in sinks]
return await gather(*coros, return_exceptions=True)
return await gather(
*[
sink[0].__call__(status)
for sink in sinks
if run_filters(sink[1], status)
],
return_exceptions=True,
)

View File

@ -2,15 +2,15 @@
from asyncio import run
from configparser import ConfigParser
from mastoposter import execute_integrations, load_integrations_from
from mastoposter.integrations import FilteredIntegration
from mastoposter.sources import websocket_source
from typing import AsyncGenerator, Callable, List
from mastoposter.integrations.base import BaseIntegration
from mastoposter.types import Status
async def listen(
source: Callable[..., AsyncGenerator[Status, None]],
drains: List[BaseIntegration],
drains: List[FilteredIntegration],
user: str,
/,
**kwargs,
@ -48,7 +48,7 @@ def main(config_path: str):
for k in _remove:
del conf[section][k]
modules = load_integrations_from(conf)
modules: List[FilteredIntegration] = load_integrations_from(conf)
url = "wss://{}/api/v1/streaming".format(conf["main"]["instance"])
run(

View File

@ -1,14 +1,17 @@
from typing import List
from mastoposter.types import Status
from .base import BaseFilter # NOQA
from .base import FilterInstance # NOQA
from mastoposter.filters.boost import BoostFilter # NOQA
from mastoposter.filters.combined import CombinedFilter # NOQA
from mastoposter.filters.mention import MentionFilter # NOQA
from mastoposter.filters.media import MediaFilter # NOQA
from mastoposter.filters.text import TextFilter # NOQA
from mastoposter.filters.spoiler import SpoilerFilter # NOQA
from mastoposter.filters.visibility import VisibilityFilter # NOQA
def run_filters(filters: List[BaseFilter], status: Status) -> bool:
return all((fil(status) for fil in filters))
def run_filters(filters: List[FilterInstance], status: Status) -> bool:
if not filters:
return True
return all((fil.filter(status) ^ fil.inverse for fil in filters))

View File

@ -1,15 +1,27 @@
from abc import ABC, abstractmethod
from configparser import SectionProxy
from typing import ClassVar, Dict, Type
from configparser import ConfigParser, SectionProxy
from typing import ClassVar, Dict, NamedTuple, Type
from mastoposter.types import Status
from re import Pattern, compile as regexp
UNUSED = lambda *_: None # NOQA
class FilterInstance(NamedTuple):
inverse: bool
filter: "BaseFilter"
def __repr__(self):
if self.inverse:
return f"~{self.filter!r}"
return repr(self.filter)
class BaseFilter(ABC):
FILTER_REGISTRY: ClassVar[Dict[str, Type["BaseFilter"]]] = {}
FILTER_NAME_REGEX: Pattern = regexp(r"^([a-z_]+)$")
FILTER_NAME_REGEX: ClassVar[Pattern] = regexp(r"^([a-z_]+)$")
filter_name: ClassVar[str] = "_base"
def __init__(self, section: SectionProxy):
UNUSED(section)
@ -21,10 +33,29 @@ class BaseFilter(ABC):
if filter_name in cls.FILTER_REGISTRY:
raise KeyError(f"{filter_name=!r} is already registered")
cls.FILTER_REGISTRY[filter_name] = cls
setattr(cls, "filter_name", filter_name)
@abstractmethod
def __call__(self, status: Status) -> bool:
raise NotImplementedError
def post_init(self, filters: Dict[str, "BaseFilter"]):
UNUSED(filters)
def post_init(
self, filters: Dict[str, FilterInstance], config: ConfigParser
):
UNUSED(filters, config)
def __repr__(self):
return f"Filter:{self.filter_name}()"
@classmethod
def load_filter(cls, name: str, section: SectionProxy) -> "BaseFilter":
if name not in cls.FILTER_REGISTRY:
raise KeyError(f"no filter with name {name!r} was found")
return cls.FILTER_REGISTRY[name](section)
@classmethod
def new_instance(cls, name: str, section: SectionProxy) -> FilterInstance:
return FilterInstance(
inverse=name[:1] in "~!",
filter=cls.load_filter(name.lstrip("~!"), section),
)

View File

@ -1,7 +1,31 @@
from configparser import SectionProxy
from fnmatch import fnmatch
from mastoposter.filters.base import BaseFilter
from mastoposter.types import Status
class BoostFilter(BaseFilter, filter_name="boost"):
def __init__(self, section: SectionProxy):
super().__init__(section)
self.list = section.get("list", "").split()
@classmethod
def check_account(cls, acct: str, mask: str):
return fnmatch(acct, mask)
def __call__(self, status: Status) -> bool:
return status.reblog is not None
if status.reblog is None:
return False
if not self.list:
return True
return any(
[
self.check_account(status.reblog.account.acct, mask)
for mask in self.list
]
)
def __repr__(self):
if not self.list:
return "Filter:boost(any)"
return f"Filter:boost(from={self.list!r})"

View File

@ -1,15 +1,10 @@
from configparser import SectionProxy
from typing import Callable, ClassVar, Dict, List, NamedTuple
from configparser import ConfigParser, SectionProxy
from typing import Callable, ClassVar, Dict, List
from functools import reduce
from mastoposter.filters.base import BaseFilter
from mastoposter.filters.base import BaseFilter, FilterInstance
from mastoposter.types import Status
class FilterType(NamedTuple):
inverse: bool
filter: BaseFilter
class CombinedFilter(BaseFilter, filter_name="combined"):
OPERATORS: ClassVar[Dict[str, Callable]] = {
"and": lambda a, b: a and b,
@ -20,18 +15,26 @@ class CombinedFilter(BaseFilter, filter_name="combined"):
def __init__(self, section: SectionProxy):
self.filter_names = section.get("filters", "").split()
self.operator = self.OPERATORS[section.get("operator", "and")]
self.filters: List[FilterType] = []
self._operator_name = section.get("operator", "and")
self.filters: List[FilterInstance] = []
def post_init(self, filters: Dict[str, "BaseFilter"]):
super().post_init(filters)
for filter_name in self.filter_names:
self.filters.append(
FilterType(
filter_name[:1] in "~!", # inverse
filters[filter_name.rstrip("!~")],
)
)
def post_init(
self, filters: Dict[str, FilterInstance], config: ConfigParser
):
super().post_init(filters, config)
self.filters = [
self.new_instance(name, config["filter/" + name.lstrip("~!")])
for name in self.filter_names
]
def __call__(self, status: Status) -> bool:
results = [fil.filter(status) ^ fil.inverse for fil in self.filters]
if self.OPERATORS[self._operator_name] is not self.operator:
self._operator_name = "N/A"
return reduce(self.operator, results)
def __repr__(self):
return (
f"Filter:combined(op={self._operator_name}, "
f"filters={self.filters!r})"
)

View File

@ -25,3 +25,11 @@ class MediaFilter(BaseFilter, filter_name="media"):
elif self.mode == "only":
return len((types ^ self.valid_media) & types) == 0
raise ValueError(f"{self.mode=} is not valid")
def __repr__(self):
return str.format(
"Filter:{name}(mode={mode}, media={media})",
name=self.filter_name,
mode=self.mode,
media=self.valid_media,
)

View File

@ -18,6 +18,8 @@ class MentionFilter(BaseFilter, filter_name="mention"):
return fnmatch(acct, mask)
def __call__(self, status: Status) -> bool:
if not self.list and status.mentions:
return True
return any(
(
any(
@ -27,3 +29,8 @@ class MentionFilter(BaseFilter, filter_name="mention"):
for mention in status.mentions
)
)
def __repr__(self):
return str.format(
"Filter:{name}({list!r})", name=self.filter_name, list=self.list
)

View File

@ -11,3 +11,10 @@ class SpoilerFilter(BaseFilter, filter_name="spoiler"):
def __call__(self, status: Status) -> bool:
return self.regexp.match(status.spoiler_text) is not None
def __repr__(self):
return str.format(
"Filter:{name}({regex!r})",
name=self.filter_name,
regex=self.regexp.pattern,
)

View File

@ -49,3 +49,17 @@ class TextFilter(BaseFilter, filter_name="content"):
return len(self.tags & {t.name for t in source.tags}) > 0
else:
raise ValueError("Neither regexp or tags were set. Why?")
def __repr__(self):
if self.regexp is not None:
return str.format(
"Filter:{name}(regexp={regex!r})",
name=self.filter_name,
regex=self.regexp.pattern,
)
elif self.tags:
return str.format(
"Filter:{name}(tags={tags!r})",
name=self.filter_name,
tags=self.tags,
)

View File

@ -6,7 +6,10 @@ from mastoposter.types import Status
class VisibilityFilter(BaseFilter, filter_name="visibility"):
def __init__(self, section: SectionProxy):
super().__init__(section)
self.options = tuple(section["options"].split())
self.options = set(section["options"].split())
def __call__(self, status: Status) -> bool:
return status.visibility in self.options
def __repr__(self):
return str.format("Filter:{}({})", self.filter_name, self.options)

View File

@ -1,2 +1,11 @@
from typing import List, NamedTuple
from mastoposter.filters.base import FilterInstance
from mastoposter.integrations.base import BaseIntegration
from .telegram import TelegramIntegration # NOQA
from .discord import DiscordIntegration # NOQA
class FilteredIntegration(NamedTuple):
sink: BaseIntegration
filters: List[FilterInstance]

View File

@ -10,5 +10,5 @@ class BaseIntegration(ABC):
pass
@abstractmethod
async def post(self, status: Status) -> Optional[str]:
async def __call__(self, status: Status) -> Optional[str]:
raise NotImplementedError

View File

@ -71,7 +71,7 @@ class DiscordIntegration(BaseIntegration):
)
).json()
async def post(self, status: Status) -> Optional[str]:
async def __call__(self, status: Status) -> Optional[str]:
source = status.reblog or status
embeds: List[DiscordEmbed] = []

View File

@ -145,7 +145,7 @@ class TelegramIntegration(BaseIntegration):
return str.join("", map(cls.node_to_text, el.children))
return escape(str(el))
async def post(self, status: Status) -> Optional[str]:
async def __call__(self, status: Status) -> Optional[str]:
source = status.reblog or status
text = self.node_to_text(
BeautifulSoup(source.content, features="lxml")