Update the Source to BackupPlan component and shared.

dev
KKlochko 4 months ago
parent c67dc14003
commit 594d9dea3f

@ -1,16 +0,0 @@
Feature: Creating the source
Scenario Outline: Adding an new unique source
Given the label "<label>"
And the path "<source_path>"
And the destinations <destinations>
And the arguments "<arguments>"
When I add the source
Then the source should be added successfully
Examples:
| label | source_path | destinations | arguments |
| usb | /mnt/usb | [] | <empty> |
| db | /db | ["/backup/db"] | -avuP |
| temp | /tmp | ["/backup/tmp1", "/backup/tmp2"] | -avuP --delete |

@ -1,12 +0,0 @@
Feature: Adding an unique path
Scenario: Adding an new unique path
Given a path "/media"
When I add the path to the database
Then the path should be added successfully
Scenario: Adding an new path with not uniq path
Given a path "/media"
When I add the path to the database
Then the exception should occur

@ -1,58 +0,0 @@
from behave import given, when, then
from peewee import IntegrityError
from tui_rsync.models.models import create_tables
from tui_rsync.models.models import Source
import json
@given('the label "{label}"')
def given_source_label(context, label):
create_tables()
context.label = label
@given('the path "{source_path}"')
def given_source_path(context, source_path):
context.source_path = source_path
@given('the destinations {destinations_json}')
def given_source_destinations(context, destinations_json):
context.destinations = json.loads(destinations_json)
@given('the arguments "{arguments}"')
def given_source_arguments(context, arguments):
context.args = arguments
if arguments == "<empty>":
context.args = ""
@when('I add the source')
def add_source(context):
try:
context.source = Source.create_save(
context.label,
context.source_path,
context.destinations,
context.args,
)
context.source = Source.get_source(context.label)
except IntegrityError:
context.exception_raised = True
else:
context.exception_raised = False
def compare_destinations(actual:list, expected: list[str]) -> bool:
actual_path_set = {destionation.path for destionation in actual}
return actual_path_set == set(expected)
@then('the source should be added successfully')
def path_has_added(context):
assert context.exception_raised == False
assert context.source.label == context.label
assert context.source.source.path == context.source_path
assert compare_destinations(
context.source.get_destinations(),
context.destinations
)
assert context.source.args.command == context.args

@ -1,28 +0,0 @@
from behave import given, when, then
from peewee import IntegrityError
from tui_rsync.models.models import create_tables
from tui_rsync.models.models import Path
@given('a path "{path_str}"')
def given_path(context, path_str):
create_tables()
context.path_str = path_str
@when('I add the path to the database')
def add_path(context):
try:
context.path = Path.create(path=context.path_str)
except IntegrityError:
context.exception_raised = True
else:
context.exception_raised = False
@then('the path should be added successfully')
def path_has_added(context):
assert context.path.path == context.path_str
assert context.exception_raised == False
@then('the exception should occur')
def path_has_not_added(context):
assert context.exception_raised == True

@ -1,4 +0,0 @@
from tui_rsync.cli.cli import cli_app
from tui_rsync.cli.sync import sync
from tui_rsync.cli.rsync import Rsync
from tui_rsync.cli.label_prompt import LabelPrompt

@ -1,31 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
import typer
from tui_rsync.cli.source.source import source
from tui_rsync.cli.sync import sync
from tui_rsync.cli.groups.groups import groups
console = Console()
cli_app = typer.Typer(rich_markup_mode="rich")
cli_app.add_typer(source, name="source", help="Manage sources")
cli_app.add_typer(groups, name="groups", help="Manage groups")
cli_app.add_typer(sync, name="sync", help="Sync sources")

@ -1 +0,0 @@
from tui_rsync.cli.groups.groups import groups

@ -1,31 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from pyfzf import FzfPrompt
from tui_rsync.models.models import all_group_labels
from tui_rsync.cli.label_prompt import LabelPrompt
console = Console()
class GroupPrompt(LabelPrompt):
@staticmethod
def get_label_fzf() -> str:
fzf = FzfPrompt()
return fzf.prompt(all_group_labels().iterator())[0]

