#!/usr/bin/env python3 import time import requests import logging import json from result import Result, Ok, Err, is_err from typing import List from websockets.asyncio.client import connect import websockets import asyncio from apitypes import * from models import * from pydantic import BaseModel from sqlmodel import Session, SQLModel, create_engine, select class Config(BaseModel): apiurl: str number: str def signal_timestamp_to_datetime(timestamp: str) -> datetime.datetime: return datetime.datetime.fromtimestamp(int(timestamp)/1000) def now(): return datetime.datetime.now() T = TypeVar("T") class Changes[T](BaseModel): to_add: List[T] = [] to_remove: List[T] = [] def necessary_changes(current: List[T], desired: List[T]) -> Changes[T]: changes = Changes[T]() for item in desired: if item not in current: changes.to_add.append(item) for item in current: if item not in desired: changes.to_remove.append(item) return changes def format_seconds(seconds: float) -> str: seconds = int(seconds) if seconds < 60: return f"{seconds} seconds" elif seconds < 3600: return f"{seconds//60} minutes" elif seconds < 86400: return f"{seconds//3600} hours" else: return f"{seconds//86400} days" class SignalAPI: def __init__(self, apiurl, number): self.apiurl = apiurl self.number = number def set_username(self, username) -> Result[UsernameSetResponse, str]: # post request to set username # /v1/accounts/{number}/username # { # "username": "test" # } r = requests.post(f"{self.apiurl}/v1/accounts/{self.number}/username", json={"username": username}) if r.status_code == 201: return parse_response(UsernameSetResponse, r.text) else: return Err("Failed to set username.") def get_groups(self) -> Result[List[GroupEntry], str]: # get request to get groups # /v1/groups r = requests.get(f"{self.apiurl}/v1/groups/{self.number}") if r.status_code == 200: return parse_response(List[GroupEntry], r.text) else: return Err("Failed to get groups.") def get_group(self, group_id: str) -> Result[GroupEntry, str]: # get request to get group # /v1/groups/{group_id} r = requests.get(f"{self.apiurl}/v1/groups/{self.number}/{group_id}") if r.status_code == 200: return parse_response(GroupEntry, r.text) else: return Err("Failed to get group.") def create_group(self, group_request: CreateGroupRequest) -> Result[CreateGroupResponse, str]: r = requests.post(f"{self.apiurl}/v1/groups/{self.number}", json=group_request.model_dump()) if r.status_code == 201: return parse_response(CreateGroupResponse, r.text) else: return Err("Failed to create group.") def add_group_members(self, group_id: str, numbers_to_add: List[str]) -> Result[None, str]: # put request to add group members # /v1/groups/{group_id}/members # { # "members": ["+49123456789", "+49123456780"] # } if len(numbers_to_add) == 0: return Ok(None) r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_add}) if r.status_code == 204: return Ok(None) else: return Err("Failed to add group members.") def remove_group_members(self, group_id: str, numbers_to_remove: List[str]) -> Result[None, str]: # delete request to remove group members # /v1/groups/{group_id}/members # { # "members": ["+49123456789", "+49123456780"] # } if len(numbers_to_remove) == 0: return Ok(None) r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_remove}) if r.status_code == 204: return Ok(None) else: return Err("Failed to remove group members.") def update_group_members(self, group_id: str, other_members: List[str], remove: bool = True) -> Result[UpdateGroupResult, str]: # Add and remove members from the group, such that the only remaining # members of the group are this bot and users specified by other_members. group = self.get_group(group_id) if is_err(group): return Err(group.unwrap_err()) current_members = group.unwrap().members members_should = [self.number] + other_members member_changes = necessary_changes(current_members, members_should) add_result = self.add_group_members(group_id, member_changes.to_add) if add_result.is_err(): return Err(add_result.unwrap_err()) if remove: remove_result = self.remove_group_members(group_id, member_changes.to_remove) if remove_result.is_err(): return Err(remove_result.unwrap_err()) return Ok(member_changes) def add_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]: if len(admins) == 0: return Ok(None) r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins}) if r.status_code == 204: return Ok(None) else: return Err(r.json().error) def remove_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]: r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins}) if r.status_code == 204: return Ok(None) else: return Err(r.json().error) def update_group_admins(self, group_id: str, other_admins: List[str], remove: bool = True) -> Result[Changes[str], str]: # Add and remove admins from the group, such that the only remaining # admins of the group are this bot and users specified by other_admins. group = self.get_group(group_id) if is_err(group): return Err(group.unwrap_err()) admins_should = [self.number] + other_admins current_admins = group.unwrap().admins admin_changes = necessary_changes(current_admins, admins_should) add_result = self.add_group_admins(group_id, admin_changes.to_add) if add_result.is_err(): return Err(add_result.unwrap_err()) if remove: remove_result = self.remove_group_admins(group_id, admin_changes.to_remove) if remove_result.is_err(): return Err(remove_result.unwrap_err()) return Ok(admin_changes) def get_identities(self) -> Result[List[IdentityEntry], str]: r = requests.get(f"{self.apiurl}/v1/identities/{self.number}") if r.status_code == 200: return parse_response(List[IdentityEntry], r.text) else: return Err("Failed to get identities.") # /v1/identities/{number}/trust/{numberToTrust} def trust_identity(self, number_to_trust: str, trust_identity_request: TrustIdentityRequest) -> Result[None, str]: r = requests.put(f"{self.apiurl}/v1/identities/{self.number}/trust/{number_to_trust}", trust_identity_request.model_dump_json()) if r.status_code == 204: return Ok(None) else: return Err("Failed to trust identity.") async def websocket_connect_receive(self): ws_url = self.apiurl.replace("http", "ws") async for websocket in websockets.connect(f"{ws_url}/v1/receive/{self.number}"): yield websocket async def receive_message(self, websocket) -> Result[Message, str]: return parse_response(Message, await websocket.recv()) def send_message(self, message: SendMessageSimple) -> Result[SendMessageResponse, str]: data = message.model_dump() data['number'] = self.number r = requests.post(f"{self.apiurl}/v2/send", json=data) if r.status_code == 201: return parse_response(SendMessageResponse, r.text) else: return Err("Failed to send message") class LabCleaningBot: api: SignalAPI base_group: str def __init__(self, api, base_group): self.api = api self.base_group = base_group def get_other_members(self) -> Result[List[str], str]: # Get all members of the base group except the bot group = self.api.get_group(self.base_group) if group.is_err(): return Err(group.unwrap_err()) members = group.unwrap().members other_members = [] for member in members: if member != self.api.number: other_members.append(member) return Ok(other_members) def sync_members_as_active_users(self, session) -> Result[None, str]: # Get all members of the base group and set them as active users # if they are not already in the database maybe_members = self.get_other_members() if is_err(maybe_members): return Err(maybe_members.unwrap_err()) members = maybe_members.unwrap() for member in members: maybe_user = get_user_by_name(session, member, only_active=False) if is_err(maybe_user): user = User(name=member) session.add(user) else: user = maybe_user.unwrap() user.active = True for user in session.exec(select(User).where(User.active)): if user.name not in members: user.active = False session.commit() return Ok(None) def send_to_base_group(self, message: str) -> Result[None, str]: # Send a message to the base group message = SendMessageSimple(message=message, recipients=[self.base_group]) return self.api.send_message(message) async def receiver(self, session: Session): # Async routine that receives messages and calles message_received() for # each message async for websocket in self.api.websocket_connect_receive(): try: while True: message_result = await self.api.receive_message(websocket) if is_err(message_result): logging.debug(message_result.unwrap_err()) continue else: message = message_result.unwrap() self.message_received(message, session) except websockets.exceptions.ConnectionClosed: print("Websockets connection closed. Reestablishing connection.") def message_received(self, message: Message, session: Session): # This method is called by the async receiver for each message envelope = message.envelope match envelope: # Normal direct message (no edits, no reactions) case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=None), sourceNumber=sourceNumber): print(message, "direct", sourceNumber) # Normal group message (no edits, no reactions) case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=GroupInfo(groupId=group_id)), sourceNumber=sourceNumber): print(message, group_id) # Reaction in direct messages case EnvelopeData( dataMessage=ReactionMessage( reaction=Reaction( emoji=emoji, isRemove=isRemove, targetSentTimestamp=targetSentTimestamp ), groupInfo=None, timestamp=timestamp ), sourceNumber=sourceNumber): reactionTimestamp = signal_timestamp_to_datetime(timestamp) requestTimestamp = signal_timestamp_to_datetime(targetSentTimestamp) maybe_request = get_participation_request_by_timestamp(session, requestTimestamp) if is_err(maybe_request): print("No participation request found for timestamp %s." % requestTimestamp) return request = maybe_request.unwrap() response_msg = "" if emoji == "👍" and not isRemove: accept_result = request.try_accept(now=reactionTimestamp) match accept_result: case Ok(AcceptInTime()): response_msg = "You accepted the request." case Ok(AcceptAfterRejectAllowed()): response_msg = "You accepted the request after rejecting it." case Err(AcceptAfterRejectExpired()): response_msg = "You cannot accept the request after rejecting it after 5 minutes." case Err(AlreadyAccepted()): response_msg = "You already accepted the request." case Err(AcceptAfterTimeout()): response_msg = "You cannot accept the request after the timeout." else: reject_result = request.try_reject(now=reactionTimestamp) match reject_result: case Ok(RejectInTime()): response_msg = "You rejected the request." case Ok(RejectAfterAccept()): response_msg = "You rejected the request after accepting it." case Err(AlreadyRejected()): response_msg = "You already rejected the request." case Err(RejectAfterTimeout()): response_msg = "You cannot reject the request after the timeout." self.api.send_message( SendMessageSimple(message=response_msg, recipients=[sourceNumber])) print(emoji, "direct", sourceNumber, isRemove) async def sync_members_and_tasks(self, session: Session): # Async routine that syncs active members using sync_members_as_active_users(), # sends out new requests for tasks to active users and handles timeouts for the # requests. unfulfillable_tasks = [] while True: sync_result = self.sync_members_as_active_users(session) self.api.update_group_members(self.base_group, ["+4915773232355"]) self.api.update_group_admins(self.base_group, ["+4915773232355"], remove=False) if is_err(sync_result): print(sync_result.unwrap_err()) for task in get_active_tasks(session, now()): reqs = task.create_additional_requests(now(), session) if is_err(reqs): if task not in unfulfillable_tasks: print("Could not fulfill task: " + task.name) unfulfillable_tasks.append(task) reqs = reqs.unwrap_err() else: reqs = reqs.unwrap() for request in reqs: seconds_to_due = (task.due - request.requested_at).total_seconds() text = f"""Hi! You have been requested to participate in the task: {task.name} (starts in {format_seconds(seconds_to_due)}). To accept the request, react with 👍. To reject, react with any other emoji. You have time to answer for {format_seconds(task.timeout)}.""" message = SendMessageSimple(message=text, recipients=[request.user.name]) res = self.api.send_message(message) if is_ok(res): timestamp = signal_timestamp_to_datetime(res.unwrap().timestamp) request.requested_at = timestamp else: print(res.unwrap_err()) # Check for timeouts (for all tasks, also those that are timeouted) for task in get_tasks(session): for request in task.freshly_expired_requests(now()): print("Request expired:", repr(request)) message = SendMessageSimple( message="You did not respond to the task request in time.", recipients=[request.user.name]) res = self.api.send_message(message) session.commit() await asyncio.sleep(1) async def main(config: Config, session: Session): api = SignalAPI(config.apiurl, config.number) bot = LabCleaningBot(api, "group.bm5KT3NJUW5FdkpRNnR2ZGRFa01oOVZBeUYrVkdnd3NNTzFpNWdsR2pwUT0=") await asyncio.gather( bot.receiver(session), bot.sync_members_and_tasks(session) ) if __name__ == "__main__": with open("config.json", "r") as f: config = Config.model_validate(json.load(f)) engine = create_engine("sqlite:///data.db") SQLModel.metadata.create_all(engine) with Session(engine) as session: asyncio.run(main(config, session)) exit(0)