Files
policy-service/lib/policy_service_web/plugs/authorization_plug.ex
HaimKortovich 07a232c131
All checks were successful
Build and Publish / build-release (push) Successful in 2m4s
add rbacs
2026-05-07 14:01:18 -05:00

189 lines
4.8 KiB
Elixir

defmodule PolicyServiceWeb.Plugs.AuthorizationPlug do
@moduledoc """
Authorization plug for enforcing role-based access control.
This plug checks if authenticated users have the required roles
and scopes to access protected resources.
"""
import Plug.Conn
require Logger
@doc """
Initializes the authorization plug.
## Options
- :required_roles - List of roles that can access the resource (optional)
- :required_scopes - List of scopes required to access the resource (optional)
- :resource_owner_check - Function to check if user owns the resource (optional)
"""
def init(opts) do
required_permission = Keyword.get(opts, :required_permission, nil)
required_scopes = Keyword.get(opts, :required_scopes, [])
resource_owner_check = Keyword.get(opts, :resource_owner_check, nil)
%{
required_permission: required_permission,
required_scopes: required_scopes,
resource_owner_check: resource_owner_check
}
end
@doc """
Authorizes the request by checking roles and scopes.
Verifies that the authenticated user has the required roles and scopes
to access the requested resource.
"""
def call(conn, config) do
user_roles = conn.assigns[:roles] || []
user_scopes = conn.assigns[:scopes] || []
user_id = conn.assigns[:user_id]
org_id = conn.assigns[:org_id]
with :ok <- check_roles(user_roles, config.required_roles),
:ok <- check_scopes(user_scopes, config.required_scopes),
:ok <- check_resource_ownership(conn, config.resource_owner_check, user_id, org_id) do
conn
else
{:error, :insufficient_role} ->
handle_insufficient_role(conn, config.required_roles)
{:error, :insufficient_scope} ->
handle_insufficient_scope(conn, config.required_scopes)
{:error, :not_owner} ->
handle_not_owner(conn)
end
end
defp check_roles(_user_roles, required_roles) when required_roles == [] do
:ok
end
defp check_roles(user_roles, required_permission) do
if has_any_role?(user_roles, required_permission) do
:ok
else
Logger.warning(
"User with roles #{inspect(user_roles)} lacks required permission: #{inspect(required_permission)}"
)
{:error, :insufficient_role}
end
end
defp check_scopes(_user_scopes, required_scopes) when required_scopes == [] do
:ok
end
defp check_scopes(user_scopes, required_scopes) do
if has_all_scopes?(user_scopes, required_scopes) do
:ok
else
Logger.warning(
"User with scopes #{inspect(user_scopes)} lacks required scopes: #{inspect(required_scopes)}"
)
{:error, :insufficient_scope}
end
end
defp check_resource_ownership(_conn, nil, _user_id, _org_id) do
:ok
end
defp check_resource_ownership(conn, check_function, user_id, org_id) do
try do
if check_function.(conn, user_id, org_id) do
:ok
else
{:error, :not_owner}
end
rescue
error ->
Logger.error("Resource ownership check failed: #{inspect(error)}")
{:error, :not_owner}
end
end
@doc """
Checks if the user has any of the required roles.
Supports role hierarchy where admin has all permissions.
"""
def has_any_role?(user_roles, required_roles) do
cond do
"admin" in user_roles ->
true
Enum.any?(required_roles, fn role -> role in user_roles end) ->
true
true ->
false
end
end
@doc """
Checks if the user has all of the required scopes.
"""
def has_all_scopes?(user_scopes, required_scopes) do
Enum.all?(required_scopes, fn scope -> scope in user_scopes end)
end
@doc """
Checks if the user can access a specific resource based on ownership.
Admins can access any resource. Other users can only access their own resources.
"""
def can_access_resource?(user_roles, resource_user_id, current_user_id) do
cond do
"admin" in user_roles ->
true
resource_user_id == current_user_id ->
true
true ->
false
end
end
defp handle_insufficient_role(conn, required_roles) do
conn
|> put_resp_content_type("application/json")
|> send_resp(
403,
Jason.encode!(%{
error: "Insufficient permissions",
required_roles: required_roles
})
)
|> halt()
end
defp handle_insufficient_scope(conn, required_scopes) do
conn
|> put_resp_content_type("application/json")
|> send_resp(
403,
Jason.encode!(%{
error: "Insufficient permissions",
required_scopes: required_scopes
})
)
|> halt()
end
defp handle_not_owner(conn) do
conn
|> put_resp_content_type("application/json")
|> send_resp(
403,
Jason.encode!(%{error: "You do not have permission to access this resource"})
)
|> halt()
end
end