lab-signal-bot/main.py
2024-12-28 02:19:45 +01:00

514 lines
20 KiB
Python

#!/usr/bin/env python3
import time
import requests
import logging
import json
from result import Result, Ok, Err, is_err
from typing import List
from websockets.asyncio.client import connect
import websockets
import asyncio
from apitypes import *
from models import *
from pydantic import BaseModel
from sqlmodel import Session, SQLModel, create_engine, select
class Config(BaseModel):
apiurl: str
number: str
def signal_timestamp_to_datetime(timestamp: str) -> datetime.datetime:
return datetime.datetime.fromtimestamp(int(timestamp)/1000)
def now():
return datetime.datetime.now()
T = TypeVar("T")
class Changes[T](BaseModel):
to_add: List[T] = []
to_remove: List[T] = []
def necessary_changes(current: List[T], desired: List[T]) -> Changes[T]:
changes = Changes[T]()
for item in desired:
if item not in current:
changes.to_add.append(item)
for item in current:
if item not in desired:
changes.to_remove.append(item)
return changes
def format_seconds(seconds: float) -> str:
seconds = int(seconds)
if seconds < 60:
return f"{seconds} seconds"
elif seconds < 3600:
return f"{seconds//60} minutes"
elif seconds < 86400:
return f"{seconds//3600} hours"
else:
return f"{seconds//86400} days"
class SignalAPI:
def __init__(self, apiurl, number):
self.apiurl = apiurl
self.number = number
def set_username(self, username) -> Result[UsernameSetResponse, str]:
# post request to set username
# /v1/accounts/{number}/username
# {
# "username": "test"
# }
r = requests.post(f"{self.apiurl}/v1/accounts/{self.number}/username", json={"username": username})
if r.status_code == 201:
return parse_response(UsernameSetResponse, r.text)
else:
return Err("Failed to set username.")
def get_groups(self) -> Result[List[GroupEntry], str]:
# get request to get groups
# /v1/groups
r = requests.get(f"{self.apiurl}/v1/groups/{self.number}")
if r.status_code == 200:
return parse_response(List[GroupEntry], r.text)
else:
return Err("Failed to get groups.")
def get_group(self, group_id: str) -> Result[GroupEntry, str]:
# get request to get group
# /v1/groups/{group_id}
r = requests.get(f"{self.apiurl}/v1/groups/{self.number}/{group_id}")
if r.status_code == 200:
return parse_response(GroupEntry, r.text)
else:
return Err("Failed to get group.")
def create_group(self, group_request: CreateGroupRequest) -> Result[CreateGroupResponse, str]:
r = requests.post(f"{self.apiurl}/v1/groups/{self.number}", json=group_request.model_dump())
if r.status_code == 201:
return parse_response(CreateGroupResponse, r.text)
else:
return Err("Failed to create group.")
def add_group_members(self, group_id: str, numbers_to_add: List[str]) -> Result[None, str]:
# put request to add group members
# /v1/groups/{group_id}/members
# {
# "members": ["+49123456789", "+49123456780"]
# }
if len(numbers_to_add) == 0:
return Ok(None)
r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_add})
if r.status_code == 204:
return Ok(None)
else:
return Err("Failed to add group members.")
def remove_group_members(self, group_id: str, numbers_to_remove: List[str]) -> Result[None, str]:
# delete request to remove group members
# /v1/groups/{group_id}/members
# {
# "members": ["+49123456789", "+49123456780"]
# }
if len(numbers_to_remove) == 0:
return Ok(None)
r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_remove})
if r.status_code == 204:
return Ok(None)
else:
return Err("Failed to remove group members.")
def update_group_members(self, group_id: str, other_members: List[str], remove: bool = True) -> Result[UpdateGroupResult, str]:
# Add and remove members from the group, such that the only remaining
# members of the group are this bot and users specified by other_members.
group = self.get_group(group_id)
if is_err(group):
return Err(group.unwrap_err())
current_members = group.unwrap().members
members_should = [self.number] + other_members
member_changes = necessary_changes(current_members, members_should)
add_result = self.add_group_members(group_id, member_changes.to_add)
if add_result.is_err():
return Err(add_result.unwrap_err())
if remove:
remove_result = self.remove_group_members(group_id, member_changes.to_remove)
if remove_result.is_err():
return Err(remove_result.unwrap_err())
return Ok(member_changes)
def add_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]:
if len(admins) == 0:
return Ok(None)
r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins})
if r.status_code == 204:
return Ok(None)
else:
return Err(r.json()['error'])
def remove_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]:
r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins})
if r.status_code == 204:
return Ok(None)
else:
return Err(r.json()['error'])
def update_group_admins(self, group_id: str, other_admins: List[str], remove: bool = True) -> Result[Changes[str], str]:
# Add and remove admins from the group, such that the only remaining
# admins of the group are this bot and users specified by other_admins.
group = self.get_group(group_id)
if is_err(group):
return Err(group.unwrap_err())
admins_should = [self.number] + other_admins
current_admins = group.unwrap().admins
admin_changes = necessary_changes(current_admins, admins_should)
add_result = self.add_group_admins(group_id, admin_changes.to_add)
if add_result.is_err():
return Err(add_result.unwrap_err())
if remove:
remove_result = self.remove_group_admins(group_id, admin_changes.to_remove)
if remove_result.is_err():
return Err(remove_result.unwrap_err())
return Ok(admin_changes)
def get_identities(self) -> Result[List[IdentityEntry], str]:
r = requests.get(f"{self.apiurl}/v1/identities/{self.number}")
if r.status_code == 200:
return parse_response(List[IdentityEntry], r.text)
else:
return Err("Failed to get identities.")
# /v1/identities/{number}/trust/{numberToTrust}
def trust_identity(self, number_to_trust: str, trust_identity_request: TrustIdentityRequest) -> Result[None, str]:
r = requests.put(f"{self.apiurl}/v1/identities/{self.number}/trust/{number_to_trust}", trust_identity_request.model_dump_json())
if r.status_code == 204:
return Ok(None)
else:
return Err("Failed to trust identity.")
async def websocket_connect_receive(self):
ws_url = self.apiurl.replace("http", "ws")
async for websocket in websockets.connect(f"{ws_url}/v1/receive/{self.number}"):
yield websocket
async def receive_message(self, websocket) -> Result[Message, str]:
res = await websocket.recv()
return parse_response(Message, res)
def send_message(self, message: SendMessageSimple) -> Result[SendMessageResponse, str]:
data = message.model_dump()
data['number'] = self.number
r = requests.post(f"{self.apiurl}/v2/send", json=data)
if r.status_code == 201:
return parse_response(SendMessageResponse, r.text)
else:
return Err(r.json()['error'])
class LabCleaningBot:
api: SignalAPI
base_group: str
def __init__(self, api, base_group):
self.api = api
self.base_group = base_group
def get_other_members(self) -> Result[List[str], str]:
# Get all members of the base group except the bot
group = self.api.get_group(self.base_group)
if group.is_err():
return Err(group.unwrap_err())
members = group.unwrap().members
other_members = []
for member in members:
if member != self.api.number:
other_members.append(member)
return Ok(other_members)
def sync_members_as_active_users(self, session) -> Result[None, str]:
# Get all members of the base group and set them as active users
# if they are not already in the database
maybe_members = self.get_other_members()
if is_err(maybe_members):
return Err(maybe_members.unwrap_err())
members = maybe_members.unwrap()
for member in members:
maybe_user = get_user_by_name(session, member, only_active=False)
if is_err(maybe_user):
user = User(name=member)
session.add(user)
else:
user = maybe_user.unwrap()
user.active = True
for user in session.exec(select(User).where(User.active)):
if user.name not in members:
user.active = False
session.commit()
return Ok(None)
def send_to_base_group(self, message: str) -> Result[None, str]:
# Send a message to the base group
message = SendMessageSimple(message=message, recipients=[self.base_group])
return self.api.send_message(message)
async def receiver(self, session: Session):
# Async routine that receives messages and calles message_received() for
# each message
async for websocket in self.api.websocket_connect_receive():
try:
while True:
message_result = await self.api.receive_message(websocket)
if is_err(message_result):
logging.debug(message_result.unwrap_err())
continue
else:
message = message_result.unwrap()
self.message_received(message, session)
except websockets.exceptions.ConnectionClosed:
print("Websockets connection closed. Reestablishing connection.")
def message_received(self, message: Message, session: Session):
# This method is called by the async receiver for each message
envelope = message.envelope
match envelope:
# Normal direct message (no edits, no reactions)
case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=None), source=source):
print(message, "direct", source)
# Normal group message (no edits, no reactions)
case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=GroupInfo(groupId=group_id)), source=source):
print(message, group_id)
# Reaction in direct messages
case EnvelopeData(
dataMessage=ReactionMessage(
reaction=Reaction(
emoji=emoji,
isRemove=isRemove,
targetSentTimestamp=targetSentTimestamp
),
groupInfo=None,
timestamp=timestamp
),
source=source):
reactionTimestamp = signal_timestamp_to_datetime(timestamp)
requestTimestamp = signal_timestamp_to_datetime(targetSentTimestamp)
maybe_request = get_participation_request_by_timestamp(session, requestTimestamp)
if is_err(maybe_request):
print("No participation request found for timestamp %s." % requestTimestamp)
return
request = maybe_request.unwrap()
response_msg = ""
if emoji == "👍" and not isRemove:
accept_result = request.try_accept(now=reactionTimestamp)
match accept_result:
case Ok(AcceptInTime()) | Ok(AcceptAfterRejectAllowed()):
response_msg = """Thank you! You accepted the request.
I will add you to a signal group dedicated for this task. You can not directly leave the signal group until the task has started.
If due to any reason you can not participate, please just change your "👍" reaction above to something else or remove the reaction. I will let the others know that you can not participate, remove you from the group and ask another person to overtake your task."""
self.api.add_group_members(request.task.chatgroup, [source])
number_of_additional_requests_to_be_sent = request.task.additional_requests_to_be_sent()
if number_of_additional_requests_to_be_sent < 0:
self.api.send_message(SendMessageSimple(
message="This task now has more participants than necessary. This can happen if a person has first rejected a request and then accepted it. You can handle this situation as you like.",
recipients=[request.task.chatgroup]))
elif number_of_additional_requests_to_be_sent < 1:
self.api.send_message(SendMessageSimple(
message="Enough participants have accepted the task. You can now start coordinating your task.",
recipients=[request.task.chatgroup]))
case Err(AcceptAfterRejectExpired()):
response_msg = "You cannot accept the request after rejecting it after 5 minutes."
case Err(AlreadyAccepted()):
response_msg = "You already accepted the request."
case Err(AcceptAfterTimeout()):
response_msg = "You cannot accept the request after the timeout."
else:
reject_result = request.try_reject(now=reactionTimestamp)
match reject_result:
case Ok(RejectInTime()):
response_msg = "You rejected the request. I will ask another person to overtake the task."
case Ok(RejectAfterAccept()):
response_msg = "You rejected the request after accepting it. I will remove you from the group now and ask another person to overtake the task."
self.api.send_message(SendMessageSimple(
message="Person will be removed now, since they rejected the request after initially accepting the task. This is most likely because the person does not have time anymore, something unexpected came up for the person or something else. I will ask another person to overtake the task. If this is not possible, I will let you know in a separate message below.",
recipients=[request.task.chatgroup]))
case Err(AlreadyRejected()):
response_msg = "You already rejected the request."
case Err(RejectAfterTimeout()):
response_msg = "You cannot reject the request after the timeout."
self.api.send_message(
SendMessageSimple(message=response_msg, recipients=[source]))
print(emoji, "direct", source, isRemove)
def create_or_update_task_group(self, task: Task) -> Result[str, str]:
chatgroup = task.chatgroup
# Check if the group still exists
if chatgroup is not None:
info = self.api.get_group(chatgroup)
if is_err(info):
# group does not exist (or so...)
print(info.unwrap_err())
chatgroup = None
# Create a group for the task if it does not exist
if chatgroup is None:
create_result = self.api.create_group(CreateGroupRequest(name=task.name, members=[self.api.number]))
if is_err(create_result):
return Err(create_result.unwrap_err())
else:
chatgroup = create_result.unwrap().id
task.chatgroup = chatgroup
self.api.update_group_members(chatgroup, [r.user.name for r in task.accepted_requests()])
self.api.update_group_admins(chatgroup, [r.user.name for r in task.accepted_requests()])
async def sync_members_and_tasks(self, session: Session):
# Async routine that syncs active members using sync_members_as_active_users(),
# sends out new requests for tasks to active users and handles timeouts for the
# requests.
unfulfillable_tasks = []
while True:
sync_result = self.sync_members_as_active_users(session)
self.api.update_group_members(self.base_group, ["+4915773232355"], remove=False)
self.api.update_group_admins(self.base_group, ["+4915773232355"], remove=False)
if is_err(sync_result):
print(sync_result.unwrap_err())
for task in get_active_tasks(session, now()):
reqs = task.create_additional_requests(now(), session)
if is_err(reqs):
if task not in unfulfillable_tasks:
print("Could not fulfill task: " + task.name)
unfulfillable_tasks.append(task)
reqs = reqs.unwrap_err()
else:
reqs = reqs.unwrap()
for request in reqs:
seconds_to_due = (task.due - request.requested_at).total_seconds()
text = f"""Hi!
You have been requested to participate in the task: {task.name} (starts in {format_seconds(seconds_to_due)}).
To accept the request, react with 👍. To reject, react with any other emoji.
You have time to answer for {format_seconds(task.timeout)}."""
message = SendMessageSimple(message=text, recipients=[request.user.name])
res = self.api.send_message(message)
if is_ok(res):
timestamp = signal_timestamp_to_datetime(res.unwrap().timestamp)
request.requested_at = timestamp
else:
print(res.unwrap_err())
# Check for timeouts (for all tasks, also those that are timeouted)
for task in get_tasks(session):
for request in task.freshly_expired_requests(now()):
print("Request expired:", repr(request))
message = SendMessageSimple(
message="You did not respond to the task request in time.",
recipients=[request.user.name])
res = self.api.send_message(message)
self.create_or_update_task_group(task)
session.commit()
await asyncio.sleep(1)
async def main(config: Config, session: Session):
api = SignalAPI(config.apiurl, config.number)
bot = LabCleaningBot(api, "group.bm5KT3NJUW5FdkpRNnR2ZGRFa01oOVZBeUYrVkdnd3NNTzFpNWdsR2pwUT0=")
await asyncio.gather(
bot.receiver(session),
bot.sync_members_and_tasks(session)
)
if __name__ == "__main__":
with open("config.json", "r") as f:
config = Config.model_validate(json.load(f))
engine = create_engine("sqlite:///data.db")
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
asyncio.run(main(config, session))
exit(0)