Compare commits

..

No commits in common. "2d23297cb5fc8dd28b2f1a414b2a5a79193a7701" and "6f9eea48b0857982ef349f9230e3f180503ccef1" have entirely different histories.

View file

@ -3,69 +3,26 @@ import websockets
import asyncio import asyncio
import json import json
import requests import requests
import random
from os import getenv from os import getenv
from munch import DefaultMunch as dm, Munch from munch import DefaultMunch as dm
from lxml import html from lxml import html
import pprint
import csv, io
def strip_html(s): def strip_html(s):
return str(html.fromstring(s).text_content()) return str(html.fromstring(s).text_content())
api_token = getenv('LG_BOTSY_TOKEN') api_token = getenv('LG_BOTSY_TOKEN')
fyrfli_token = getenv('LG_FYRFLI_TOKEN')
endpoint = getenv('LG_ENDPOINT') endpoint = getenv('LG_ENDPOINT')
quotes_src = getenv('QUOTES_SOURCE')
quotes_list = json.loads(json.dumps(list(csv.DictReader(io.StringIO(requests.get(quotes_src).text)))))
ws_uri=f"wss://{endpoint}/api/v1/streaming?access_token={api_token}&stream=user" ws_uri=f"wss://{endpoint}/api/v1/streaming?access_token={api_token}&stream=user"
message = ""
post_uri = f"https://{endpoint}/api/v1/statuses" post_uri = f"https://{endpoint}/api/v1/statuses"
note_to_post = ""
headers = { "Content-Type" : "application/json", headers = { "Content-Type" : "application/json",
"Authorization" : f"Bearer {api_token}", "Authorization" : f"Bearer {fyrfli_token}",
"User-Agent" : "fyrfliBOT" } "User-Agent" : "fyrfliBOT" }
mention_id = ""
status_dict = Munch({"status": "", created_note = ""
"in_reply_to_id": "",
"local_only": False,
"visibility": "mutuals_only",
"sensitive": False,
"content_type": "text/markdown"})
randomised_hellos = [
'hi back',
'hello to you too'
'aw hi hi',
'well hello there',
'mornin\' mornin\'',
'g\'day mate',
'allo'
]
def get_random_quote():
choise = quotes_list[random.randint(0, len(quotes_list) - 1)]
return choise['quote'] + '\n ~ ' + choise['author'] + ' ~'
def process_request(the_post):
print('processing ', the_post.url)
pprint.pprint(json.dumps(the_post), compact=True, indent=2)
hellos = ["hey" in the_post.content.lower(),"hey you" in the_post.content.lower(), "hi" in the_post.content.lower(), "hello" in the_post.content.lower()]
request_quote = ["random quote" in the_post.content.lower(), "quote please" in the_post.content.lower()]
if any(hellos):
respond_with = randomised_hellos[random.randint(0, len(randomised_hellos) - 1)]
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) {respond_with}!"
if any(request_quote):
status_dict.status = f"[@{the_post.account.acct}]({the_post.account.url}) here's a quote for you:\n\n" + get_random_quote()
if status_dict.status:
status_dict.in_reply_to_id = the_post.id
pprint.pprint(status_dict, compact=False, indent=2)
r = requests.post(
post_uri,
headers=headers,
data=json.dumps(status_dict)
)
print(r.reason, r.status_code, json.loads(r.text))
async def hello(uri): async def hello(uri):
# async with websockets.connect(uri) as ws: # async with websockets.connect(uri) as ws:
@ -73,35 +30,73 @@ async def hello(uri):
try: try:
message = await ws.recv() message = await ws.recv()
decoded = dm.fromDict(json.loads(message)) decoded = dm.fromDict(json.loads(message))
print(decoded.event, decoded.payload[1:150]) print(decoded.event, decoded.payload[1:49])
if decoded.event != "delete":
the_post = dm.fromDict(json.loads(decoded.payload))
match decoded.event: match decoded.event:
case "delete": case "delete":
print("a delete event for id ", decoded.payload, "\n===>\n") print("a delete event for id ", decoded.payload, "\n===>\n")
case "status.update": case "update"|"status.update":
print("an update to an existing post: ", the_post.url, "\n===>\n") the_update = dm.fromDict(json.loads(decoded.payload))
print(the_update)
case "update": if the_update.reblog:
if the_post.reblog: print(
print( "a reblog of ", the_post.reblog.url, "\n===>\n" ) "a reblog of ",
the_update.reblog.url,
"\n id ",
the_update.id,
"\n created at ",
the_update.created_at,
"\n===>\n"
)
else: else:
plain_text_content = strip_html(the_post.content) plain_text_content = strip_html(the_update.content)
print( "new status: ", the_post.url, "\n===>\n" ) print(
if the_post.mentions: "a status update (new or edit) ",
for mention in the_post.mentions: the_update.url,
if mention.username == "botsy": "\n id ",
process_request(the_post) the_update.id,
break "\n by ",
the_update.account.acct,
"\n created at ",
the_update.created_at,
"\n the content \n",
plain_text_content,
"\n===>\n"
)
strings_to_listen_for = ["folks" in plain_text_content.lower(),
"love" in plain_text_content.lower(),
"you" in plain_text_content.lower()]
if any(strings_to_listen_for):
note_to_post = {
"status": f"[@{the_update.account.acct}](the_update.account.url) We come to love not by finding a perfect person, but by learning to see an imperfect person perfectly.\n ~ Sam Keen ~ ",
"in_reply_to_id": the_update.id,
"visibility": "mutuals_only",
"content_type": "text/markdown"
}
r = requests.post(
post_uri,
headers=headers,
json=note_to_post
)
print(r.reason, r.status_code)
case "notification": case "notification":
process_request(the_post) the_notification = dm.fromDict(json.loads(decoded.payload))
print(
"a ",
the_notification.type,
" notification\n",
" from ",
the_notification.account.acct,
" id ", the_notification.id, "\n",
" created at ", the_notification.created_at,
"\n===>\n"
)
case _: case _:
print('unknown event: ', decoded.event, "\n", print('unknown event: ', decoded.event, "\n",
" with payload beginning ", " with payload beginning ",
decoded.payload[1:150]) decoded.payload[1:100])
except asyncio.exceptions.CancelledError: except asyncio.exceptions.CancelledError:
print('keyboard interrupt received ... exiting') print('keyboard interrupt received ... exiting')
@ -121,5 +116,3 @@ async def main():
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(hello(ws_uri)) asyncio.run(hello(ws_uri))