import websockets import asyncio from json import loads, dumps from requests import get, post from random import randint from os import getenv from munch import munchify from markdownify import markdownify as md from pprint import pp import csv, io import sys import apprise apprise_object = apprise.Apprise() apprise_object.add('pover://dP9LCGHZRMozXRn6K5PctGg6uZhaYc@af7xxr6ho94isdzc3uj92m3f82hdkb', tag='pover') apprise_object.add('zulip://sandbox@chat.fyrfli.org/KiXbeBR7poxmUuuW3S6b9igX2ibpyfNC/sandbox', tag='zulip') # apprise_object.notify(body='is this even working from inside this program?', title='fyrfli at mastodon.social timeline', tag='zulip') api_token = getenv('MASTODON_SOCIAL_FYRFLI_TOKEN') endpoint = getenv('MASTODON_SOCIAL_ENDPOINT') quotes_src = getenv('QUOTES_SOURCE') quotes_list = loads(dumps(list(csv.DictReader(io.StringIO(get(quotes_src).text))))) # get streaming endpoint from instance info instance_info = munchify(loads(get(f"https://{endpoint}/api/v1/instance").text)) streaming_api = f"{instance_info.urls.streaming_api}/api/v1/streaming?access_token={api_token}&stream=user" post_uri = f"https://{endpoint}/api/v1/statuses" headers = { "Content-Type" : "application/json", "Authorization" : f"Bearer {api_token}", "User-Agent" : "fyrfliBOT" } status_dict = munchify({"status": "", "in_reply_to_id": "", "local_only": False, "visibility": "mutuals_only", "sensitive": False, "content_type": "text/markdown"}) randomised_hellos = [ 'hi back', 'hello to you too' 'aw hi hi', 'well hello there', 'mornin\' mornin\'', 'g\'day mate', 'allo', '\'sup' ] def get_random_quote(): choise = quotes_list[randint(0, len(quotes_list) - 1)] return choise['quote'] + '\n ~ ' + choise['author'] + ' ~' def process_request(the_post): print('processing ', the_post.url) hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()] request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()] if any(hellos): respond_with = randomised_hellos[randint(0, len(randomised_hellos) - 1)] status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!" if any(request_quote): status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote() if status_dict.status: status_dict.in_reply_to_id = the_post.id pp(status_dict, compact=False, indent=2) r = post( post_uri, headers=headers, data=dumps(status_dict) ) print(r.reason, r.status_code, loads(r.text)) async def hello(uri): async for ws in websockets.connect(streaming_api): try: message = await ws.recv() decoded = munchify(loads(message)) if decoded.event != "delete": the_post = munchify(loads(decoded.payload)) match decoded.event: case "delete": message = f"a delete event for id: {decoded.payload}\n===>\n" case "status.update": message = f"an update to an existing post: {the_post.url}\n===>\n" case "update": if the_post.reblog: reblog_post_id = the_post.reblog.url.split('/')[4] reblogged_post = munchify(loads(get(f'{post_uri}/{reblog_post_id}').text)) message = f"a reblog of {the_post.reblog.url}" if reblogged_post.content: message += "\n{(pp(md(reblogged_post.content)) or \"unknown content\")}\n===>\n" else: post_content = md(the_post.content) message = f"new status: {the_post.url}\n{post_content}\n===>\n" if the_post.mentions: for mention in the_post.mentions: if mention.username == "botsy": process_request(the_post) case "notification": process_request(the_post) case _: message = f"unknown event: {decoded.event}\n{decoded.payload[1:200]}" print(message) apprise_object.notify(body=message, title='fyrfli at mastodon.social timeline', tag='zulip') except asyncio.exceptions.CancelledError: print('keyboard interrupt received ... exiting') break except KeyboardInterrupt: print('keyboard interrupt received ... exiting') break except websockets.ConnectionClosed: print("connection closed ... reconnecting ...") continue except websockets.exceptions.ConnectionClosed: print("connection closed ... reconnecting ...") continue async def main(): async with websockets.connect(hello, streaming_api): await asyncio.Future() if __name__ == "__main__": asyncio.run(hello(streaming_api))