{Pleroma.Search, [max_running: 30, max_waiting: 50]}
-config :pleroma, :search, provider: Pleroma.Search.Builtin
config :pleroma, Pleroma.Search, module: Pleroma.Search.DatabaseSearch
config :pleroma, Pleroma.Search.Meilisearch,
private_key: nil,
initial_indexing_chunk_size: 100_000
+config :pleroma, Pleroma.Search.Elasticsearch.Cluster,
+ url: "http://localhost:9200",
+ username: "elastic",
+ password: "changeme",
+ api: Elasticsearch.API.HTTP,
+ json_library: Jason,
+ indexes: %{
+ activities: %{
+ settings: "priv/es-mappings/activity.json",
+ store: Pleroma.Search.Elasticsearch.Store,
+ sources: [Pleroma.Activity],
+ bulk_page_size: 5000,
+ bulk_wait_interval: 15_000
+ }
+ }
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
+++ /dev/null
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Mix.Tasks.Pleroma.Search do
- use Mix.Task
- import Mix.Pleroma
- import Ecto.Query
- alias Pleroma.Activity
- alias Pleroma.Pagination
- alias Pleroma.User
- alias Pleroma.Hashtag
- @shortdoc "Manages elasticsearch"
- def run(["import", "activities" | _rest]) do
- start_pleroma()
- from(a in Activity, where: not ilike(a.actor, "%/relay"))
- |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
- |> Activity.with_preloaded_object()
- |> Activity.with_preloaded_user_actor()
- |> get_all(:activities)
- end
- def run(["import", "users" | _rest]) do
- start_pleroma()
- from(u in User, where: u.nickname not in ["internal.fetch", "relay"])
- |> get_all(:users)
- end
- def run(["import", "hashtags" | _rest]) do
- start_pleroma()
- from(h in Hashtag)
- |> Pleroma.Repo.all()
- |> Pleroma.Elasticsearch.bulk_post(:hashtags)
- end
- defp get_all(query, index, max_id \\ nil) do
- params = %{limit: 1000}
- params =
- if max_id == nil do
- params
- else
- Map.put(params, :max_id, max_id)
- end
- res =
- query
- |> Pagination.fetch_paginated(params)
- if res == [] do
- :ok
- else
- res
- |> Pleroma.Elasticsearch.bulk_post(index)
- get_all(query, index, List.last(res).id)
- end
- end
+++ /dev/null
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
- require Pleroma.Constants
- import Mix.Pleroma
- import Ecto.Query
- import Pleroma.Search.Meilisearch,
- only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1]
- def run(["index"]) do
- start_pleroma()
- meili_version =
- (
- {:ok, result} = meili_get("/version")
- result["pkgVersion"]
- )
- # The ranking rule syntax was changed but nothing about that is mentioned in the changelog
- if not Version.match?(meili_version, ">= 0.25.0") do
- raise "Meilisearch <0.24.0 not supported"
- end
- {:ok, _} =
- meili_post(
- "/indexes/objects/settings/ranking-rules",
- [
- "published:desc",
- "words",
- "exactness",
- "proximity",
- "typo",
- "attribute",
- "sort"
- ]
- )
- {:ok, _} =
- meili_post(
- "/indexes/objects/settings/searchable-attributes",
- [
- "content"
- ]
- )
- IO.puts("Created indices. Starting to insert posts.")
- chunk_size = Pleroma.Config.get([Pleroma.Search.Meilisearch, :initial_indexing_chunk_size])
- Pleroma.Repo.transaction(
- fn ->
- query =
- from(Pleroma.Object,
- # Only index public and unlisted posts which are notes and have some text
- where:
- fragment("data->>'type' = 'Note'") and
- (fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or
- fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())),
- order_by: [desc: fragment("data->'published'")]
- )
- count = query |> Pleroma.Repo.aggregate(:count, :data)
- IO.puts("Entries to index: #{count}")
- Pleroma.Repo.stream(
- query,
- timeout: :infinity
- )
- |> Stream.map(&Pleroma.Search.Meilisearch.object_to_search_data/1)
- |> Stream.filter(fn o -> not is_nil(o) end)
- |> Stream.chunk_every(chunk_size)
- |> Stream.transform(0, fn objects, acc ->
- new_acc = acc + Enum.count(objects)
- # Reset to the beginning of the line and rewrite it
- IO.write("\r")
- IO.write("Indexed #{new_acc} entries")
- {[objects], new_acc}
- end)
- |> Stream.each(fn objects ->
- result =
- meili_put(
- "/indexes/objects/documents",
- objects
- )
- with {:ok, res} <- result do
- if not Map.has_key?(res, "uid") do
- IO.puts("\nFailed to index: #{inspect(result)}")
- end
- else
- e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}")
- end
- end)
- |> Stream.run()
- end,
- timeout: :infinity
- )
- IO.write("\n")
- end
- def run(["clear"]) do
- start_pleroma()
- meili_delete!("/indexes/objects/documents")
- end
- def run(["show-keys", master_key]) do
- start_pleroma()
- endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
- {:ok, result} =
- Pleroma.HTTP.get(
- Path.join(endpoint, "/keys"),
- [{"Authorization", "Bearer #{master_key}"}]
- )
- decoded = Jason.decode!(result.body)
- if decoded["results"] do
- Enum.each(decoded["results"], fn %{"description" => desc, "key" => key} ->
- IO.puts("#{desc}: #{key}")
- end)
- else
- IO.puts("Error fetching the keys, check the master key is correct: #{inspect(decoded)}")
- end
- end
- def run(["stats"]) do
- start_pleroma()
- {:ok, result} = meili_get("/indexes/objects/stats")
- IO.puts("Number of entries: #{result["numberOfDocuments"]}")
- IO.puts("Indexing? #{result["isIndexing"]}")
- end
{Oban, Config.get(Oban)},
] ++
+ elasticsearch_children() ++
task_children(@mix_env) ++
dont_run_in_test(@mix_env) ++
defp http_children(_, _), do: []
+ def elasticsearch_children do
+ config = Config.get([Pleroma.Search, :module])
+ if config == Pleroma.Search.Elasticsearch do
+ [Pleroma.Search.Elasticsearch.Cluster]
+ else
+ []
+ end
+ end
@spec limiters_setup() :: :ok
def limiters_setup do
config = Config.get(ConcurrentLimiter, [])
+++ /dev/null
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Elasticsearch.DocumentMappings.Activity do
- alias Pleroma.Object
- def id(obj), do: obj.id
- def encode(%{object: %{data: %{"type" => "Note"}}} = activity) do
- %{
- _timestamp: activity.inserted_at,
- user: activity.user_actor.nickname,
- content: activity.object.data["content"],
- instance: URI.parse(activity.user_actor.ap_id).host,
- hashtags: Object.hashtags(activity.object)
- }
- end
+++ /dev/null
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Elasticsearch.DocumentMappings.Hashtag do
- def id(obj), do: obj.id
- def encode(%{timestamp: _} = hashtag) do
- %{
- hashtag: hashtag.name,
- timestamp: hashtag.timestamp
- }
- end
- def encode(hashtag) do
- %{
- hashtag: hashtag.name,
- timestamp: hashtag.inserted_at
- }
- end
+++ /dev/null
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Elasticsearch.DocumentMappings.User do
- def id(obj), do: obj.id
- def encode(%{actor_type: "Person"} = user) do
- %{
- timestamp: user.inserted_at,
- instance: URI.parse(user.ap_id).host,
- nickname: user.nickname,
- bio: user.bio,
- display_name: user.name
- }
- end
+++ /dev/null
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Elasticsearch do
- alias Pleroma.Activity
- alias Pleroma.User
- alias Pleroma.Object
- alias Pleroma.Elasticsearch.DocumentMappings
- alias Pleroma.Config
- require Logger
- defp url do
- Config.get([:elasticsearch, :url])
- end
- defp enabled? do
- Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
- end
- def delete_by_id(:activity, id) do
- if enabled?() do
- Elastix.Document.delete(url(), "activities", "activity", id)
- end
- end
- def put_by_id(:activity, id) do
- id
- |> Activity.get_by_id_with_object()
- |> maybe_put_into_elasticsearch()
- end
- def maybe_put_into_elasticsearch({:ok, item}) do
- maybe_put_into_elasticsearch(item)
- end
- def maybe_put_into_elasticsearch(
- %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
- ) do
- if enabled?() do
- actor = Pleroma.Activity.user_actor(activity)
- activity
- |> Map.put(:user_actor, actor)
- |> put()
- end
- end
- def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
- if enabled?() do
- put(user)
- end
- end
- def maybe_put_into_elasticsearch(_) do
- {:ok, :skipped}
- end
- def maybe_bulk_post(data, type) do
- if enabled?() do
- bulk_post(data, type)
- end
- end
- def put(%Activity{} = activity) do
- with {:ok, _} <-
- Elastix.Document.index(
- url(),
- "activities",
- "activity",
- DocumentMappings.Activity.id(activity),
- DocumentMappings.Activity.encode(activity)
- ) do
- activity
- |> Map.get(:object)
- |> Object.hashtags()
- |> Enum.map(fn x ->
- %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())}
- end)
- |> bulk_post(:hashtags)
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not put activity: #{err}")
- :skipped
- end
- end
- def put(%User{} = user) do
- with {:ok, _} <-
- Elastix.Document.index(
- url(),
- "users",
- "user",
- DocumentMappings.User.id(user),
- DocumentMappings.User.encode(user)
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not put user: #{err}")
- :skipped
- end
- end
- def bulk_post(data, :activities) do
- d =
- data
- |> Enum.filter(fn x ->
- t =
- x.object
- |> Map.get(:data, %{})
- |> Map.get("type", "")
- t == "Note"
- end)
- |> Enum.map(fn d ->
- [
- %{index: %{_id: DocumentMappings.Activity.id(d)}},
- DocumentMappings.Activity.encode(d)
- ]
- end)
- |> List.flatten()
- with {:ok, %{body: %{"errors" => false}}} <-
- Elastix.Bulk.post(
- url(),
- d,
- index: "activities",
- type: "activity"
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not bulk put activity: #{err}")
- :skipped
- {:ok, %{body: _}} ->
- :skipped
- end
- end
- def bulk_post(data, :users) do
- d =
- data
- |> Enum.filter(fn x -> x.actor_type == "Person" end)
- |> Enum.map(fn d ->
- [
- %{index: %{_id: DocumentMappings.User.id(d)}},
- DocumentMappings.User.encode(d)
- ]
- end)
- |> List.flatten()
- with {:ok, %{body: %{"errors" => false}}} <-
- Elastix.Bulk.post(
- url(),
- d,
- index: "users",
- type: "user"
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not bulk put users: #{err}")
- :skipped
- {:ok, %{body: _}} ->
- :skipped
- end
- end
- def bulk_post(data, :hashtags) when is_list(data) do
- d =
- data
- |> Enum.map(fn d ->
- [
- %{index: %{_id: DocumentMappings.Hashtag.id(d)}},
- DocumentMappings.Hashtag.encode(d)
- ]
- end)
- |> List.flatten()
- with {:ok, %{body: %{"errors" => false}}} <-
- Elastix.Bulk.post(
- url(),
- d,
- index: "hashtags",
- type: "hashtag"
- ) do
- :ok
- else
- {:error, %{reason: err}} ->
- Logger.error("Could not bulk put hashtags: #{err}")
- :skipped
- {:ok, %{body: _}} ->
- :skipped
- end
- end
- def bulk_post(_, :hashtags), do: {:ok, nil}
- def search(_, _, _, :skip), do: []
- def search(:raw, index, type, q) do
- with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
- results =
- raw_results
- |> Map.get(:body, %{})
- |> Map.get("hits", %{})
- |> Map.get("hits", [])
- {:ok, results}
- else
- {:error, e} ->
- Logger.error(e)
- {:error, e}
- end
- end
- def search(:activities, q) do
- with {:ok, results} <- search(:raw, "activities", "activity", q) do
- results
- |> Enum.map(fn result -> result["_id"] end)
- |> Pleroma.Activity.all_by_ids_with_object()
- |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
- else
- e ->
- Logger.error(e)
- []
- end
- end
- def search(:users, q) do
- with {:ok, results} <- search(:raw, "users", "user", q) do
- results
- |> Enum.map(fn result -> result["_id"] end)
- |> Pleroma.User.get_all_by_ids()
- else
- e ->
- Logger.error(e)
- []
- end
- end
- def search(:hashtags, q) do
- with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
- results
- |> Enum.map(fn result -> result["_source"]["hashtag"] end)
- else
- e ->
- Logger.error(e)
- []
- end
- end
{:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
|> Repo.transaction() do
- Pleroma.Elasticsearch.maybe_bulk_post(hashtags, :hashtags)
{:ok, hashtags}
{:error, _name, value, _changes_so_far} -> {:error, value}
+++ /dev/null
-defmodule Pleroma.Search.Builtin do
- @behaviour Pleroma.Search
- alias Pleroma.Repo
- alias Pleroma.User
- alias Pleroma.Activity
- alias Pleroma.Web.MastodonAPI.AccountView
- alias Pleroma.Web.MastodonAPI.StatusView
- alias Pleroma.Web.Endpoint
- require Logger
- @impl Pleroma.Search
- def search(_conn, %{q: query} = params, options) do
- version = Keyword.get(options, :version)
- timeout = Keyword.get(Repo.config(), :timeout, 15_000)
- query = String.trim(query)
- default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
- default_values
- |> Enum.map(fn {resource, default_value} ->
- if params[:type] in [nil, resource] do
- {resource, fn -> resource_search(version, resource, query, options) end}
- else
- {resource, fn -> default_value end}
- end
- end)
- |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
- timeout: timeout,
- on_timeout: :kill_task
- )
- |> Enum.reduce(default_values, fn
- {:ok, {resource, result}}, acc ->
- Map.put(acc, resource, result)
- _error, acc ->
- acc
- end)
- end
- defp resource_search(_, "accounts", query, options) do
- accounts = with_fallback(fn -> User.search(query, options) end)
- AccountView.render("index.json",
- users: accounts,
- for: options[:for_user],
- embed_relationships: options[:embed_relationships]
- )
- end
- defp resource_search(_, "statuses", query, options) do
- statuses = with_fallback(fn -> Activity.search(options[:for_user], query, options) end)
- StatusView.render("index.json",
- activities: statuses,
- for: options[:for_user],
- as: :activity
- )
- end
- defp resource_search(:v2, "hashtags", query, options) do
- tags_path = Endpoint.url() <> "/tag/"
- query
- |> prepare_tags(options)
- |> Enum.map(fn tag ->
- %{name: tag, url: tags_path <> tag}
- end)
- end
- defp resource_search(:v1, "hashtags", query, options) do
- prepare_tags(query, options)
- end
- defp prepare_tags(query, options) do
- tags =
- query
- |> preprocess_uri_query()
- |> String.split(~r/[^#\w]+/u, trim: true)
- |> Enum.uniq_by(&String.downcase/1)
- explicit_tags = Enum.filter(tags, fn tag -> String.starts_with?(tag, "#") end)
- tags =
- if Enum.any?(explicit_tags) do
- explicit_tags
- else
- tags
- end
- tags = Enum.map(tags, fn tag -> String.trim_leading(tag, "#") end)
- tags =
- if Enum.empty?(explicit_tags) && !options[:skip_joined_tag] do
- add_joined_tag(tags)
- else
- tags
- end
- Pleroma.Pagination.paginate(tags, options)
- end
- # If `query` is a URI, returns last component of its path, otherwise returns `query`
- defp preprocess_uri_query(query) do
- if query =~ ~r/https?:\/\// do
- query
- |> String.trim_trailing("/")
- |> URI.parse()
- |> Map.get(:path)
- |> String.split("/")
- |> Enum.at(-1)
- else
- query
- end
- end
- defp add_joined_tag(tags) do
- tags
- |> Kernel.++([joined_tag(tags)])
- |> Enum.uniq_by(&String.downcase/1)
- end
- defp joined_tag(tags) do
- tags
- |> Enum.map(fn tag -> String.capitalize(tag) end)
- |> Enum.join()
- end
- defp with_fallback(f, fallback \\ []) do
- try do
- f.()
- rescue
- error ->
- Logger.error("#{__MODULE__} search error: #{inspect(error)}")
- fallback
- end
- end
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.Elasticsearch do
- @behaviour Pleroma.Search
+ @behaviour Pleroma.Search.SearchBackend
alias Pleroma.Activity
alias Pleroma.Object.Fetcher
- alias Pleroma.Web.MastodonAPI.StatusView
- alias Pleroma.Web.MastodonAPI.AccountView
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Search.Elasticsearch.Parsers
- alias Pleroma.Web.Endpoint
- def es_query(:activity, query) do
+ def es_query(:activity, query, offset, limit) do
must = Parsers.Activity.parse(query)
if must == [] do
- size: 50,
+ size: limit,
+ from: offset,
terminate_after: 50,
timeout: "5s",
sort: [
- def es_query(:user, query) do
- must = Parsers.User.parse(query)
- if must == [] do
- :skip
- else
- %{
- size: 50,
- terminate_after: 50,
- timeout: "5s",
- sort: [
- "_score"
- ],
- query: %{
- bool: %{
- must: must
- }
- }
- }
- end
- end
- def es_query(:hashtag, query) do
- must = Parsers.Hashtag.parse(query)
- if must == [] do
- :skip
- else
- %{
- size: 50,
- terminate_after: 50,
- timeout: "5s",
- sort: [
- "_score"
- ],
- query: %{
- bool: %{
- must: Parsers.Hashtag.parse(query)
- }
- }
- }
- end
- end
defp maybe_fetch(:activity, search_query) do
with true <- Regex.match?(~r/https?:/, search_query),
{:ok, object} <- Fetcher.fetch_object_from_id(search_query),
- @impl Pleroma.Search
- def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do
+ def search(user, query, options) do
+ limit = Enum.min([Keyword.get(options, :limit), 40])
+ offset = Keyword.get(options, :offset, 0)
parsed_query =
|> String.trim()
activity_task =
Task.async(fn ->
- q = es_query(:activity, parsed_query)
+ q = es_query(:activity, parsed_query, offset, limit)
- Pleroma.Elasticsearch.search(:activities, q)
+ Pleroma.Search.Elasticsearch.Store.search(:activities, q)
|> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end)
- user_task =
- Task.async(fn ->
- q = es_query(:user, parsed_query)
- Pleroma.Elasticsearch.search(:users, q)
- |> Enum.filter(fn x -> Pleroma.User.visible_for(x, user) == :visible end)
- end)
- hashtag_task =
- Task.async(fn ->
- q = es_query(:hashtag, parsed_query)
- Pleroma.Elasticsearch.search(:hashtags, q)
- end)
activity_results = Task.await(activity_task)
- user_results = Task.await(user_task)
- hashtag_results = Task.await(hashtag_task)
direct_activity = Task.await(activity_fetch_task)
activity_results =
[direct_activity | activity_results]
- %{
- "accounts" =>
- AccountView.render("index.json",
- users: user_results,
- for: user
- ),
- "hashtags" =>
- Enum.map(hashtag_results, fn x ->
- %{
- url: Endpoint.url() <> "/tag/" <> x,
- name: x
- }
- end),
- "statuses" =>
- StatusView.render("index.json",
- activities: activity_results,
- for: user,
- as: :activity
- )
- }
+ activity_results
+ end
+ @impl true
+ def add_to_index(activity) do
+ Elasticsearch.put_document(Pleroma.Search.Elasticsearch.Cluster, activity, "activities")
+ end
+ @impl true
+ def remove_from_index(object) do
+ Elasticsearch.delete_document(Pleroma.Search.Elasticsearch.Cluster, object, "activities")
--- /dev/null
+defmodule Pleroma.Search.Elasticsearch.Cluster do
+ @moduledoc false
+ use Elasticsearch.Cluster, otp_app: :pleroma
--- /dev/null
+# Akkoma: A lightweight social networking server
+# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
+# SPDX-License-Identifier: AGPL-3.0-only
+defimpl Elasticsearch.Document, for: Pleroma.Activity do
+ alias Pleroma.Object
+ require Pleroma.Constants
+ def id(obj), do: obj.id
+ def routing(_), do: false
+ def object_to_search_data(object) do
+ # Only index public or unlisted Notes
+ if not is_nil(object) and object.data["type"] == "Note" and
+ not is_nil(object.data["content"]) and
+ (Pleroma.Constants.as_public() in object.data["to"] or
+ Pleroma.Constants.as_public() in object.data["cc"]) and
+ String.length(object.data["content"]) > 1 do
+ data = object.data
+ content_str =
+ case data["content"] do
+ [nil | rest] -> to_string(rest)
+ str -> str
+ end
+ content =
+ with {:ok, scrubbed} <- FastSanitize.strip_tags(content_str),
+ trimmed <- String.trim(scrubbed) do
+ trimmed
+ end
+ if String.length(content) > 1 do
+ {:ok, published, _} = DateTime.from_iso8601(data["published"])
+ %{
+ _timestamp: published,
+ content: content,
+ instance: URI.parse(object.data["actor"]).host,
+ hashtags: Object.hashtags(object),
+ user: Pleroma.User.get_cached_by_ap_id(object.data["actor"]).nickname
+ }
+ else
+ %{}
+ end
+ else
+ %{}
+ end
+ end
+ def encode(activity) do
+ object = Pleroma.Object.normalize(activity)
+ object_to_search_data(object)
+ end
+++ /dev/null
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Search.Elasticsearch.Parsers.Hashtag do
- defp to_es(term) when is_binary(term) do
- %{
- term: %{
- hashtag: %{
- value: String.downcase(term)
- }
- }
- }
- end
- defp to_es({:quoted, term}), do: to_es(term)
- defp to_es({:filter, ["hashtag", query]}) do
- %{
- term: %{
- hashtag: %{
- value: String.downcase(query)
- }
- }
- }
- end
- defp to_es({:filter, _}), do: nil
- def parse(q) do
- Enum.map(q, &to_es/1)
- |> Enum.filter(fn x -> x != nil end)
- end
--- /dev/null
+# Akkoma: A lightweight social networking server
+# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
+# SPDX-License-Identifier: AGPL-3.0-only
+defmodule Pleroma.Search.Elasticsearch.Store do
+ @behaviour Elasticsearch.Store
+ alias Pleroma.Search.Elasticsearch.Cluster
+ require Logger
+ alias Pleroma.Repo
+ @impl true
+ def stream(schema) do
+ Repo.stream(schema)
+ end
+ @impl true
+ def transaction(fun) do
+ {:ok, result} = Repo.transaction(fun, timeout: :infinity)
+ result
+ end
+ def search(_, _, _, :skip), do: []
+ def search(:raw, index, q) do
+ with {:ok, raw_results} <- Elasticsearch.post(Cluster, "/#{index}/_search", q) do
+ results =
+ raw_results
+ |> Map.get("hits", %{})
+ |> Map.get("hits", [])
+ {:ok, results}
+ else
+ {:error, e} ->
+ Logger.error(e)
+ {:error, e}
+ end
+ end
+ def search(:activities, q) do
+ with {:ok, results} <- search(:raw, "activities", q) do
+ results
+ |> Enum.map(fn result -> result["_id"] end)
+ |> Pleroma.Activity.all_by_ids_with_object()
+ |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
+ else
+ e ->
+ Logger.error(e)
+ []
+ end
+ end
+++ /dev/null
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Search.Elasticsearch.Parsers.User do
- defp to_es(term) when is_binary(term) do
- %{
- bool: %{
- minimum_should_match: 1,
- should: [
- %{
- match: %{
- bio: %{
- query: term,
- operator: "AND"
- }
- }
- },
- %{
- term: %{
- nickname: %{
- value: term
- }
- }
- },
- %{
- match: %{
- display_name: %{
- query: term,
- operator: "AND"
- }
- }
- }
- ]
- }
- }
- end
- defp to_es({:quoted, term}), do: to_es(term)
- defp to_es({:filter, ["user", query]}) do
- %{
- term: %{
- nickname: %{
- value: query
- }
- }
- }
- end
- defp to_es({:filter, _}), do: nil
- def parse(q) do
- Enum.map(q, &to_es/1)
- |> Enum.filter(fn x -> x != nil end)
- end
was_superuser_before_update = User.superuser?(user)
with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
- Pleroma.Elasticsearch.maybe_put_into_elasticsearch(user)
|> maybe_remove_report_notifications(was_superuser_before_update)
case Repo.transaction(fn -> do_common_pipeline(object, meta) end, Utils.query_timeout()) do
{:ok, {:ok, activity, meta}} ->
- side_effects().handle_after_transaction(activity)
{:ok, activity, meta}
{:ok, value} ->
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffects do
def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
reacted_object = Object.get_by_ap_id(object.data["object"])
Utils.add_emoji_reaction_to_object(object, reacted_object)
{:ok, object, meta}
@impl true
- def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
- Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
- end
- def handle_after_transaction(%Pleroma.Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => id}
- }) do
- Pleroma.Elasticsearch.delete_by_id(:activity, id)
- end
- def handle_after_transaction(%Pleroma.Activity{}) do
- :ok
- end
- def handle_after_transaction(%Pleroma.Object{}) do
- :ok
- end
def handle_after_transaction(meta) do
|> send_notifications()
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do
def post(user, %{status: _} = data) do
with {:ok, draft} <- ActivityDraft.create(user, data) do
- activity = ActivityPub.create(draft.changes, draft.preview?)
- unless draft.preview? do
- Pleroma.Elasticsearch.maybe_put_into_elasticsearch(activity)
- end
- activity
+ ActivityPub.create(draft.changes, draft.preview?)
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.MastodonAPI.SearchController do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.ControllerHelper
+ alias Pleroma.Web.Endpoint
alias Pleroma.Web.MastodonAPI.AccountView
+ alias Pleroma.Web.MastodonAPI.StatusView
alias Pleroma.Web.Plugs.OAuthScopesPlug
alias Pleroma.Web.Plugs.RateLimiter
def search2(conn, params), do: do_search(:v2, conn, params)
def search(conn, params), do: do_search(:v1, conn, params)
- defp do_search(version, %{assigns: %{user: user}} = conn, params) do
- options =
- search_options(params, user)
- |> Keyword.put(:version, version)
- search_provider = Pleroma.Config.get([:search, :provider])
- json(conn, search_provider.search(conn, params, options))
+ defp do_search(version, %{assigns: %{user: user}} = conn, %{q: query} = params) do
+ query = String.trim(query)
+ options = search_options(params, user)
+ timeout = Keyword.get(Repo.config(), :timeout, 15_000)
+ default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
+ result =
+ default_values
+ |> Enum.map(fn {resource, default_value} ->
+ if params[:type] in [nil, resource] do
+ {resource, fn -> resource_search(version, resource, query, options) end}
+ else
+ {resource, fn -> default_value end}
+ end
+ end)
+ |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
+ timeout: timeout,
+ on_timeout: :kill_task
+ )
+ |> Enum.reduce(default_values, fn
+ {:ok, {resource, result}}, acc ->
+ Map.put(acc, resource, result)
+ _error, acc ->
+ acc
+ end)
+ json(conn, result)
defp search_options(params, user) do
{:nimble_parsec, "~> 1.0", override: true},
{:phoenix_live_dashboard, "~> 0.6.2"},
{:ecto_psql_extras, "~> 0.6"},
+ {:elasticsearch, "~> 1.0.0"},
# indirect dependency version override
{:plug, "~> 1.10.4", override: true},
- "properties": {
- "_timestamp": {
- "type": "date",
- "index": true
- },
- "instance": {
- "type": "keyword"
- },
- "content": {
- "type": "text"
- },
- "hashtags": {
- "type": "keyword"
- },
- "user": {
- "type": "text"
+ "mappings": {
+ "properties": {
+ "_timestamp": {
+ "type": "date",
+ "index": true
+ },
+ "instance": {
+ "type": "keyword"
+ },
+ "content": {
+ "type": "text"
+ },
+ "hashtags": {
+ "type": "keyword"
+ },
+ "user": {
+ "type": "text"
+ }
|> expect(:handle, fn o, m -> {:ok, o, m} end)
|> expect(:handle_after_transaction, fn m -> m end)
- |> expect(:handle_after_transaction, fn m -> m end)