Only add local posts to index in activity_pub
[akkoma] / lib / pleroma / activity.ex
1 # Pleroma: A lightweight social networking server
2 # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
3 # SPDX-License-Identifier: AGPL-3.0-only
4
5 defmodule Pleroma.Activity do
6 use Ecto.Schema
7
8 alias Pleroma.Activity
9 alias Pleroma.Activity.Queries
10 alias Pleroma.Bookmark
11 alias Pleroma.Notification
12 alias Pleroma.Object
13 alias Pleroma.Repo
14 alias Pleroma.ReportNote
15 alias Pleroma.ThreadMute
16 alias Pleroma.User
17 alias Pleroma.Web.ActivityPub.ActivityPub
18
19 import Ecto.Changeset
20 import Ecto.Query
21
22 @type t :: %__MODULE__{}
23 @type actor :: String.t()
24
25 @primary_key {:id, FlakeId.Ecto.CompatType, autogenerate: true}
26
27 @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
28
29 schema "activities" do
30 field(:data, :map)
31 field(:local, :boolean, default: true)
32 field(:actor, :string)
33 field(:recipients, {:array, :string}, default: [])
34 field(:thread_muted?, :boolean, virtual: true)
35
36 # A field that can be used if you need to join some kind of other
37 # id to order / paginate this field by
38 field(:pagination_id, :string, virtual: true)
39
40 # This is a fake relation,
41 # do not use outside of with_preloaded_user_actor/with_joined_user_actor
42 has_one(:user_actor, User, on_delete: :nothing, foreign_key: :id)
43 # This is a fake relation, do not use outside of with_preloaded_bookmark/get_bookmark
44 has_one(:bookmark, Bookmark)
45 # This is a fake relation, do not use outside of with_preloaded_report_notes
46 has_many(:report_notes, ReportNote)
47 has_many(:notifications, Notification, on_delete: :delete_all)
48
49 # Attention: this is a fake relation, don't try to preload it blindly and expect it to work!
50 # The foreign key is embedded in a jsonb field.
51 #
52 # To use it, you probably want to do an inner join and a preload:
53 #
54 # ```
55 # |> join(:inner, [activity], o in Object,
56 # on: fragment("(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
57 # o.data, activity.data, activity.data))
58 # |> preload([activity, object], [object: object])
59 # ```
60 #
61 # As a convenience, Activity.with_preloaded_object() sets up an inner join and preload for the
62 # typical case.
63 has_one(:object, Object, on_delete: :nothing, foreign_key: :id)
64
65 timestamps()
66 end
67
68 def with_joined_object(query, join_type \\ :inner) do
69 join(query, join_type, [activity], o in Object,
70 on:
71 fragment(
72 "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
73 o.data,
74 activity.data,
75 activity.data
76 ),
77 as: :object
78 )
79 end
80
81 def with_preloaded_object(query, join_type \\ :inner) do
82 query
83 |> has_named_binding?(:object)
84 |> if(do: query, else: with_joined_object(query, join_type))
85 |> preload([activity, object: object], object: object)
86 end
87
88 # Note: applies to fake activities (ActivityPub.Utils.get_notified_from_object/1 etc.)
89 def user_actor(%Activity{actor: nil}), do: nil
90
91 def user_actor(%Activity{} = activity) do
92 with %User{} <- activity.user_actor do
93 activity.user_actor
94 else
95 _ -> User.get_cached_by_ap_id(activity.actor)
96 end
97 end
98
99 def with_joined_user_actor(query, join_type \\ :inner) do
100 join(query, join_type, [activity], u in User,
101 on: u.ap_id == activity.actor,
102 as: :user_actor
103 )
104 end
105
106 def with_preloaded_user_actor(query, join_type \\ :inner) do
107 query
108 |> with_joined_user_actor(join_type)
109 |> preload([activity, user_actor: user_actor], user_actor: user_actor)
110 end
111
112 def with_preloaded_bookmark(query, %User{} = user) do
113 from([a] in query,
114 left_join: b in Bookmark,
115 on: b.user_id == ^user.id and b.activity_id == a.id,
116 as: :bookmark,
117 preload: [bookmark: b]
118 )
119 end
120
121 def with_preloaded_bookmark(query, _), do: query
122
123 def with_preloaded_report_notes(query) do
124 from([a] in query,
125 left_join: r in ReportNote,
126 on: a.id == r.activity_id,
127 as: :report_note,
128 preload: [report_notes: r]
129 )
130 end
131
132 def with_preloaded_report_notes(query, _), do: query
133
134 def with_set_thread_muted_field(query, %User{} = user) do
135 from([a] in query,
136 left_join: tm in ThreadMute,
137 on: tm.user_id == ^user.id and tm.context == fragment("?->>'context'", a.data),
138 as: :thread_mute,
139 select: %Activity{a | thread_muted?: not is_nil(tm.id)}
140 )
141 end
142
143 def with_set_thread_muted_field(query, _), do: query
144
145 def get_by_ap_id(ap_id) do
146 ap_id
147 |> Queries.by_ap_id()
148 |> Repo.one()
149 end
150
151 def get_bookmark(%Activity{} = activity, %User{} = user) do
152 if Ecto.assoc_loaded?(activity.bookmark) do
153 activity.bookmark
154 else
155 Bookmark.get(user.id, activity.id)
156 end
157 end
158
159 def get_bookmark(_, _), do: nil
160
161 def get_report(activity_id) do
162 opts = %{
163 type: "Flag",
164 skip_preload: true,
165 preload_report_notes: true
166 }
167
168 ActivityPub.fetch_activities_query([], opts)
169 |> where(id: ^activity_id)
170 |> Repo.one()
171 end
172
173 def change(struct, params \\ %{}) do
174 struct
175 |> cast(params, [:data, :recipients])
176 |> validate_required([:data])
177 |> unique_constraint(:ap_id, name: :activities_unique_apid_index)
178 end
179
180 def get_by_ap_id_with_object(ap_id) do
181 ap_id
182 |> Queries.by_ap_id()
183 |> with_preloaded_object(:left)
184 |> Repo.one()
185 end
186
187 @doc """
188 Gets activity by ID, doesn't load activities from deactivated actors by default.
189 """
190 @spec get_by_id(String.t(), keyword()) :: t() | nil
191 def get_by_id(id, opts \\ [filter: [:restrict_deactivated]]), do: get_by_id_with_opts(id, opts)
192
193 @spec get_by_id_with_user_actor(String.t()) :: t() | nil
194 def get_by_id_with_user_actor(id), do: get_by_id_with_opts(id, preload: [:user_actor])
195
196 @spec get_by_id_with_object(String.t()) :: t() | nil
197 def get_by_id_with_object(id), do: get_by_id_with_opts(id, preload: [:object])
198
199 defp get_by_id_with_opts(id, opts) do
200 if FlakeId.flake_id?(id) do
201 query = Queries.by_id(id)
202
203 with_filters_query =
204 if is_list(opts[:filter]) do
205 Enum.reduce(opts[:filter], query, fn
206 {:type, type}, acc -> Queries.by_type(acc, type)
207 :restrict_deactivated, acc -> restrict_deactivated_users(acc)
208 _, acc -> acc
209 end)
210 else
211 query
212 end
213
214 with_preloads_query =
215 if is_list(opts[:preload]) do
216 Enum.reduce(opts[:preload], with_filters_query, fn
217 :user_actor, acc -> with_preloaded_user_actor(acc)
218 :object, acc -> with_preloaded_object(acc)
219 _, acc -> acc
220 end)
221 else
222 with_filters_query
223 end
224
225 Repo.one(with_preloads_query)
226 end
227 end
228
229 def all_by_ids_with_object(ids) do
230 Activity
231 |> where([a], a.id in ^ids)
232 |> with_preloaded_object()
233 |> Repo.all()
234 end
235
236 @doc """
237 Accepts `ap_id` or list of `ap_id`.
238 Returns a query.
239 """
240 @spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t()
241 def create_by_object_ap_id(ap_id) do
242 ap_id
243 |> Queries.by_object_id()
244 |> Queries.by_type("Create")
245 end
246
247 def get_all_create_by_object_ap_id(ap_id) do
248 ap_id
249 |> create_by_object_ap_id()
250 |> Repo.all()
251 end
252
253 def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
254 create_by_object_ap_id(ap_id)
255 |> restrict_deactivated_users()
256 |> Repo.one()
257 end
258
259 def get_create_by_object_ap_id(_), do: nil
260
261 @doc """
262 Accepts `ap_id` or list of `ap_id`.
263 Returns a query.
264 """
265 @spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t()
266 def create_by_object_ap_id_with_object(ap_id) do
267 ap_id
268 |> create_by_object_ap_id()
269 |> with_preloaded_object()
270 end
271
272 def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
273 ap_id
274 |> create_by_object_ap_id_with_object()
275 |> Repo.one()
276 end
277
278 def get_create_by_object_ap_id_with_object(_), do: nil
279
280 @spec create_by_id_with_object(String.t()) :: t() | nil
281 def create_by_id_with_object(id) do
282 get_by_id_with_opts(id, preload: [:object], filter: [type: "Create"])
283 end
284
285 defp get_in_reply_to_activity_from_object(%Object{data: %{"inReplyTo" => ap_id}}) do
286 get_create_by_object_ap_id_with_object(ap_id)
287 end
288
289 defp get_in_reply_to_activity_from_object(_), do: nil
290
291 def get_in_reply_to_activity(%Activity{} = activity) do
292 get_in_reply_to_activity_from_object(Object.normalize(activity, fetch: false))
293 end
294
295 def normalize(%Activity{data: %{"id" => ap_id}}), do: get_by_ap_id_with_object(ap_id)
296 def normalize(%{"id" => ap_id}), do: get_by_ap_id_with_object(ap_id)
297 def normalize(ap_id) when is_binary(ap_id), do: get_by_ap_id_with_object(ap_id)
298 def normalize(_), do: nil
299
300 def delete_all_by_object_ap_id(id) when is_binary(id) do
301 id
302 |> Queries.by_object_id()
303 |> Queries.exclude_type("Delete")
304 |> select([u], u)
305 |> Repo.delete_all(timeout: :infinity)
306 |> elem(1)
307 |> Enum.find(fn
308 %{data: %{"type" => "Create", "object" => ap_id}} when is_binary(ap_id) -> ap_id == id
309 %{data: %{"type" => "Create", "object" => %{"id" => ap_id}}} -> ap_id == id
310 _ -> nil
311 end)
312 |> purge_web_resp_cache()
313 end
314
315 def delete_all_by_object_ap_id(_), do: nil
316
317 defp purge_web_resp_cache(%Activity{data: %{"id" => id}} = activity) when is_binary(id) do
318 with %{path: path} <- URI.parse(id) do
319 @cachex.del(:web_resp_cache, path)
320 end
321
322 activity
323 end
324
325 defp purge_web_resp_cache(activity), do: activity
326
327 def follow_accepted?(
328 %Activity{data: %{"type" => "Follow", "object" => followed_ap_id}} = activity
329 ) do
330 with %User{} = follower <- Activity.user_actor(activity),
331 %User{} = followed <- User.get_cached_by_ap_id(followed_ap_id) do
332 Pleroma.FollowingRelationship.following?(follower, followed)
333 else
334 _ -> false
335 end
336 end
337
338 def follow_accepted?(_), do: false
339
340 def all_by_actor_and_id(actor, status_ids \\ [])
341 def all_by_actor_and_id(_actor, []), do: []
342
343 def all_by_actor_and_id(actor, status_ids) do
344 Activity
345 |> where([s], s.id in ^status_ids)
346 |> where([s], s.actor == ^actor)
347 |> Repo.all()
348 end
349
350 def follow_requests_for_actor(%User{ap_id: ap_id}) do
351 ap_id
352 |> Queries.by_object_id()
353 |> Queries.by_type("Follow")
354 |> where([a], fragment("? ->> 'state' = 'pending'", a.data))
355 end
356
357 def following_requests_for_actor(%User{ap_id: ap_id}) do
358 Queries.by_type("Follow")
359 |> where([a], fragment("?->>'state' = 'pending'", a.data))
360 |> where([a], a.actor == ^ap_id)
361 |> Repo.all()
362 end
363
364 def restrict_deactivated_users(query) do
365 deactivated_users_query = from(u in User.Query.build(%{deactivated: true}), select: u.ap_id)
366
367 from(activity in query, where: activity.actor not in subquery(deactivated_users_query))
368 end
369
370 defdelegate search(user, query, options \\ []), to: Pleroma.Activity.Search
371
372 def direct_conversation_id(activity, for_user) do
373 alias Pleroma.Conversation.Participation
374
375 with %{data: %{"context" => context}} when is_binary(context) <- activity,
376 %Pleroma.Conversation{} = conversation <- Pleroma.Conversation.get_for_ap_id(context),
377 %Participation{id: participation_id} <-
378 Participation.for_user_and_conversation(for_user, conversation) do
379 participation_id
380 else
381 _ -> nil
382 end
383 end
384
385 @spec get_by_object_ap_id_with_object(String.t()) :: t() | nil
386 def get_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
387 ap_id
388 |> Queries.by_object_id()
389 |> with_preloaded_object()
390 |> first()
391 |> Repo.one()
392 end
393
394 def get_by_object_ap_id_with_object(_), do: nil
395
396 @spec add_by_params_query(String.t(), String.t(), String.t()) :: Ecto.Query.t()
397 def add_by_params_query(object_id, actor, target) do
398 object_id
399 |> Queries.by_object_id()
400 |> Queries.by_type("Add")
401 |> Queries.by_actor(actor)
402 |> where([a], fragment("?->>'target' = ?", a.data, ^target))
403 end
404 end