########################################################################## # Copyright (C) 2023 Kostiantyn Klochko # # # # This file is part of notification-producer. # # # # notification-producer is free software: you can redistribute it and/or # # modify it under the terms of the GNU Affero General Public License as # # published by the Free Software Foundation, either version 3 of the # # License, or (at your option) any later version. # # # # notification-producer is distributed in the hope that it will be # # useful, but WITHOUT ANY WARRANTY; without even the implied warranty # # of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # # Affero General Public License for more details. # # # # You should have received a copy of the GNU Affero General Public # # License along with notification-producer. If not, see # # . # ########################################################################## import asyncio import struct import sys from rich.console import Console from rich.prompt import Prompt class ProducerClient: BUFFER_SIZE = 1024 console = Console() def __init__(self, ip: str, port: int): self.__ip = ip self.__port = port self.reader = None self.writer = None async def connect(self): try: self.reader, self.writer = await asyncio.open_connection(self.__ip, self.__port) self.console.print(f'[yellow b][INFO][/] Connected to the server ({self.__ip}:{self.__port}).') except ConnectionRefusedError: sys.stderr.write("Could not connect to the server. Please, check the ip and the port.\n") sys.exit(1) async def send(self, message: str): self.writer.write(struct.pack('