87 lines
2.3 KiB
Elixir
87 lines
2.3 KiB
Elixir
defmodule WorkloadService.MessageBus do
|
|
@moduledoc false
|
|
|
|
use GenServer
|
|
require Logger
|
|
|
|
alias AMQP.{Connection, Channel, Exchange}
|
|
|
|
@reconnect_interval 5_000
|
|
|
|
def start_link(opts \\ []) do
|
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
|
end
|
|
|
|
def publish(routing_key, payload) when is_binary(payload) do
|
|
GenServer.cast(__MODULE__, {:publish, routing_key, payload})
|
|
end
|
|
|
|
def publish(routing_key, payload) do
|
|
GenServer.cast(__MODULE__, {:publish, routing_key, Jason.encode!(payload)})
|
|
end
|
|
|
|
@impl true
|
|
def init(_opts) do
|
|
send(self(), :connect)
|
|
{:ok, %{channel: nil, connection: nil}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(:connect, _state) do
|
|
case connect() do
|
|
{:ok, connection, channel} ->
|
|
{:noreply, %{connection: connection, channel: channel}}
|
|
|
|
{:error, reason} ->
|
|
Logger.error("Failed to connect to RabbitMQ: #{inspect(reason)}")
|
|
schedule_reconnect()
|
|
{:noreply, %{channel: nil, connection: nil}}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:DOWN, _, :process, _pid, reason}, _state) do
|
|
Logger.error("RabbitMQ connection lost: #{inspect(reason)}")
|
|
schedule_reconnect()
|
|
{:noreply, %{channel: nil, connection: nil}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:publish, _routing_key, _payload}, %{channel: nil} = state) do
|
|
Logger.warning("RabbitMQ not connected, message dropped")
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do
|
|
Exchange.direct(channel, exchange_name(), durable: true)
|
|
|
|
AMQP.Basic.publish(channel, exchange_name(), routing_key, payload,
|
|
content_type: "application/json",
|
|
persistent: true
|
|
)
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
defp connect do
|
|
config = Application.get_env(:workload_service, WorkloadService.MessageBus, [])
|
|
|
|
with {:ok, connection} <- Connection.open(config),
|
|
{:ok, channel} <- Channel.open(connection) do
|
|
Exchange.declare(channel, exchange_name(), :direct, durable: true)
|
|
Process.monitor(connection.pid)
|
|
{:ok, connection, channel}
|
|
end
|
|
end
|
|
|
|
defp exchange_name do
|
|
Application.get_env(:workload_service, WorkloadService.MessageBus, [])
|
|
|> Keyword.get(:exchange, "workload_service.events")
|
|
end
|
|
|
|
defp schedule_reconnect do
|
|
Process.send_after(self(), :connect, @reconnect_interval)
|
|
end
|
|
end
|