wip
Some checks are pending
Build and Publish / build-release (push) Waiting to run

This commit is contained in:
2026-04-13 15:30:31 -05:00
parent a52f049a29
commit 5037bc3632
44 changed files with 2210 additions and 676 deletions

View File

@@ -0,0 +1,73 @@
defmodule PolicyService.Consumers.PolicyIssuedConsumer do
use GenServer
require Logger
alias PolicyService.CommandedApp
alias PolicyService.Commands.CarPolicy
alias PolicyService.Aggregates.PolicyId
@exchange "carrier_inbox.events"
@queue "policy_service.policy_issued"
@routing_key "policy.issued"
def start_link(_opts), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
def init(_) do
{:ok, conn} = AMQP.Connection.open(amqp_url())
{:ok, channel} = AMQP.Channel.open(conn)
AMQP.Exchange.topic(channel, @exchange, durable: true)
AMQP.Queue.declare(channel, @queue, durable: true)
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
AMQP.Basic.qos(channel, prefetch_count: 10)
{:ok, _tag} = AMQP.Basic.consume(channel, @queue)
{:ok, %{channel: channel}}
end
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_deliver, payload, meta}, state) do
case Jason.decode(payload) do
{:ok, event} ->
process(event, meta, state)
{:error, _} ->
Logger.error("PolicyIssuedConsumer: failed to decode payload")
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false)
end
{:noreply, state}
end
defp process(event, meta, state) do
%{policy_type: policy_type} = PolicyId.parse!(event["id"])
command =
case policy_type do
"car" ->
%CarPolicy.RecordPolicyIssued{
id: event["id"],
policy_number: event["policy_number"],
effective_date: event["effective_date"],
expiry_date: event["expiry_date"],
issued_at: DateTime.utc_now()
}
end
case CommandedApp.dispatch(command) do
:ok ->
AMQP.Basic.ack(state.channel, meta.delivery_tag)
{:error, reason} ->
Logger.error("PolicyIssuedConsumer: dispatch failed: #{inspect(reason)}")
AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: true)
end
end
defp amqp_url do
Application.get_env(:policy_service, :amqp_url, "amqp://guest:guest@localhost:5672")
end
end

View File

@@ -0,0 +1,120 @@
defmodule PolicyService.Consumers.QuoteReceivedConsumer do
use GenServer
require Logger
alias PolicyService.CommandedApp
alias PolicyService.Commands.CarPolicy
alias PolicyService.Aggregates.PolicyId
@exchange "carrier_inbox.events"
@queue "policy_service.quote_received"
@routing_key "quote.received"
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
amqp_url = Application.fetch_env!(:policy_service, :amqp_url)
{:ok, conn} = AMQP.Connection.open(amqp_url)
{:ok, channel} = AMQP.Channel.open(conn)
AMQP.Exchange.declare(channel, @exchange, :topic, durable: true)
AMQP.Queue.declare(channel, @queue, durable: true)
AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key)
AMQP.Basic.consume(channel, @queue, nil, no_ack: false)
Logger.info("QuoteReceivedConsumer started, listening on #{@queue}")
{:ok, %{conn: conn, channel: channel}}
end
# ---------------------------------------------------------------------------
# AMQP callbacks
# ---------------------------------------------------------------------------
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state}
def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_deliver, payload, %{delivery_tag: tag}}, state) do
case process(payload) do
:ok ->
AMQP.Basic.ack(state.channel, tag)
{:error, reason} ->
Logger.error("Failed to process quote.received: #{inspect(reason)}")
AMQP.Basic.nack(state.channel, tag, requeue: false)
end
{:noreply, state}
end
# ---------------------------------------------------------------------------
# Processing
# ---------------------------------------------------------------------------
defp process(payload) do
with {:ok, event} <- Jason.decode(payload),
{:ok, cmd} <- build_command(event),
:ok <- CommandedApp.dispatch(cmd, consistency: :strong) do
:ok
end
end
defp build_command(event) do
case event["policy_type"] do
"car" -> build_car_command(event)
type -> {:error, {:unsupported_policy_type, type}}
end
end
defp build_car_command(event) do
%{policy_type: policy_type} = PolicyId.parse!(event["id"])
case policy_type do
"car" ->
cmd = %CarPolicy.RecordProviderQuote{
id: PolicyId.parse!(event["id"]),
recorded_by: event["entered_by"],
provider_id: event["provider_id"],
quote_id: event["quote_id"],
valid_until: parse_date(event["valid_until"]),
plans: parse_plans(event["plans"])
}
{:ok, cmd}
end
rescue
e -> {:error, e}
end
defp parse_plans(nil), do: []
defp parse_plans(plans) when is_list(plans) do
Enum.map(plans, fn p ->
%{
plan_id: p["plan_id"],
name: p["name"],
premium: p["premium"],
coverage_details: p["coverage_details"],
deductible: p["deductible"],
coverage_limit: p["coverage_limit"]
}
end)
end
defp parse_date(nil), do: nil
defp parse_date(%Date{} = d), do: d
defp parse_date(s) when is_binary(s) do
case Date.from_iso8601(s) do
{:ok, d} -> d
_ -> nil
end
end
end