491 lines
18 KiB
Python
491 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import time
|
|
import requests
|
|
import logging
|
|
import json
|
|
from result import Result, Ok, Err, is_err
|
|
from typing import List
|
|
from websockets.asyncio.client import connect
|
|
import websockets
|
|
import asyncio
|
|
from apitypes import *
|
|
from models import *
|
|
from pydantic import BaseModel
|
|
from sqlmodel import Session, SQLModel, create_engine, select
|
|
|
|
class Config(BaseModel):
|
|
apiurl: str
|
|
number: str
|
|
|
|
def signal_timestamp_to_datetime(timestamp: str) -> datetime.datetime:
|
|
return datetime.datetime.fromtimestamp(int(timestamp)/1000)
|
|
|
|
def now():
|
|
return datetime.datetime.now()
|
|
|
|
T = TypeVar("T")
|
|
|
|
class Changes[T](BaseModel):
|
|
to_add: List[T] = []
|
|
to_remove: List[T] = []
|
|
|
|
def necessary_changes(current: List[T], desired: List[T]) -> Changes[T]:
|
|
changes = Changes[T]()
|
|
|
|
for item in desired:
|
|
if item not in current:
|
|
changes.to_add.append(item)
|
|
|
|
for item in current:
|
|
if item not in desired:
|
|
changes.to_remove.append(item)
|
|
|
|
return changes
|
|
|
|
def format_seconds(seconds: float) -> str:
|
|
seconds = int(seconds)
|
|
|
|
if seconds < 60:
|
|
return f"{seconds} seconds"
|
|
elif seconds < 3600:
|
|
return f"{seconds//60} minutes"
|
|
elif seconds < 86400:
|
|
return f"{seconds//3600} hours"
|
|
else:
|
|
return f"{seconds//86400} days"
|
|
|
|
class SignalAPI:
|
|
|
|
def __init__(self, apiurl, number):
|
|
self.apiurl = apiurl
|
|
self.number = number
|
|
|
|
def set_username(self, username) -> Result[UsernameSetResponse, str]:
|
|
# post request to set username
|
|
# /v1/accounts/{number}/username
|
|
# {
|
|
# "username": "test"
|
|
# }
|
|
r = requests.post(f"{self.apiurl}/v1/accounts/{self.number}/username", json={"username": username})
|
|
|
|
if r.status_code == 201:
|
|
return parse_response(UsernameSetResponse, r.text)
|
|
else:
|
|
return Err("Failed to set username.")
|
|
|
|
def get_groups(self) -> Result[List[GroupEntry], str]:
|
|
# get request to get groups
|
|
# /v1/groups
|
|
r = requests.get(f"{self.apiurl}/v1/groups/{self.number}")
|
|
|
|
if r.status_code == 200:
|
|
return parse_response(List[GroupEntry], r.text)
|
|
else:
|
|
return Err("Failed to get groups.")
|
|
|
|
def get_group(self, group_id: str) -> Result[GroupEntry, str]:
|
|
# get request to get group
|
|
# /v1/groups/{group_id}
|
|
r = requests.get(f"{self.apiurl}/v1/groups/{self.number}/{group_id}")
|
|
|
|
if r.status_code == 200:
|
|
return parse_response(GroupEntry, r.text)
|
|
else:
|
|
return Err("Failed to get group.")
|
|
|
|
def create_group(self, group_request: CreateGroupRequest) -> Result[CreateGroupResponse, str]:
|
|
r = requests.post(f"{self.apiurl}/v1/groups/{self.number}", json=group_request.model_dump())
|
|
|
|
if r.status_code == 201:
|
|
return parse_response(CreateGroupResponse, r.text)
|
|
else:
|
|
return Err("Failed to create group.")
|
|
|
|
def add_group_members(self, group_id: str, numbers_to_add: List[str]) -> Result[None, str]:
|
|
# put request to add group members
|
|
# /v1/groups/{group_id}/members
|
|
# {
|
|
# "members": ["+49123456789", "+49123456780"]
|
|
# }
|
|
if len(numbers_to_add) == 0:
|
|
return Ok(None)
|
|
|
|
r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_add})
|
|
|
|
if r.status_code == 204:
|
|
return Ok(None)
|
|
else:
|
|
return Err("Failed to add group members.")
|
|
|
|
def remove_group_members(self, group_id: str, numbers_to_remove: List[str]) -> Result[None, str]:
|
|
# delete request to remove group members
|
|
# /v1/groups/{group_id}/members
|
|
# {
|
|
# "members": ["+49123456789", "+49123456780"]
|
|
# }
|
|
if len(numbers_to_remove) == 0:
|
|
return Ok(None)
|
|
|
|
r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_remove})
|
|
|
|
if r.status_code == 204:
|
|
return Ok(None)
|
|
else:
|
|
return Err("Failed to remove group members.")
|
|
|
|
def update_group_members(self, group_id: str, other_members: List[str], remove: bool = True) -> Result[UpdateGroupResult, str]:
|
|
# Add and remove members from the group, such that the only remaining
|
|
# members of the group are this bot and users specified by other_members.
|
|
group = self.get_group(group_id)
|
|
if is_err(group):
|
|
return Err(group.unwrap_err())
|
|
|
|
current_members = group.unwrap().members
|
|
|
|
members_should = [self.number] + other_members
|
|
member_changes = necessary_changes(current_members, members_should)
|
|
|
|
add_result = self.add_group_members(group_id, member_changes.to_add)
|
|
if add_result.is_err():
|
|
return Err(add_result.unwrap_err())
|
|
|
|
if remove:
|
|
remove_result = self.remove_group_members(group_id, member_changes.to_remove)
|
|
if remove_result.is_err():
|
|
return Err(remove_result.unwrap_err())
|
|
|
|
return Ok(member_changes)
|
|
|
|
def add_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]:
|
|
if len(admins) == 0:
|
|
return Ok(None)
|
|
|
|
r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins})
|
|
|
|
if r.status_code == 204:
|
|
return Ok(None)
|
|
else:
|
|
return Err(r.json()['error'])
|
|
|
|
def remove_group_admins(self, group_id: str, admins: List[str]) -> Result[None, str]:
|
|
r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/admins", json={"admins": admins})
|
|
|
|
if r.status_code == 204:
|
|
return Ok(None)
|
|
else:
|
|
return Err(r.json()['error'])
|
|
|
|
def update_group_admins(self, group_id: str, other_admins: List[str], remove: bool = True) -> Result[Changes[str], str]:
|
|
# Add and remove admins from the group, such that the only remaining
|
|
# admins of the group are this bot and users specified by other_admins.
|
|
group = self.get_group(group_id)
|
|
|
|
if is_err(group):
|
|
return Err(group.unwrap_err())
|
|
|
|
admins_should = [self.number] + other_admins
|
|
current_admins = group.unwrap().admins
|
|
|
|
admin_changes = necessary_changes(current_admins, admins_should)
|
|
|
|
add_result = self.add_group_admins(group_id, admin_changes.to_add)
|
|
if add_result.is_err():
|
|
return Err(add_result.unwrap_err())
|
|
|
|
if remove:
|
|
remove_result = self.remove_group_admins(group_id, admin_changes.to_remove)
|
|
if remove_result.is_err():
|
|
return Err(remove_result.unwrap_err())
|
|
|
|
return Ok(admin_changes)
|
|
|
|
def get_identities(self) -> Result[List[IdentityEntry], str]:
|
|
r = requests.get(f"{self.apiurl}/v1/identities/{self.number}")
|
|
|
|
if r.status_code == 200:
|
|
return parse_response(List[IdentityEntry], r.text)
|
|
else:
|
|
return Err("Failed to get identities.")
|
|
|
|
# /v1/identities/{number}/trust/{numberToTrust}
|
|
def trust_identity(self, number_to_trust: str, trust_identity_request: TrustIdentityRequest) -> Result[None, str]:
|
|
r = requests.put(f"{self.apiurl}/v1/identities/{self.number}/trust/{number_to_trust}", trust_identity_request.model_dump_json())
|
|
|
|
if r.status_code == 204:
|
|
return Ok(None)
|
|
else:
|
|
return Err("Failed to trust identity.")
|
|
|
|
async def websocket_connect_receive(self):
|
|
ws_url = self.apiurl.replace("http", "ws")
|
|
async for websocket in websockets.connect(f"{ws_url}/v1/receive/{self.number}"):
|
|
yield websocket
|
|
|
|
async def receive_message(self, websocket) -> Result[Message, str]:
|
|
return parse_response(Message, await websocket.recv())
|
|
|
|
def send_message(self, message: SendMessageSimple) -> Result[SendMessageResponse, str]:
|
|
data = message.model_dump()
|
|
data['number'] = self.number
|
|
r = requests.post(f"{self.apiurl}/v2/send", json=data)
|
|
|
|
if r.status_code == 201:
|
|
return parse_response(SendMessageResponse, r.text)
|
|
else:
|
|
return Err(r.json()['error'])
|
|
|
|
|
|
class LabCleaningBot:
|
|
api: SignalAPI
|
|
base_group: str
|
|
|
|
def __init__(self, api, base_group):
|
|
self.api = api
|
|
self.base_group = base_group
|
|
|
|
def get_other_members(self) -> Result[List[str], str]:
|
|
# Get all members of the base group except the bot
|
|
group = self.api.get_group(self.base_group)
|
|
if group.is_err():
|
|
return Err(group.unwrap_err())
|
|
|
|
members = group.unwrap().members
|
|
other_members = []
|
|
|
|
for member in members:
|
|
if member != self.api.number:
|
|
other_members.append(member)
|
|
|
|
return Ok(other_members)
|
|
|
|
def sync_members_as_active_users(self, session) -> Result[None, str]:
|
|
# Get all members of the base group and set them as active users
|
|
# if they are not already in the database
|
|
maybe_members = self.get_other_members()
|
|
|
|
if is_err(maybe_members):
|
|
return Err(maybe_members.unwrap_err())
|
|
|
|
members = maybe_members.unwrap()
|
|
|
|
for member in members:
|
|
maybe_user = get_user_by_name(session, member, only_active=False)
|
|
|
|
if is_err(maybe_user):
|
|
user = User(name=member)
|
|
session.add(user)
|
|
else:
|
|
user = maybe_user.unwrap()
|
|
user.active = True
|
|
|
|
for user in session.exec(select(User).where(User.active)):
|
|
if user.name not in members:
|
|
user.active = False
|
|
|
|
session.commit()
|
|
return Ok(None)
|
|
|
|
def send_to_base_group(self, message: str) -> Result[None, str]:
|
|
# Send a message to the base group
|
|
message = SendMessageSimple(message=message, recipients=[self.base_group])
|
|
return self.api.send_message(message)
|
|
|
|
async def receiver(self, session: Session):
|
|
# Async routine that receives messages and calles message_received() for
|
|
# each message
|
|
async for websocket in self.api.websocket_connect_receive():
|
|
try:
|
|
while True:
|
|
message_result = await self.api.receive_message(websocket)
|
|
|
|
if is_err(message_result):
|
|
logging.debug(message_result.unwrap_err())
|
|
continue
|
|
else:
|
|
message = message_result.unwrap()
|
|
|
|
self.message_received(message, session)
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
print("Websockets connection closed. Reestablishing connection.")
|
|
|
|
def message_received(self, message: Message, session: Session):
|
|
# This method is called by the async receiver for each message
|
|
envelope = message.envelope
|
|
|
|
match envelope:
|
|
# Normal direct message (no edits, no reactions)
|
|
case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=None), sourceNumber=sourceNumber):
|
|
print(message, "direct", sourceNumber)
|
|
# Normal group message (no edits, no reactions)
|
|
case EnvelopeData(dataMessage=DataMessage(message=message, groupInfo=GroupInfo(groupId=group_id)), sourceNumber=sourceNumber):
|
|
print(message, group_id)
|
|
# Reaction in direct messages
|
|
case EnvelopeData(
|
|
dataMessage=ReactionMessage(
|
|
reaction=Reaction(
|
|
emoji=emoji,
|
|
isRemove=isRemove,
|
|
targetSentTimestamp=targetSentTimestamp
|
|
),
|
|
groupInfo=None,
|
|
timestamp=timestamp
|
|
),
|
|
sourceNumber=sourceNumber):
|
|
|
|
reactionTimestamp = signal_timestamp_to_datetime(timestamp)
|
|
requestTimestamp = signal_timestamp_to_datetime(targetSentTimestamp)
|
|
|
|
maybe_request = get_participation_request_by_timestamp(session, requestTimestamp)
|
|
|
|
if is_err(maybe_request):
|
|
print("No participation request found for timestamp %s." % requestTimestamp)
|
|
return
|
|
|
|
request = maybe_request.unwrap()
|
|
|
|
response_msg = ""
|
|
|
|
if emoji == "👍" and not isRemove:
|
|
accept_result = request.try_accept(now=reactionTimestamp)
|
|
|
|
match accept_result:
|
|
case Ok(AcceptInTime()):
|
|
response_msg = "You accepted the request."
|
|
case Ok(AcceptAfterRejectAllowed()):
|
|
response_msg = "You accepted the request after rejecting it."
|
|
case Err(AcceptAfterRejectExpired()):
|
|
response_msg = "You cannot accept the request after rejecting it after 5 minutes."
|
|
case Err(AlreadyAccepted()):
|
|
response_msg = "You already accepted the request."
|
|
case Err(AcceptAfterTimeout()):
|
|
response_msg = "You cannot accept the request after the timeout."
|
|
else:
|
|
reject_result = request.try_reject(now=reactionTimestamp)
|
|
|
|
match reject_result:
|
|
case Ok(RejectInTime()):
|
|
response_msg = "You rejected the request."
|
|
case Ok(RejectAfterAccept()):
|
|
response_msg = "You rejected the request after accepting it."
|
|
case Err(AlreadyRejected()):
|
|
response_msg = "You already rejected the request."
|
|
case Err(RejectAfterTimeout()):
|
|
response_msg = "You cannot reject the request after the timeout."
|
|
|
|
self.api.send_message(
|
|
SendMessageSimple(message=response_msg, recipients=[sourceNumber]))
|
|
|
|
print(emoji, "direct", sourceNumber, isRemove)
|
|
|
|
def create_or_update_task_group(self, task: Task) -> Result[str, str]:
|
|
chatgroup = task.chatgroup
|
|
|
|
# Check if the group still exists
|
|
if chatgroup is not None:
|
|
info = self.api.get_group(chatgroup)
|
|
|
|
if is_err(info):
|
|
# group does not exist (or so...)
|
|
print(info.unwrap_err())
|
|
chatgroup = None
|
|
|
|
# Create a group for the task if it does not exist
|
|
if chatgroup is None:
|
|
create_result = self.api.create_group(CreateGroupRequest(name=task.name, members=[self.api.number]))
|
|
|
|
if is_err(create_result):
|
|
return Err(create_result.unwrap_err())
|
|
else:
|
|
chatgroup = create_result.unwrap().id
|
|
|
|
task.chatgroup = chatgroup
|
|
self.api.update_group_members(chatgroup, [r.user.name for r in task.accepted_requests()])
|
|
|
|
async def sync_members_and_tasks(self, session: Session):
|
|
# Async routine that syncs active members using sync_members_as_active_users(),
|
|
# sends out new requests for tasks to active users and handles timeouts for the
|
|
# requests.
|
|
unfulfillable_tasks = []
|
|
|
|
while True:
|
|
sync_result = self.sync_members_as_active_users(session)
|
|
|
|
self.api.update_group_members(self.base_group, ["+4915773232355"])
|
|
self.api.update_group_admins(self.base_group, ["+4915773232355"], remove=False)
|
|
|
|
if is_err(sync_result):
|
|
print(sync_result.unwrap_err())
|
|
|
|
for task in get_active_tasks(session, now()):
|
|
reqs = task.create_additional_requests(now(), session)
|
|
|
|
if is_err(reqs):
|
|
if task not in unfulfillable_tasks:
|
|
print("Could not fulfill task: " + task.name)
|
|
|
|
unfulfillable_tasks.append(task)
|
|
|
|
reqs = reqs.unwrap_err()
|
|
else:
|
|
reqs = reqs.unwrap()
|
|
|
|
for request in reqs:
|
|
seconds_to_due = (task.due - request.requested_at).total_seconds()
|
|
text = f"""Hi!
|
|
|
|
You have been requested to participate in the task: {task.name} (starts in {format_seconds(seconds_to_due)}).
|
|
|
|
To accept the request, react with 👍. To reject, react with any other emoji.
|
|
|
|
You have time to answer for {format_seconds(task.timeout)}."""
|
|
|
|
message = SendMessageSimple(message=text, recipients=[request.user.name])
|
|
res = self.api.send_message(message)
|
|
|
|
if is_ok(res):
|
|
timestamp = signal_timestamp_to_datetime(res.unwrap().timestamp)
|
|
request.requested_at = timestamp
|
|
else:
|
|
print(res.unwrap_err())
|
|
|
|
# Check for timeouts (for all tasks, also those that are timeouted)
|
|
for task in get_tasks(session):
|
|
for request in task.freshly_expired_requests(now()):
|
|
print("Request expired:", repr(request))
|
|
message = SendMessageSimple(
|
|
message="You did not respond to the task request in time.",
|
|
recipients=[request.user.name])
|
|
res = self.api.send_message(message)
|
|
|
|
self.create_or_update_task_group(task)
|
|
|
|
session.commit()
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
async def main(config: Config, session: Session):
|
|
api = SignalAPI(config.apiurl, config.number)
|
|
|
|
bot = LabCleaningBot(api, "group.bm5KT3NJUW5FdkpRNnR2ZGRFa01oOVZBeUYrVkdnd3NNTzFpNWdsR2pwUT0=")
|
|
|
|
await asyncio.gather(
|
|
bot.receiver(session),
|
|
bot.sync_members_and_tasks(session)
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
|
|
with open("config.json", "r") as f:
|
|
config = Config.model_validate(json.load(f))
|
|
|
|
engine = create_engine("sqlite:///data.db")
|
|
SQLModel.metadata.create_all(engine)
|
|
|
|
with Session(engine) as session:
|
|
asyncio.run(main(config, session))
|
|
|
|
exit(0)
|