#!/usr/bin/env python3 import time import requests import json from result import Result, Ok, Err, is_err from typing import List from websockets.asyncio.client import connect import websockets import asyncio from apitypes import * from models import * from pydantic import BaseModel from sqlmodel import Session, SQLModel, create_engine, select class Config(BaseModel): apiurl: str number: str class SignalAPI: def __init__(self, apiurl, number): self.apiurl = apiurl self.number = number def set_username(self, username) -> Result[UsernameSetResponse, str]: # post request to set username # /v1/accounts/{number}/username # { # "username": "test" # } r = requests.post(f"{self.apiurl}/v1/accounts/{self.number}/username", json={"username": username}) if r.status_code == 201: return parse_response(UsernameSetResponse, r.text) else: print("Failed to set username") print(r.json()) def get_groups(self) -> Result[List[GroupEntry], str]: # get request to get groups # /v1/groups r = requests.get(f"{self.apiurl}/v1/groups/{self.number}") if r.status_code == 200: return parse_response(List[GroupEntry], r.text) else: print("Failed to get groups") print(r.json()) def get_group(self, group_id: str) -> Result[GroupEntry, str]: # get request to get group # /v1/groups/{group_id} r = requests.get(f"{self.apiurl}/v1/groups/{self.number}/{group_id}") if r.status_code == 200: return parse_response(GroupEntry, r.text) else: print("Failed to get group") print(r.json()) def create_group(self, group_request: CreateGroupRequest) -> Result[CreateGroupResponse, str]: r = requests.post(f"{self.apiurl}/v1/groups/{self.number}", json=group_request.model_dump()) if r.status_code == 201: return parse_response(CreateGroupResponse, r.text) else: print("Failed to create group") print(r.json()) def add_group_members(self, group_id: str, numbers_to_add: List[str]) -> Result[None, str]: # put request to add group members # /v1/groups/{group_id}/members # { # "members": ["+49123456789", "+49123456780"] # } r = requests.post(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_add}) if r.status_code == 204: return Ok(None) else: print("Failed to add group members") print(r.json()) def remove_group_members(self, group_id: str, numbers_to_remove: List[str]) -> Result[None, str]: # delete request to remove group members # /v1/groups/{group_id}/members # { # "members": ["+49123456789", "+49123456780"] # } r = requests.delete(f"{self.apiurl}/v1/groups/{self.number}/{group_id}/members", json={"members": numbers_to_remove}) if r.status_code == 204: return Ok(None) else: print("Failed to remove group members") print(r.json()) def update_group_members(self, group_id: str, other_members: List[str]) -> Result[UpdateGroupResult, str]: group = self.get_group(group_id) if is_err(group): return Err(group.unwrap_err()) current_members = group.unwrap().members members_to_add = [] members_to_remove = [] for member in other_members: if member not in current_members: members_to_add.append(member) for member in current_members: if member not in other_members: members_to_remove.append(member) if len(members_to_add) > 0: add_result = self.add_group_members(group_id, members_to_add) if add_result.is_err(): return Err(add_result.unwrap_err()) if len(members_to_remove) > 0: remove_result = self.remove_group_members(group_id, members_to_remove) if remove_result.is_err(): return Err(remove_result.unwrap_err()) return Ok(UpdateGroupResult(members_added=members_to_add, members_removed=members_to_remove)) def get_identities(self) -> Result[List[IdentityEntry], str]: r = requests.get(f"{self.apiurl}/v1/identities/{self.number}") if r.status_code == 200: return parse_response(List[IdentityEntry], r.text) else: print("Failed to get identities") print(r.json()) # /v1/identities/{number}/trust/{numberToTrust} def trust_identity(self, number_to_trust: str, trust_identity_request: TrustIdentityRequest) -> Result[None, str]: r = requests.put(f"{self.apiurl}/v1/identities/{self.number}/trust/{number_to_trust}", trust_identity_request.model_dump_json()) if r.status_code == 204: return Ok(None) else: print("Failed to trust identity") print(r.text) async def receive_messages(self): ws_apiurl = self.apiurl.replace("http", "ws") async for websocket in connect(f"{ws_apiurl}/v1/receive/{config.number}"): try: print(parse_response(Message, await websocket.recv())) #print(await websocket.recv()) except websockets.exceptions.ConnectionClosed: print("Websockets connection closed. Reestablishing connection.") class LabCleaningBot: def __init__(self, api, base_group): self.api = api self.base_group = base_group def get_other_members(self) -> Result[List[str], str]: group = self.api.get_group(self.base_group) if group.is_err(): return Err(group.unwrap_err()) members = group.unwrap().members other_members = [] for member in members: if member != self.api.number: other_members.append(member) return Ok(other_members) def sync_members_as_active_users(self, session) -> Result[None, str]: group = self.api.get_group(self.base_group) if group.is_err(): return Err(group.unwrap_err()) members = group.unwrap().members for member in members: maybe_user = get_user_by_name(session, member, only_active=False) if is_err(maybe_user): user = User(name=member) session.add(user) else: user = maybe_user.unwrap() user.active = True for user in session.exec(select(User).where(User.active)): if user.name not in members: user.active = False session.commit() if __name__ == "__main__": with open("config.json", "r") as f: config = Config.model_validate(json.load(f)) api = SignalAPI(config.apiurl, config.number) bot = LabCleaningBot(api, "group.bm5KT3NJUW5FdkpRNnR2ZGRFa01oOVZBeUYrVkdnd3NNTzFpNWdsR2pwUT0=") engine = create_engine("sqlite:///data.db") SQLModel.metadata.create_all(engine) with Session(engine) as session: while True: sync_result = bot.sync_members_as_active_users(session) if is_err(sync_result): print(sync_result.unwrap_err()) exit(0) res = bot.api.update_group_members("group.TTlKelhpUW1sUVJSU2Z2NDJpdjVWcllMTW93MTBNN2tseEtGaFkzQ1VsZz0=", groupinfo.members) if is_err(res): print(res.unwrap_err()) continue print(res.unwrap())