You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
66 lines
2.5 KiB
66 lines
2.5 KiB
##########################################################################
|
|
# Copyright (C) 2023 Kostiantyn Klochko <kklochko@protonmail.com> #
|
|
# #
|
|
# This file is part of notification-producer. #
|
|
# #
|
|
# notification-producer is free software: you can redistribute it and/or #
|
|
# modify it under the terms of the GNU Affero General Public License as #
|
|
# published by the Free Software Foundation, either version 3 of the #
|
|
# License, or (at your option) any later version. #
|
|
# #
|
|
# notification-producer is distributed in the hope that it will be #
|
|
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty #
|
|
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
|
|
# Affero General Public License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU Affero General Public #
|
|
# License along with notification-producer. If not, see #
|
|
# <https://www.gnu.org/licenses/>. #
|
|
##########################################################################
|
|
|
|
import asyncio
|
|
import struct
|
|
import sys
|
|
|
|
|
|
class ProducerClient:
|
|
BUFFER_SIZE = 1024
|
|
|
|
def __init__(self, ip: str, port: int):
|
|
self.__ip = ip
|
|
self.__port = port
|
|
self.reader = None
|
|
self.writer = None
|
|
|
|
async def connect(self):
|
|
self.reader, self.writer = await asyncio.open_connection(self.__ip, self.__port)
|
|
|
|
async def send(self, message: str):
|
|
self.writer.write(struct.pack('<L', len(message)))
|
|
self.writer.write(message.encode('utf-8'))
|
|
|
|
async def send_notification(self, title: str, message: str):
|
|
await self.send(title)
|
|
|
|
await self.send(message)
|
|
|
|
async def notify_interactive(self):
|
|
await self.connect()
|
|
|
|
try:
|
|
while True:
|
|
print('Your notification: ')
|
|
title = input('title: ')
|
|
message = input('message: ')
|
|
await self.send_notification(title, message)
|
|
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except KeyboardInterrupt:
|
|
print("\nbye")
|
|
sys.exit(0)
|
|
finally:
|
|
self.writer.close()
|
|
await self.writer.wait_closed()
|
|
|