defmodule PolicyService.Consumers.PolicyIssuedConsumer do use GenServer require Logger alias PolicyService.CommandedApp alias PolicyService.Commands.CarPolicy alias PolicyService.Aggregates.PolicyId @exchange "carrier_inbox.events" @queue "policy_service.policy_issued" @routing_key "policy.issued" def start_link(_opts), do: GenServer.start_link(__MODULE__, [], name: __MODULE__) def init(_) do {:ok, conn} = AMQP.Connection.open(amqp_url()) {:ok, channel} = AMQP.Channel.open(conn) AMQP.Exchange.topic(channel, @exchange, durable: true) AMQP.Queue.declare(channel, @queue, durable: true) AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) AMQP.Basic.qos(channel, prefetch_count: 10) {:ok, _tag} = AMQP.Basic.consume(channel, @queue) {:ok, %{channel: channel}} end def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} def handle_info({:basic_deliver, payload, meta}, state) do case Jason.decode(payload) do {:ok, event} -> process(event, meta, state) {:error, _} -> Logger.error("PolicyIssuedConsumer: failed to decode payload") AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) end {:noreply, state} end defp process(event, meta, state) do %{policy_type: policy_type} = PolicyId.parse!(event["id"]) command = case policy_type do "car" -> %CarPolicy.RecordPolicyIssued{ id: event["id"], policy_number: event["policy_number"], effective_date: event["effective_date"], expiry_date: event["expiry_date"], issued_at: DateTime.utc_now() } end case CommandedApp.dispatch(command) do :ok -> AMQP.Basic.ack(state.channel, meta.delivery_tag) {:error, reason} -> Logger.error("PolicyIssuedConsumer: dispatch failed: #{inspect(reason)}") AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: true) end end defp amqp_url do Application.get_env(:policy_service, :amqp_url, "amqp://guest:guest@localhost:5672") end end