interactive_bot/listen.py

136 lines
5.3 KiB
Python

import websockets
import asyncio
from json import loads, dumps
from requests import get, post
from random import randint
from os import getenv
from munch import munchify
from markdownify import markdownify as md
from pprint import pp
import csv, io
import sys
import apprise
apprise_object = apprise.Apprise()
apprise_object.add('pover://dP9LCGHZRMozXRn6K5PctGg6uZhaYc@af7xxr6ho94isdzc3uj92m3f82hdkb', tag='pover')
apprise_object.add('zulip://sandbox@chat.fyrfli.org/KiXbeBR7poxmUuuW3S6b9igX2ibpyfNC/sandbox', tag='zulip')
# apprise_object.notify(body='is this even working from inside this program?', title='fyrfli at mastodon.social timeline', tag='zulip')
api_token = getenv('MASTODON_SOCIAL_FYRFLI_TOKEN')
endpoint = getenv('MASTODON_SOCIAL_ENDPOINT')
quotes_src = getenv('QUOTES_SOURCE')
quotes_list = loads(dumps(list(csv.DictReader(io.StringIO(get(quotes_src).text)))))
# get streaming endpoint from instance info
instance_info = munchify(loads(get(f"https://{endpoint}/api/v1/instance").text))
streaming_api = f"{instance_info.urls.streaming_api}/api/v1/streaming?access_token={api_token}&stream=user"
post_uri = f"https://{endpoint}/api/v1/statuses"
headers = { "Content-Type" : "application/json",
"Authorization" : f"Bearer {api_token}",
"User-Agent" : "fyrfliBOT" }
status_dict = munchify({"status": "",
"in_reply_to_id": "",
"local_only": False,
"visibility": "mutuals_only",
"sensitive": False,
"content_type": "text/markdown"})
randomised_hellos = [
'hi back',
'hello to you too'
'aw hi hi',
'well hello there',
'mornin\' mornin\'',
'g\'day mate',
'allo',
'\'sup'
]
def get_random_quote():
choise = quotes_list[randint(0, len(quotes_list) - 1)]
return choise['quote'] + '\n ~ ' + choise['author'] + ' ~'
def process_request(the_post):
print('processing ', the_post.url)
hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()]
request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()]
if any(hellos):
respond_with = randomised_hellos[randint(0, len(randomised_hellos) - 1)]
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!"
if any(request_quote):
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote()
if status_dict.status:
status_dict.in_reply_to_id = the_post.id
pp(status_dict, compact=False, indent=2)
r = post(
post_uri,
headers=headers,
data=dumps(status_dict)
)
print(r.reason, r.status_code, loads(r.text))
async def hello(uri):
async for ws in websockets.connect(streaming_api):
try:
message = await ws.recv()
decoded = munchify(loads(message))
if decoded.event != "delete":
the_post = munchify(loads(decoded.payload))
match decoded.event:
case "delete":
message = f"a delete event for id: {decoded.payload}\n===>\n"
case "status.update":
message = f"an update to an existing post: {the_post.url}\n===>\n"
case "update":
if the_post.reblog:
reblog_post_id = the_post.reblog.url.split('/')[4]
reblogged_post = munchify(loads(get(f'{post_uri}/{reblog_post_id}').text))
message = f"a reblog of {the_post.reblog.url}"
if reblogged_post.content:
message += "\n{(pp(md(reblogged_post.content)) or \"unknown content\")}\n===>\n"
else:
post_content = md(the_post.content)
message = f"new status: {the_post.url}\n{post_content}\n===>\n"
if the_post.mentions:
for mention in the_post.mentions:
if mention.username == "botsy":
process_request(the_post)
case "notification":
process_request(the_post)
case _:
message = f"unknown event: {decoded.event}\n{decoded.payload[1:200]}"
print(message)
apprise_object.notify(body=message, title='fyrfli at mastodon.social timeline', tag='zulip')
except asyncio.exceptions.CancelledError:
print('keyboard interrupt received ... exiting')
break
except KeyboardInterrupt:
print('keyboard interrupt received ... exiting')
break
except websockets.ConnectionClosed:
print("connection closed ... reconnecting ...")
continue
except websockets.exceptions.ConnectionClosed:
print("connection closed ... reconnecting ...")
continue
async def main():
async with websockets.connect(hello, streaming_api):
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(hello(streaming_api))