@ -1,56 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Group
from tui_rsync.models.models import all_group_labels
from tui_rsync.cli.groups.group_prompt import GroupPrompt
console = Console()
group_remove = typer.Typer()
@group_remove.command()
def one(
group_label: str = typer.Option(
None, "--group-label", "-g",
help="[b]The label[/] is a uniq identification of a [b]group[/].",
show_default=False
),
):
"""
[red b]Remove[/] an [yellow]existing group[/].
"""
if group_label is None:
group_label = GroupPrompt.get_label_fzf()
if Group.is_exist(group_label):
group = Group.get_group(group_label)
group.delete_instance()
@group_remove.command()
def all():
"""
[red b]Remove[/] [yellow] all existing groups[/].
"""
for label in all_group_labels().iterator():
one(label.label)

@ -1,63 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Group, GroupSource
from tui_rsync.cli.groups.group_prompt import GroupPrompt
from tui_rsync.models.models import all_group_labels
console = Console()
group_show = typer.Typer()
@group_show.command()
def one(
group_label: str = typer.Option(
None, "--group-label", "-l",
help="[b]The label[/] is a uniq identification of a [b]group[/].",
show_default=False
),
):
"""
[green b]Show[/] an [yellow]existing group[/].
"""
if group_label is None:
console.print("What is the [yellow b]label of the group[/]? ")
group_label = GroupPrompt.get_label_fzf()
if not Group.is_exist(group_label):
console.print("[red b][ERROR][/] Source does not exists!!!")
return
group = Group.get_group(group_label)
console.print(group.show_format())
@group_show.command()
def all():
"""
[green b]Show[/] [yellow]all existing groups[/].
"""
for label in all_group_labels().iterator():
group = Group.get_group(label.label)
console.print(group.show_format())

@ -1,89 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Group, GroupSource
from tui_rsync.cli.label_prompt import LabelPrompt
from tui_rsync.cli.groups.group_prompt import GroupPrompt
from typer.main import get_group
console = Console()
group_update = typer.Typer()
@group_update.command()
def label(
group_label: str = typer.Option(
None, "--group-label", "-l",
help="[b]The label[/] is a uniq identification of a [b]group[/].",
show_default=False
),
new_group_label: str = typer.Option(
None, "--new-group-label", "-nl",
help="[b]The new label[/] will replace the [b]old group label[/].",
show_default=False
),
):
"""
[green b]Update[/] an [yellow]existing group label[/].
"""
if group_label is None:
console.print("What is the [yellow b]old label of group[/]? ")
group_label = GroupPrompt.get_label_fzf()
if new_group_label is None:
question = "What is the [yellow b]new label of the group[/]? "
new_group_label = GroupPrompt.ask_uuid(question)
if Group.is_exist(group_label):
group = Group.get_group(group_label)
group.update_label(new_group_label)
@group_update.command()
def labels(
group_label: str = typer.Option(
None, "--group-label", "-g",
help="[b]The label[/] is a uniq identification of a [b]group[/].",
show_default=False
),
new_labels: str = typer.Option(
None, "--new-labels", "-nl",
help="[b]The new label[/] will replace the [b]old source label[/].",
show_default=False
),
):
"""
[green b]Update[/] [yellow]the group[/] with a [bold]the label[/].
[b]The chosen sources[/] will be updated for [b]the group[/].
"""
if group_label is None:
group_label = GroupPrompt.get_label_fzf()
if new_labels is None:
new_labels = LabelPrompt.get_labels()
group = Group.get_group(group_label)
group.remove_sources()
GroupSource.create_group_sources(group, new_labels)
group.save()

