Add main.py

This commit is contained in:
lemoer 2024-12-19 10:36:56 +01:00
parent 32734c73e0
commit da2352642e
3 changed files with 159 additions and 1 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
venv venv
__pycache__ __pycache__
*.json

4
config.json.example Normal file
View File

@ -0,0 +1,4 @@
{
"apiurl": "http://localhost:8080",
"number": "+4900000000001"
}

153
main.py Normal file
View File

@ -0,0 +1,153 @@
#!/usr/bin/env python3
import requests
import json
from result import Result, Ok, Err
from typing import List
from websockets.asyncio.client import connect
import websockets
import asyncio
from apitypes import *
from pydantic import BaseModel
class Config(BaseModel):
apiurl: str
number: str
class SignalAPI:
def __init__(self, apiurl, number):
self.apiurl = apiurl
self.number = number
def set_username(self, username) -> Result[UsernameSetResponse, str]:
# post request to set username
# /v1/accounts/{number}/username
# {
# "username": "test"
# }
r = requests.post(f"{self.apiurl}/v1/accounts/{self.number}/username", json={"username": username})
if r.status_code == 201:
return parse_response(UsernameSetResponse, r.text)
else:
print("Failed to set username")
print(r.json())
def get_groups(self) -> Result[List[GroupEntry], str]:
# get request to get groups
# /v1/groups
r = requests.get(f"{self.apiurl}/v1/groups/{self.number}")
if r.status_code == 200:
return parse_response(List[GroupEntry], r.text)
else:
print("Failed to get groups")
print(r.json())
def get_group(self, group_id: str) -> Result[GroupEntry, str]:
# get request to get group
# /v1/groups/{group_id}
r = requests.get(f"{self.apiurl}/v1/groups/{self.number}/{group_id}")
if r.status_code == 200:
return parse_response(GroupEntry, r.text)
else:
print("Failed to get group")
print(r.json())
def get_identities(self) -> Result[List[IdentityEntry], str]:
r = requests.get(f"{self.apiurl}/v1/identities/{self.number}")
if r.status_code == 200:
return parse_response(List[IdentityEntry], r.text)
else:
print("Failed to get identities")
print(r.json())
# /v1/identities/{number}/trust/{numberToTrust}
def trust_identity(self, number_to_trust: str, trust_identity_request: TrustIdentityRequest) -> Result[None, str]:
r = requests.put(f"{self.apiurl}/v1/identities/{self.number}/trust/{number_to_trust}", trust_identity_request.model_dump_json())
if r.status_code == 204:
return Ok(None)
else:
print("Failed to trust identity")
print(r.text)
async def receive_messages(self):
ws_apiurl = self.apiurl.replace("http", "ws")
async for websocket in connect(f"{ws_apiurl}/v1/receive/{config.number}"):
try:
print(parse_response(Message, await websocket.recv()))
#print(await websocket.recv())
except websockets.exceptions.ConnectionClosed:
print("Websockets connection closed. Reestablishing connection.")
class User(BaseModel):
number: str
name: str
pause_until: int | None = None
class UserList:
members: List[str]
class DataStore(BaseModel):
version: int = 1
user_list: UserList
#def test_user_list():
class LabCleaningBot:
def __init__(self, api, base_group):
self.api = api
self.base_group = base_group
def get_other_members(self) -> Result[List[str], str]:
group = self.api.get_group(self.base_group)
if group.is_err():
return Err(group.unwrap_err())
members = group.unwrap().members
other_members = []
for member in members:
if member != self.api.number:
other_members.append(member)
return Ok(other_members)
if __name__ == "__main__":
with open("config.json", "r") as f:
config = Config.model_validate(json.load(f))
api = SignalAPI(config.apiurl, config.number)
#print(api.set_username("leinelabbot"))
#groups = api.get_groups()
#print(groups)
#group = api.get_group("group.bm5KT3NJUW5FdkpRNnR2ZGRFa01oOVZBeUYrVkdnd3NNTzFpNWdsR2pwUT0=")
#print(group)
#identities = api.get_identities()
#print(identities)
#trust = api.trust_identity("+4915773232355", TrustAllKnownKeys())
#print(trust)
#receive_messages = api.receive_messages(1)
#print(receive_messages)
bot = LabCleaningBot(api, "group.bm5KT3NJUW5FdkpRNnR2ZGRFa01oOVZBeUYrVkdnd3NNTzFpNWdsR2pwUT0=")
print(bot.get_other_members())
#asyncio.run(api.receive_test())