Streamer: Stream out Conversations/Participations.
authorlain <lain@soykaf.club>
Fri, 3 May 2019 11:39:14 +0000 (13:39 +0200)
committerlain <lain@soykaf.club>
Fri, 3 May 2019 11:39:14 +0000 (13:39 +0200)
lib/pleroma/conversation.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/streamer.ex
mix.exs
mix.lock
test/conversation_test.exs
test/web/activity_pub/activity_pub_test.exs

index e6a4ccc85c4989fa22117e831dd5a6860118d2e3..6e26c5fd4b8bf3e5dda2d1b0918ef9087cf0da58 100644 (file)
@@ -63,10 +63,13 @@ defmodule Pleroma.Conversation do
           participation
         end)
 
-      %{
-        conversation
-        | participations: participations
-      }
+      {:ok,
+       %{
+         conversation
+         | participations: participations
+       }}
+    else
+      e -> {:error, e}
     end
   end
 end
index 28754e864e75f37a111285a4c6f80c052eefa1fb..6c737d0a47302ac9903fd18d5c60dc92649e993d 100644 (file)
@@ -142,8 +142,14 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
       end)
 
       Notification.create_notifications(activity)
-      Conversation.create_or_bump_for(activity)
+
+      participations =
+        activity
+        |> Conversation.create_or_bump_for()
+        |> get_participations()
+
       stream_out(activity)
+      stream_out_participations(participations)
       {:ok, activity}
     else
       %Activity{} = activity ->
@@ -166,6 +172,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
     end
   end
 
+  defp get_participations({:ok, %{participations: participations}}), do: participations
+  defp get_participations(_), do: []
+
+  def stream_out_participations(participations) do
+    participations =
+      participations
+      |> Repo.preload(:user)
+
+    Enum.each(participations, fn participation ->
+      Pleroma.Web.Streamer.stream("participation", participation)
+    end)
+  end
+
   def stream_out(activity) do
     public = "https://www.w3.org/ns/activitystreams#Public"
 
@@ -197,6 +216,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
           end
         end
       else
+        # TODO: Write test, replace with visibility test
         if !Enum.member?(activity.data["cc"] || [], public) &&
              !Enum.member?(
                activity.data["to"],
index 72eaf208477444d7d33639af0c1bb3b6027263ed..b8f6663a103b1cc3d7ef5f6988d69222a139f434 100644 (file)
@@ -5,6 +5,7 @@
 defmodule Pleroma.Web.Streamer do
   use GenServer
   require Logger
+  alias Pleroma.Conversation.Participation
   alias Pleroma.Activity
   alias Pleroma.Notification
   alias Pleroma.Object
@@ -71,6 +72,15 @@ defmodule Pleroma.Web.Streamer do
     {:noreply, topics}
   end
 
+  def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
+    user_topic = "direct:#{participation.user_id}"
+    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+    push_to_socket(topics, user_topic, participation)
+
+    {:noreply, topics}
+  end
+
   def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
     # filter the recipient list if the activity is not public, see #270.
     recipient_lists =
@@ -192,6 +202,19 @@ defmodule Pleroma.Web.Streamer do
     |> Jason.encode!()
   end
 
+  def represent_conversation(%Participation{} = participation) do
+    %{
+      event: "conversation",
+      payload:
+        Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
+          participation: participation,
+          user: participation.user
+        })
+        |> Jason.encode!()
+    }
+    |> Jason.encode!()
+  end
+
   def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
     Enum.each(topics[topic] || [], fn socket ->
       # Get the current user so we have up-to-date blocks etc.
@@ -214,6 +237,12 @@ defmodule Pleroma.Web.Streamer do
     end)
   end
 
+  def push_to_socket(topics, topic, %Participation{} = participation) do
+    Enum.each(topics[topic] || [], fn socket ->
+      send(socket.transport_pid, {:text, represent_conversation(participation)})
+    end)
+  end
+
   def push_to_socket(topics, topic, %Activity{
         data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
       }) do
diff --git a/mix.exs b/mix.exs
index 9ded9931cb4ac2309048a4d54e66eb93230d1cb7..ad8f1123afefbf8b3c8e16502994e15eb44b7565 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -87,7 +87,7 @@ defmodule Pleroma.Mixfile do
       {:bbcode, "~> 0.1"},
       {:ex_machina, "~> 2.3", only: :test},
       {:credo, "~> 0.9.3", only: [:dev, :test]},
-      {:mock, "~> 0.3.1", only: :test},
+      {:mock, "~> 0.3.3", only: :test},
       {:crypt,
        git: "https://github.com/msantos/crypt", ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"},
       {:cors_plug, "~> 1.5"},
index 08221eadcf6f7b38276d3b9fc170355194d19cf1..bb298a68ba55ddd528ebbc97b44f57778f72f929 100644 (file)
--- a/mix.lock
+++ b/mix.lock
@@ -43,7 +43,7 @@
   "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"},
   "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"},
   "mochiweb": {:hex, :mochiweb, "2.15.0", "e1daac474df07651e5d17cc1e642c4069c7850dc4508d3db7263a0651330aacc", [:rebar3], [], "hexpm"},
-  "mock": {:hex, :mock, "0.3.1", "994f00150f79a0ea50dc9d86134cd9ebd0d177ad60bd04d1e46336cdfdb98ff9", [:mix], [{:meck, "~> 0.8.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
+  "mock": {:hex, :mock, "0.3.3", "42a433794b1291a9cf1525c6d26b38e039e0d3a360732b5e467bfc77ef26c914", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
   "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
   "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
   "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
index 763183d6b484806adb0f577e08183e6982482f6b..f3300e7d18a535db0deb78df160197369821ab0d 100644 (file)
@@ -117,4 +117,21 @@ defmodule Pleroma.ConversationTest do
              tridi.id == user_id
            end)
   end
+
+  test "create_or_bump_for returns the conversation with participations" do
+    har = insert(:user)
+    jafnhar = insert(:user, local: false)
+
+    {:ok, activity} =
+      CommonAPI.post(har, %{"status" => "Hey @#{jafnhar.nickname}", "visibility" => "direct"})
+
+    {:ok, conversation} = Conversation.create_or_bump_for(activity)
+
+    assert length(conversation.participations) == 2
+
+    {:ok, activity} =
+      CommonAPI.post(har, %{"status" => "Hey @#{jafnhar.nickname}", "visibility" => "public"})
+
+    assert {:error, _} = Conversation.create_or_bump_for(activity)
+  end
 end
index 047270a2a5adca2d30ad6aa88b05c2ef8cb44260..1e056b7eedac222eef7c9cc3d8ada54381aace3e 100644 (file)
@@ -22,6 +22,28 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
     :ok
   end
 
+  describe "streaming out participations" do
+    test "it streams them out" do
+      user = insert(:user)
+      {:ok, activity} = CommonAPI.post(user, %{"status" => ".", "visibility" => "direct"})
+
+      {:ok, conversation} = Pleroma.Conversation.create_or_bump_for(activity)
+
+      participations =
+        conversation.participations
+        |> Repo.preload(:user)
+
+      with_mock Pleroma.Web.Streamer,
+        stream: fn _, _ -> nil end do
+        ActivityPub.stream_out_participations(conversation.participations)
+
+        Enum.each(participations, fn participation ->
+          assert called(Pleroma.Web.Streamer.stream("participation", participation))
+        end)
+      end
+    end
+  end
+
   describe "fetching restricted by visibility" do
     test "it restricts by the appropriate visibility" do
       user = insert(:user)