@ -1,56 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Confirm, Prompt
from typing import List, Optional
import typer
from tui_rsync.cli.label_prompt import LabelPrompt
from tui_rsync.cli.rsync import Rsync
from tui_rsync.models.models import Group, count_all_labels_except
from tui_rsync.cli.groups.group_show import group_show
from tui_rsync.cli.groups.group_update import group_update
from tui_rsync.cli.groups.group_remove import group_remove
console = Console()
groups = typer.Typer()
groups.add_typer(group_show, name="show", help="Show groups")
groups.add_typer(group_update, name="update", help="Update groups")
groups.add_typer(group_remove, name="remove", help="Remove groups")
@groups.command()
def add(
group_label: str = typer.Option(
None, "--group-label", "-g",
help="[b]The label[/] is a uniq identification of a [b]group[/].",
show_default=False
),
):
"""
[green b]Create[/] a [yellow]new group[/] with a [bold]uniq[/] label.
[b]The chosen sources[/] will be united into [b]the group[/].
"""
if group_label is None:
question = "Would you like to change [yellow b]the group label[/]?"
group_label = LabelPrompt.ask_uuid(question)
labels = LabelPrompt.get_labels()
Group.create_save(group_label, labels)

@ -1,70 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Confirm, Prompt, IntPrompt
from pyfzf import FzfPrompt
import uuid
from tui_rsync.models.models import all_labels, all_labels_except
from tui_rsync.models.models import count_all_labels_except
console = Console()
class LabelPrompt:
@staticmethod
def ask_uuid(
question: str = "Would you like to change [yellow b]the label[/]?"
) -> str:
"""
Return the label or the default uuid value.
"""
uid = uuid.uuid4().hex
label = Prompt.ask(question, default=uid)
return label
@staticmethod
def get_label_fzf() -> str:
fzf = FzfPrompt()
return fzf.prompt(all_labels().iterator())[0]
@staticmethod
def get_label_except_fzf(labels = None) -> str:
fzf = FzfPrompt()
return fzf.prompt(all_labels_except(labels).iterator())[0]
@staticmethod
def get_labels(labels = None) -> list:
confirm_question = "Would you like to add a source/sources to the group?"
is_fzf = Confirm.ask(confirm_question, default=True)
if not is_fzf:
return []
count_question = "How much would you like to add sources to the group?"
count = IntPrompt.ask(count_question, default=1)
count_max = count_all_labels_except(labels)
count = count_max if count > count_max else count
labels = []
for i in range(count):
option = LabelPrompt.get_label_except_fzf(labels)
labels.append(option)
return labels

@ -1,40 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from sys import stderr
from rich.console import Console
from rich.prompt import Prompt
from pyfzf import FzfPrompt
import os
from tui_rsync.models.models import Destination, Path
console = Console()
err_console = Console(stderr=True)
class PathPrompt:
@staticmethod
def get_backup_fzf(label:str) -> str:
dests_count = len(Destination.get_all(label))
if dests_count == 0:
err_console.print("[red b]No backups!!![/]")
return ""
if dests_count == 1:
return Destination.get_all(label).get()
fzf = FzfPrompt()
return fzf.prompt(Destination.get_all(label).iterator())[0]

@ -1,36 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
import shlex
from subprocess import Popen, PIPE
class Rsync:
def __init__(self, args:str):
self.__args = ["rsync"] + shlex.split(args)
def run_one(self, source, destination):
args = self.__args + [source, destination]
output = Popen(args, stdout=PIPE)
response = output.communicate()
def dry_one(self, source, destination):
args = self.__args + ['--dry-run', source, destination]
output = Popen(args, stdout=PIPE)
response = output.communicate()
return response

@ -1 +0,0 @@
from tui_rsync.cli.source.source import source

