Compare commits

..

No commits in common. "2d23297cb5fc8dd28b2f1a414b2a5a79193a7701" and "6f9eea48b0857982ef349f9230e3f180503ccef1" have entirely different histories.

View file

@ -3,69 +3,26 @@ import websockets
import asyncio
import json
import requests
import random
from os import getenv
from munch import DefaultMunch as dm, Munch
from munch import DefaultMunch as dm
from lxml import html
import pprint
import csv, io
def strip_html(s):
return str(html.fromstring(s).text_content())
api_token = getenv('LG_BOTSY_TOKEN')
fyrfli_token = getenv('LG_FYRFLI_TOKEN')
endpoint = getenv('LG_ENDPOINT')
quotes_src = getenv('QUOTES_SOURCE')
quotes_list = json.loads(json.dumps(list(csv.DictReader(io.StringIO(requests.get(quotes_src).text)))))
ws_uri=f"wss://{endpoint}/api/v1/streaming?access_token={api_token}&stream=user"
message = ""
post_uri = f"https://{endpoint}/api/v1/statuses"
note_to_post = ""
headers = { "Content-Type" : "application/json",
"Authorization" : f"Bearer {api_token}",
"Authorization" : f"Bearer {fyrfli_token}",
"User-Agent" : "fyrfliBOT" }
status_dict = Munch({"status": "",
"in_reply_to_id": "",
"local_only": False,
"visibility": "mutuals_only",
"sensitive": False,
"content_type": "text/markdown"})
randomised_hellos = [
'hi back',
'hello to you too'
'aw hi hi',
'well hello there',
'mornin\' mornin\'',
'g\'day mate',
'allo'
]
def get_random_quote():
choise = quotes_list[random.randint(0, len(quotes_list) - 1)]
return choise['quote'] + '\n ~ ' + choise['author'] + ' ~'
def process_request(the_post):
print('processing ', the_post.url)
pprint.pprint(json.dumps(the_post), compact=True, indent=2)
hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()]
request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()]
if any(hellos):
respond_with = randomised_hellos[random.randint(0, len(randomised_hellos) - 1)]
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!"
if any(request_quote):
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote()
if status_dict.status:
status_dict.in_reply_to_id = the_post.id
pprint.pprint(status_dict, compact=False, indent=2)
r = requests.post(
post_uri,
headers=headers,
data=json.dumps(status_dict)
)
print(r.reason, r.status_code, json.loads(r.text))
mention_id = ""
created_note = ""
async def hello(uri):
# async with websockets.connect(uri) as ws:
@ -73,35 +30,73 @@ async def hello(uri):
try:
message = await ws.recv()
decoded = dm.fromDict(json.loads(message))
print(decoded.event, decoded.payload[1:150])
if decoded.event != "delete":
the_post = dm.fromDict(json.loads(decoded.payload))
print(decoded.event, decoded.payload[1:49])
match decoded.event:
case "delete":
print("a delete event for id ", decoded.payload, "\n===>\n")
case "status.update":
print("an update to an existing post: ", the_post.url, "\n===>\n")
case "update":
if the_post.reblog:
print( "a reblog of ", the_post.reblog.url, "\n===>\n" )
case "update"|"status.update":
the_update = dm.fromDict(json.loads(decoded.payload))
print(the_update)
if the_update.reblog:
print(
"a reblog of ",
the_update.reblog.url,
"\n id ",
the_update.id,
"\n created at ",
the_update.created_at,
"\n===>\n"
)
else:
plain_text_content = strip_html(the_post.content)
print( "new status: ", the_post.url, "\n===>\n" )
if the_post.mentions:
for mention in the_post.mentions:
if mention.username == "botsy":
process_request(the_post)
break
plain_text_content = strip_html(the_update.content)
print(
"a status update (new or edit) ",
the_update.url,
"\n id ",
the_update.id,
"\n by ",
the_update.account.acct,
"\n created at ",
the_update.created_at,
"\n the content \n",
plain_text_content,
"\n===>\n"
)
strings_to_listen_for = ["folks" in plain_text_content.lower(),
"love" in plain_text_content.lower(),
"you" in plain_text_content.lower()]
if any(strings_to_listen_for):
note_to_post = {
"status": f"[@{the_update.account.acct}](the_update.account.url) We come to love not by finding a perfect person, but by learning to see an imperfect person perfectly.\n ~ Sam Keen ~ ",
"in_reply_to_id": the_update.id,
"visibility": "mutuals_only",
"content_type": "text/markdown"
}
r = requests.post(
post_uri,
headers=headers,
json=note_to_post
)
print(r.reason, r.status_code)
case "notification":
process_request(the_post)
the_notification = dm.fromDict(json.loads(decoded.payload))
print(
"a ",
the_notification.type,
" notification\n",
" from ",
the_notification.account.acct,
" id ", the_notification.id, "\n",
" created at ", the_notification.created_at,
"\n===>\n"
)
case _:
print('unknown event: ', decoded.event, "\n",
" with payload beginning ",
decoded.payload[1:150])
decoded.payload[1:100])
except asyncio.exceptions.CancelledError:
print('keyboard interrupt received ... exiting')
@ -121,5 +116,3 @@ async def main():
if __name__ == "__main__":
asyncio.run(hello(ws_uri))