From c8a58c3f584a819399a3c1d8f69241ba23828ca2 Mon Sep 17 00:00:00 2001 From: HaimKortovich Date: Thu, 23 Apr 2026 10:32:20 -0500 Subject: [PATCH] add solictation consumer --- .../aggregates/policy_application.ex | 3 +- lib/policy_service/application.ex | 2 +- .../consumers/solicitation_task_consumer.ex | 87 +++++++++++++++++++ lib/policy_service/events/policy.ex | 2 +- ops/chart/values.yaml | 19 +++- 5 files changed, 108 insertions(+), 5 deletions(-) create mode 100644 lib/policy_service/consumers/solicitation_task_consumer.ex diff --git a/lib/policy_service/aggregates/policy_application.ex b/lib/policy_service/aggregates/policy_application.ex index 0ee8c43..95685e1 100644 --- a/lib/policy_service/aggregates/policy_application.ex +++ b/lib/policy_service/aggregates/policy_application.ex @@ -158,7 +158,8 @@ defmodule PolicyService.Aggregates.PolicyApplication do }, %SolicitationRequestSent{ id: agg.id, - plan: result.plan + plan: result.plan, + provider_id: result.provider } ] end diff --git a/lib/policy_service/application.ex b/lib/policy_service/application.ex index c9a2a41..4b78d7f 100644 --- a/lib/policy_service/application.ex +++ b/lib/policy_service/application.ex @@ -12,7 +12,7 @@ defmodule PolicyService.Application do PolicyService.Handlers.QuoteRequestHandler, PolicyService.Handlers.SolicitationRequestHandler, PolicyService.Consumers.QuoteTaskConsumer, - # PolicyService.Consumers.SolicitationTaskConsumer, + PolicyService.Consumers.SolicitationTaskConsumer, # PolicyService.Consumers.PolicyIssuedConsumer, PolicyService.Projectors.PolicyProjector, PolicyServiceWeb.Telemetry, diff --git a/lib/policy_service/consumers/solicitation_task_consumer.ex b/lib/policy_service/consumers/solicitation_task_consumer.ex new file mode 100644 index 0000000..fc22565 --- /dev/null +++ b/lib/policy_service/consumers/solicitation_task_consumer.ex @@ -0,0 +1,87 @@ +defmodule PolicyService.Consumers.SolicitationTaskConsumer do + use GenServer + require Logger + + alias PolicyService.CommandedApp + alias PolicyService.Commands.CarPolicy + alias PolicyService.Aggregates.PolicyId + + @exchange "workload_service.events.solicitation_task_completed" + @queue "policy_service.solicitation_task_completed" + @routing_key "solicitation_task.completed" + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_opts) do + {:ok, conn} = AMQP.Connection.open(Application.fetch_env!(:policy_service, :amqp_url)) + {:ok, channel} = AMQP.Channel.open(conn) + + AMQP.Queue.declare(channel, @queue, durable: true) + AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) + {:ok, _tag} = AMQP.Basic.consume(channel, @queue) + + Logger.info("SolicitationTaskConsumer started, listening on #{@queue}") + + {:ok, %{channel: channel}} + end + + def handle_info({:basic_deliver, payload, meta}, state) do + :ok = + case process(payload) do + :ok -> + AMQP.Basic.ack(state.channel, meta.delivery_tag) + + {:error, reason} -> + Logger.error("SolicitationTaskConsumer: failed to process: #{inspect(reason)}") + AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) + end + + {:noreply, state} + end + + defp process(payload) do + with {:ok, event} <- Jason.decode(payload), + :ok <- dispatch(event) do + :ok + end + end + + def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} + def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} + def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} + + defp dispatch(%{ + "application_id" => %{ + "org_id" => org_id, + "policy_type" => policy_type, + "application_id" => app_id + }, + "task_info" => task_info, + "submission" => submission + }) do + cmd = + case policy_type do + "car" -> + %CarPolicy.RecordPolicyIssued{ + id: PolicyId.new(org_id, policy_type, app_id), + provider_policy_number: Map.get(submission, "provider_policy_number"), + effective_date: Map.get(submission, "effective_date"), + expiry_date: Map.get(submission, "expiry_date"), + issued_at: + Map.get(submission, "issued_at") || DateTime.utc_now() |> DateTime.to_iso8601() + } + end + + case CommandedApp.dispatch(cmd, consistency: :strong) do + :ok -> + Logger.info("SolicitationTaskConsumer: issued policy for #{app_id}") + :ok + + {:error, reason} -> + Logger.error("SolicitationTaskConsumer: failed to issue policy - #{inspect(reason)}") + {:error, reason} + end + end +end diff --git a/lib/policy_service/events/policy.ex b/lib/policy_service/events/policy.ex index 1da7cdd..9446a79 100644 --- a/lib/policy_service/events/policy.ex +++ b/lib/policy_service/events/policy.ex @@ -56,7 +56,7 @@ defmodule PolicyService.Events.Policy do defmodule SolicitationRequestSent do use PolicyService.Events @derive Jason.Encoder - defstruct [:id, :plan] + defstruct [:id, :plan, :provider_id] end defmodule PolicyIssued do diff --git a/ops/chart/values.yaml b/ops/chart/values.yaml index 7045361..7de6efb 100644 --- a/ops/chart/values.yaml +++ b/ops/chart/values.yaml @@ -155,14 +155,29 @@ rawResources: name: rabbitmq namespace: rabbitmq - exchange-solictation-requested: + exchange-solicitation-requested: enabled: true apiVersion: rabbitmq.com/v1beta1 kind: Exchange suffix: exchange-solicitation-requested spec: spec: - name: policy_service.events.solictation_requested + name: policy_service.events.solicitation_requested + type: topic + durable: true + vhost: "application" + rabbitmqClusterReference: + name: rabbitmq + namespace: rabbitmq + + exchange-solicitation-task-completed: + enabled: true + apiVersion: rabbitmq.com/v1beta1 + kind: Exchange + suffix: exchange-solicitation-task-completed + spec: + spec: + name: workload_service.events.solicitation_task_completed type: topic durable: true vhost: "application"