defmodule WorkloadService.Consumers.QuoteRequestedConsumer do use GenServer require Logger alias WorkloadService.CommandedApp alias WorkloadService.Commands.QuoteTask @exchange "workload_service.events.quote_requested" @queue "workload_service.quote_requested" @routing_key "quote.requested" def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def init(_opts) do {:ok, conn} = AMQP.Connection.open(amqp_url()) {:ok, channel} = AMQP.Channel.open(conn) AMQP.Queue.declare(channel, @queue, durable: true) AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) {:ok, _tag} = AMQP.Basic.consume(channel, @queue) Logger.info("QuoteRequestedConsumer started, listening on #{@queue}") {:ok, %{channel: channel}} end def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} def handle_info({:basic_deliver, payload, meta}, state) do case process(payload) do :ok -> AMQP.Basic.ack(state.channel, meta.delivery_tag) {:error, reason} -> Logger.error("QuoteRequestedConsumer: failed to process: #{inspect(reason)}") AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) end {:noreply, state} end defp process(payload) do with {:ok, event} <- Jason.decode(payload), :ok <- handle_event(event) do :ok end end defp handle_event(%{ "application_id" => application_id, "org_id" => org_id, "provider_id" => provider_id, "policy_type" => policy_type } = event) do task_id = WorkloadService.Aggregates.TaskId.new(org_id, "quote", Ecto.UUID.generate()) command = %QuoteTask.CreateTask{ id: task_id, application_id: application_id, provider_id: provider_id, provider_name: Map.get(event, "provider_name", ""), task_info: %{ "policy_type" => policy_type, "provider_email" => Map.get(event, "provider_email"), "applicant_info" => Map.get(event, "applicant_info", %{}), "car_details" => Map.get(event, "car_details", %{}), "building_details" => Map.get(event, "building_details", %{}), "life_details" => Map.get(event, "life_details", %{}) } } case CommandedApp.dispatch(command) do :ok -> Logger.info("QuoteRequestedConsumer: created task #{task_id}") :ok {:error, reason} -> {:error, reason} end end defp handle_event(event) do {:error, {:invalid_event, event}} end defp amqp_url do Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672") end end