Add Idempotency plug
authorEgor Kislitsyn <egor@kislitsyn.com>
Wed, 26 Jun 2019 11:36:42 +0000 (18:36 +0700)
committerEgor Kislitsyn <egor@kislitsyn.com>
Wed, 26 Jun 2019 11:36:58 +0000 (18:36 +0700)
lib/pleroma/plugs/idempotency_plug.ex [new file with mode: 0644]
test/plugs/idempotency_plug_test.exs [new file with mode: 0644]

diff --git a/lib/pleroma/plugs/idempotency_plug.ex b/lib/pleroma/plugs/idempotency_plug.ex
new file mode 100644 (file)
index 0000000..442573d
--- /dev/null
@@ -0,0 +1,82 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Plugs.IdempotencyPlug do
+  import Phoenix.Controller, only: [json: 2]
+  import Plug.Conn
+
+  @behaviour Plug
+
+  @impl true
+  def init(opts), do: opts
+
+  # Sending idempotency keys in `GET` and `DELETE` requests has no effect and should be avoided, as these requests are idempotent by definition.
+  @impl true
+  def call(%{method: method} = conn, _) when method in ["POST", "PUT", "PATCH"] do
+    case get_req_header(conn, "idempotency-key") do
+      [key] -> process_request(conn, key)
+      _ -> conn
+    end
+  end
+
+  def call(conn, _), do: conn
+
+  def process_request(conn, key) do
+    case Cachex.get(:idempotency_cache, key) do
+      {:ok, nil} ->
+        cache_resposnse(conn, key)
+
+      {atom, message} when atom in [:ignore, :error] ->
+        render_error(conn, message)
+
+      {:ok, record} ->
+        send_cached(conn, key, record)
+    end
+  end
+
+  defp cache_resposnse(conn, key) do
+    Plug.Conn.register_before_send(conn, fn conn ->
+      [request_id] = get_resp_header(conn, "x-request-id")
+      content_type = get_content_type(conn)
+
+      record = {request_id, content_type, conn.status, conn.resp_body}
+      {:ok, _} = Cachex.put(:idempotency_cache, key, record)
+
+      conn
+      |> put_resp_header("idempotency-key", key)
+      |> put_resp_header("x-original-request-id", request_id)
+    end)
+  end
+
+  defp send_cached(conn, key, record) do
+    {request_id, content_type, status, body} = record
+
+    conn
+    |> put_resp_header("idempotency-key", key)
+    |> put_resp_header("idempotent-replayed", "true")
+    |> put_resp_header("x-original-request-id", request_id)
+    |> put_resp_content_type(content_type)
+    |> send_resp(status, body)
+    |> halt()
+  end
+
+  defp render_error(conn, message) do
+    conn
+    |> put_status(:unprocessable_entity)
+    |> json(%{error: message})
+    |> halt()
+  end
+
+  defp get_content_type(conn) do
+    [content_type] = get_resp_header(conn, "content-type")
+
+    if String.contains?(content_type, ";") do
+      content_type
+      |> String.split(";")
+      |> hd()
+    else
+      content_type
+    end
+  end
+end
diff --git a/test/plugs/idempotency_plug_test.exs b/test/plugs/idempotency_plug_test.exs
new file mode 100644 (file)
index 0000000..aebc463
--- /dev/null
@@ -0,0 +1,110 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Plugs.IdempotencyPlugTest do
+  use ExUnit.Case, async: true
+  use Plug.Test
+
+  alias Pleroma.Plugs.IdempotencyPlug
+  alias Plug.Conn
+
+  test "returns result from cache" do
+    key = "test1"
+    orig_request_id = "test1"
+    second_request_id = "test2"
+    body = "testing"
+    status = 200
+
+    :post
+    |> conn("/cofe")
+    |> put_req_header("idempotency-key", key)
+    |> Conn.put_resp_header("x-request-id", orig_request_id)
+    |> Conn.put_resp_content_type("application/json")
+    |> IdempotencyPlug.call([])
+    |> Conn.send_resp(status, body)
+
+    conn2 =
+      :post
+      |> conn("/cofe")
+      |> put_req_header("idempotency-key", key)
+      |> Conn.put_resp_header("x-request-id", second_request_id)
+      |> Conn.put_resp_content_type("application/json")
+      |> IdempotencyPlug.call([])
+
+    assert_raise Conn.AlreadySentError, fn ->
+      Conn.send_resp(conn2, :im_a_teapot, "no cofe")
+    end
+
+    assert conn2.resp_body == body
+    assert conn2.status == status
+
+    assert [^second_request_id] = Conn.get_resp_header(conn2, "x-request-id")
+    assert [^orig_request_id] = Conn.get_resp_header(conn2, "x-original-request-id")
+    assert [^key] = Conn.get_resp_header(conn2, "idempotency-key")
+    assert ["true"] = Conn.get_resp_header(conn2, "idempotent-replayed")
+    assert ["application/json; charset=utf-8"] = Conn.get_resp_header(conn2, "content-type")
+  end
+
+  test "pass conn downstream if the cache not found" do
+    key = "test2"
+    orig_request_id = "test3"
+    body = "testing"
+    status = 200
+
+    conn =
+      :post
+      |> conn("/cofe")
+      |> put_req_header("idempotency-key", key)
+      |> Conn.put_resp_header("x-request-id", orig_request_id)
+      |> Conn.put_resp_content_type("application/json")
+      |> IdempotencyPlug.call([])
+      |> Conn.send_resp(status, body)
+
+    assert conn.resp_body == body
+    assert conn.status == status
+
+    assert [] = Conn.get_resp_header(conn, "idempotent-replayed")
+    assert [^key] = Conn.get_resp_header(conn, "idempotency-key")
+  end
+
+  test "passes conn downstream if idempotency is not present in headers" do
+    orig_request_id = "test4"
+    body = "testing"
+    status = 200
+
+    conn =
+      :post
+      |> conn("/cofe")
+      |> Conn.put_resp_header("x-request-id", orig_request_id)
+      |> Conn.put_resp_content_type("application/json")
+      |> IdempotencyPlug.call([])
+      |> Conn.send_resp(status, body)
+
+    assert [] = Conn.get_resp_header(conn, "idempotency-key")
+  end
+
+  test "doesn't work with GET/DELETE" do
+    key = "test3"
+    body = "testing"
+    status = 200
+
+    conn =
+      :get
+      |> conn("/cofe")
+      |> put_req_header("idempotency-key", key)
+      |> IdempotencyPlug.call([])
+      |> Conn.send_resp(status, body)
+
+    assert [] = Conn.get_resp_header(conn, "idempotency-key")
+
+    conn =
+      :delete
+      |> conn("/cofe")
+      |> put_req_header("idempotency-key", key)
+      |> IdempotencyPlug.call([])
+      |> Conn.send_resp(status, body)
+
+    assert [] = Conn.get_resp_header(conn, "idempotency-key")
+  end
+end