# defmodule WorkloadService.Consumers.SolicitationRequestedConsumer do # use GenServer # require Logger # alias WorkloadService.CommandedApp # alias WorkloadService.Commands.SolicitationTask # @exchange "workload_service.events.solicitation_requested" # @queue "workload_service.solicitation_requested" # @routing_key "solicitation.requested" # @provider_service_url "http://localhost:4002" # @solicitation_service_url "http://localhost:8081" # def start_link(opts \\ []) do # GenServer.start_link(__MODULE__, opts, name: __MODULE__) # end # def init(_opts) do # {:ok, conn} = AMQP.Connection.open(amqp_url()) # {:ok, channel} = AMQP.Channel.open(conn) # AMQP.Queue.declare(channel, @queue, durable: true) # AMQP.Queue.bind(channel, @queue, @exchange, routing_key: @routing_key) # {:ok, _tag} = AMQP.Basic.consume(channel, @queue) # Logger.info("SolicitationRequestedConsumer started, listening on #{@queue}") # {:ok, %{channel: channel}} # end # def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state} # def handle_info({:basic_cancel, _}, state), do: {:stop, :normal, state} # def handle_info({:basic_cancel_ok, _}, state), do: {:noreply, state} # def handle_info({:basic_deliver, payload, meta}, state) do # case process(payload) do # :ok -> # AMQP.Basic.ack(state.channel, meta.delivery_tag) # {:error, reason} -> # Logger.error("SolicitationRequestedConsumer: failed to process: #{inspect(reason)}") # AMQP.Basic.reject(state.channel, meta.delivery_tag, requeue: false) # end # {:noreply, state} # end # defp process(payload) do # with {:ok, event} <- Jason.decode(payload), # {:ok, provider} <- get_provider(event["provider_id"]), # {:ok, template} <- get_active_template(provider, event["policy_type"]), # {:ok, result} <- generate_solicitation(event, template), # :ok <- create_task(event, result) do # :ok # end # end # defp get_provider(provider_id) do # url = "#{@provider_service_url}/api/v1/providers/#{provider_id}" # case Req.get(url) do # {:ok, %{status: 200, body: %{"data" => provider}}} -> {:ok, provider} # {:ok, %{status: 404}} -> {:error, :provider_not_found} # {:error, reason} -> {:error, reason} # end # end # defp get_active_template(provider, policy_type) do # templates = get_in(provider, ["templates", policy_type]) || [] # default_id = get_in(provider, ["default_templates", policy_type]) # template = # if default_id do # Enum.find(templates, &(&1["template_id"] == default_id)) # else # Enum.find(templates, &(&1["active"] == true)) # end # case template do # nil -> {:error, :no_active_template} # t -> {:ok, t} # end # end # defp generate_solicitation(event, template) do # url = "#{@solicitation_service_url}/api/solicitations/generate" # body = %{ # org_id: event["org_id"], # id: event["id"], # template_document_url: template["document_url"], # fields: event["fields"] || %{} # } # case Req.post(url, json: body) do # {:ok, %{status: 200, body: result}} -> # {:ok, result} # {:ok, %{status: status, body: body}} -> # {:error, "solicitation_service returned #{status}: #{inspect(body)}"} # {:error, reason} -> # {:error, reason} # end # end # defp create_task(event, result) do # org_id = event["org_id"] # task_id = WorkloadService.Aggregates.TaskId.new(org_id, "solicitation", Ecto.UUID.generate()) # command = %SolicitationTask.CreateTask{ # id: task_id, # application_id: event["id"], # provider_id: event["provider_id"], # provider_name: event["provider_name"] || "", # task_info: %{ # "quote" => event["quote"], # "plan" => event["plan"] # }, # attachments: [result["document_url"]] |> Enum.filter(& &1) # } # case CommandedApp.dispatch(command) do # :ok -> # Logger.info("SolicitationRequestedConsumer: created task #{task_id}") # :ok # {:error, reason} -> # {:error, reason} # end # end # defp amqp_url do # Application.get_env(:workload_service, :amqp_url, "amqp://guest:guest@localhost:5672") # end # end