defmodule PolicyService.Consumers.QuoteReceivedConsumer do use GenServer require Logger alias PolicyService.CommandedApp alias PolicyService.Commands.CarPolicy alias PolicyService.Aggregates.PolicyId @exchange "carrier_inbox.events" @queue "policy_service.quote_received" @routing_key "quote.received" def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def init(_opts) do amqp_url = Application.fetch_env!(:policy_service, :amqp_url) {:ok, conn} = AMQP.Connection.open(amqp_url) {:ok, channel} = AMQP.Channel.open(conn) AMQP.Exchange.declare(channel, @exchange, :topic, durable: true) AMQP.Queue.declare(channel, @queue, durable: true) AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) AMQP.Basic.consume(channel, @queue, nil, no_ack: false) Logger.info("QuoteReceivedConsumer started, listening on #{@queue}") {:ok, %{conn: conn, channel: channel}} end # --------------------------------------------------------------------------- # AMQP callbacks # --------------------------------------------------------------------------- def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, state) do case process(payload) do :ok -> AMQP.Basic.ack(state.channel, tag) {:error, reason} -> Logger.error("Failed to process quote.received: #{inspect(reason)}") AMQP.Basic.nack(state.channel, tag, requeue: false) end {:noreply, state} end # --------------------------------------------------------------------------- # Processing # --------------------------------------------------------------------------- defp process(payload) do with {:ok, event} <- Jason.decode(payload), {:ok, cmd} <- build_command(event), :ok <- CommandedApp.dispatch(cmd, consistency: :strong) do :ok end end defp build_command(event) do case event["policy_type"] do "car" -> build_car_command(event) type -> {:error, {:unsupported_policy_type, type}} end end defp build_car_command(event) do %{policy_type: policy_type} = PolicyId.parse!(event["id"]) case policy_type do "car" -> cmd = %CarPolicy.RecordProviderQuote{ id: PolicyId.parse!(event["id"]), recorded_by: event["entered_by"], provider_id: event["provider_id"], quote_id: event["quote_id"], valid_until: parse_date(event["valid_until"]), plans: parse_plans(event["plans"]) } {:ok, cmd} end rescue e -> {:error, e} end defp parse_plans(nil), do: [] defp parse_plans(plans) when is_list(plans) do Enum.map(plans, fn p -> %{ plan_id: p["plan_id"], name: p["name"], premium: p["premium"], coverage_details: p["coverage_details"], deductible: p["deductible"], coverage_limit: p["coverage_limit"] } end) end defp parse_date(nil), do: nil defp parse_date(%Date{} = d), do: d defp parse_date(s) when is_binary(s) do case Date.from_iso8601(s) do {:ok, d} -> d _ -> nil end end end