Revert "Merge branch 'streamer-refactoring' into 'develop'"
[akkoma] / lib / pleroma / web / streamer.ex
index d233d2a417e8c6ab2ba4781c7e1aecabfe9316fa..587c43f401aa8597efa08b45cd01fa09a9a10478 100644 (file)
@@ -18,7 +18,7 @@ defmodule Pleroma.Web.Streamer do
 
   @keepalive_interval :timer.seconds(30)
 
-  def start_link do
+  def start_link(_) do
     GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
   end
 
@@ -35,28 +35,21 @@ defmodule Pleroma.Web.Streamer do
   end
 
   def init(args) do
-    spawn(fn ->
-      # 30 seconds
-      Process.sleep(@keepalive_interval)
-      GenServer.cast(__MODULE__, %{action: :ping})
-    end)
+    Process.send_after(self(), %{action: :ping}, @keepalive_interval)
 
     {:ok, args}
   end
 
-  def handle_cast(%{action: :ping}, topics) do
-    Map.values(topics)
+  def handle_info(%{action: :ping}, topics) do
+    topics
+    |> Map.values()
     |> List.flatten()
     |> Enum.each(fn socket ->
       Logger.debug("Sending keepalive ping")
       send(socket.transport_pid, {:text, ""})
     end)
 
-    spawn(fn ->
-      # 30 seconds
-      Process.sleep(@keepalive_interval)
-      GenServer.cast(__MODULE__, %{action: :ping})
-    end)
+    Process.send_after(self(), %{action: :ping}, @keepalive_interval)
 
     {:noreply, topics}
   end
@@ -120,8 +113,7 @@ defmodule Pleroma.Web.Streamer do
     |> Map.get("#{topic}:#{item.user_id}", [])
     |> Enum.each(fn socket ->
       with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
-           true <- should_send?(user, item),
-           false <- CommonAPI.thread_muted?(user, item.activity) do
+           true <- should_send?(user, item) do
         send(
           socket.transport_pid,
           {:text, represent_notification(socket.assigns[:user], item)}
@@ -209,7 +201,7 @@ defmodule Pleroma.Web.Streamer do
       payload:
         Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
           participation: participation,
-          user: participation.user
+          for: participation.user
         })
         |> Jason.encode!()
     }
@@ -235,13 +227,16 @@ defmodule Pleroma.Web.Streamer do
     mutes = user.info.mutes || []
     reblog_mutes = user.info.muted_reblogs || []
     domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
-    %{host: host} = URI.parse(parent.data["actor"])
 
     with parent when not is_nil(parent) <- Object.normalize(item),
          true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
          true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
-         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, host),
-         true <- thread_containment(item, user) do
+         %{host: item_host} <- URI.parse(item.actor),
+         %{host: parent_host} <- URI.parse(parent.data["actor"]),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+         true <- thread_containment(item, user),
+         false <- CommonAPI.thread_muted?(user, item) do
       true
     else
       _ -> false