publish aggregate when task is complete
All checks were successful
Build and Publish / build-release (push) Successful in 1m26s
All checks were successful
Build and Publish / build-release (push) Successful in 1m26s
This commit is contained in:
@@ -1,86 +1,21 @@
|
||||
defmodule WorkloadService.MessageBus do
|
||||
@moduledoc false
|
||||
use AMQP
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
def publish(exchange, routing_key, event) do
|
||||
payload = Jason.encode!(event)
|
||||
|
||||
alias AMQP.{Connection, Channel, Exchange}
|
||||
|
||||
@reconnect_interval 5_000
|
||||
|
||||
def start_link(opts \\ []) do
|
||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
:ok =
|
||||
AMQP.Basic.publish(channel(), exchange, routing_key, payload,
|
||||
content_type: "application/json",
|
||||
persistent: true
|
||||
)
|
||||
end
|
||||
|
||||
def publish(routing_key, payload) when is_binary(payload) do
|
||||
GenServer.cast(__MODULE__, {:publish, routing_key, payload})
|
||||
defp channel do
|
||||
{:ok, conn} = AMQP.Connection.open(amqp_url())
|
||||
{:ok, chan} = AMQP.Channel.open(conn)
|
||||
chan
|
||||
end
|
||||
|
||||
def publish(routing_key, payload) do
|
||||
GenServer.cast(__MODULE__, {:publish, routing_key, Jason.encode!(payload)})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
send(self(), :connect)
|
||||
{:ok, %{channel: nil, connection: nil}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:connect, _state) do
|
||||
case connect() do
|
||||
{:ok, connection, channel} ->
|
||||
{:noreply, %{connection: connection, channel: channel}}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Failed to connect to RabbitMQ: #{inspect(reason)}")
|
||||
schedule_reconnect()
|
||||
{:noreply, %{channel: nil, connection: nil}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:DOWN, _, :process, _pid, reason}, _state) do
|
||||
Logger.error("RabbitMQ connection lost: #{inspect(reason)}")
|
||||
schedule_reconnect()
|
||||
{:noreply, %{channel: nil, connection: nil}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:publish, _routing_key, _payload}, %{channel: nil} = state) do
|
||||
Logger.warning("RabbitMQ not connected, message dropped")
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:publish, routing_key, payload}, %{channel: channel} = state) do
|
||||
Exchange.direct(channel, exchange_name(), durable: true)
|
||||
|
||||
AMQP.Basic.publish(channel, exchange_name(), routing_key, payload,
|
||||
content_type: "application/json",
|
||||
persistent: true
|
||||
)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp connect do
|
||||
config = Application.get_env(:workload_service, WorkloadService.MessageBus, [])
|
||||
|
||||
with {:ok, connection} <- Connection.open(config),
|
||||
{:ok, channel} <- Channel.open(connection) do
|
||||
Exchange.declare(channel, exchange_name(), :direct, durable: true)
|
||||
Process.monitor(connection.pid)
|
||||
{:ok, connection, channel}
|
||||
end
|
||||
end
|
||||
|
||||
defp exchange_name do
|
||||
Application.get_env(:workload_service, WorkloadService.MessageBus, [])
|
||||
|> Keyword.get(:exchange, "workload_service.events")
|
||||
end
|
||||
|
||||
defp schedule_reconnect do
|
||||
Process.send_after(self(), :connect, @reconnect_interval)
|
||||
end
|
||||
end
|
||||
defp amqp_url, do: Application.fetch_env!(:workload_service, :amqp_url)
|
||||
end
|
||||
Reference in New Issue
Block a user