You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
4.4 KiB

##########################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kklochko@protonmail.com> #
# #
# This file is part of notification-producer. #
# #
# notification-producer is free software: you can redistribute it and/or #
# modify it under the terms of the GNU Affero General Public License as #
# published by the Free Software Foundation, either version 3 of the #
# License, or (at your option) any later version. #
# #
# notification-producer is distributed in the hope that it will be #
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty #
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
# Affero General Public License for more details. #
# #
# You should have received a copy of the GNU Affero General Public #
# License along with notification-producer. If not, see #
# <https://www.gnu.org/licenses/>. #
##########################################################################
import asyncio
import typer
import os
import sys
from typing_extensions import Annotated
from rich.console import Console
from notification_producer.producer_client import ProducerClient
console = Console()
cli_app = typer.Typer(rich_markup_mode="rich")
notify = typer.Typer()
cli_app.add_typer(notify, name="notify", help="To send notification.")
@cli_app.command()
def up(
ip: str = typer.Option(
"127.0.0.1", "--ip", "-i",
help="The server's ip.",
),
port: int = typer.Option(
5554, "--port", "-p",
help="The server's port for [b]producers[/].",
),
):
"""
This command [b]start[/] the [yellow b]producer[/] to send messages in [b]the interactive mode[/].
"""
client = ProducerClient(ip, port)
asyncio.run(client.notify_interactive())
@notify.command()
def message(
notification_title: str = typer.Option(
"notification", "--title", "-t",
help="[b yellow]The title[/] for the [y b]notification[/].",
rich_help_panel="Notification",
),
notification_message: str = typer.Option(
'', "--message", "-m",
help="[b yellow]The message[/] for the [y b]notification[/].",
rich_help_panel="Notification",
),
ip: str = typer.Option(
"127.0.0.1", "--ip", "-i",
help="The server's ip.",
rich_help_panel="Connection",
),
port: int = typer.Option(
5554, "--port", "-p",
help="The server's port for [b]producers[/].",
rich_help_panel="Connection",
),
):
"""
This command [b]send[/] [yellow b]one notification[/].
"""
client = ProducerClient(ip, port)
asyncio.run(client.notify(notification_title, notification_message))
@notify.command()
def file(
notification_message_path: Annotated[str, typer.Option(
"--file", "-f",
help="[b yellow]The message file[/] for the [y b]notification[/].",
rich_help_panel="Notification",
show_default=False,
)],
notification_title: str = typer.Option(
"notification", "--title", "-t",
help="[b yellow]The title[/] for the [y b]notification[/].",
rich_help_panel="Notification",
),
ip: str = typer.Option(
"127.0.0.1", "--ip", "-i",
help="The server's ip.",
rich_help_panel="Connection",
),
port: int = typer.Option(
5554, "--port", "-p",
help="The server's port for [b]producers[/].",
rich_help_panel="Connection",
),
):
"""
This command [b]send[/] the file content as [yellow b]one notification[/].
"""
client = ProducerClient(ip, port)
if not os.path.isfile(notification_message_path):
sys.stderr.write(f"The file ({notification_message_path}) does not exist or this is not a file. Please, check the path.\n")
sys.exit(2)
with open(notification_message_path) as file:
notification_message = file.read()
asyncio.run(client.notify(notification_title, notification_message))