@ -1,71 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Source, Destination, SyncCommand, Path
from tui_rsync.cli.label_prompt import LabelPrompt
from tui_rsync.cli.source.source_show import source_show
from tui_rsync.cli.source.source_update import source_update
from tui_rsync.cli.source.source_remove import source_remove
console = Console()
source = typer.Typer()
source.add_typer(source_show, name="show", help="Show sources")
source.add_typer(source_update, name="update", help="Update sources")
source.add_typer(source_remove, name="remove", help="Remove sources")
@source.command()
def add(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
),
source: str = typer.Option(
None, "--source", "-s",
help="[b]A source[/] of the data.",
show_default=False
),
destinations: Optional[List[str]] = typer.Option(
None, "--destination", "-d", help="[b]The backup[/] destinations.",
show_default=False
),
args: str = typer.Option(
None, "--args", "-a",
help="[b i yellow]Additional[/] rsync [b]arguments[/].",
show_default=False
)
):
"""
[green b]Create[/] a [yellow]new source[/] with a [bold]uniq[/] label.
[b]The source[/] will be connected to [b]backup destinations[/].
[yellow i]Optionally, additional[/] arguments for rsync can be added.
"""
if label is None:
label = LabelPrompt.ask_uuid()
if source is None:
source = console.input("What is the [yellow b]path to the source[/]? ")
if args is None:
args = console.input("What is the [yellow b]rsync args of source[/]? ")
Source.create_save(label, source, destinations, args)

@ -1,56 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Source, Destination, SyncCommand, Path
from tui_rsync.cli.label_prompt import LabelPrompt
from tui_rsync.models.models import all_labels
console = Console()
source_remove = typer.Typer()
@source_remove.command()
def one(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
)
):
"""
[red b]Remove[/] an [yellow]existing source[/].
"""
if label is None:
label = LabelPrompt.get_label_fzf()
if Source.is_exist(label):
src = Source.get_source(label)
src.delete_instance()
@source_remove.command()
def all():
"""
[red b]Remove[/] [yellow] all existing sources[/].
"""
for label in all_labels().iterator():
one(label.label)

@ -1,64 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Source, Destination, SyncCommand, Path
from tui_rsync.cli.label_prompt import LabelPrompt
from tui_rsync.models.models import all_labels
console = Console()
source_show = typer.Typer()
@source_show.command()
def one(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
),
):
"""
[green b]Show[/] an [yellow]existing source[/].
"""
if label is None:
console.print("What is the [yellow b]label of source[/]? ")
label = LabelPrompt.get_label_fzf()
if not Source.is_exist(label):
console.print("[red b][ERROR][/] Source does not exists!!!")
return
source = Source.get_source(label)
console.print(source.show_format())
@source_show.command()
def all():
"""
[green b]Show[/] [yellow]all existing sources[/].
"""
for label in all_labels().iterator():
source = Source.get_source(label.label)
console.print(source.show_format())

@ -1,111 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Source, Destination, SyncCommand, Path
from tui_rsync.cli.label_prompt import LabelPrompt
console = Console()
source_update = typer.Typer()
@source_update.command()
def label(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
),
new_label: str = typer.Option(
None, "--new-label", "-nl",
help="[b]The new label[/] will replace the [b]old source label[/].",
show_default=False
),
):
"""
[green b]Update[/] an [yellow]existing source label[/].
"""
if label is None:
console.print("What is the [yellow b]old label of source[/]? ")
label = LabelPrompt.get_label_fzf()
if new_label is None:
question = "What is the [yellow b]new label of the source[/]? "
new_label = LabelPrompt.ask_uuid(question)
if Source.is_exist(label):
src = Source.get_source(label)
src.update_label(new_label)
@source_update.command()
def source(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
),
new_source_path: str = typer.Option(
None, "--new-label", "-nl",
help="[b]The new source[/] will replace the [b]old source[/].",
show_default=False
),
):
"""
[green b]Update[/] a source path of an [yellow]existing source[/].
"""
if label is None:
console.print("What is the [yellow b]label of source[/]? ")
label = LabelPrompt.get_label_fzf()
if new_source_path is None:
question = "What is the [yellow b]new source path of the source[/]? "
new_source_path = console.input(question)
if Source.is_exist(label):
src = Source.get_source(label)
src.update_source_path(new_source_path)
@source_update.command()
def args(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
),
args: str = typer.Option(
None, "--args", "-a",
help="[b yellow]rsync[/] [b]arguments[/].",
show_default=False
)
):
"""
[green b]Update[/] an [yellow]existing source args[/].
"""
if args is None:
args = console.input("What is the [yellow b]rsync args of source[/]? ")
if Source.is_exist(label):
src = Source.get_source(label)
src.update_args(args)

