MastoAPI: Implement all streaming functions.
authorRoger Braun <rbraun@Bobble.local>
Thu, 16 Nov 2017 15:49:51 +0000 (16:49 +0100)
committerRoger Braun <rbraun@Bobble.local>
Thu, 16 Nov 2017 15:49:51 +0000 (16:49 +0100)
lib/pleroma/notification.ex
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/activity_pub.ex
lib/pleroma/web/mastodon_api/mastodon_api_controller.ex
lib/pleroma/web/mastodon_api/mastodon_socket.ex
lib/pleroma/web/streamer.ex
test/user_test.exs

index 039cc731214d3872243cffcbf1f22ecdb7299d38..65e3265d4a175b2926e8bb27e247d95d4256d9c7 100644 (file)
@@ -78,8 +78,9 @@ defmodule Pleroma.Notification do
   # TODO move to sql, too.
   def create_notification(%Activity{} = activity, %User{} = user) do
     unless User.blocks?(user, %{ap_id: activity.data["actor"]}) do
-      notification = %Notification{user_id: user.id, activity_id: activity.id}
+      notification = %Notification{user_id: user.id, activity: activity}
       {:ok, notification} = Repo.insert(notification)
+      Pleroma.Web.Streamer.stream("user", notification)
       notification
     end
   end
index 771c54e813e51f73bce54d0acde84d379698393a..56502e8973210d3d23f358e43bcdf491655e4251 100644 (file)
@@ -284,6 +284,17 @@ defmodule Pleroma.User do
     Repo.all(query)
   end
 
+  def get_recipients_from_activity(%Activity{data: %{"to" => to}} = activity) do
+    query = from u in User,
+      where: u.local == true
+
+    query = from u in query,
+      where: u.ap_id in ^to,
+      or_where: fragment("? \\\?| ?", u.following, ^to)
+
+    Repo.all(query)
+  end
+
   def search(query, resolve) do
     if resolve do
       User.get_or_fetch_by_nickname(query)
index 5cbf14868e2c39f71b386fc9fe35a8656f43753a..b4e59050b480d1a774906558765cb01a14196051 100644 (file)
@@ -24,6 +24,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
          :ok <- maybe_federate(activity) do
       if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
         Pleroma.Web.Streamer.stream("public", activity)
+        Pleroma.Web.Streamer.stream("user", activity)
         if local do
           Pleroma.Web.Streamer.stream("public:local", activity)
         end
index 8b5714555bc054b81d81a9686a16c2704f65f491..bbd003b06420d42a2fef5554774f8e4eb811d151 100644 (file)
@@ -595,7 +595,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
     json(conn, [])
   end
 
-  defp render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
+  def render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
     actor = User.get_cached_by_ap_id(activity.data["actor"])
     created_at = NaiveDateTime.to_iso8601(created_at)
     |> String.replace(~r/(\.\d+)?$/, ".000Z", global: false)
index af76c87015cb687b5d5e8fc0684c60d9ca1e3c1b..1d276e64abfa2409cf2a0911d888179c5d9ed7c1 100644 (file)
@@ -11,7 +11,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
     with token when not is_nil(token) <- params["access_token"],
          %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
          %User{} = user <- Repo.get(User, user_id),
-         stream when stream in ["public", "public:local"] <- params["stream"] do
+         stream when stream in ["public", "public:local", "user"] <- params["stream"] do
       socket = socket
       |> assign(:topic, params["stream"])
       |> assign(:user, user)
index 3b2938676721b9aba90e2b87df0d1e9762b91e62..9f10800154cd24b46c9f5ebc74889f12e79b06a1 100644 (file)
@@ -2,6 +2,7 @@ defmodule Pleroma.Web.Streamer do
   use GenServer
   require Logger
   import Plug.Conn
+  alias Pleroma.{User, Notification}
 
   def start_link do
     spawn(fn ->
@@ -37,9 +38,7 @@ defmodule Pleroma.Web.Streamer do
     {:noreply, topics}
   end
 
-  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
-    Logger.debug("Trying to push to #{topic}")
-    Logger.debug("Pushing item to #{topic}")
+  def push_to_socket(topics, topic, item) do
     Enum.each(topics[topic] || [], fn (socket) ->
       json = %{
         event: "update",
@@ -48,10 +47,46 @@ defmodule Pleroma.Web.Streamer do
 
       send socket.transport_pid, {:text, json}
     end)
+  end
+
+  def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
+    topic = "user:#{item.user_id}"
+    Enum.each(topics[topic] || [], fn (socket) ->
+      json = %{
+        event: "notification",
+        payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
+      } |> Poison.encode!
+
+      send socket.transport_pid, {:text, json}
+    end)
+    {:noreply, topics}
+  end
+
+  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
+    Logger.debug("Trying to push to users")
+    recipient_topics = User.get_recipients_from_activity(item)
+    |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)
+
+    Enum.each(recipient_topics, fn (topic) ->
+      push_to_socket(topics, topic, item)
+    end)
+    {:noreply, topics}
+  end
+
+  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+    Logger.debug("Trying to push to #{topic}")
+    Logger.debug("Pushing item to #{topic}")
+    push_to_socket(topics, topic, item)
     {:noreply, topics}
   end
 
+  defp internal_topic("user", socket) do
+    "user:#{socket.assigns[:user].id}"
+  end
+  defp internal_topic(topic, socket), do: topic
+
   def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
     sockets_for_topic = sockets[topic] || []
     sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
     sockets = Map.put(sockets, topic, sockets_for_topic)
@@ -61,6 +96,7 @@ defmodule Pleroma.Web.Streamer do
   end
 
   def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
     sockets_for_topic = sockets[topic] || []
     sockets_for_topic = List.delete(sockets_for_topic, socket)
     sockets = Map.put(sockets, topic, sockets_for_topic)
index 151b9afc04d482c732ef208b7be37e49db5e8e8c..430f56846c920b8de84f7c37a185cebc60ee4c60 100644 (file)
@@ -3,6 +3,7 @@ defmodule Pleroma.UserTest do
   alias Pleroma.{User, Repo}
   alias Pleroma.Web.OStatus
   alias Pleroma.Web.Websub.WebsubClientSubscription
+  alias Pleroma.Web.CommonAPI
   use Pleroma.DataCase
 
   import Pleroma.Factory
@@ -296,5 +297,22 @@ defmodule Pleroma.UserTest do
       refute User.blocks?(user, blocked_user)
     end
   end
+
+  test "get recipients from activity" do
+    actor = insert(:user)
+    user = insert(:user, local: true)
+    user_two = insert(:user, local: false)
+    addressed = insert(:user, local: true)
+    addressed_remote = insert(:user, local: false)
+    {:ok, activity} = CommonAPI.post(actor, %{"status" => "hey @#{addressed.nickname} @#{addressed_remote.nickname}"})
+
+    assert [addressed] == User.get_recipients_from_activity(activity)
+
+    {:ok, user} = User.follow(user, actor)
+    recipients = User.get_recipients_from_activity(activity)
+    assert length(recipients) == 2
+    assert user in recipients
+    assert addressed in recipients
+  end
 end