From 44213069c97b116e79634a14327e08c6ae619759 Mon Sep 17 00:00:00 2001 From: KKlochko Date: Sun, 27 Apr 2025 20:19:06 +0300 Subject: [PATCH] Add the SyncWorkerManual to run sync manually and notify when done. --- .../sync/sync_workers/sync_worker_manual.ex | 31 +++++++++++++++++++ .../live/dbi_server_live/index.ex | 30 +++++++++++------- 2 files changed, 49 insertions(+), 12 deletions(-) create mode 100644 lib/decentralised_book_index/sync/sync_workers/sync_worker_manual.ex diff --git a/lib/decentralised_book_index/sync/sync_workers/sync_worker_manual.ex b/lib/decentralised_book_index/sync/sync_workers/sync_worker_manual.ex new file mode 100644 index 0000000..cd927da --- /dev/null +++ b/lib/decentralised_book_index/sync/sync_workers/sync_worker_manual.ex @@ -0,0 +1,31 @@ +defmodule DecentralisedBookIndex.SyncWorkerManual do + use Oban.Worker, + queue: :default, + max_attempts: 2, + tags: ["sync", "manual"] + + require Logger + + alias DecentralisedBookIndex.SyncTasks.SyncServerTask + + @topic "task:events" + + @impl Oban.Worker + def perform(%Oban.Job{args: args} = job) do + Logger.info("The manual job (#{job.id}) run SyncServerTask at #{DateTime.utc_now()} with args: #{inspect(args)}") + + result = SyncServerTask.sync_all() + + Phoenix.PubSub.broadcast!( + DecentralisedBookIndex.PubSub, + @topic, + {:manual_sync_completed, result} + ) + + Logger.info("The manual job (#{job.id}) is done.") + + result + end + + def topic, do: @topic +end diff --git a/lib/decentralised_book_index_web/live/dbi_server_live/index.ex b/lib/decentralised_book_index_web/live/dbi_server_live/index.ex index 41e5ea9..de70e13 100644 --- a/lib/decentralised_book_index_web/live/dbi_server_live/index.ex +++ b/lib/decentralised_book_index_web/live/dbi_server_live/index.ex @@ -89,6 +89,10 @@ defmodule DecentralisedBookIndexWeb.DbiServerLive.Index do @impl true def mount(_params, _session, socket) do + if connected?(socket) do + Phoenix.PubSub.subscribe(DecentralisedBookIndex.PubSub, DecentralisedBookIndex.SyncWorkerManual.topic()) + end + {:ok, socket |> assign_new(:current_user, fn -> nil end)} @@ -198,23 +202,25 @@ defmodule DecentralisedBookIndexWeb.DbiServerLive.Index do @impl true def handle_event("sync", _params, socket) do - Task.async(fn -> - SyncServerTask.sync_all() - send(self(), :sync_completed) - end) - - socket = - socket - |> put_flash(:info, "The sync is started") + case DecentralisedBookIndex.SyncWorkerManual.new(%{}) |> Oban.insert() do + {:ok, _job} -> + {:noreply, put_flash(socket, :info, "Task started")} - {:noreply, socket} + {:error, reason} -> + {:noreply, put_flash(socket, :error, "Failed to start task: #{inspect(reason)}")} + end end @impl true - def handle_info({_pid, :sync_completed}, socket) do + def handle_info({:manual_sync_completed, result}, socket) do socket = - socket - |> put_flash(:info, "The sync is done") + if result == :ok do + socket + |> put_flash(:info, "Task completed") + else + socket + |> put_flash(:error, "Task failed") + end {:noreply, socket} end