whitelist: []
+config :pleroma, :media_preview_proxy,
+ enabled: false,
+ limit_dimensions: "400x200",
+ max_body_length: 25 * 1_048_576
config :pleroma, :chat, enabled: true
config :phoenix, :format_encoders, json: Jason
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+defmodule Pleroma.Helpers.MogrifyHelper do
+ @moduledoc """
+ Handles common Mogrify operations.
+ """
+ @spec store_as_temporary_file(String.t(), binary()) :: {:ok, String.t()} | {:error, atom()}
+ @doc "Stores binary content fetched from specified URL as a temporary file."
+ def store_as_temporary_file(url, body) do
+ path = Mogrify.temporary_path_for(%{path: url})
+ with :ok <- File.write(path, body), do: {:ok, path}
+ end
+ @spec store_as_temporary_file(String.t(), String.t()) :: Mogrify.Image.t() | any()
+ @doc "Modifies file at specified path by resizing to specified limit dimensions."
+ def in_place_resize_to_limit(path, resize_dimensions) do
+ path
+ |> Mogrify.open()
+ |> Mogrify.resize_to_limit(resize_dimensions)
+ |> Mogrify.save(in_place: true)
+ end
[attachment_url | _] = attachment["url"]
media_type = attachment_url["mediaType"] || attachment_url["mimeType"] || "image"
href = attachment_url["href"] |> MediaProxy.url()
+ href_preview = attachment_url["href"] |> MediaProxy.preview_url()
type =
cond do
id: to_string(attachment["id"] || hash_id),
url: href,
remote_url: href,
- preview_url: href,
+ preview_url: href_preview,
text_url: href,
type: type,
description: attachment["name"],
+ def preview_url(url) do
+ if disabled?() or whitelisted?(url) do
+ url
+ else
+ encode_preview_url(url)
+ end
+ end
defp disabled?, do: !Config.get([:media_proxy, :enabled], false)
defp local?(url), do: String.starts_with?(url, Pleroma.Web.base_url())
- def encode_url(url) do
+ defp base64_sig64(url) do
base64 = Base.url_encode64(url, @base64_opts)
sig64 =
- |> signed_url
+ |> signed_url()
|> Base.url_encode64(@base64_opts)
+ {base64, sig64}
+ end
+ def encode_url(url) do
+ {base64, sig64} = base64_sig64(url)
build_url(sig64, base64, filename(url))
+ def encode_preview_url(url) do
+ {base64, sig64} = base64_sig64(url)
+ build_preview_url(sig64, base64, filename(url))
+ end
def decode_url(sig, url) do
with {:ok, sig} <- Base.url_decode64(sig, @base64_opts),
signature when signature == sig <- signed_url(url) do
if path = URI.parse(url_or_path).path, do: Path.basename(path)
- def build_url(sig_base64, url_base64, filename \\ nil) do
+ defp proxy_url(path, sig_base64, url_base64, filename) do
Pleroma.Config.get([:media_proxy, :base_url], Web.base_url()),
- "proxy",
+ path,
|> Enum.filter(& &1)
|> Path.join()
+ def build_url(sig_base64, url_base64, filename \\ nil) do
+ proxy_url("proxy", sig_base64, url_base64, filename)
+ end
+ def build_preview_url(sig_base64, url_base64, filename \\ nil) do
+ proxy_url("proxy/preview", sig_base64, url_base64, filename)
+ end
+ def filename_matches(%{"filename" => _} = _, path, url) do
+ filename = filename(url)
+ if filename && not basename_matches?(path, filename) do
+ {:wrong_filename, filename}
+ else
+ :ok
+ end
+ end
+ def filename_matches(_, _, _), do: :ok
+ defp basename_matches?(path, filename) do
+ basename = Path.basename(path)
+ basename == filename or URI.decode(basename) == filename or URI.encode(basename) == filename
+ end
defmodule Pleroma.Web.MediaProxy.MediaProxyController do
use Pleroma.Web, :controller
+ alias Pleroma.Config
+ alias Pleroma.Helpers.MogrifyHelper
alias Pleroma.ReverseProxy
alias Pleroma.Web.MediaProxy
@default_proxy_opts [max_body_length: 25 * 1_048_576, http: [follow_redirect: true]]
def remote(conn, %{"sig" => sig64, "url" => url64} = params) do
- with config <- Pleroma.Config.get([:media_proxy], []),
- true <- Keyword.get(config, :enabled, false),
+ with config <- Config.get([:media_proxy], []),
+ {_, true} <- {:enabled, Keyword.get(config, :enabled, false)},
{:ok, url} <- MediaProxy.decode_url(sig64, url64),
- :ok <- filename_matches(params, conn.request_path, url) do
+ :ok <- MediaProxy.filename_matches(params, conn.request_path, url) do
ReverseProxy.call(conn, url, Keyword.get(config, :proxy_opts, @default_proxy_opts))
- false ->
+ {:enabled, false} ->
send_resp(conn, 404, Plug.Conn.Status.reason_phrase(404))
{:error, :invalid_signature} ->
- def filename_matches(%{"filename" => _} = _, path, url) do
- filename = MediaProxy.filename(url)
+ def preview(conn, %{"sig" => sig64, "url" => url64} = params) do
+ with {_, true} <- {:enabled, Config.get([:media_preview_proxy, :enabled], false)},
+ {:ok, url} <- MediaProxy.decode_url(sig64, url64),
+ :ok <- MediaProxy.filename_matches(params, conn.request_path, url) do
+ handle_preview(conn, url)
+ else
+ {:enabled, false} ->
+ send_resp(conn, 404, Plug.Conn.Status.reason_phrase(404))
+ {:error, :invalid_signature} ->
+ send_resp(conn, 403, Plug.Conn.Status.reason_phrase(403))
+ {:wrong_filename, filename} ->
+ redirect(conn, external: MediaProxy.build_preview_url(sig64, url64, filename))
+ end
+ end
- if filename && does_not_match(path, filename) do
- {:wrong_filename, filename}
+ defp handle_preview(conn, url) do
+ with {:ok, %{status: status} = head_response} when status in 200..299 <- Tesla.head(url),
+ {_, true} <- {:acceptable_content_length, acceptable_body_length?(head_response)} do
+ content_type = Tesla.get_header(head_response, "content-type")
+ handle_preview(content_type, conn, url)
- :ok
+ {_, %{status: status}} ->
+ send_resp(conn, :failed_dependency, "Can't fetch HTTP headers (HTTP #{status}).")
+ {:acceptable_content_length, false} ->
+ send_resp(conn, :unprocessable_entity, "Source file size exceeds limit.")
- def filename_matches(_, _, _), do: :ok
+ defp handle_preview("image/" <> _, %{params: params} = conn, url) do
+ with {:ok, %{status: status, body: body}} when status in 200..299 <- Tesla.get(url),
+ {:ok, path} <- MogrifyHelper.store_as_temporary_file(url, body),
+ resize_dimensions <-
+ Map.get(
+ params,
+ "limit_dimensions",
+ Config.get([:media_preview_proxy, :limit_dimensions])
+ ),
+ %Mogrify.Image{} <- MogrifyHelper.in_place_resize_to_limit(path, resize_dimensions) do
+ send_file(conn, 200, path)
+ else
+ {_, %{status: _}} ->
+ send_resp(conn, :failed_dependency, "Can't fetch the image.")
+ _ ->
+ send_resp(conn, :failed_dependency, "Can't handle image preview.")
+ end
+ end
+ defp handle_preview(content_type, conn, _url) do
+ send_resp(conn, :unprocessable_entity, "Unsupported content type: #{content_type}.")
+ end
+ defp acceptable_body_length?(head_response) do
+ max_body_length = Config.get([:media_preview_proxy, :max_body_length], nil)
+ content_length = Tesla.get_header(head_response, "content-length")
+ content_length = with {int, _} <- Integer.parse(content_length), do: int
- defp does_not_match(path, filename) do
- basename = Path.basename(path)
- basename != filename and URI.decode(basename) != filename and URI.encode(basename) != filename
+ content_length == :error or
+ max_body_length in [nil, :infinity] or
+ content_length <= max_body_length
scope "/proxy/", Pleroma.Web.MediaProxy do
+ get("/preview/:sig/:url", MediaProxyController, :preview)
+ get("/preview/:sig/:url/:filename", MediaProxyController, :preview)
get("/:sig/:url", MediaProxyController, :remote)
get("/:sig/:url/:filename", MediaProxyController, :remote)
defmodule Pleroma.Web.MediaProxyTest do
use ExUnit.Case
use Pleroma.Tests.Helpers
- import Pleroma.Web.MediaProxy
- alias Pleroma.Web.MediaProxy.MediaProxyController
- setup do: clear_config([:media_proxy, :enabled])
- setup do: clear_config(Pleroma.Upload)
+ alias Pleroma.Config
+ alias Pleroma.Web.Endpoint
+ alias Pleroma.Web.MediaProxy
+ defp decode_result(encoded) do
+ [_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/")
+ {:ok, decoded} = MediaProxy.decode_url(sig, base64)
+ decoded
+ end
describe "when enabled" do
- setup do
- Pleroma.Config.put([:media_proxy, :enabled], true)
- :ok
- end
+ setup do: clear_config([:media_proxy, :enabled], true)
test "ignores invalid url" do
- assert url(nil) == nil
- assert url("") == nil
+ assert MediaProxy.url(nil) == nil
+ assert MediaProxy.url("") == nil
test "ignores relative url" do
- assert url("/local") == "/local"
- assert url("/") == "/"
+ assert MediaProxy.url("/local") == "/local"
+ assert MediaProxy.url("/") == "/"
test "ignores local url" do
- local_url = Pleroma.Web.Endpoint.url() <> "/hello"
- local_root = Pleroma.Web.Endpoint.url()
- assert url(local_url) == local_url
- assert url(local_root) == local_root
+ local_url = Endpoint.url() <> "/hello"
+ local_root = Endpoint.url()
+ assert MediaProxy.url(local_url) == local_url
+ assert MediaProxy.url(local_root) == local_root
test "encodes and decodes URL" do
url = "https://pleroma.soykaf.com/static/logo.png"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert String.starts_with?(
- Pleroma.Config.get([:media_proxy, :base_url], Pleroma.Web.base_url())
+ Config.get([:media_proxy, :base_url], Pleroma.Web.base_url())
assert String.ends_with?(encoded, "/logo.png")
test "encodes and decodes URL without a path" do
url = "https://pleroma.soykaf.com"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert decode_result(encoded) == url
test "encodes and decodes URL without an extension" do
url = "https://pleroma.soykaf.com/path/"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert String.ends_with?(encoded, "/path")
assert decode_result(encoded) == url
test "encodes and decodes URL and ignores query params for the path" do
url = "https://pleroma.soykaf.com/static/logo.png?93939393939&bunny=true"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert String.ends_with?(encoded, "/logo.png")
assert decode_result(encoded) == url
test "validates signature" do
- secret_key_base = Pleroma.Config.get([Pleroma.Web.Endpoint, :secret_key_base])
- on_exit(fn ->
- Pleroma.Config.put([Pleroma.Web.Endpoint, :secret_key_base], secret_key_base)
- end)
+ secret_key_base = Config.get([Endpoint, :secret_key_base])
+ clear_config([Endpoint, :secret_key_base], secret_key_base)
- encoded = url("https://pleroma.social")
+ encoded = MediaProxy.url("https://pleroma.social")
- Pleroma.Config.put(
- [Pleroma.Web.Endpoint, :secret_key_base],
+ Config.put(
+ [Endpoint, :secret_key_base],
[_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/")
- assert decode_url(sig, base64) == {:error, :invalid_signature}
+ assert MediaProxy.decode_url(sig, base64) == {:error, :invalid_signature}
- test "filename_matches preserves the encoded or decoded path" do
- assert MediaProxyController.filename_matches(
+ test "`filename_matches/_` preserves the encoded or decoded path" do
+ assert MediaProxy.filename_matches(
%{"filename" => "/Hello world.jpg"},
"/Hello world.jpg",
"http://pleroma.social/Hello world.jpg"
) == :ok
- assert MediaProxyController.filename_matches(
+ assert MediaProxy.filename_matches(
%{"filename" => "/Hello%20world.jpg"},
) == :ok
- assert MediaProxyController.filename_matches(
+ assert MediaProxy.filename_matches(
%{"filename" => "/my%2Flong%2Furl%2F2019%2F07%2FS.jpg"},
) == :ok
- assert MediaProxyController.filename_matches(
+ assert MediaProxy.filename_matches(
%{"filename" => "/my%2Flong%2Furl%2F2019%2F07%2FS.jp"},
# conn.request_path will return encoded url
request_path = "/ANALYSE-DAI-_-LE-STABLECOIN-100-D%C3%89CENTRALIS%C3%89-BQ.jpg"
- assert MediaProxyController.filename_matches(
+ assert MediaProxy.filename_matches(
test "uses the configured base_url" do
- base_url = Pleroma.Config.get([:media_proxy, :base_url])
- if base_url do
- on_exit(fn ->
- Pleroma.Config.put([:media_proxy, :base_url], base_url)
- end)
- end
- Pleroma.Config.put([:media_proxy, :base_url], "https://cache.pleroma.social")
+ clear_config([:media_proxy, :base_url], "https://cache.pleroma.social")
url = "https://pleroma.soykaf.com/static/logo.png"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
- assert String.starts_with?(encoded, Pleroma.Config.get([:media_proxy, :base_url]))
+ assert String.starts_with?(encoded, Config.get([:media_proxy, :base_url]))
# Some sites expect ASCII encoded characters in the URL to be preserved even if
url =
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert decode_result(encoded) == url
url =
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert decode_result(encoded) == url
test "preserve unicode characters" do
url = "https://ko.wikipedia.org/wiki/위키백과:대문"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert decode_result(encoded) == url
describe "when disabled" do
- setup do
- enabled = Pleroma.Config.get([:media_proxy, :enabled])
- if enabled do
- Pleroma.Config.put([:media_proxy, :enabled], false)
- on_exit(fn ->
- Pleroma.Config.put([:media_proxy, :enabled], enabled)
- :ok
- end)
- end
- :ok
- end
+ setup do: clear_config([:media_proxy, :enabled], false)
test "does not encode remote urls" do
- assert url("https://google.fr") == "https://google.fr"
+ assert MediaProxy.url("https://google.fr") == "https://google.fr"
- defp decode_result(encoded) do
- [_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/")
- {:ok, decoded} = decode_url(sig, base64)
- decoded
- end
describe "whitelist" do
- setup do
- Pleroma.Config.put([:media_proxy, :enabled], true)
- :ok
- end
+ setup do: clear_config([:media_proxy, :enabled], true)
test "mediaproxy whitelist" do
- Pleroma.Config.put([:media_proxy, :whitelist], ["google.com", "feld.me"])
+ clear_config([:media_proxy, :whitelist], ["google.com", "feld.me"])
url = "https://feld.me/foo.png"
- unencoded = url(url)
+ unencoded = MediaProxy.url(url)
assert unencoded == url
test "does not change whitelisted urls" do
- Pleroma.Config.put([:media_proxy, :whitelist], ["mycdn.akamai.com"])
- Pleroma.Config.put([:media_proxy, :base_url], "https://cache.pleroma.social")
+ clear_config([:media_proxy, :whitelist], ["mycdn.akamai.com"])
+ clear_config([:media_proxy, :base_url], "https://cache.pleroma.social")
media_url = "https://mycdn.akamai.com"
url = "#{media_url}/static/logo.png"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert String.starts_with?(encoded, media_url)
test "ensure Pleroma.Upload base_url is always whitelisted" do
media_url = "https://media.pleroma.social"
- Pleroma.Config.put([Pleroma.Upload, :base_url], media_url)
+ clear_config([Pleroma.Upload, :base_url], media_url)
url = "#{media_url}/static/logo.png"
- encoded = url(url)
+ encoded = MediaProxy.url(url)
assert String.starts_with?(encoded, media_url)