interactive_bot/listen.py

161 lines
6.5 KiB
Python

import websockets
import asyncio
from json import loads, dumps
from requests import get, post
from random import randint
from os import getenv
from munch import munchify
from lxml import html
from markdownify import markdownify as md
from pprint import pp
import csv, io
import sys
import apprise
def strip_html(s):
return str(html.fromstring(s).text_content())
apprise_object = apprise.Apprise()
apprise_object.add('pover://dP9LCGHZRMozXRn6K5PctGg6uZhaYc@af7xxr6ho94isdzc3uj92m3f82hdkb', tag='pover')
apprise_object.add('zulip://sandbox@chat.fyrfli.org/KiXbeBR7poxmUuuW3S6b9igX2ibpyfNC/sandbox', tag='zulip')
api_token = getenv('MASTODON_SOCIAL_FYRFLI_TOKEN')
endpoint = getenv('MASTODON_SOCIAL_ENDPOINT')
quotes_src = getenv('QUOTES_SOURCE')
quotes_list = loads(dumps(list(csv.DictReader(io.StringIO(get(quotes_src).text)))))
# get streaming endpoint from instance info
instance_info = munchify(loads(get(f"https://{endpoint}/api/v1/instance").text))
streaming_api = f"{instance_info.urls.streaming_api}/api/v1/streaming?access_token={api_token}&stream=user"
# print(streaming_api)
# sys.exit(0)
# streaming_api=f"wss://{endpoint}/api/v1/streaming?access_token={api_token}&stream=user"
post_uri = f"https://{endpoint}/api/v1/statuses"
headers = { "Content-Type" : "application/json",
"Authorization" : f"Bearer {api_token}",
"User-Agent" : "fyrfliBOT" }
status_dict = munchify({"status": "",
"in_reply_to_id": "",
"local_only": False,
"visibility": "mutuals_only",
"sensitive": False,
"content_type": "text/markdown"})
randomised_hellos = [
'hi back',
'hello to you too'
'aw hi hi',
'well hello there',
'mornin\' mornin\'',
'g\'day mate',
'allo',
'\'sup'
]
def get_random_quote():
choise = quotes_list[randint(0, len(quotes_list) - 1)]
return choise['quote'] + '\n ~ ' + choise['author'] + ' ~'
def process_request(the_post):
print('processing ', the_post.url)
# pp(dumps(the_post), compact=True, indent=2)
hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()]
request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()]
if any(hellos):
respond_with = randomised_hellos[randint(0, len(randomised_hellos) - 1)]
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!"
if any(request_quote):
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote()
if status_dict.status:
status_dict.in_reply_to_id = the_post.id
pp(status_dict, compact=False, indent=2)
r = post(
post_uri,
headers=headers,
data=dumps(status_dict)
)
print(r.reason, r.status_code, loads(r.text))
async def hello(uri):
# async with websockets.connect(uri) as ws:
async for ws in websockets.connect(uri):
try:
message = await ws.recv()
decoded = munchify(loads(message))
# print(decoded.event, decoded.payload[1:150])
# print(decoded.event, decoded.payload)
if decoded.event != "delete":
the_post = munchify(loads(decoded.payload))
# print(decoded.payload[1:250])
match decoded.event:
case "delete":
# message = f"a delete event for id: {decoded.payload}\\n===>\\n"
# print("a delete event for id ", decoded.payload, "\n===>\n")
message = 'a delete event for id: ', decoded.payload, '===>'
case "status.update":
# print("an update to an existing post: ", the_post.url, "\n===>\n")
# message = f"an update to an existing post: {the_post.url}\\n===>\\n"
message = 'an update to an existing post: ', the_post.url, '===>'
case "update":
if the_post.reblog:
# print( "a reblog of ", the_post.reblog.url)
reblog_post_id = the_post.reblog.url.split('/')[4]
reblogged_post = munchify(loads(get(f'{post_uri}/{reblog_post_id}').text))
# pp(md(reblogged_post.content))
# print("\n===>\n")
# message = f"a reblog of {the_post.reblog.url}\\n{pp(md(reblogged_post.content))}\\n===>\\n"
message = 'a reblog of ', the_post.reblog.url, pp(md(reblogged_post.content)),'===>'
else:
# plain_text_content = strip_html(the_post.content)
post_content = md(the_post.content)
# print( "new status: ", the_post.url, "\n", post_content, "\n===>\n" )
# message = f"new status: {the_post.url}\\n{post_content}\\n===>\\n"
message = 'new status: ', the_post.url, post_content, '===>'
if the_post.mentions:
for mention in the_post.mentions:
if mention.username == "botsy":
process_request(the_post)
# break
case "notification":
process_request(the_post)
case _:
# print('unknown event: ', decoded.event, "\n",
# " with payload beginning ",
# decoded.payload[1:150])
# message = f"unknown event: {decoded.event}\\n{decoded.payload[1:200]}"
message = 'unknown event: ', decoded.event, decoded.payload[1:120]
pp(message)
apprise_object.notify(body=message, title='@fyrfli.mastodon.social timeline')
except asyncio.exceptions.CancelledError:
print('keyboard interrupt received ... exiting')
break
except KeyboardInterrupt:
print('keyboard interrupt received ... exiting')
break
except websockets.ConnectionClosed:
break
async def main():
async with websockets.serve(hello, streaming_api):
await asyncio.Future()
# print(__name__)
# sys.exit(0)
if __name__ == "__main__":
asyncio.run(hello(streaming_api))