defmodule WorkloadService.MessageBus do @moduledoc false use GenServer require Logger alias AMQP.{Connection, Channel, Exchange} @reconnect_interval 5_000 def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def publish(routing_key, payload) when is_binary(payload) do GenServer.cast(__MODULE__, {:publish, routing_key, payload}) end def publish(routing_key, payload) do GenServer.cast(__MODULE__, {:publish, routing_key, Jason.encode!(payload)}) end @impl true def init(_opts) do send(self(), :connect) {:ok, %{channel: nil, connection: nil}} end @impl true def handle_info(:connect, _state) do case connect() do {:ok, connection, channel} -> {:noreply, %{connection: connection, channel: channel}} {:error, reason} -> Logger.error("Failed to connect to RabbitMQ: #{inspect(reason)}") schedule_reconnect() {:noreply, %{channel: nil, connection: nil}} end end @impl true def handle_info({:DOWN, _, :process, _pid, reason}, _state) do Logger.error("RabbitMQ connection lost: #{inspect(reason)}") schedule_reconnect() {:noreply, %{channel: nil, connection: nil}} end @impl true def handle_cast({:publish, _routing_key, _payload}, %{channel: nil} = state) do Logger.warning("RabbitMQ not connected, message dropped") {:noreply, state} end @impl true def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do Exchange.direct(channel, exchange_name(), durable: true) AMQP.Basic.publish(channel, exchange_name(), routing_key, payload, content_type: "application/json", persistent: true ) {:noreply, state} end defp connect do config = Application.get_env(:workload_service, WorkloadService.MessageBus, []) with {:ok, connection} <- Connection.open(config), {:ok, channel} <- Channel.open(connection) do Exchange.declare(channel, exchange_name(), :direct, durable: true) Process.monitor(connection.pid) {:ok, connection, channel} end end defp exchange_name do Application.get_env(:workload_service, WorkloadService.MessageBus, []) |> Keyword.get(:exchange, "workload_service.events") end defp schedule_reconnect do Process.send_after(self(), :connect, @reconnect_interval) end end