defmodule Pleroma.Conversation do
alias Pleroma.Repo
alias Pleroma.Conversation.Participation
+ alias Pleroma.User
use Ecto.Schema
import Ecto.Changeset
schema "conversations" do
+ # This is the context ap id.
field(:ap_id, :string)
has_many(:participations, Participation)
def create_for_ap_id(ap_id) do
%__MODULE__{}
|> creation_cng(%{ap_id: ap_id})
- |> Repo.insert()
+ |> Repo.insert(
+ on_conflict: [set: [updated_at: NaiveDateTime.utc_now()]],
+ returning: true,
+ conflict_target: :ap_id
+ )
+ end
+
+ def get_for_ap_id(ap_id) do
+ Repo.get_by(__MODULE__, ap_id: ap_id)
+ end
+
+ @doc """
+ This will
+ 1. Create a conversation if there isn't one already
+ 2. Create a participation for all the people involved who don't have one already
+ 3. Bump all relevant participations to 'unread'
+ """
+ def create_or_bump_for(activity) do
+ with true <- Pleroma.Web.ActivityPub.Visibility.is_direct?(activity),
+ "Create" <- activity.data["type"],
+ "Note" <- activity.data["object"]["type"],
+ ap_id when is_binary(ap_id) <- activity.data["object"]["context"] do
+ {:ok, conversation} = create_for_ap_id(ap_id)
+
+ local_users = User.get_users_from_set(activity.recipients, true)
+
+ participations =
+ Enum.map(local_users, fn user ->
+ {:ok, participation} =
+ Participation.create_for_user_and_conversation(user, conversation)
+
+ participation
+ end)
+
+ %{
+ conversation
+ | participations: participations
+ }
+ end
end
end
def create_for_user_and_conversation(user, conversation) do
%__MODULE__{}
|> creation_cng(%{user_id: user.id, conversation_id: conversation.id})
- |> Repo.insert()
+ |> Repo.insert(
+ on_conflict: [set: [read: false, updated_at: NaiveDateTime.utc_now()]],
+ returning: true,
+ conflict_target: [:user_id, :conversation_id]
+ )
end
def read_cng(struct, params) do
defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity
+ alias Pleroma.Conversation
alias Pleroma.Instances
alias Pleroma.Notification
alias Pleroma.Object
end)
Notification.create_notifications(activity)
+ Conversation.create_or_bump_for(activity)
stream_out(activity)
{:ok, activity}
else
timestamps()
end
- create index(:conversation_participations, [:user_id])
create index(:conversation_participations, [:conversation_id])
+ create unique_index(:conversation_participations, [:user_id, :conversation_id])
create unique_index(:conversations, [:ap_id])
end
end
defmodule Pleroma.Conversation.ParticipationTest do
use Pleroma.DataCase
-
import Pleroma.Factory
-
alias Pleroma.Conversation.Participation
test "it creates a participation for a conversation and a user" do
assert participation.user_id == user.id
assert participation.conversation_id == conversation.id
+
+ :timer.sleep(1000)
+ # Creating again returns the same participation
+ {:ok, %Participation{} = participation_two} =
+ Participation.create_for_user_and_conversation(user, conversation)
+
+ assert participation.id == participation_two.id
+ refute participation.updated_at == participation_two.updated_at
+ end
+
+ test "recreating an existing participations sets it to unread" do
+ participation = insert(:participation, %{read: true})
+
+ {:ok, participation} =
+ Participation.create_for_user_and_conversation(
+ participation.user,
+ participation.conversation
+ )
+
+ refute participation.read
end
test "it marks a participation as read" do
defmodule Pleroma.ConversationTest do
use Pleroma.DataCase
alias Pleroma.Conversation
+ alias Pleroma.Web.CommonAPI
+
+ import Pleroma.Factory
test "it creates a conversation for given ap_id" do
- assert {:ok, %Conversation{}} = Conversation.create_for_ap_id("https://some_ap_id")
+ assert {:ok, %Conversation{} = conversation} =
+ Conversation.create_for_ap_id("https://some_ap_id")
+
+ # Inserting again returns the same
+ assert {:ok, conversation_two} = Conversation.create_for_ap_id("https://some_ap_id")
+ assert conversation_two.id == conversation.id
+ end
+
+ test "public posts don't create conversations" do
+ user = insert(:user)
+ {:ok, activity} = CommonAPI.post(user, %{"status" => "Hey"})
+
+ context = activity.data["object"]["context"]
+
+ conversation = Conversation.get_for_ap_id(context)
+
+ refute conversation
+ end
+
+ test "it creates or updates a conversation and participations for a given DM" do
+ har = insert(:user)
+ jafnhar = insert(:user)
+ tridi = insert(:user)
+
+ {:ok, activity} =
+ CommonAPI.post(har, %{"status" => "Hey @#{jafnhar.nickname}", "visibility" => "direct"})
+
+ context = activity.data["object"]["context"]
+
+ conversation =
+ Conversation.get_for_ap_id(context)
+ |> Repo.preload(:participations)
+
+ assert conversation
+ [har_participation, jafnhar_participation] = conversation.participations
+
+ assert har_participation.user_id == har.id
+ assert jafnhar_participation.user_id == jafnhar.id
+
+ {:ok, activity} =
+ CommonAPI.post(jafnhar, %{
+ "status" => "Hey @#{har.nickname}",
+ "visibility" => "direct",
+ "in_reply_to_status_id" => activity.id
+ })
+
+ context = activity.data["object"]["context"]
+
+ conversation_two =
+ Conversation.get_for_ap_id(context)
+ |> Repo.preload(:participations)
+
+ assert conversation_two.id == conversation.id
+
+ [har_participation_two, jafnhar_participation_two] = conversation_two.participations
+
+ assert har_participation_two.user_id == har.id
+ assert jafnhar_participation_two.user_id == jafnhar.id
+
+ {:ok, activity} =
+ CommonAPI.post(tridi, %{
+ "status" => "Hey @#{har.nickname}",
+ "visibility" => "direct",
+ "in_reply_to_status_id" => activity.id
+ })
+
+ context = activity.data["object"]["context"]
+
+ conversation_three =
+ Conversation.get_for_ap_id(context)
+ |> Repo.preload(:participations)
+
+ assert conversation_three.id == conversation.id
+
+ [har_participation_three, jafnhar_participation_three, tridi_participation] =
+ conversation_three.participations
+
+ assert har_participation_three.user_id == har.id
+ assert jafnhar_participation_three.user_id == jafnhar.id
+ assert tridi_participation.user_id == tridi.id
end
end