local: local,
recipients: recipients,
actor: object["actor"]
- }) do
+ }),
+ # TODO: add tests for expired activities, when Note type will be supported in new pipeline
+ {:ok, _} <- maybe_create_activity_expiration(activity) do
{:ok, activity, meta}
end
end
end
defp insert_activity_with_expiration(data, local, recipients) do
- %Activity{
+ struct = %Activity{
data: data,
local: local,
actor: data["actor"],
recipients: recipients
}
- |> Repo.insert()
- |> maybe_create_activity_expiration()
+
+ with {:ok, activity} <- Repo.insert(struct) do
+ maybe_create_activity_expiration(activity)
+ end
end
def notify_and_stream(activity) do
stream_out_participations(participations)
end
- defp maybe_create_activity_expiration({:ok, %{data: %{"expires_at" => expires_at}} = activity}) do
+ defp maybe_create_activity_expiration(
+ %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
+ ) do
with {:ok, _job} <-
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
activity_id: activity.id,
end
end
- defp maybe_create_activity_expiration(result), do: result
+ defp maybe_create_activity_expiration(activity), do: {:ok, activity}
defp create_or_bump_conversation(activity, actor) do
with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
Object.increase_replies_count(in_reply_to)
end
- if expires_at = activity.data["expires_at"] do
- Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
- activity_id: activity.id,
- expires_at: expires_at
- })
- end
-
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
meta =