You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
124 lines
5.3 KiB
124 lines
5.3 KiB
###############################################################################
|
|
# Copyright (C) 2023 Kostiantyn Klochko <kklochko@protonmail.com> #
|
|
# #
|
|
# This file is part of notification-server. #
|
|
# #
|
|
# notification-server is free software: you can redistribute it and/or modify #
|
|
# it under the terms of the GNU Affero General Public License as published by #
|
|
# the Free Software Foundation, either version 3 of the License, or (at your #
|
|
# option) any later version. #
|
|
# #
|
|
# notification-server is distributed in the hope that it will be useful, but #
|
|
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
|
|
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public #
|
|
# License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU Affero General Public License #
|
|
# along with notification-server. If not, see <https://www.gnu.org/licenses/>.#
|
|
###############################################################################
|
|
|
|
import struct
|
|
import asyncio
|
|
from rich.console import Console
|
|
from notification_server.notification_saver import NotificationSaver
|
|
from notification_server.timestamp import Timestamp
|
|
|
|
|
|
class Server:
|
|
BUFFER_SIZE = 1024
|
|
NOTIFICATION_DIR = None
|
|
console = Console()
|
|
|
|
def __init__(self, ip: str, port: int, producer_port: int, notification_dir: str | None = None):
|
|
self.NOTIFICATION_DIR = notification_dir
|
|
self.__client_socket = None
|
|
self.__producer_socket = None
|
|
self.__ip = ip
|
|
self.__port = port
|
|
self.__producer_port = producer_port
|
|
self.__client_writers = []
|
|
|
|
async def up(self):
|
|
self.__client_socket = await asyncio.start_server(
|
|
self.handle_client, self.__ip, self.__port
|
|
)
|
|
|
|
self.__producer_socket = await asyncio.start_server(
|
|
lambda reader, writer: self.handle_producer(reader, writer, self.NOTIFICATION_DIR),
|
|
self.__ip, self.__producer_port
|
|
)
|
|
|
|
now = Timestamp.now()
|
|
self.console.print(f'[yellow b][INFO][/][#8abeb7 b][{now}][/] Serves on {self.__ip}:{self.__port} for clients.')
|
|
self.console.print(f'[yellow b][INFO][/][#8abeb7 b][{now}][/] Receives notifications from producers at {self.__ip}:{self.__producer_port}.')
|
|
|
|
async with self.__producer_socket, self.__client_socket:
|
|
await asyncio.gather(
|
|
self.__producer_socket.serve_forever(),
|
|
self.__client_socket.serve_forever()
|
|
)
|
|
|
|
@staticmethod
|
|
async def send_message(writer, message: str):
|
|
writer.write(struct.pack('<L', len(message)))
|
|
writer.write(message.encode('utf-8'))
|
|
|
|
@staticmethod
|
|
async def receive_message(reader) -> str:
|
|
size, = struct.unpack('<L', await reader.readexactly(4))
|
|
message = await reader.readexactly(size)
|
|
return message.decode('utf-8')
|
|
|
|
async def broadcast_message(self, message):
|
|
for writer in self.__client_writers:
|
|
await self.send_message(writer, message)
|
|
|
|
async def handle_producer(self, reader, writer, notification_dir):
|
|
addr = writer.get_extra_info('peername')
|
|
now = Timestamp.now()
|
|
self.console.print(f"[yellow b][INFO][/][#8abeb7 b][{now}][/] A producer ({addr[0]}:{addr[1]}) connected.")
|
|
|
|
try:
|
|
while True:
|
|
title = await self.receive_message(reader)
|
|
message = await self.receive_message(reader)
|
|
|
|
if title == '' and message == '':
|
|
now = Timestamp.now()
|
|
self.console.print(f"[yellow b][INFO][/][#8abeb7 b][{now}][/] The producer ({addr[0]}:{addr[1]}) finished.")
|
|
break
|
|
|
|
if not self.NOTIFICATION_DIR is None:
|
|
now = Timestamp.now()
|
|
NotificationSaver.save_notification(self.NOTIFICATION_DIR, title, message, now)
|
|
|
|
await self.broadcast_message(title)
|
|
await self.broadcast_message(message)
|
|
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
now = Timestamp.now()
|
|
self.console.print(f"[yellow b][INFO][/][#8abeb7 b][{now}][/] The producer ({addr[0]}:{addr[1]}) disconnected.")
|
|
writer.close()
|
|
|
|
async def handle_client(self, reader, writer):
|
|
addr = writer.get_extra_info('peername')
|
|
now = Timestamp.now()
|
|
self.console.print(f"[yellow b][INFO][/][#8abeb7 b][{now}][/] A client ({addr[0]}:{addr[1]}) connected.")
|
|
self.__client_writers.append(writer)
|
|
|
|
try:
|
|
while True:
|
|
data = await reader.read(self.BUFFER_SIZE)
|
|
if not data:
|
|
break
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
now = Timestamp.now()
|
|
self.console.print(f"[yellow b][INFO][/][#8abeb7 b][{now}][/] The client ({addr[0]}:{addr[1]}) disconnected.")
|
|
self.__client_writers.remove(writer)
|
|
writer.close()
|
|
|