# Write your code here :-) import websockets import asyncio import json import requests import random from os import getenv from munch import munchify from lxml import html import pprint import csv, io def strip_html(s): return str(html.fromstring(s).text_content()) api_token = getenv('LG_BOTSY_TOKEN') endpoint = getenv('LG_ENDPOINT') quotes_src = getenv('QUOTES_SOURCE') quotes_list = json.loads(json.dumps(list(csv.DictReader(io.StringIO(requests.get(quotes_src).text))))) ws_uri=f"wss://{endpoint}/api/v1/streaming?access_token={api_token}&stream=user" post_uri = f"https://{endpoint}/api/v1/statuses" headers = { "Content-Type" : "application/json", "Authorization" : f"Bearer {api_token}", "User-Agent" : "fyrfliBOT" } status_dict = munchify({"status": "", "in_reply_to_id": "", "local_only": False, "visibility": "mutuals_only", "sensitive": False, "content_type": "text/markdown"}) randomised_hellos = [ 'hi back', 'hello to you too' 'aw hi hi', 'well hello there', 'mornin\' mornin\'', 'g\'day mate', 'allo', '\'sup' ] def get_random_quote(): choise = quotes_list[random.randint(0, len(quotes_list) - 1)] return choise['quote'] + '\n ~ ' + choise['author'] + ' ~' def process_request(the_post): print('processing ', the_post.url) # pprint.pprint(json.dumps(the_post), compact=True, indent=2) hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()] request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()] if any(hellos): respond_with = randomised_hellos[random.randint(0, len(randomised_hellos) - 1)] status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!" if any(request_quote): status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote() if status_dict.status: status_dict.in_reply_to_id = the_post.id pprint.pprint(status_dict, compact=False, indent=2) r = requests.post( post_uri, headers=headers, data=json.dumps(status_dict) ) print(r.reason, r.status_code, json.loads(r.text)) async def hello(uri): # async with websockets.connect(uri) as ws: async for ws in websockets.connect(uri): try: message = await ws.recv() decoded = munchify(json.loads(message)) print(decoded.event, decoded.payload[1:150]) if decoded.event != "delete": the_post = munchify(json.loads(decoded.payload)) match decoded.event: case "delete": print("a delete event for id ", decoded.payload, "\n===>\n") case "status.update": print("an update to an existing post: ", the_post.url, "\n===>\n") case "update": if the_post.reblog: print( "a reblog of ", the_post.reblog.url, "\n===>\n" ) else: plain_text_content = strip_html(the_post.content) print( "new status: ", the_post.url, "\n===>\n" ) if the_post.mentions: for mention in the_post.mentions: if mention.username == "botsy": process_request(the_post) break case "notification": process_request(the_post) case _: print('unknown event: ', decoded.event, "\n", " with payload beginning ", decoded.payload[1:150]) except asyncio.exceptions.CancelledError: print('keyboard interrupt received ... exiting') break except KeyboardInterrupt: print('keyboard interrupt received ... exiting') break except websockets.ConnectionClosed: break async def main(): async with websockets.serve(hello, ws_uri): await asyncio.Future() if __name__ == "__main__": asyncio.run(hello(ws_uri))