@ -1,115 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from rich.console import Console
from rich.prompt import Prompt
from typing import List, Optional
import typer
from tui_rsync.models.models import Source, Group, Destination, SyncCommand
from tui_rsync.models.models import Path
from tui_rsync.models.models import all_labels
from tui_rsync.cli.label_prompt import LabelPrompt
from tui_rsync.cli.groups.group_prompt import GroupPrompt
from tui_rsync.cli.path_prompt import PathPrompt
from tui_rsync.cli.rsync import Rsync
console = Console()
sync = typer.Typer()
skip_error = "[yellow b]Skippped[/] because the [red b]path was unavailable[/]."
@sync.command()
def one(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
),
dry: bool = typer.Option(
False, "-d", "--dry-run",
help="The command will [b]show[/] information about what will be changed.",
)
):
"""
[green b]Sync[/] a [yellow]source[/] with the [yellow b]label[/] and its backups.
[yellow b]Skips[/] if not available.
"""
if label is None:
label = LabelPrompt.get_label_fzf()
src = Source.get_source(label)
rsync = Rsync(str(src.args))
for dest in src.destinations:
if not dest.path.is_exists():
console.print(skip_error)
continue
if dry:
response = rsync.dry_one(str(src.source), str(dest))
out, err = response
console.print(f"{bstr_nonan(out)} {bstr_nonan(err)}")
else:
rsync.run_one(str(src.source), str(dest))
@sync.command()
def group(
group_label: str = typer.Option(
None, "--group-label", "-g",
help="[b]The label[/] is a uniq identification of a [b]group[/].",
show_default=False
),
):
"""
[green b]Sync[/] a [yellow]group[/] with the [yellow b] label[/].
"""
if group_label is None:
group_label = GroupPrompt.get_label_fzf()
group = Group.get_group(group_label)
for src in group.get_sources():
one(src.label)
@sync.command()
def all():
"""
[green b]Sync[/] [yellow]all sources[/] with theirs backups.
"""
for label in all_labels().iterator():
one(label.label)
@sync.command()
def recover(
label: str = typer.Option(
None, "--label", "-l",
help="[b]The label[/] is a uniq identification of a [b]source[/].",
show_default=False
),
):
"""
[green b]Sync[/] the [yellow]chosen backup[/] with its source.
"""
if label is None:
label = LabelPrompt.get_label_fzf()
src = Source.get(Source.label == label)
dest = PathPrompt.get_backup_fzf(label)
rsync = Rsync(str(src.args))
rsync.run_one(str(dest), str(src.source))
def bstr_nonan(obj):
return "" if obj is None else obj.decode()

@ -1,2 +0,0 @@
from tui_rsync.config.app import App

@ -1,52 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
import platformdirs
import os
class App:
"""
Configuration of the tui-rsync
"""
__APP_NAME = "tui-rsync"
__APP_AUTHOR = "KKlochko"
__DB_NAME = "sync.db"
def get_db_path(self):
path = platformdirs.user_data_path(
self.__APP_NAME,
self.__APP_AUTHOR,
self.__DB_NAME
)
App.safe_create_path(self.get_data_dir())
return path
def get_data_dir(self):
return platformdirs.user_data_dir(
self.__APP_NAME,
self.__APP_AUTHOR,
)
@staticmethod
def safe_create_path(path):
"""
Create path's folders if they do not exist
"""
if not os.path.exists(path):
os.makedirs(path)

@ -0,0 +1,4 @@
from .entities import BackupPlan
from .value_objects import BackupPlanId, Path, Source, Destination
__all__ = ['BackupPlan', 'BackupPlanId', 'Path', 'Source', 'Destination']

@ -0,0 +1,3 @@
from .backup_plan import BackupPlan
__all__ = ['BackupPlan']

