diff --git a/gts_socket_listen.py b/gts_socket_listen.py deleted file mode 100644 index feedf90..0000000 --- a/gts_socket_listen.py +++ /dev/null @@ -1,126 +0,0 @@ -# Write your code here :-) -import websockets -import asyncio -import json -import requests -import random -from os import getenv -from munch import munchify -from lxml import html -import pprint -import csv, io - -def strip_html(s): - return str(html.fromstring(s).text_content()) - - -api_token = getenv('LG_BOTSY_TOKEN') -endpoint = getenv('LG_ENDPOINT') -quotes_src = getenv('QUOTES_SOURCE') -quotes_list = json.loads(json.dumps(list(csv.DictReader(io.StringIO(requests.get(quotes_src).text))))) - -ws_uri=f"wss://{endpoint}/api/v1/streaming?access_token={api_token}&stream=user" -post_uri = f"https://{endpoint}/api/v1/statuses" -headers = { "Content-Type" : "application/json", - "Authorization" : f"Bearer {api_token}", - "User-Agent" : "fyrfliBOT" } - -status_dict = munchify({"status": "", - "in_reply_to_id": "", - "local_only": False, - "visibility": "mutuals_only", - "sensitive": False, - "content_type": "text/markdown"}) - -randomised_hellos = [ - 'hi back', - 'hello to you too' - 'aw hi hi', - 'well hello there', - 'mornin\' mornin\'', - 'g\'day mate', - 'allo', - '\'sup' - ] - -def get_random_quote(): - choise = quotes_list[random.randint(0, len(quotes_list) - 1)] - return choise['quote'] + '\n ~ ' + choise['author'] + ' ~' - -def process_request(the_post): - print('processing ', the_post.url) - # pprint.pprint(json.dumps(the_post), compact=True, indent=2) - hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()] - request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()] - if any(hellos): - respond_with = randomised_hellos[random.randint(0, len(randomised_hellos) - 1)] - status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!" - if any(request_quote): - status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote() - if status_dict.status: - status_dict.in_reply_to_id = the_post.id - pprint.pprint(status_dict, compact=False, indent=2) - r = requests.post( - post_uri, - headers=headers, - data=json.dumps(status_dict) - ) - print(r.reason, r.status_code, json.loads(r.text)) - - -async def hello(uri): - # async with websockets.connect(uri) as ws: - async for ws in websockets.connect(uri): - try: - message = await ws.recv() - decoded = munchify(json.loads(message)) - print(decoded.event, decoded.payload[1:150]) - if decoded.event != "delete": - the_post = munchify(json.loads(decoded.payload)) - match decoded.event: - case "delete": - print("a delete event for id ", decoded.payload, "\n===>\n") - - case "status.update": - print("an update to an existing post: ", the_post.url, "\n===>\n") - - case "update": - if the_post.reblog: - print( "a reblog of ", the_post.reblog.url, "\n===>\n" ) - else: - plain_text_content = strip_html(the_post.content) - print( "new status: ", the_post.url, "\n===>\n" ) - if the_post.mentions: - for mention in the_post.mentions: - if mention.username == "botsy": - process_request(the_post) - break - - case "notification": - process_request(the_post) - - case _: - print('unknown event: ', decoded.event, "\n", - " with payload beginning ", - decoded.payload[1:150]) - - except asyncio.exceptions.CancelledError: - print('keyboard interrupt received ... exiting') - break - - except KeyboardInterrupt: - print('keyboard interrupt received ... exiting') - break - - except websockets.ConnectionClosed: - break - - -async def main(): - async with websockets.serve(hello, ws_uri): - await asyncio.Future() - -if __name__ == "__main__": - asyncio.run(hello(ws_uri)) - -