Code refactoring

This commit is contained in:
mirai 2022-12-05 18:31:40 +03:00
parent d1dc5b56e6
commit f592e440fb
10 changed files with 211 additions and 89 deletions

10
.idea/cmd-chat.iml Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (cmd-chat)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/cmd-chat.iml" filepath="$PROJECT_DIR$/.idea/cmd-chat.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@ -1,128 +1,106 @@
import os
import time
import rsa
import platform
import requests
import threading
import threading
from colorama import init
from colorama import Fore
from cryptography.fernet import Fernet
from websocket import create_connection
from core.crypto import RSAService
init()
OS = "Windows"
if "Linux" in str(platform.platform()):
OS = "Linux"
class Client:
class Client(RSAService):
def _key_gen(self) -> None:
(pubkey, privkey) = rsa.newkeys(512)
with open("private.pem", "wb") as f:
f.write(privkey.save_pkcs1())
with open("public.pem", "wb") as f:
f.write(pubkey.save_pkcs1())
def __init__(self):
def __init__(self, server: str, port: int, username: str):
super().__init__()
# Server info
self.server = input("server ip: \n")
self.port = input("server port: \n")
self.username = input("your username: \n")
self.server = server
self.port = port
self.username = username
# Urls
self.base_url = f"http://{self.server}:{self.port}"
self.talk_url = f"{self.base_url}/talk"
self.info_url = f"{self.base_url}/update"
self.key_url = f"{self.base_url}/get_key"
self.ws_url = f"ws://{self.server}:{self.port}"
# Keys
self.pubkey = None
self.privkey = None
self.symetric_key = None
self.fernet = None
def __get_os(self) -> str:
if "Linux" in str(platform.platform()):
return "Linux"
return "Windows"
def send_info(self):
ws = create_connection(f"{self.ws_url}/talk")
while True:
user_input = input("You're message: ")
message = f'{self.username}: {user_input}'
socket_message = str({
"text": self.fernet.encrypt(message.encode()),
"username": self.username
})
ws.send(payload=socket_message.encode())
try:
user_input = input("You're message: ")
message = f'{self.username}: {user_input}'
socket_message = str({
"text": self._encrypt(message),
"username": self.username
})
ws.send(payload=socket_message.encode())
except KeyboardInterrupt:
ws.close()
quit()
except Exception as exc:
ws.close()
print("Something went wrong! ", exc)
quit()
def print_message(self, message: str) -> str:
message = message.split(":")
if message[0] == self.username:
return Fore.MAGENTA + message[0] + ": " + message[1] + Fore.WHITE
return Fore.MAGENTA + message[0] + ": " + message[1] + Fore.WHITE
return message[0] + ": " + message[1] + Fore.WHITE
def __clear_console(self):
# For windows clear command its cls
# For linux clear command its clear
if self.__get_os() == "Linux":
os.system("clear")
else:
return message[0] + ": " + message[1] + Fore.WHITE
os.system("cls")
def update_info(self):
ws = create_connection(f"{self.ws_url}/update")
last_try = None
while True:
time.sleep(0.05)
r = ws.recv()
if last_try == eval(r):
continue
last_try = eval(r)
# For windows clear command its cls
# For linux clear command its clear
if OS == "Linux":
os.system("clear")
else:
os.system("cls")
if len(last_try['status']) > 0:
for i, msg in enumerate(last_try["status"]):
actual_message = self.fernet.decrypt(
msg.encode()
).decode("utf-8")
if i == 0:
users = last_try["users_in_chat"]
for user in users:
ip = user.split(",")[0]
username = user.split(",")[1]
print("IP:", Fore.MAGENTA + ip + Fore.WHITE)
print("USERNAME: ", Fore.GREEN + username + Fore.WHITE)
print()
print(f"{self.print_message(actual_message)}")
else:
print(f"{self.print_message(actual_message)}")
def _key_request(self) -> None:
with open('private.pem', 'rb') as f:
self.privkey = rsa.PrivateKey.load_pkcs1(f.read())
with open("public.pem", 'rb') as f:
r = requests.get(
self.key_url,
data={
"pubkey": f.read(),
"username": self.username
},
stream=True
)
message = r.raw.read(999)
self.symetric_key = rsa.decrypt(message, self.privkey)
self.fernet = Fernet(self.symetric_key)
def _remove_keys(self) -> None:
os.remove("private.pem")
os.remove("public.pem")
try:
time.sleep(0.05)
r = eval(ws.recv())
if last_try == r:
continue
last_try = r
self.__clear_console()
if len(last_try['status']) > 0:
for i, msg in enumerate(last_try["status"]):
actual_message = self._decrypt(msg)
if i == 0:
for user in last_try["users_in_chat"]:
print("IP:", Fore.MAGENTA + user.split(",")[0] + Fore.WHITE)
print("USERNAME: ", Fore.GREEN + user.split(",")[1] + Fore.WHITE)
print(f"\n{self.print_message(actual_message)}")
else:
print(f"{self.print_message(actual_message)}")
except KeyboardInterrupt:
ws.close()
quit()
except Exception as exc:
ws.close()
print("Something went wrong! ", exc)
quit()
def _validate_keys(self) -> None:
self._key_gen()
self._key_request()
with open('public.pem', "rb") as f:
first_key = f.read()
self.pubkey = rsa.PublicKey.load_pkcs1(first_key)
self._request_key(self.key_url, self.username)
self._remove_keys()
def __call__(self):
def run(self):
# Running two threads,
# One for sending info
# Second one for updating info
# Second one for updating info
self._validate_keys()
threads = [
threading.Thread(target=self.send_info),
@ -133,4 +111,8 @@ class Client:
if __name__ == '__main__':
Client()()
Client(
server=input("server ip:\n"),
port=int(input("server port: \n")),
username=input("username:\n").replace(" ", "").lower()
).run()

View File

@ -0,0 +1,28 @@
from abc import ABC, abstractmethod
class CryptoService(ABC):
@abstractmethod
def _encrypt(self, message: str) -> str:
raise NotImplementedError("Need to implement encrypt method")
@abstractmethod
def _decrypt(self, message: str) -> str:
raise NotImplementedError("Need to implement decrypt method")
@abstractmethod
def _request_key(self, url: str, username: str):
raise NotImplementedError("Need to implement request key method")
@abstractmethod
def _generate_keys(self):
raise NotImplementedError("Need to implement generate keys method")
@abstractmethod
def _get_generated_keys(self) -> list[str]:
raise NotImplementedError("Need to implement get generated keys method")
@abstractmethod
def _remove_keys(self):
raise NotImplementedError("Need to implement remove keys method")

65
client/core/crypto.py Normal file
View File

@ -0,0 +1,65 @@
import os
import rsa
import requests
from cryptography.fernet import Fernet
from core.abs.abs_crypto import CryptoService
class RSAService(CryptoService):
def __init__(self):
self.public_key = None
self.private_key = None
self.symmetric_key = None
self.fernet = None
self.private_key_name = "private.pem"
self.public_key_name = "public.pem"
self.keys_path: list[str] = []
self._generate_keys()
def _encrypt(self, message: str) -> str:
return self.fernet.encrypt(message.encode())
def _decrypt(self, message: str) -> str:
return self.fernet.decrypt(message.encode()).decode("utf-8")
def _request_key(self, url: str, username: str):
data = {
"pubkey": self._open_generated_file(self.public_key_name),
"username": username
}
r = requests.get(url, data=data, stream=True)
message = r.raw.read(999)
self.symmetric_key = rsa.decrypt(message, self.private_key)
self.fernet = Fernet(self.symmetric_key)
def __update_keys_path(self, path_list: list[str]) -> None:
for file in path_list:
self.keys_path.append(file)
def __write_generated_key(self, name: str, key) -> None:
with open(name, "wb") as f:
f.write(key.save_pkcs1())
def _open_generated_file(self, name: str) -> bytes:
with open(name, "rb") as f:
return f.read()
def _generate_keys(self):
(public_key, private_key) = rsa.newkeys(512)
self.__write_generated_key(self.private_key_name, private_key)
self.__write_generated_key(self.public_key_name, public_key)
self.private_key = rsa.PrivateKey.load_pkcs1(
self._open_generated_file(self.private_key_name)
)
self.public_key = rsa.PublicKey.load_pkcs1(
self._open_generated_file(self.public_key_name)
)
self.__update_keys_path(["public.pem", "private.pem"])
def _get_generated_keys(self):
return self.private_key, self.public_key
def _remove_keys(self):
for key in self.keys_path:
os.remove(key)

9
private.pem Normal file
View File

@ -0,0 +1,9 @@
-----BEGIN RSA PRIVATE KEY-----
MIIBPAIBAAJBAJYbSpDoRC8IBswfwPT2hu8m6w9lOAXCY3Oq8srtsvLhrknrdXsb
FWeWwikTJGkuWff5zNZXC8LN72+OdYXJGEkCAwEAAQJACUYcYEGJXOKBEQFxOXE1
uvbLlQLq6CgvXskUAQeYeQXkU+yAuFo72ZqIP4TDs+xn1RcjzLjRCBshsS+Yru3d
xQIjAK1PezX4YtRWdP2PkTpNtTpbIFgua88MKFYlW+jro4pDonMCHwDduaOcFhZL
Gszx3inSzEal9NvBIrfSwqwMGVSyn1MCIgE9gV84gNSOLdYmsd5d8f8R6eBXrLPV
nXBIYij/jrMNmtsCHwCLb3kCaalvZdVIrYvjsu8i4o9oL+smMaJ8oVlwU10CImFC
uFlA2nG0Zgocwy6WA7laZKd6clskEdTxRwoOQRdpfSM=
-----END RSA PRIVATE KEY-----

4
public.pem Normal file
View File

@ -0,0 +1,4 @@
-----BEGIN RSA PUBLIC KEY-----
MEgCQQCWG0qQ6EQvCAbMH8D09obvJusPZTgFwmNzqvLK7bLy4a5J63V7GxVnlsIp
EyRpLln3+czWVwvCze9vjnWFyRhJAgMBAAE=
-----END RSA PUBLIC KEY-----