- Admin API: Return users' tags when querying reports
- Admin API: Return avatar and display name when querying users
- Mastodon API, streaming: Add support for passing the token in the `Sec-WebSocket-Protocol` header
+- Added synchronization of following/followers counters for external users
### Fixed
- Not being able to pin unlisted posts
remote_post_retention_days: 90,
skip_thread_containment: true,
limit_to_local_content: :unauthenticated,
- dynamic_configuration: false
+ dynamic_configuration: false,
+ external_user_synchronization: [
+ enabled: false,
+ # every 2 hours
+ interval: 60 * 60 * 2,
+ max_retries: 3,
+ limit: 500
+ ]
config :pleroma, :markup,
# XXX - unfortunately, inline images must be enabled by default right now, because
* `skip_thread_containment`: Skip filter out broken threads. The default is `false`.
* `limit_to_local_content`: Limit unauthenticated users to search for local statutes and users only. Possible values: `:unauthenticated`, `:all` and `false`. The default is `:unauthenticated`.
* `dynamic_configuration`: Allow transferring configuration to DB with the subsequent customization from Admin api.
+* `external_user_synchronization`: Following/followers counters synchronization settings.
+ * `enabled`: Enables synchronization
+ * `interval`: Interval between synchronization.
+ * `max_retries`: Max rettries for host. After exceeding the limit, the check will not be carried out for users from this host.
+ * `limit`: Users batch size for processing in one time.
+
## :logger
start: {Pleroma.Web.Endpoint, :start_link, []},
type: :supervisor
},
- %{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}}
+ %{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}},
+ %{
+ id: Pleroma.User.SynchronizationWorker,
+ start: {Pleroma.User.SynchronizationWorker, :start_link, []}
+ }
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
def ap_followers(%User{follower_address: fa}) when is_binary(fa), do: fa
def ap_followers(%User{} = user), do: "#{ap_id(user)}/followers"
- def user_info(%User{} = user) do
+ def user_info(%User{} = user, args \\ %{}) do
+ following_count =
+ if args[:following_count], do: args[:following_count], else: following_count(user)
+
+ follower_count =
+ if args[:follower_count], do: args[:follower_count], else: user.info.follower_count
+
%{
- following_count: following_count(user),
note_count: user.info.note_count,
- follower_count: user.info.follower_count,
locked: user.info.locked,
confirmation_pending: user.info.confirmation_pending,
default_scope: user.info.default_scope
}
+ |> Map.put(:following_count, following_count)
+ |> Map.put(:follower_count, follower_count)
+ end
+
+ def set_info_cache(user, args) do
+ Cachex.put(:user_cache, "user_info:#{user.id}", user_info(user, args))
end
def restrict_deactivated(query) do
)
end
+ @spec sync_follow_counter() :: :ok
+ def sync_follow_counter,
+ do: PleromaJobQueue.enqueue(:background, __MODULE__, [:sync_follow_counters])
+
+ @spec perform(:sync_follow_counters) :: :ok
+ def perform(:sync_follow_counters) do
+ {:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors)
+ config = Pleroma.Config.get([:instance, :external_user_synchronization])
+
+ :ok = sync_follow_counters(config)
+ Agent.stop(:domain_errors)
+ end
+
+ @spec sync_follow_counters(keyword()) :: :ok
+ def sync_follow_counters(opts \\ []) do
+ users = external_users(opts)
+
+ if length(users) > 0 do
+ errors = Agent.get(:domain_errors, fn state -> state end)
+ {last, updated_errors} = User.Synchronization.call(users, errors, opts)
+ Agent.update(:domain_errors, fn _state -> updated_errors end)
+ sync_follow_counters(max_id: last.id, limit: opts[:limit])
+ else
+ :ok
+ end
+ end
+
+ @spec external_users(keyword()) :: [User.t()]
+ def external_users(opts \\ []) do
+ query =
+ User.Query.build(%{
+ external: true,
+ active: true,
+ order_by: :id,
+ select: [:id, :ap_id, :info]
+ })
+
+ query =
+ if opts[:max_id],
+ do: where(query, [u], u.id > ^opts[:max_id]),
+ else: query
+
+ query =
+ if opts[:limit],
+ do: limit(query, ^opts[:limit]),
+ else: query
+
+ Repo.all(query)
+ end
+
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers),
do:
PleromaJobQueue.enqueue(:background, __MODULE__, [
User query builder module. Builds query from new query or another user query.
## Example:
- query = Pleroma.User.Query(%{nickname: "nickname"})
+ query = Pleroma.User.Query.build(%{nickname: "nickname"})
another_query = Pleroma.User.Query.build(query, %{email: "email@example.com"})
Pleroma.Repo.all(query)
Pleroma.Repo.all(another_query)
friends: User.t(),
recipients_from_activity: [String.t()],
nickname: [String.t()],
- ap_id: [String.t()]
+ ap_id: [String.t()],
+ order_by: term(),
+ select: term(),
+ limit: pos_integer()
}
| %{}
where(query, [u], u.ap_id in ^to or fragment("? && ?", u.following, ^to))
end
+ defp compose_query({:order_by, key}, query) do
+ order_by(query, [u], field(u, ^key))
+ end
+
+ defp compose_query({:select, keys}, query) do
+ select(query, [u], ^keys)
+ end
+
+ defp compose_query({:limit, limit}, query) do
+ limit(query, ^limit)
+ end
+
defp compose_query(_unsupported_param, query), do: query
defp prepare_tag_criteria(tag, query) do
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.User.Synchronization do
+ alias Pleroma.HTTP
+ alias Pleroma.User
+
+ @spec call([User.t()], map(), keyword()) :: {User.t(), map()}
+ def call(users, errors, opts \\ []) do
+ do_call(users, errors, opts)
+ end
+
+ defp do_call([user | []], errors, opts) do
+ updated = fetch_counters(user, errors, opts)
+ {user, updated}
+ end
+
+ defp do_call([user | others], errors, opts) do
+ updated = fetch_counters(user, errors, opts)
+ do_call(others, updated, opts)
+ end
+
+ defp fetch_counters(user, errors, opts) do
+ %{host: host} = URI.parse(user.ap_id)
+
+ info = %{}
+ {following, errors} = fetch_counter(user.ap_id <> "/following", host, errors, opts)
+ info = if following, do: Map.put(info, :following_count, following), else: info
+
+ {followers, errors} = fetch_counter(user.ap_id <> "/followers", host, errors, opts)
+ info = if followers, do: Map.put(info, :follower_count, followers), else: info
+
+ User.set_info_cache(user, info)
+ errors
+ end
+
+ defp available_domain?(domain, errors, opts) do
+ max_retries = Keyword.get(opts, :max_retries, 3)
+ not (Map.has_key?(errors, domain) && errors[domain] >= max_retries)
+ end
+
+ defp fetch_counter(url, host, errors, opts) do
+ with true <- available_domain?(host, errors, opts),
+ {:ok, %{body: body, status: code}} when code in 200..299 <-
+ HTTP.get(
+ url,
+ [{:Accept, "application/activity+json"}]
+ ),
+ {:ok, data} <- Jason.decode(body) do
+ {data["totalItems"], errors}
+ else
+ false ->
+ {nil, errors}
+
+ _ ->
+ {nil, Map.update(errors, host, 1, &(&1 + 1))}
+ end
+ end
+end
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-onl
+
+defmodule Pleroma.User.SynchronizationWorker do
+ use GenServer
+
+ def start_link do
+ config = Pleroma.Config.get([:instance, :external_user_synchronization])
+
+ if config[:enabled] do
+ GenServer.start_link(__MODULE__, interval: config[:interval])
+ else
+ :ignore
+ end
+ end
+
+ def init(opts) do
+ schedule_next(opts)
+ {:ok, opts}
+ end
+
+ def handle_info(:sync_follow_counters, opts) do
+ Pleroma.User.sync_follow_counter()
+ schedule_next(opts)
+ {:noreply, opts}
+ end
+
+ defp schedule_next(opts) do
+ Process.send_after(self(), :sync_follow_counters, opts[:interval])
+ end
+end
--- /dev/null
+{
+ "@context": "https://www.w3.org/ns/activitystreams",
+ "id": "http://localhost:4001/users/masto_closed/followers",
+ "type": "OrderedCollection",
+ "totalItems": 437,
+ "first": "http://localhost:4001/users/masto_closed/followers?page=1"
+}
--- /dev/null
+{
+ "@context": "https://www.w3.org/ns/activitystreams",
+ "id": "http://localhost:4001/users/masto_closed/following",
+ "type": "OrderedCollection",
+ "totalItems": 152,
+ "first": "http://localhost:4001/users/masto_closed/following?page=1"
+}
--- /dev/null
+{
+ "type": "OrderedCollection",
+ "totalItems": 527,
+ "id": "http://localhost:4001/users/fuser2/followers",
+ "first": {
+ "type": "OrderedCollectionPage",
+ "totalItems": 527,
+ "partOf": "http://localhost:4001/users/fuser2/followers",
+ "orderedItems": [],
+ "next": "http://localhost:4001/users/fuser2/followers?page=2",
+ "id": "http://localhost:4001/users/fuser2/followers?page=1"
+ },
+ "@context": [
+ "https://www.w3.org/ns/activitystreams",
+ "http://localhost:4001/schemas/litepub-0.1.jsonld",
+ {
+ "@language": "und"
+ }
+ ]
+}
--- /dev/null
+{
+ "type": "OrderedCollection",
+ "totalItems": 267,
+ "id": "http://localhost:4001/users/fuser2/following",
+ "first": {
+ "type": "OrderedCollectionPage",
+ "totalItems": 267,
+ "partOf": "http://localhost:4001/users/fuser2/following",
+ "orderedItems": [],
+ "next": "http://localhost:4001/users/fuser2/following?page=2",
+ "id": "http://localhost:4001/users/fuser2/following?page=1"
+ },
+ "@context": [
+ "https://www.w3.org/ns/activitystreams",
+ "http://localhost:4001/schemas/litepub-0.1.jsonld",
+ {
+ "@language": "und"
+ }
+ ]
+}
{:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/rich_media/ogp.html")}}
end
+ def get("http://localhost:4001/users/masto_closed/followers", _, _, _) do
+ {:ok,
+ %Tesla.Env{
+ status: 200,
+ body: File.read!("test/fixtures/users_mock/masto_closed_followers.json")
+ }}
+ end
+
+ def get("http://localhost:4001/users/masto_closed/following", _, _, _) do
+ {:ok,
+ %Tesla.Env{
+ status: 200,
+ body: File.read!("test/fixtures/users_mock/masto_closed_following.json")
+ }}
+ end
+
+ def get("http://localhost:4001/users/fuser2/followers", _, _, _) do
+ {:ok,
+ %Tesla.Env{
+ status: 200,
+ body: File.read!("test/fixtures/users_mock/pleroma_followers.json")
+ }}
+ end
+
+ def get("http://localhost:4001/users/fuser2/following", _, _, _) do
+ {:ok,
+ %Tesla.Env{
+ status: 200,
+ body: File.read!("test/fixtures/users_mock/pleroma_following.json")
+ }}
+ end
+
+ def get("http://domain-with-errors:4001/users/fuser1/followers", _, _, _) do
+ {:ok,
+ %Tesla.Env{
+ status: 504,
+ body: ""
+ }}
+ end
+
+ def get("http://domain-with-errors:4001/users/fuser1/following", _, _, _) do
+ {:ok,
+ %Tesla.Env{
+ status: 504,
+ body: ""
+ }}
+ end
+
def get("http://example.com/ogp-missing-data", _, _, _) do
{:ok,
%Tesla.Env{
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.User.SynchronizationTest do
+ use Pleroma.DataCase
+ import Pleroma.Factory
+ alias Pleroma.User
+ alias Pleroma.User.Synchronization
+
+ setup do
+ Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end)
+ :ok
+ end
+
+ test "update following/followers counters" do
+ user1 =
+ insert(:user,
+ local: false,
+ ap_id: "http://localhost:4001/users/masto_closed"
+ )
+
+ user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2")
+
+ users = User.external_users()
+ assert length(users) == 2
+ {user, %{}} = Synchronization.call(users, %{})
+ assert user == List.last(users)
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
+ assert followers == 437
+ assert following == 152
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
+
+ assert followers == 527
+ assert following == 267
+ end
+
+ test "don't check host if errors exist" do
+ user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1")
+
+ user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2")
+
+ users = User.external_users()
+ assert length(users) == 2
+
+ {user, %{"domain-with-errors" => 2}} =
+ Synchronization.call(users, %{"domain-with-errors" => 2}, max_retries: 2)
+
+ assert user == List.last(users)
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
+ assert followers == 0
+ assert following == 0
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
+
+ assert followers == 0
+ assert following == 0
+ end
+
+ test "don't check host if errors appeared" do
+ user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1")
+
+ user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2")
+
+ users = User.external_users()
+ assert length(users) == 2
+
+ {user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2)
+
+ assert user == List.last(users)
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
+ assert followers == 0
+ assert following == 0
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
+
+ assert followers == 0
+ assert following == 0
+ end
+
+ test "other users after error appeared" do
+ user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1")
+ user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2")
+
+ users = User.external_users()
+ assert length(users) == 2
+
+ {user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2)
+ assert user == List.last(users)
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
+ assert followers == 0
+ assert following == 0
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
+
+ assert followers == 527
+ assert following == 267
+ end
+end
--- /dev/null
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.User.SynchronizationWorkerTest do
+ use Pleroma.DataCase
+ import Pleroma.Factory
+
+ setup do
+ Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
+
+ config = Pleroma.Config.get([:instance, :external_user_synchronization])
+
+ for_update = [enabled: true, interval: 1000]
+
+ Pleroma.Config.put([:instance, :external_user_synchronization], for_update)
+
+ on_exit(fn ->
+ Pleroma.Config.put([:instance, :external_user_synchronization], config)
+ end)
+
+ :ok
+ end
+
+ test "sync follow counters" do
+ user1 =
+ insert(:user,
+ local: false,
+ ap_id: "http://localhost:4001/users/masto_closed"
+ )
+
+ user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2")
+
+ {:ok, _} = Pleroma.User.SynchronizationWorker.start_link()
+ :timer.sleep(1500)
+
+ %{follower_count: followers, following_count: following} =
+ Pleroma.User.get_cached_user_info(user1)
+
+ assert followers == 437
+ assert following == 152
+
+ %{follower_count: followers, following_count: following} =
+ Pleroma.User.get_cached_user_info(user2)
+
+ assert followers == 527
+ assert following == 267
+ end
+end
assert user_two.ap_id in ap_ids
end
end
+
+ describe "sync followers count" do
+ setup do
+ user1 = insert(:user, local: false, ap_id: "http://localhost:4001/users/masto_closed")
+ user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2")
+ insert(:user, local: true)
+ insert(:user, local: false, info: %{deactivated: true})
+ {:ok, user1: user1, user2: user2}
+ end
+
+ test "external_users/1 external active users with limit", %{user1: user1, user2: user2} do
+ [fdb_user1] = User.external_users(limit: 1)
+
+ assert fdb_user1.ap_id
+ assert fdb_user1.ap_id == user1.ap_id
+ assert fdb_user1.id == user1.id
+
+ [fdb_user2] = User.external_users(max_id: fdb_user1.id, limit: 1)
+
+ assert fdb_user2.ap_id
+ assert fdb_user2.ap_id == user2.ap_id
+ assert fdb_user2.id == user2.id
+
+ assert User.external_users(max_id: fdb_user2.id, limit: 1) == []
+ end
+
+ test "sync_follow_counters/1", %{user1: user1, user2: user2} do
+ {:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors)
+
+ :ok = User.sync_follow_counters()
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
+ assert followers == 437
+ assert following == 152
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
+
+ assert followers == 527
+ assert following == 267
+
+ Agent.stop(:domain_errors)
+ end
+
+ test "sync_follow_counters/1 in separate batches", %{user1: user1, user2: user2} do
+ {:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors)
+
+ :ok = User.sync_follow_counters(limit: 1)
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
+ assert followers == 437
+ assert following == 152
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
+
+ assert followers == 527
+ assert following == 267
+
+ Agent.stop(:domain_errors)
+ end
+
+ test "perform/1 with :sync_follow_counters", %{user1: user1, user2: user2} do
+ :ok = User.perform(:sync_follow_counters)
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
+ assert followers == 437
+ assert following == 152
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
+
+ assert followers == 527
+ assert following == 267
+ end
+ end
+
+ describe "set_info_cache/2" do
+ setup do
+ user = insert(:user)
+ {:ok, user: user}
+ end
+
+ test "update from args", %{user: user} do
+ User.set_info_cache(user, %{following_count: 15, follower_count: 18})
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user)
+ assert followers == 18
+ assert following == 15
+ end
+
+ test "without args", %{user: user} do
+ User.set_info_cache(user, %{})
+
+ %{follower_count: followers, following_count: following} = User.get_cached_user_info(user)
+ assert followers == 0
+ assert following == 0
+ end
+ end
+
+ describe "user_info/2" do
+ setup do
+ user = insert(:user)
+ {:ok, user: user}
+ end
+
+ test "update from args", %{user: user} do
+ %{follower_count: followers, following_count: following} =
+ User.user_info(user, %{following_count: 15, follower_count: 18})
+
+ assert followers == 18
+ assert following == 15
+ end
+
+ test "without args", %{user: user} do
+ %{follower_count: followers, following_count: following} = User.user_info(user)
+
+ assert followers == 0
+ assert following == 0
+ end
+ end
end