Salmon: Take both versions of public keys.
[akkoma] / lib / pleroma / web / streamer.ex
index 3a7b9174322a72c8b68d720dd69830ac852e69ab..d64e6c393b0ddb71428a5027541d6b5808e32227 100644 (file)
@@ -1,7 +1,7 @@
 defmodule Pleroma.Web.Streamer do
   use GenServer
   require Logger
-  import Plug.Conn
+  alias Pleroma.{User, Notification}
 
   def start_link do
     spawn(fn ->
@@ -37,13 +37,12 @@ defmodule Pleroma.Web.Streamer do
     {:noreply, topics}
   end
 
-  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
-    Logger.debug("Trying to push to #{topic}")
-    Logger.debug("Pushing item to #{topic}")
+  def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
+    topic = "user:#{item.user_id}"
     Enum.each(topics[topic] || [], fn (socket) ->
       json = %{
-        event: "update",
-        payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item) |> Poison.encode!
+        event: "notification",
+        payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
       } |> Poison.encode!
 
       send socket.transport_pid, {:text, json}
@@ -51,7 +50,26 @@ defmodule Pleroma.Web.Streamer do
     {:noreply, topics}
   end
 
+  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
+    Logger.debug("Trying to push to users")
+    recipient_topics = User.get_recipients_from_activity(item)
+    |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)
+
+    Enum.each(recipient_topics, fn (topic) ->
+      push_to_socket(topics, topic, item)
+    end)
+    {:noreply, topics}
+  end
+
+  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
+    Logger.debug("Trying to push to #{topic}")
+    Logger.debug("Pushing item to #{topic}")
+    push_to_socket(topics, topic, item)
+    {:noreply, topics}
+  end
+
   def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
     sockets_for_topic = sockets[topic] || []
     sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
     sockets = Map.put(sockets, topic, sockets_for_topic)
@@ -61,6 +79,7 @@ defmodule Pleroma.Web.Streamer do
   end
 
   def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
+    topic = internal_topic(topic, socket)
     sockets_for_topic = sockets[topic] || []
     sockets_for_topic = List.delete(sockets_for_topic, socket)
     sockets = Map.put(sockets, topic, sockets_for_topic)
@@ -73,4 +92,21 @@ defmodule Pleroma.Web.Streamer do
     IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
     {:noreply, state}
   end
+
+  def push_to_socket(topics, topic, item) do
+    Enum.each(topics[topic] || [], fn (socket) ->
+      json = %{
+        event: "update",
+        payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item, for: socket.assigns[:user]) |> Poison.encode!
+      } |> Poison.encode!
+
+      send socket.transport_pid, {:text, json}
+    end)
+  end
+
+  defp internal_topic("user", socket) do
+    "user:#{socket.assigns[:user].id}"
+  end
+
+  defp internal_topic(topic, _), do: topic
 end