From a009830d4953e7b0775117e95d4c7f8676390902 Mon Sep 17 00:00:00 2001 From: Camille Frantz Date: Mon, 27 Oct 2025 08:32:07 -0500 Subject: [PATCH] change name to less specific name --- listen.py | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 listen.py diff --git a/listen.py b/listen.py new file mode 100644 index 0000000..feedf90 --- /dev/null +++ b/listen.py @@ -0,0 +1,126 @@ +# Write your code here :-) +import websockets +import asyncio +import json +import requests +import random +from os import getenv +from munch import munchify +from lxml import html +import pprint +import csv, io + +def strip_html(s): + return str(html.fromstring(s).text_content()) + + +api_token = getenv('LG_BOTSY_TOKEN') +endpoint = getenv('LG_ENDPOINT') +quotes_src = getenv('QUOTES_SOURCE') +quotes_list = json.loads(json.dumps(list(csv.DictReader(io.StringIO(requests.get(quotes_src).text))))) + +ws_uri=f"wss://{endpoint}/api/v1/streaming?access_token={api_token}&stream=user" +post_uri = f"https://{endpoint}/api/v1/statuses" +headers = { "Content-Type" : "application/json", + "Authorization" : f"Bearer {api_token}", + "User-Agent" : "fyrfliBOT" } + +status_dict = munchify({"status": "", + "in_reply_to_id": "", + "local_only": False, + "visibility": "mutuals_only", + "sensitive": False, + "content_type": "text/markdown"}) + +randomised_hellos = [ + 'hi back', + 'hello to you too' + 'aw hi hi', + 'well hello there', + 'mornin\' mornin\'', + 'g\'day mate', + 'allo', + '\'sup' + ] + +def get_random_quote(): + choise = quotes_list[random.randint(0, len(quotes_list) - 1)] + return choise['quote'] + '\n ~ ' + choise['author'] + ' ~' + +def process_request(the_post): + print('processing ', the_post.url) + # pprint.pprint(json.dumps(the_post), compact=True, indent=2) + hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()] + request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()] + if any(hellos): + respond_with = randomised_hellos[random.randint(0, len(randomised_hellos) - 1)] + status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!" + if any(request_quote): + status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote() + if status_dict.status: + status_dict.in_reply_to_id = the_post.id + pprint.pprint(status_dict, compact=False, indent=2) + r = requests.post( + post_uri, + headers=headers, + data=json.dumps(status_dict) + ) + print(r.reason, r.status_code, json.loads(r.text)) + + +async def hello(uri): + # async with websockets.connect(uri) as ws: + async for ws in websockets.connect(uri): + try: + message = await ws.recv() + decoded = munchify(json.loads(message)) + print(decoded.event, decoded.payload[1:150]) + if decoded.event != "delete": + the_post = munchify(json.loads(decoded.payload)) + match decoded.event: + case "delete": + print("a delete event for id ", decoded.payload, "\n===>\n") + + case "status.update": + print("an update to an existing post: ", the_post.url, "\n===>\n") + + case "update": + if the_post.reblog: + print( "a reblog of ", the_post.reblog.url, "\n===>\n" ) + else: + plain_text_content = strip_html(the_post.content) + print( "new status: ", the_post.url, "\n===>\n" ) + if the_post.mentions: + for mention in the_post.mentions: + if mention.username == "botsy": + process_request(the_post) + break + + case "notification": + process_request(the_post) + + case _: + print('unknown event: ', decoded.event, "\n", + " with payload beginning ", + decoded.payload[1:150]) + + except asyncio.exceptions.CancelledError: + print('keyboard interrupt received ... exiting') + break + + except KeyboardInterrupt: + print('keyboard interrupt received ... exiting') + break + + except websockets.ConnectionClosed: + break + + +async def main(): + async with websockets.serve(hello, ws_uri): + await asyncio.Future() + +if __name__ == "__main__": + asyncio.run(hello(ws_uri)) + +