+ def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
+ filtered_by_user?(user, activity)
+ end
+
+ defp do_stream("direct", item) do
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+ Enum.each(recipient_topics, fn user_topic ->
+ Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+ push_to_socket(user_topic, item)
+ end)
+ end
+
+ defp do_stream("participation", participation) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(user_topic, participation)
+ end
+
+ defp do_stream("list", item) do
+ # filter the recipient list if the activity is not public, see #270.
+ recipient_lists =
+ case Visibility.is_public?(item) do
+ true ->
+ Pleroma.List.get_lists_from_activity(item)
+
+ _ ->
+ Pleroma.List.get_lists_from_activity(item)
+ |> Enum.filter(fn list ->
+ owner = User.get_cached_by_id(list.user_id)
+
+ Visibility.visible_for_user?(item, owner)
+ end)
+ end
+
+ recipient_topics =
+ recipient_lists
+ |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+ Enum.each(recipient_topics, fn list_topic ->
+ Logger.debug("Trying to push message to #{list_topic}\n\n")
+ push_to_socket(list_topic, item)
+ end)
+ end
+
+ defp do_stream(topic, %Notification{} = item)
+ when topic in ["user", "user:notification"] do
+ Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:render_with_user, StreamerView, "notification.json", item})
+ end)
+ end)
+ end
+
+ defp do_stream("user", item) do
+ Logger.debug("Trying to push to users")
+
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+ Enum.each(recipient_topics, fn topic ->
+ push_to_socket(topic, item)
+ end)
+ end
+
+ defp do_stream(topic, item) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ push_to_socket(topic, item)
+ end
+
+ defp push_to_socket(topic, %Participation{} = participation) do
+ rendered = StreamerView.render("conversation.json", participation)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
+ end)
+ end
+
+ defp push_to_socket(topic, %Activity{
+ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+ }) do
+ rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
+ end)
+ end
+
+ defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+ defp push_to_socket(topic, item) do
+ anon_render = StreamerView.render("update.json", item)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, auth?} ->
+ if auth? do
+ send(pid, {:render_with_user, StreamerView, "update.json", item})
+ else
+ send(pid, {:text, anon_render})
+ end
+ end)
+ end)
+ end
+
+ defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
+
+ defp thread_containment(activity, user) do
+ if Config.get([:instance, :skip_thread_containment]) do
+ true
+ else
+ ActivityPub.contain_activity(activity, user)
+ end
+ end
+
+ # In test environement, only return true if the registry is started.
+ # In benchmark environment, returns false.
+ # In any other environment, always returns true.
+ cond do
+ @mix_env == :test ->
+ def should_env_send? do
+ case Process.whereis(@registry) do
+ nil ->
+ false
+
+ pid ->
+ Process.alive?(pid)
+ end
+ end
+
+ @mix_env == :benchmark ->
+ def should_env_send?, do: false
+
+ true ->
+ def should_env_send?, do: true