#!/usr/bin/env python3 import time import requests import logging import json from result import Result, Ok, Err, is_err from typing import List from websockets.asyncio.client import connect import websockets import asyncio from apitypes import * from models import * from pydantic import BaseModel from sqlmodel import Session, SQLModel, create_engine, select class Config(BaseModel): apiurl: str number: str lab_cleaning_signal_base_group: str def signal_timestamp_to_datetime(timestamp: str) -> datetime.datetime: return datetime.datetime.fromtimestamp(int(timestamp)/1000) def now(): return datetime.datetime.now() T = TypeVar("T") class Changes[T](BaseModel): to_add: List[T] = [] to_remove: List[T] = [] def necessary_changes(current: List[T], desired: List[T]) -> Changes[T]: changes = Changes[T]() for item in desired: if item not in current: changes.to_add.append(item) for item in current: if item not in desired: changes.to_remove.append(item) return changes def format_seconds(seconds: float) -> str: seconds = int(seconds) if seconds < 60: return f"{seconds} seconds" elif seconds < 3600: return f"{seconds//60} minutes" elif seconds < 86400: return f"{seconds//3600} hours" else: return f"{seconds//86400} days" class SignalAPI: def __init__(self, apiurl, number): self.apiurl = apiurl self.number = number def set_username(self, username) -> Result[UsernameSetResponse, str]: # post request to set username # /v1/accounts/{number}/username # { # "username": "test" # } r = requests.post(f"{self.apiurl}/v1/accounts/{self.number}/username", json={"username": username}) if r.status_code == 201: return parse_response(UsernameSetResponse, r.text) else: return Err("Failed to set username.") def get_groups(self) -> Result[List[GroupEntry], str]: # get request to get groups # /v1/groups r = requests.get(f"{self.apiurl}/v1/groups/{self.number}") if r.status_code == 200: return parse_response(List[GroupEntry], r.text) else: return Err("Failed to get groups.") def get_group(self, group_id: str) -> Result[GroupEntry, str]: # get request to get group # /v1/groups/{group_id} r = requests.get(f"{self.apiurl}/v1/groups/{self.number}/{group_id}") if r.status_code == 200: return parse_response(GroupEntry, r.text) else: return Err("Failed to get group.") def create_group(self, group_request: CreateGroupRequest) -> Result[CreateGroupResponse, str]: r = requests.post(f"{self.apiurl}/v1/groups/{self.number}", json=group_request.model_dump()) if r.status_code == 201: return parse_response(CreateGroupResponse, r.text) else: return Err("Failed to create group.") def add_group_members(self, group_id: str, numbers_to_add: List[str]) -> Result[None, str]: # put request to add group members # /v1/groups/{group_id}/members # { # "members": ["+49123456789", "+49123456780"] # } if len(numbers_to_add) == 0: return Ok(None) r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_add}) if r.status_code == 204: return Ok(None) else: return Err("Failed to add group members.") def remove_group_members(self, group_id: str, numbers_to_remove: List[str]) -> Result[None, str]: # delete request to remove group members # /v1/groups/{group_id}/members # { # "members": ["+49123456789", "+49123456780"] # } if len(numbers_to_remove) == 0: return Ok(None) r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_remove}) if r.status_code == 204: return Ok(None) else: return Err("Failed to remove group members.") def update_group_members(self, group_id: str, other_members: List[str], remove: bool = True) -> Result[UpdateGroupResult, str]: # Add and remove members from the group, such that the only remaining # members of the group are this bot and users specified by other_members. group = self.get_group(group_id) if is_err(group): return Err(group.unwrap_err()) current_members = group.unwrap().members members_should = [self.number] + other_members member_changes = necessary_changes(current_members, members_should) add_result = self.add_group_members(group_id, member_changes.to_add) if add_result.is_err(): return Err(add_result.unwrap_err()) if remove: remove_result = self.remove_group_members(group_id, member_changes.to_remove) if remove_result.is_err(): return Err(remove_result.unwrap_err()) return Ok(member_changes) def add_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]: if len(admins) == 0: return Ok(None) r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins}) if r.status_code == 204: return Ok(None) else: return Err(r.json()['error']) def remove_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]: r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins}) if r.status_code == 204: return Ok(None) else: return Err(r.json()['error']) def update_group_admins(self, group_id: str, other_admins: List[str], remove: bool = True) -> Result[Changes[str], str]: # Add and remove admins from the group, such that the only remaining # admins of the group are this bot and users specified by other_admins. group = self.get_group(group_id) if is_err(group): return Err(group.unwrap_err()) admins_should = [self.number] + other_admins current_admins = group.unwrap().admins admin_changes = necessary_changes(current_admins, admins_should) add_result = self.add_group_admins(group_id, admin_changes.to_add) if add_result.is_err(): return Err(add_result.unwrap_err()) if remove: remove_result = self.remove_group_admins(group_id, admin_changes.to_remove) if remove_result.is_err(): return Err(remove_result.unwrap_err()) return Ok(admin_changes) def get_identities(self) -> Result[List[IdentityEntry], str]: r = requests.get(f"{self.apiurl}/v1/identities/{self.number}") if r.status_code == 200: return parse_response(List[IdentityEntry], r.text) else: return Err("Failed to get identities.") # /v1/identities/{number}/trust/{numberToTrust} def trust_identity(self, number_to_trust: str, trust_identity_request: TrustIdentityRequest) -> Result[None, str]: r = requests.put(f"{self.apiurl}/v1/identities/{self.number}/trust/{number_to_trust}", trust_identity_request.model_dump_json()) if r.status_code == 204: return Ok(None) else: return Err("Failed to trust identity.") async def websocket_connect_receive(self): ws_url = self.apiurl.replace("http", "ws") async for websocket in websockets.connect(f"{ws_url}/v1/receive/{self.number}"): yield websocket async def receive_message(self, websocket) -> Result[Message, str]: res = await websocket.recv() return parse_response(Message, res) def send_message(self, message: SendMessageSimple) -> Result[SendMessageResponse, str]: data = message.model_dump() data['number'] = self.number r = requests.post(f"{self.apiurl}/v2/send", json=data) if r.status_code == 201: return parse_response(SendMessageResponse, r.text) else: return Err(r.json()['error']) class LabCleaningBot: api: SignalAPI base_group: str def __init__(self, api, base_group): self.api = api self.base_group = base_group def get_other_members(self) -> Result[List[str], str]: # Get all members of the base group except the bot group = self.api.get_group(self.base_group) if group.is_err(): return Err(group.unwrap_err()) members = group.unwrap().members other_members = [] for member in members: if member != self.api.number: other_members.append(member) return Ok(other_members) def sync_members_as_active_users(self, session) -> Result[None, str]: # Get all members of the base group and set them as active users # if they are not already in the database maybe_members = self.get_other_members() if is_err(maybe_members): return Err(maybe_members.unwrap_err()) members = maybe_members.unwrap() for member in members: maybe_user = get_user_by_name(session, member, only_active=False) if is_err(maybe_user): user = User(name=member) session.add(user) else: user = maybe_user.unwrap() user.active = True for user in session.exec(select(User).where(User.active)): if user.name not in members: user.active = False session.commit() return Ok(None) def send_to_base_group(self, message: str) -> Result[None, str]: # Send a message to the base group message = SendMessageSimple(message=message, recipients=[self.base_group]) return self.api.send_message(message) async def receiver(self, session: Session): # Async routine that receives messages and calles message_received() for # each message async for websocket in self.api.websocket_connect_receive(): try: while True: message_result = await self.api.receive_message(websocket) if is_err(message_result): logging.debug(message_result.unwrap_err()) continue else: message = message_result.unwrap() self.message_received(message, session) except websockets.exceptions.ConnectionClosed: print("Websockets connection closed. Reestablishing connection.") def pad_hint(self, task: Task) -> str: if task.pad_url is not None: return "\n\nA list of the tasks is linked the description of the group or can be found here: " + task.pad_url + "." else: return "" def message_received(self, message: Message, session: Session): # This method is called by the async receiver for each message envelope = message.envelope match envelope: # Normal direct message (no edits, no reactions) case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=None), source=source): print(message, "direct", source) # Normal group message (no edits, no reactions) case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=GroupInfo(groupId=group_id)), source=source): print(message, group_id) # Reaction in direct messages case EnvelopeData( dataMessage=ReactionMessage( reaction=Reaction( emoji=emoji, isRemove=isRemove, targetSentTimestamp=targetSentTimestamp ), groupInfo=None, timestamp=timestamp ), source=source): reactionTimestamp = signal_timestamp_to_datetime(timestamp) requestTimestamp = signal_timestamp_to_datetime(targetSentTimestamp) maybe_request = get_participation_request_by_timestamp(session, requestTimestamp) if is_err(maybe_request): print("No participation request found for timestamp %s." % requestTimestamp) return request = maybe_request.unwrap() response_msg = "" if emoji == "👍" and not isRemove: accept_result = request.try_accept(now=reactionTimestamp) match accept_result: case Ok(AcceptInTime()) | Ok(AcceptAfterRejectAllowed()): response_msg = f"""Thank you! You accepted the request. I will add you to a signal group dedicated for this task. You can not directly leave the signal group until the task has started. If due to any reason you can not participate, please just change your "👍" reaction above to something else or remove the reaction. I will let the others know that you can not participate, remove you from the group and ask another person to overtake your task. As soon as {request.task.required_number_of_participants} people joined the task, I will let you know. Then you can start coordinate with your group.""" self.api.add_group_members(request.task.chatgroup, [source]) number_of_additional_people_needed = request.task.required_number_of_participants - len(request.task.accepted_requests()) pad_hint = self.pad_hint(request.task) if number_of_additional_people_needed < 0: self.api.send_message(SendMessageSimple( message="This task now has more participants than necessary. This can happen if a person has first rejected a request and then accepted it. You can handle this situation as you like." + pad_hint, recipients=[request.task.chatgroup])) elif number_of_additional_people_needed < 1: self.api.send_message(SendMessageSimple( message="Enough participants have accepted the task. You can now start coordinating your task." + pad_hint, recipients=[request.task.chatgroup])) case Err(AcceptAfterRejectExpired()): response_msg = "You cannot accept the request after rejecting it after 5 minutes." case Err(AlreadyAccepted()): response_msg = "You already accepted the request." case Err(AcceptAfterTimeout()): response_msg = "You cannot accept the request after the timeout." else: reject_result = request.try_reject(now=reactionTimestamp) match reject_result: case Ok(RejectInTime()): response_msg = "You rejected the request. I will ask another person to overtake the task." case Ok(RejectAfterAccept()): response_msg = "You rejected the request after accepting it. I will remove you from the group now and ask another person to overtake the task." self.api.send_message(SendMessageSimple( message="Person will be removed now, since they rejected the request after initially accepting the task. This is most likely because the person does not have time anymore, something unexpected came up for the person or something else. I will ask another person to overtake the task. If this is not possible, I will let you know in a separate message below.", recipients=[request.task.chatgroup])) case Err(AlreadyRejected()): response_msg = "You already rejected the request." case Err(RejectAfterTimeout()): response_msg = "You cannot reject the request after the timeout." self.api.send_message( SendMessageSimple(message=response_msg, recipients=[source])) print(emoji, "direct", source, isRemove) def create_or_update_task_group(self, task: Task) -> Result[None, str]: chatgroup = task.chatgroup # Check if the group still exists if chatgroup is not None: info = self.api.get_group(chatgroup) if is_err(info): # group does not exist (or so...) print(info.unwrap_err()) chatgroup = None # Create a group for the task if it does not exist if chatgroup is None: group_creation_request = CreateGroupRequest(name=task.name, members=[self.api.number]) if task.pad_url is not None: group_creation_request.description = task.pad_url create_result = self.api.create_group(group_creation_request) if is_err(create_result): return Err(create_result.unwrap_err()) else: chatgroup = create_result.unwrap().id task.chatgroup = chatgroup update_members_result = self.api.update_group_members(chatgroup, [r.user.name for r in task.accepted_requests()]) if is_err(update_members_result): return Err(update_members_result.unwrap_err()) update_group_admins = self.api.update_group_admins(chatgroup, [r.user.name for r in task.accepted_requests()]) if is_err(update_group_admins): return Err(update_group_admins.unwrap_err()) return Ok(None) async def sync_members_and_tasks(self, session: Session): # Async routine that syncs active members using sync_members_as_active_users(), # sends out new requests for tasks to active users and handles timeouts for the # requests. unfulfillable_tasks = [] while True: self.assert_is_base_group_admin() sync_result = self.sync_members_as_active_users(session) if is_err(sync_result): print(sync_result.unwrap_err()) for task in get_active_tasks(session, now()): create_or_update_result = self.create_or_update_task_group(task) if is_err(create_or_update_result): print(create_or_update_result.unwrap_err()) continue reqs = task.create_additional_requests(now(), session) if is_err(reqs): if task not in unfulfillable_tasks and len(task.accepted_requests()) > 0: print("Could not fulfill task: " + task.name) additional_requested_users = [r.user.name for r in task.requested_requests()] self.api.send_message(SendMessageSimple(message=f"It was planned to do this task with {task.required_number_of_participants} participants. There are currently {len(additional_requested_users)} unanswered requests. However, currently, no additional users are left to request for this task. Please try to fulfill the tasks as good as you are able to within your group or ask other people directly if they can join your group." + self.pad_hint(task), recipients=[task.chatgroup])) unfulfillable_tasks.append(task) reqs = reqs.unwrap_err() else: reqs = reqs.unwrap() for request in reqs: seconds_to_due = (task.due - request.requested_at).total_seconds() text = f"""Hi! You have been requested to participate in the task: {task.name} ({task.required_number_of_participants} participants desired, starts in {format_seconds(seconds_to_due)}). To accept the request, react with 👍. To reject, react with any other emoji. You have time to answer for {format_seconds(task.timeout)}.""" message = SendMessageSimple(message=text, recipients=[request.user.name]) res = self.api.send_message(message) if is_ok(res): timestamp = signal_timestamp_to_datetime(res.unwrap().timestamp) request.requested_at = timestamp else: print(res.unwrap_err()) # Check for timeouts (for all tasks, also those that are timeouted) for task in get_tasks(session): for request in task.freshly_expired_requests(now()): print("Request expired:", repr(request)) message = SendMessageSimple( message="You did not respond to the task request in time.", recipients=[request.user.name]) res = self.api.send_message(message) session.commit() await asyncio.sleep(1) def assert_is_base_group_admin(self): group_info_result = self.api.get_group(self.base_group) if is_err(group_info_result): print("Error, could not get info about base_group " + self.base_group + ": " + group_info_result.unwrap_err()) exit(1) group_info = group_info_result.unwrap() if config.number not in group_info.admins: print("Error: Bot is not an admin of the base group.") exit(1) async def main(config: Config, session: Session): api = SignalAPI(config.apiurl, config.number) bot = LabCleaningBot(api, config.lab_cleaning_signal_base_group) bot.assert_is_base_group_admin() print("Bot started.") await asyncio.gather( bot.receiver(session), bot.sync_members_and_tasks(session) ) with open("config.json", "r") as f: config = Config.model_validate(json.load(f)) engine = create_engine("sqlite:///data.db") SQLModel.metadata.create_all(engine) if __name__ == "__main__": with Session(engine) as session: asyncio.run(main(config, session)) exit(0)