@ -0,0 +1,14 @@
from dataclasses import dataclass, field
from typing import List
from ..value_objects import BackupPlanId, Source, Destination
from tui_rsync.core.shared_kernel.components.common.domain import Label
@dataclass
class BackupPlan:
source: Source
destinations: List[Destination]
id: BackupPlanId = field(default_factory=lambda: BackupPlanId.generate())
label: Label = field(default='')

@ -0,0 +1,6 @@
from .backup_plan_id import BackupPlanId
from .path import Path
from .source import Source
from .destionation import Destination
__all__ = ['BackupPlanId', 'Path', 'Source', 'Destination']

@ -0,0 +1,5 @@
from tui_rsync.core.shared_kernel.components.common.domain import UUID
class BackupPlanId(UUID):
pass

@ -0,0 +1,5 @@
from .path import Path
class Destination(Path):
pass

@ -0,0 +1,6 @@
from dataclasses import dataclass
@dataclass
class Path:
path: str

@ -0,0 +1,5 @@
from .path import Path
class Source(Path):
pass

@ -0,0 +1,3 @@
from .domain import UUID, Label
__all__ = ['UUID', 'Label']

@ -0,0 +1,3 @@
from .value_objects import UUID, Label
__all__ = ['UUID', 'Label']

@ -0,0 +1,4 @@
from .uuid import UUID
from .label import Label
__all__ = ['UUID', 'Label']

@ -0,0 +1,6 @@
from dataclasses import dataclass, field
@dataclass(frozen=True)
class Label:
label: str = field(default='')

@ -0,0 +1,14 @@
from dataclasses import dataclass, field
import uuid
@dataclass(frozen=True)
class UUID:
id: str = field(default_factory=lambda: UUID.generate().id)
@classmethod
def generate(cls) -> 'UUID':
return cls(str(uuid.uuid4()))
def __str__(self):
return self.id

@ -1,9 +1,7 @@
from .models.models import create_tables
from .cli.cli import cli_app
def main(): def main():
create_tables() pass
cli_app()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

@ -1,5 +0,0 @@
from tui_rsync.models.models import Source
from tui_rsync.models.models import Path
from tui_rsync.models.models import Destination
from tui_rsync.models.models import SyncCommand
from tui_rsync.models.models import create_tables

