import asyncio import typer from rich.console import Console from notification_producer.producer_client import ProducerClient console = Console() cli_app = typer.Typer(rich_markup_mode="rich") notify = typer.Typer() cli_app.add_typer(notify, name="notify", help="To send notification.") @cli_app.command() def up( ip: str = typer.Option( "127.0.0.1", "--ip", "-i", help="The server's ip.", ), port: int = typer.Option( 5554, "--port", "-p", help="The server's port for [b]producers[/].", ), ): """ This command [b]start[/] the [yellow b]producer[/] to send messages in [b]the interactive mode[/]. """ client = ProducerClient(ip, port) asyncio.run(client.notify_interactive()) @notify.command() def message( notification_title: str = typer.Option( "notification", "--title", "-t", help="[b yellow]The title[/] for the [y b]notification[/].", rich_help_panel="Notification", ), notification_message: str = typer.Option( '', "--message", "-m", help="[b yellow]The message[/] for the [y b]notification[/].", rich_help_panel="Notification", ), ip: str = typer.Option( "127.0.0.1", "--ip", "-i", help="The server's ip.", rich_help_panel="Connection", ), port: int = typer.Option( 5554, "--port", "-p", help="The server's port for [b]producers[/].", rich_help_panel="Connection", ), ): """ This command [b]send[/] [yellow b]one notification[/]. """ client = ProducerClient(ip, port) asyncio.run(client.notify(notification_title, notification_message))