You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.9 KiB

##########################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kklochko@protonmail.com> #
# #
# This file is part of notification-producer. #
# #
# notification-producer is free software: you can redistribute it and/or #
# modify it under the terms of the GNU Affero General Public License as #
# published by the Free Software Foundation, either version 3 of the #
# License, or (at your option) any later version. #
# #
# notification-producer is distributed in the hope that it will be #
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty #
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
# Affero General Public License for more details. #
# #
# You should have received a copy of the GNU Affero General Public #
# License along with notification-producer. If not, see #
# <https://www.gnu.org/licenses/>. #
##########################################################################
import asyncio
import struct
import sys
from rich.console import Console
from rich.prompt import Prompt
from notification_producer.timestamp import Timestamp
class ProducerClient:
BUFFER_SIZE = 1024
console = Console()
def __init__(self, ip: str, port: int):
self.__ip = ip
self.__port = port
self.reader = None
self.writer = None
async def connect(self):
try:
self.reader, self.writer = await asyncio.open_connection(self.__ip, self.__port)
now = Timestamp.now()
self.console.print(f'[yellow b][INFO][/][#8abeb7 b][{now}][/] Connected to the server ({self.__ip}:{self.__port}).')
except ConnectionRefusedError:
now = Timestamp.now()
sys.stderr.write(f"[ERROR][{now}] Could not connect to the server. Please, check the ip and the port.\n")
sys.exit(1)
async def send(self, message: str):
self.writer.write(struct.pack('<L', len(message)))
self.writer.write(message.encode('utf-8'))
async def send_notification(self, title: str, message: str):
await self.send(title)
await self.send(message)
async def notify_interactive(self):
await self.connect()
try:
while True:
self.console.print('\n[cyan b]Your notification[/]: ')
title = Prompt.ask('[cyan]title[/]')
message = Prompt.ask('[cyan]message[/]')
await self.send_notification(title, message)
if title == '' and message == '':
now = Timestamp.now()
self.console.print(f"[yellow b][INFO][/][#8abeb7 b][{now}][/] Finished.")
self.console.print("[green]bye[/]")
break
except asyncio.CancelledError:
pass
except KeyboardInterrupt:
await self.send_notification('', '')
now = Timestamp.now()
self.console.print(f"[yellow b][INFO][/][#8abeb7 b][{now}][/] Finished.")
self.console.print("\n[green]bye[/]")
sys.exit(0)
finally:
self.writer.close()
await self.writer.wait_closed()
async def notify(self, title, message):
await self.connect()
try:
await self.send_notification(title, message)
await self.send_notification('', '')
except asyncio.CancelledError:
pass
finally:
self.writer.close()
await self.writer.wait_closed()