diff --git a/lib/workload_service/consumers/quote_requested_consumer.ex b/lib/workload_service/consumers/quote_requested_consumer.ex index d017836..535e359 100644 --- a/lib/workload_service/consumers/quote_requested_consumer.ex +++ b/lib/workload_service/consumers/quote_requested_consumer.ex @@ -50,14 +50,14 @@ defmodule WorkloadService.Consumers.QuoteRequestedConsumer do end end -defp handle_event( - %{ - "id" => %{"org_id" => org_id, "application_id" => app_id, "policy_type" => policy_type} = application_id, - "provider_id" => provider_id, - "policy_details" => policy_details, - "applicant_info" => applicant_info - } = event - ) do + defp handle_event( + %{ + "id" => %{"org_id" => org_id, "application_id" => app_id, "policy_type" => policy_type}, + "provider_id" => provider_id, + "policy_details" => policy_details, + "applicant_info" => applicant_info + } = event + ) do task_id = WorkloadService.Aggregates.TaskId.new(org_id, "quote", Ecto.UUID.generate()) app_id_struct = WorkloadService.Aggregates.ApplicationId.new(org_id, app_id, policy_type) diff --git a/lib/workload_service/handlers/task_completed_handler.ex b/lib/workload_service/handlers/task_completed_handler.ex new file mode 100644 index 0000000..af1f501 --- /dev/null +++ b/lib/workload_service/handlers/task_completed_handler.ex @@ -0,0 +1,51 @@ +defmodule WorkloadService.Handlers.TaskCompletedHandler do + use Commanded.Event.Handler, + application: WorkloadService.CommandedApp, + name: "TaskCompletedHandler" + + require Logger + + alias WorkloadService.Events.TaskCompleted + alias Commanded.Aggregates.Aggregate + alias WorkloadService.MessageBus + + alias WorkloadService.Aggregates.{ + QuoteTask, + SolicitationTask + } + + def handle(%TaskCompleted{} = event, _metadata) do + aggregate_module = + case event.id.type do + "quote" -> QuoteTask + # "solicitation" -> SolicitationTask + _ -> nil + end + + if aggregate_module do + case Aggregate.aggregate_state( + WorkloadService.CommandedApp, + aggregate_module, + event.id + ) do + nil -> + Logger.warning("TaskCompletedHandler: aggregate not found for #{event.id}") + + state -> + MessageBus.publish( + "workload_service.events.quote_task_completed", + "quote_task_completed", + state + ) + + Logger.info("TaskCompletedHandler: published for #{event.id}") + end + end + + :ok + rescue + e -> + Logger.error("TaskCompletedHandler: failed to process - #{inspect(e)}") + :ok + end +end diff --git a/lib/workload_service/message_bus.ex b/lib/workload_service/message_bus.ex index 82772c0..382fa46 100644 --- a/lib/workload_service/message_bus.ex +++ b/lib/workload_service/message_bus.ex @@ -1,86 +1,21 @@ defmodule WorkloadService.MessageBus do - @moduledoc false + use AMQP - use GenServer - require Logger + def publish(exchange, routing_key, event) do + payload = Jason.encode!(event) - alias AMQP.{Connection, Channel, Exchange} - - @reconnect_interval 5_000 - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) + :ok = + AMQP.Basic.publish(channel(), exchange, routing_key, payload, + content_type: "application/json", + persistent: true + ) end - def publish(routing_key, payload) when is_binary(payload) do - GenServer.cast(__MODULE__, {:publish, routing_key, payload}) + defp channel do + {:ok, conn} = AMQP.Connection.open(amqp_url()) + {:ok, chan} = AMQP.Channel.open(conn) + chan end - def publish(routing_key, payload) do - GenServer.cast(__MODULE__, {:publish, routing_key, Jason.encode!(payload)}) - end - - @impl true - def init(_opts) do - send(self(), :connect) - {:ok, %{channel: nil, connection: nil}} - end - - @impl true - def handle_info(:connect, _state) do - case connect() do - {:ok, connection, channel} -> - {:noreply, %{connection: connection, channel: channel}} - - {:error, reason} -> - Logger.error("Failed to connect to RabbitMQ: #{inspect(reason)}") - schedule_reconnect() - {:noreply, %{channel: nil, connection: nil}} - end - end - - @impl true - def handle_info({:DOWN, _, :process, _pid, reason}, _state) do - Logger.error("RabbitMQ connection lost: #{inspect(reason)}") - schedule_reconnect() - {:noreply, %{channel: nil, connection: nil}} - end - - @impl true - def handle_cast({:publish, _routing_key, _payload}, %{channel: nil} = state) do - Logger.warning("RabbitMQ not connected, message dropped") - {:noreply, state} - end - - @impl true - def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do - Exchange.direct(channel, exchange_name(), durable: true) - - AMQP.Basic.publish(channel, exchange_name(), routing_key, payload, - content_type: "application/json", - persistent: true - ) - - {:noreply, state} - end - - defp connect do - config = Application.get_env(:workload_service, WorkloadService.MessageBus, []) - - with {:ok, connection} <- Connection.open(config), - {:ok, channel} <- Channel.open(connection) do - Exchange.declare(channel, exchange_name(), :direct, durable: true) - Process.monitor(connection.pid) - {:ok, connection, channel} - end - end - - defp exchange_name do - Application.get_env(:workload_service, WorkloadService.MessageBus, []) - |> Keyword.get(:exchange, "workload_service.events") - end - - defp schedule_reconnect do - Process.send_after(self(), :connect, @reconnect_interval) - end -end + defp amqp_url, do: Application.fetch_env!(:workload_service, :amqp_url) +end \ No newline at end of file diff --git a/lib/workload_service/projectors/task_projector.ex b/lib/workload_service/projectors/task_projector.ex index 12304fa..e0a925a 100644 --- a/lib/workload_service/projectors/task_projector.ex +++ b/lib/workload_service/projectors/task_projector.ex @@ -10,8 +10,6 @@ defmodule WorkloadService.Projectors.TaskProjector do import Ecto.Query project(%Events.TaskCreated{} = e, _meta, fn multi -> - application_id_str = to_string(e.application_id) - Ecto.Multi.insert(multi, :task, %Task{ id: to_string(e.id), org_id: e.id.org_id, diff --git a/ops/chart/values.yaml b/ops/chart/values.yaml index 0e446ef..f390793 100644 --- a/ops/chart/values.yaml +++ b/ops/chart/values.yaml @@ -140,14 +140,14 @@ rawResources: configure: ".*" read: ".*" - exchange-task-completed: + exchange-quote-task-completed: enabled: true apiVersion: rabbitmq.com/v1beta1 kind: Exchange - suffix: exchange-task-completed + suffix: exchange-quote-task-completed spec: spec: - name: workload_service.events.task_completed + name: workload_service.events.quote_task_completed type: topic durable: true vhost: "application"