@ -1,257 +0,0 @@
################################################################################
# Copyright (C) 2023 Kostiantyn Klochko <kostya_klochko@ukr.net> #
# #
# This file is part of tui-rsync. #
# #
# tui-rsync is free software: you can redistribute it and/or modify it under #
# uthe terms of the GNU General Public License as published by the Free #
# Software Foundation, either version 3 of the License, or (at your option) #
# any later version. #
# #
# tui-rsync is distributed in the hope that it will be useful, but WITHOUT ANY #
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS #
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more #
# details. #
# #
# You should have received a copy of the GNU General Public License along with #
# tui-rsync. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
from peewee import *
from tui_rsync.config.app import App
import os
app = App()
db = SqliteDatabase(app.get_db_path())
class BaseModel(Model):
class Meta:
database = db
class Path(BaseModel):
path = CharField(unique=True)
def __str__(self) -> str:
return f"{self.path}"
def __repr__(self) -> str:
return f"Path({self.path})"
def is_exists(self) -> bool:
return os.path.exists(self.path)
class SyncCommand(BaseModel):
command = CharField()
@staticmethod
def get_sync_command(args):
sync_cmd, _ = SyncCommand.get_or_create(command=args)
return sync_cmd
def __str__(self) -> str:
return self.command
class Source(BaseModel):
label = CharField(unique=True)
source = ForeignKeyField(Path)
args = ForeignKeyField(SyncCommand)
@staticmethod
def is_exist(label) -> bool:
return Source.select().where(Source.label == label).exists()
@staticmethod
def get_source(label):
if not Source.is_exist(label):
return None
return Source.select().where(Source.label == label).get()
@staticmethod
def create_save(label:str, source:str, destinations:list[str], args:str):
src_path, _ = Path.get_or_create(path=source)
src_sync_cmd, _ = SyncCommand.get_or_create(command=args)
src = Source.create(
label=label,
source=src_path,
destinations=[],
args=src_sync_cmd
)
for destination in destinations:
src.add_destionation(destination)
src.save()
return src
def add_destionation(self, destination):
destination_path, _ = Path.get_or_create(path=destination)
src_destination, _ = Destination.get_or_create(
source=self,
path=destination_path
)
def get_destinations(self):
destionations = []
for dest in self.destinations:
destionations.append(dest.path)
return destionations
def update_label(self, new_label):
self.label = new_label
self.save()
def update_source_path(self, new_path):
new_source_path, _ = Path.get_or_create(path=new_path)
self.source = new_source_path
self.save()
def update_args(self, args):
args_obj = SyncCommand.get_sync_command(args)
self.args = args_obj
self.save()
def __str__(self) -> str:
return f"{self.label}"
def __repr__(self) -> str:
return f"{self.label}"
def show_format(self, prefix='') -> str:
output = f"[b]label:[/] {self.label}\n" \
f"[b]source:[/] {self.source.path}\n" \
f"[b]args:[/] {self.args}\n" \
f"[b]destionations:[/] \n"
for destination in self.destinations:
output+=f"\t{destination.path}\n"
if prefix != '':
keepends = True
output = prefix + f'{prefix}'.join(output.splitlines(keepends))
return output
class Destination(BaseModel):
source = ForeignKeyField(Source, backref='destinations')
path = ForeignKeyField(Path)
def __str__(self) -> str:
return f"{self.path}"
@staticmethod
def get_all(label:str|None = None):
"""
Return all destiantions of the source.
"""
if label is None:
return []
src = Source.get_source(label)
if src is None:
return []
return src.destinations
class Group(BaseModel):
label = CharField(unique=True)
@staticmethod
def is_exist(label) -> bool:
return Group.select().where(Group.label == label).exists()
@staticmethod
def get_group(label):
if not Group.is_exist(label):
return None
return Group.select().where(Group.label == label).get()
@staticmethod
def create_save(label:str, source_labels:list[str]):
group = Group.create(
label=label,
sources=[],
)
GroupSource.create_group_sources(group, source_labels)
group.save()
return group
def update_label(self, new_label):
self.label = new_label
self.save()
def get_sources(self):
"""
Return iterator of the group sources.
"""
return (group_src.source for group_src in self.sources)
def remove_sources(self):
GroupSource.delete().where(GroupSource.group == self.label).execute()
def __str__(self) -> str:
return f"{self.label}"
def __repl__(self) -> str:
return f"{self.label}"
def show_format(self) -> str:
output = f"[b]label:[/] {self.label}\n" \
f"[b]sources:[/] \n"
for source in self.sources:
prefix = '\t'
output+=f"{source.source.show_format(prefix)}\n"
return output
class GroupSource(BaseModel):
group = ForeignKeyField(Group, backref='sources')
source = ForeignKeyField(Source)
@staticmethod
def create_group_source(group:Group, source_label:str) -> bool:
"""
Return group-source created status
"""
src = Source.get(label=source_label)
group_src, created = GroupSource.get_or_create(group=group, source=src)
group_src.save()
return created
@staticmethod
def create_group_sources(group:Group, source_labels:list[str]):
for source_label in source_labels:
GroupSource.create_group_source(group, source_label)
def create_tables():
with db:
tables = [
Source,
Path,
Destination,
SyncCommand,
Group,
GroupSource
]
db.create_tables(tables, safe=True)
def all_group_labels():
with db:
return Group.select(Group.label)
def all_labels():
with db:
return Source.select(Source.label)
def all_labels_except(labels):
if labels == None:
return all_labels()
if len(labels) == 0:
return all_labels()
with db:
return Source.select(Source.label).where(Source.label.not_in(labels))
def count_all_labels_except(labels):
return len(all_labels_except(labels))
Loading…
Cancel
Save