Files
workload-service/lib/workload_service/message_bus.ex
2026-04-16 14:20:58 -05:00

87 lines
2.3 KiB
Elixir

defmodule WorkloadService.MessageBus do
@moduledoc false
use GenServer
require Logger
alias AMQP.{Connection, Channel, Exchange}
@reconnect_interval 5_000
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def publish(routing_key, payload) when is_binary(payload) do
GenServer.cast(__MODULE__, {:publish, routing_key, payload})
end
def publish(routing_key, payload) do
GenServer.cast(__MODULE__, {:publish, routing_key, Jason.encode!(payload)})
end
@impl true
def init(_opts) do
send(self(), :connect)
{:ok, %{channel: nil, connection: nil}}
end
@impl true
def handle_info(:connect, _state) do
case connect() do
{:ok, connection, channel} ->
{:noreply, %{connection: connection, channel: channel}}
{:error, reason} ->
Logger.error("Failed to connect to RabbitMQ: #{inspect(reason)}")
schedule_reconnect()
{:noreply, %{channel: nil, connection: nil}}
end
end
@impl true
def handle_info({:DOWN, _, :process, _pid, reason}, _state) do
Logger.error("RabbitMQ connection lost: #{inspect(reason)}")
schedule_reconnect()
{:noreply, %{channel: nil, connection: nil}}
end
@impl true
def handle_cast({:publish, _routing_key, _payload}, %{channel: nil} = state) do
Logger.warning("RabbitMQ not connected, message dropped")
{:noreply, state}
end
@impl true
def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do
Exchange.direct(channel, exchange_name(), durable: true)
AMQP.Basic.publish(channel, exchange_name(), routing_key, payload,
content_type: "application/json",
persistent: true
)
{:noreply, state}
end
defp connect do
config = Application.get_env(:workload_service, WorkloadService.MessageBus, [])
with {:ok, connection} <- Connection.open(config),
{:ok, channel} <- Channel.open(connection) do
Exchange.declare(channel, exchange_name(), :direct, durable: true)
Process.monitor(connection.pid)
{:ok, connection, channel}
end
end
defp exchange_name do
Application.get_env(:workload_service, WorkloadService.MessageBus, [])
|> Keyword.get(:exchange, "workload_service.events")
end
defp schedule_reconnect do
Process.send_after(self(), :connect, @reconnect_interval)
end
end