Refactor ES on top of search behaviour
authorFloatingGhost <hannah@coffee-and-dreams.uk>
Thu, 30 Jun 2022 15:28:31 +0000 (16:28 +0100)
committerFloatingGhost <hannah@coffee-and-dreams.uk>
Thu, 30 Jun 2022 15:28:31 +0000 (16:28 +0100)
25 files changed:
config/config.exs
lib/mix/tasks/pleroma/search.ex [deleted file]
lib/mix/tasks/pleroma/search/meilisearch.ex [deleted file]
lib/pleroma/application.ex
lib/pleroma/elasticsearch/document_mappings/activity.ex [deleted file]
lib/pleroma/elasticsearch/document_mappings/hashtag.ex [deleted file]
lib/pleroma/elasticsearch/document_mappings/user.ex [deleted file]
lib/pleroma/elasticsearch/store.ex [deleted file]
lib/pleroma/hashtag.ex
lib/pleroma/search/builtin.ex [deleted file]
lib/pleroma/search/elasticsearch.ex
lib/pleroma/search/elasticsearch/cluster.ex [new file with mode: 0644]
lib/pleroma/search/elasticsearch/document_mappings/activity.ex [new file with mode: 0644]
lib/pleroma/search/elasticsearch/hashtag_parser.ex [deleted file]
lib/pleroma/search/elasticsearch/store.ex [new file with mode: 0644]
lib/pleroma/search/elasticsearch/user_paser.ex [deleted file]
lib/pleroma/user.ex
lib/pleroma/web/activity_pub/pipeline.ex
lib/pleroma/web/activity_pub/side_effects.ex
lib/pleroma/web/activity_pub/side_effects/handling.ex
lib/pleroma/web/common_api.ex
lib/pleroma/web/mastodon_api/controllers/search_controller.ex
mix.exs
priv/es-mappings/activity.json
test/pleroma/web/activity_pub/pipeline_test.exs

index cf5f9cf276abf761adf4466ba8e2ecb3231596e9..727a2b0cb5b6b886d3e045bbd3bd93275ce47a63 100644 (file)
@@ -856,8 +856,6 @@ config :pleroma, ConcurrentLimiter, [
   {Pleroma.Search, [max_running: 30, max_waiting: 50]}
 ]
 
-config :pleroma, :search, provider: Pleroma.Search.Builtin
-
 config :pleroma, Pleroma.Search, module: Pleroma.Search.DatabaseSearch
 
 config :pleroma, Pleroma.Search.Meilisearch,
@@ -865,6 +863,22 @@ config :pleroma, Pleroma.Search.Meilisearch,
   private_key: nil,
   initial_indexing_chunk_size: 100_000
 
+config :pleroma, Pleroma.Search.Elasticsearch.Cluster,
+  url: "http://localhost:9200",
+  username: "elastic",
+  password: "changeme",
+  api: Elasticsearch.API.HTTP,
+  json_library: Jason,
+  indexes: %{
+    activities: %{
+      settings: "priv/es-mappings/activity.json",
+      store: Pleroma.Search.Elasticsearch.Store,
+      sources: [Pleroma.Activity],
+      bulk_page_size: 5000,
+      bulk_wait_interval: 15_000
+    }
+  }
+
 # Import environment specific config. This must remain at the bottom
 # of this file so it overrides the configuration defined above.
 import_config "#{Mix.env()}.exs"
diff --git a/lib/mix/tasks/pleroma/search.ex b/lib/mix/tasks/pleroma/search.ex
deleted file mode 100644 (file)
index 1fd880e..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Mix.Tasks.Pleroma.Search do
-  use Mix.Task
-  import Mix.Pleroma
-  import Ecto.Query
-  alias Pleroma.Activity
-  alias Pleroma.Pagination
-  alias Pleroma.User
-  alias Pleroma.Hashtag
-
-  @shortdoc "Manages elasticsearch"
-
-  def run(["import", "activities" | _rest]) do
-    start_pleroma()
-
-    from(a in Activity, where: not ilike(a.actor, "%/relay"))
-    |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
-    |> Activity.with_preloaded_object()
-    |> Activity.with_preloaded_user_actor()
-    |> get_all(:activities)
-  end
-
-  def run(["import", "users" | _rest]) do
-    start_pleroma()
-
-    from(u in User, where: u.nickname not in ["internal.fetch", "relay"])
-    |> get_all(:users)
-  end
-
-  def run(["import", "hashtags" | _rest]) do
-    start_pleroma()
-
-    from(h in Hashtag)
-    |> Pleroma.Repo.all()
-    |> Pleroma.Elasticsearch.bulk_post(:hashtags)
-  end
-
-  defp get_all(query, index, max_id \\ nil) do
-    params = %{limit: 1000}
-
-    params =
-      if max_id == nil do
-        params
-      else
-        Map.put(params, :max_id, max_id)
-      end
-
-    res =
-      query
-      |> Pagination.fetch_paginated(params)
-
-    if res == [] do
-      :ok
-    else
-      res
-      |> Pleroma.Elasticsearch.bulk_post(index)
-
-      get_all(query, index, List.last(res).id)
-    end
-  end
-end
diff --git a/lib/mix/tasks/pleroma/search/meilisearch.ex b/lib/mix/tasks/pleroma/search/meilisearch.ex
deleted file mode 100644 (file)
index d4a83c3..0000000
+++ /dev/null
@@ -1,144 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
-  require Pleroma.Constants
-
-  import Mix.Pleroma
-  import Ecto.Query
-
-  import Pleroma.Search.Meilisearch,
-    only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1]
-
-  def run(["index"]) do
-    start_pleroma()
-
-    meili_version =
-      (
-        {:ok, result} = meili_get("/version")
-
-        result["pkgVersion"]
-      )
-
-    # The ranking rule syntax was changed but nothing about that is mentioned in the changelog
-    if not Version.match?(meili_version, ">= 0.25.0") do
-      raise "Meilisearch <0.24.0 not supported"
-    end
-
-    {:ok, _} =
-      meili_post(
-        "/indexes/objects/settings/ranking-rules",
-        [
-          "published:desc",
-          "words",
-          "exactness",
-          "proximity",
-          "typo",
-          "attribute",
-          "sort"
-        ]
-      )
-
-    {:ok, _} =
-      meili_post(
-        "/indexes/objects/settings/searchable-attributes",
-        [
-          "content"
-        ]
-      )
-
-    IO.puts("Created indices. Starting to insert posts.")
-
-    chunk_size = Pleroma.Config.get([Pleroma.Search.Meilisearch, :initial_indexing_chunk_size])
-
-    Pleroma.Repo.transaction(
-      fn ->
-        query =
-          from(Pleroma.Object,
-            # Only index public and unlisted posts which are notes and have some text
-            where:
-              fragment("data->>'type' = 'Note'") and
-                (fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or
-                   fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())),
-            order_by: [desc: fragment("data->'published'")]
-          )
-
-        count = query |> Pleroma.Repo.aggregate(:count, :data)
-        IO.puts("Entries to index: #{count}")
-
-        Pleroma.Repo.stream(
-          query,
-          timeout: :infinity
-        )
-        |> Stream.map(&Pleroma.Search.Meilisearch.object_to_search_data/1)
-        |> Stream.filter(fn o -> not is_nil(o) end)
-        |> Stream.chunk_every(chunk_size)
-        |> Stream.transform(0, fn objects, acc ->
-          new_acc = acc + Enum.count(objects)
-
-          # Reset to the beginning of the line and rewrite it
-          IO.write("\r")
-          IO.write("Indexed #{new_acc} entries")
-
-          {[objects], new_acc}
-        end)
-        |> Stream.each(fn objects ->
-          result =
-            meili_put(
-              "/indexes/objects/documents",
-              objects
-            )
-
-          with {:ok, res} <- result do
-            if not Map.has_key?(res, "uid") do
-              IO.puts("\nFailed to index: #{inspect(result)}")
-            end
-          else
-            e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}")
-          end
-        end)
-        |> Stream.run()
-      end,
-      timeout: :infinity
-    )
-
-    IO.write("\n")
-  end
-
-  def run(["clear"]) do
-    start_pleroma()
-
-    meili_delete!("/indexes/objects/documents")
-  end
-
-  def run(["show-keys", master_key]) do
-    start_pleroma()
-
-    endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
-
-    {:ok, result} =
-      Pleroma.HTTP.get(
-        Path.join(endpoint, "/keys"),
-        [{"Authorization", "Bearer #{master_key}"}]
-      )
-
-    decoded = Jason.decode!(result.body)
-
-    if decoded["results"] do
-      Enum.each(decoded["results"], fn %{"description" => desc, "key" => key} ->
-        IO.puts("#{desc}: #{key}")
-      end)
-    else
-      IO.puts("Error fetching the keys, check the master key is correct: #{inspect(decoded)}")
-    end
-  end
-
-  def run(["stats"]) do
-    start_pleroma()
-
-    {:ok, result} = meili_get("/indexes/objects/stats")
-    IO.puts("Number of entries: #{result["numberOfDocuments"]}")
-    IO.puts("Indexing? #{result["isIndexing"]}")
-  end
-end
index be03cdffbd45af9ede606fdf755e3d513225c9a1..b709e737bf2345dd1b032967274230cda8caeb22 100644 (file)
@@ -105,6 +105,7 @@ defmodule Pleroma.Application do
           {Oban, Config.get(Oban)},
           Pleroma.Web.Endpoint
         ] ++
+        elasticsearch_children() ++
         task_children(@mix_env) ++
         dont_run_in_test(@mix_env) ++
         shout_child(shout_enabled?())
@@ -303,6 +304,16 @@ defmodule Pleroma.Application do
 
   defp http_children(_, _), do: []
 
+  def elasticsearch_children do
+    config = Config.get([Pleroma.Search, :module])
+
+    if config == Pleroma.Search.Elasticsearch do
+      [Pleroma.Search.Elasticsearch.Cluster]
+    else
+      []
+    end
+  end
+
   @spec limiters_setup() :: :ok
   def limiters_setup do
     config = Config.get(ConcurrentLimiter, [])
diff --git a/lib/pleroma/elasticsearch/document_mappings/activity.ex b/lib/pleroma/elasticsearch/document_mappings/activity.ex
deleted file mode 100644 (file)
index a028c6f..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch.DocumentMappings.Activity do
-  alias Pleroma.Object
-
-  def id(obj), do: obj.id
-
-  def encode(%{object: %{data: %{"type" => "Note"}}} = activity) do
-    %{
-      _timestamp: activity.inserted_at,
-      user: activity.user_actor.nickname,
-      content: activity.object.data["content"],
-      instance: URI.parse(activity.user_actor.ap_id).host,
-      hashtags: Object.hashtags(activity.object)
-    }
-  end
-end
diff --git a/lib/pleroma/elasticsearch/document_mappings/hashtag.ex b/lib/pleroma/elasticsearch/document_mappings/hashtag.ex
deleted file mode 100644 (file)
index 7391983..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch.DocumentMappings.Hashtag do
-  def id(obj), do: obj.id
-
-  def encode(%{timestamp: _} = hashtag) do
-    %{
-      hashtag: hashtag.name,
-      timestamp: hashtag.timestamp
-    }
-  end
-
-  def encode(hashtag) do
-    %{
-      hashtag: hashtag.name,
-      timestamp: hashtag.inserted_at
-    }
-  end
-end
diff --git a/lib/pleroma/elasticsearch/document_mappings/user.ex b/lib/pleroma/elasticsearch/document_mappings/user.ex
deleted file mode 100644 (file)
index d5cfca6..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch.DocumentMappings.User do
-  def id(obj), do: obj.id
-
-  def encode(%{actor_type: "Person"} = user) do
-    %{
-      timestamp: user.inserted_at,
-      instance: URI.parse(user.ap_id).host,
-      nickname: user.nickname,
-      bio: user.bio,
-      display_name: user.name
-    }
-  end
-end
diff --git a/lib/pleroma/elasticsearch/store.ex b/lib/pleroma/elasticsearch/store.ex
deleted file mode 100644 (file)
index 98c88a7..0000000
+++ /dev/null
@@ -1,256 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Elasticsearch do
-  alias Pleroma.Activity
-  alias Pleroma.User
-  alias Pleroma.Object
-  alias Pleroma.Elasticsearch.DocumentMappings
-  alias Pleroma.Config
-  require Logger
-
-  defp url do
-    Config.get([:elasticsearch, :url])
-  end
-
-  defp enabled? do
-    Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
-  end
-
-  def delete_by_id(:activity, id) do
-    if enabled?() do
-      Elastix.Document.delete(url(), "activities", "activity", id)
-    end
-  end
-
-  def put_by_id(:activity, id) do
-    id
-    |> Activity.get_by_id_with_object()
-    |> maybe_put_into_elasticsearch()
-  end
-
-  def maybe_put_into_elasticsearch({:ok, item}) do
-    maybe_put_into_elasticsearch(item)
-  end
-
-  def maybe_put_into_elasticsearch(
-        %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
-      ) do
-    if enabled?() do
-      actor = Pleroma.Activity.user_actor(activity)
-
-      activity
-      |> Map.put(:user_actor, actor)
-      |> put()
-    end
-  end
-
-  def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
-    if enabled?() do
-      put(user)
-    end
-  end
-
-  def maybe_put_into_elasticsearch(_) do
-    {:ok, :skipped}
-  end
-
-  def maybe_bulk_post(data, type) do
-    if enabled?() do
-      bulk_post(data, type)
-    end
-  end
-
-  def put(%Activity{} = activity) do
-    with {:ok, _} <-
-           Elastix.Document.index(
-             url(),
-             "activities",
-             "activity",
-             DocumentMappings.Activity.id(activity),
-             DocumentMappings.Activity.encode(activity)
-           ) do
-      activity
-      |> Map.get(:object)
-      |> Object.hashtags()
-      |> Enum.map(fn x ->
-        %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())}
-      end)
-      |> bulk_post(:hashtags)
-    else
-      {:error, %{reason: err}} ->
-        Logger.error("Could not put activity: #{err}")
-        :skipped
-    end
-  end
-
-  def put(%User{} = user) do
-    with {:ok, _} <-
-           Elastix.Document.index(
-             url(),
-             "users",
-             "user",
-             DocumentMappings.User.id(user),
-             DocumentMappings.User.encode(user)
-           ) do
-      :ok
-    else
-      {:error, %{reason: err}} ->
-        Logger.error("Could not put user: #{err}")
-        :skipped
-    end
-  end
-
-  def bulk_post(data, :activities) do
-    d =
-      data
-      |> Enum.filter(fn x ->
-        t =
-          x.object
-          |> Map.get(:data, %{})
-          |> Map.get("type", "")
-
-        t == "Note"
-      end)
-      |> Enum.map(fn d ->
-        [
-          %{index: %{_id: DocumentMappings.Activity.id(d)}},
-          DocumentMappings.Activity.encode(d)
-        ]
-      end)
-      |> List.flatten()
-
-    with {:ok, %{body: %{"errors" => false}}} <-
-           Elastix.Bulk.post(
-             url(),
-             d,
-             index: "activities",
-             type: "activity"
-           ) do
-      :ok
-    else
-      {:error, %{reason: err}} ->
-        Logger.error("Could not bulk put activity: #{err}")
-        :skipped
-
-      {:ok, %{body: _}} ->
-        :skipped
-    end
-  end
-
-  def bulk_post(data, :users) do
-    d =
-      data
-      |> Enum.filter(fn x -> x.actor_type == "Person" end)
-      |> Enum.map(fn d ->
-        [
-          %{index: %{_id: DocumentMappings.User.id(d)}},
-          DocumentMappings.User.encode(d)
-        ]
-      end)
-      |> List.flatten()
-
-    with {:ok, %{body: %{"errors" => false}}} <-
-           Elastix.Bulk.post(
-             url(),
-             d,
-             index: "users",
-             type: "user"
-           ) do
-      :ok
-    else
-      {:error, %{reason: err}} ->
-        Logger.error("Could not bulk put users: #{err}")
-        :skipped
-
-      {:ok, %{body: _}} ->
-        :skipped
-    end
-  end
-
-  def bulk_post(data, :hashtags) when is_list(data) do
-    d =
-      data
-      |> Enum.map(fn d ->
-        [
-          %{index: %{_id: DocumentMappings.Hashtag.id(d)}},
-          DocumentMappings.Hashtag.encode(d)
-        ]
-      end)
-      |> List.flatten()
-
-    with {:ok, %{body: %{"errors" => false}}} <-
-           Elastix.Bulk.post(
-             url(),
-             d,
-             index: "hashtags",
-             type: "hashtag"
-           ) do
-      :ok
-    else
-      {:error, %{reason: err}} ->
-        Logger.error("Could not bulk put hashtags: #{err}")
-        :skipped
-
-      {:ok, %{body: _}} ->
-        :skipped
-    end
-  end
-
-  def bulk_post(_, :hashtags), do: {:ok, nil}
-
-  def search(_, _, _, :skip), do: []
-
-  def search(:raw, index, type, q) do
-    with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
-      results =
-        raw_results
-        |> Map.get(:body, %{})
-        |> Map.get("hits", %{})
-        |> Map.get("hits", [])
-
-      {:ok, results}
-    else
-      {:error, e} ->
-        Logger.error(e)
-        {:error, e}
-    end
-  end
-
-  def search(:activities, q) do
-    with {:ok, results} <- search(:raw, "activities", "activity", q) do
-      results
-      |> Enum.map(fn result -> result["_id"] end)
-      |> Pleroma.Activity.all_by_ids_with_object()
-      |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
-    else
-      e ->
-        Logger.error(e)
-        []
-    end
-  end
-
-  def search(:users, q) do
-    with {:ok, results} <- search(:raw, "users", "user", q) do
-      results
-      |> Enum.map(fn result -> result["_id"] end)
-      |> Pleroma.User.get_all_by_ids()
-    else
-      e ->
-        Logger.error(e)
-        []
-    end
-  end
-
-  def search(:hashtags, q) do
-    with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
-      results
-      |> Enum.map(fn result -> result["_source"]["hashtag"] end)
-    else
-      e ->
-        Logger.error(e)
-        []
-    end
-  end
-end
index cdbfeab02b4b2d974248c23c9f5d3da9f2b39429..53e2e9c897d564dd788306a72fa640ff75cbdce8 100644 (file)
@@ -61,7 +61,6 @@ defmodule Pleroma.Hashtag do
                {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
              end)
              |> Repo.transaction() do
-        Pleroma.Elasticsearch.maybe_bulk_post(hashtags, :hashtags)
         {:ok, hashtags}
       else
         {:error, _name, value, _changes_so_far} -> {:error, value}
diff --git a/lib/pleroma/search/builtin.ex b/lib/pleroma/search/builtin.ex
deleted file mode 100644 (file)
index 3cbe220..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-defmodule Pleroma.Search.Builtin do
-  @behaviour Pleroma.Search
-
-  alias Pleroma.Repo
-  alias Pleroma.User
-  alias Pleroma.Activity
-  alias Pleroma.Web.MastodonAPI.AccountView
-  alias Pleroma.Web.MastodonAPI.StatusView
-  alias Pleroma.Web.Endpoint
-
-  require Logger
-
-  @impl Pleroma.Search
-  def search(_conn, %{q: query} = params, options) do
-    version = Keyword.get(options, :version)
-    timeout = Keyword.get(Repo.config(), :timeout, 15_000)
-    query = String.trim(query)
-    default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
-
-    default_values
-    |> Enum.map(fn {resource, default_value} ->
-      if params[:type] in [nil, resource] do
-        {resource, fn -> resource_search(version, resource, query, options) end}
-      else
-        {resource, fn -> default_value end}
-      end
-    end)
-    |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
-      timeout: timeout,
-      on_timeout: :kill_task
-    )
-    |> Enum.reduce(default_values, fn
-      {:ok, {resource, result}}, acc ->
-        Map.put(acc, resource, result)
-
-      _error, acc ->
-        acc
-    end)
-  end
-
-  defp resource_search(_, "accounts", query, options) do
-    accounts = with_fallback(fn -> User.search(query, options) end)
-
-    AccountView.render("index.json",
-      users: accounts,
-      for: options[:for_user],
-      embed_relationships: options[:embed_relationships]
-    )
-  end
-
-  defp resource_search(_, "statuses", query, options) do
-    statuses = with_fallback(fn -> Activity.search(options[:for_user], query, options) end)
-
-    StatusView.render("index.json",
-      activities: statuses,
-      for: options[:for_user],
-      as: :activity
-    )
-  end
-
-  defp resource_search(:v2, "hashtags", query, options) do
-    tags_path = Endpoint.url() <> "/tag/"
-
-    query
-    |> prepare_tags(options)
-    |> Enum.map(fn tag ->
-      %{name: tag, url: tags_path <> tag}
-    end)
-  end
-
-  defp resource_search(:v1, "hashtags", query, options) do
-    prepare_tags(query, options)
-  end
-
-  defp prepare_tags(query, options) do
-    tags =
-      query
-      |> preprocess_uri_query()
-      |> String.split(~r/[^#\w]+/u, trim: true)
-      |> Enum.uniq_by(&String.downcase/1)
-
-    explicit_tags = Enum.filter(tags, fn tag -> String.starts_with?(tag, "#") end)
-
-    tags =
-      if Enum.any?(explicit_tags) do
-        explicit_tags
-      else
-        tags
-      end
-
-    tags = Enum.map(tags, fn tag -> String.trim_leading(tag, "#") end)
-
-    tags =
-      if Enum.empty?(explicit_tags) && !options[:skip_joined_tag] do
-        add_joined_tag(tags)
-      else
-        tags
-      end
-
-    Pleroma.Pagination.paginate(tags, options)
-  end
-
-  # If `query` is a URI, returns last component of its path, otherwise returns `query`
-  defp preprocess_uri_query(query) do
-    if query =~ ~r/https?:\/\// do
-      query
-      |> String.trim_trailing("/")
-      |> URI.parse()
-      |> Map.get(:path)
-      |> String.split("/")
-      |> Enum.at(-1)
-    else
-      query
-    end
-  end
-
-  defp add_joined_tag(tags) do
-    tags
-    |> Kernel.++([joined_tag(tags)])
-    |> Enum.uniq_by(&String.downcase/1)
-  end
-
-  defp joined_tag(tags) do
-    tags
-    |> Enum.map(fn tag -> String.capitalize(tag) end)
-    |> Enum.join()
-  end
-
-  defp with_fallback(f, fallback \\ []) do
-    try do
-      f.()
-    rescue
-      error ->
-        Logger.error("#{__MODULE__} search error: #{inspect(error)}")
-        fallback
-    end
-  end
-end
index 76d2c32771764a31b77809bb57c205aace355410..7c7ca82c8825e19a39b5807ec51afe9cb3d5b5e5 100644 (file)
@@ -3,24 +3,22 @@
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Search.Elasticsearch do
-  @behaviour Pleroma.Search
+  @behaviour Pleroma.Search.SearchBackend
 
   alias Pleroma.Activity
   alias Pleroma.Object.Fetcher
-  alias Pleroma.Web.MastodonAPI.StatusView
-  alias Pleroma.Web.MastodonAPI.AccountView
   alias Pleroma.Web.ActivityPub.Visibility
   alias Pleroma.Search.Elasticsearch.Parsers
-  alias Pleroma.Web.Endpoint
 
-  def es_query(:activity, query) do
+  def es_query(:activity, query, offset, limit) do
     must = Parsers.Activity.parse(query)
 
     if must == [] do
       :skip
     else
       %{
-        size: 50,
+        size: limit,
+        from: offset,
         terminate_after: 50,
         timeout: "5s",
         sort: [
@@ -36,50 +34,6 @@ defmodule Pleroma.Search.Elasticsearch do
     end
   end
 
-  def es_query(:user, query) do
-    must = Parsers.User.parse(query)
-
-    if must == [] do
-      :skip
-    else
-      %{
-        size: 50,
-        terminate_after: 50,
-        timeout: "5s",
-        sort: [
-          "_score"
-        ],
-        query: %{
-          bool: %{
-            must: must
-          }
-        }
-      }
-    end
-  end
-
-  def es_query(:hashtag, query) do
-    must = Parsers.Hashtag.parse(query)
-
-    if must == [] do
-      :skip
-    else
-      %{
-        size: 50,
-        terminate_after: 50,
-        timeout: "5s",
-        sort: [
-          "_score"
-        ],
-        query: %{
-          bool: %{
-            must: Parsers.Hashtag.parse(query)
-          }
-        }
-      }
-    end
-  end
-
   defp maybe_fetch(:activity, search_query) do
     with true <- Regex.match?(~r/https?:/, search_query),
          {:ok, object} <- Fetcher.fetch_object_from_id(search_query),
@@ -90,8 +44,10 @@ defmodule Pleroma.Search.Elasticsearch do
     end
   end
 
-  @impl Pleroma.Search
-  def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do
+  def search(user, query, options) do
+    limit = Enum.min([Keyword.get(options, :limit), 40])
+    offset = Keyword.get(options, :offset, 0)
+
     parsed_query =
       query
       |> String.trim()
@@ -104,30 +60,13 @@ defmodule Pleroma.Search.Elasticsearch do
 
     activity_task =
       Task.async(fn ->
-        q = es_query(:activity, parsed_query)
+        q = es_query(:activity, parsed_query, offset, limit)
 
-        Pleroma.Elasticsearch.search(:activities, q)
+        Pleroma.Search.Elasticsearch.Store.search(:activities, q)
         |> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end)
       end)
 
-    user_task =
-      Task.async(fn ->
-        q = es_query(:user, parsed_query)
-
-        Pleroma.Elasticsearch.search(:users, q)
-        |> Enum.filter(fn x -> Pleroma.User.visible_for(x, user) == :visible end)
-      end)
-
-    hashtag_task =
-      Task.async(fn ->
-        q = es_query(:hashtag, parsed_query)
-
-        Pleroma.Elasticsearch.search(:hashtags, q)
-      end)
-
     activity_results = Task.await(activity_task)
-    user_results = Task.await(user_task)
-    hashtag_results = Task.await(hashtag_task)
     direct_activity = Task.await(activity_fetch_task)
 
     activity_results =
@@ -137,25 +76,16 @@ defmodule Pleroma.Search.Elasticsearch do
         [direct_activity | activity_results]
       end
 
-    %{
-      "accounts" =>
-        AccountView.render("index.json",
-          users: user_results,
-          for: user
-        ),
-      "hashtags" =>
-        Enum.map(hashtag_results, fn x ->
-          %{
-            url: Endpoint.url() <> "/tag/" <> x,
-            name: x
-          }
-        end),
-      "statuses" =>
-        StatusView.render("index.json",
-          activities: activity_results,
-          for: user,
-          as: :activity
-        )
-    }
+    activity_results
+  end
+
+  @impl true
+  def add_to_index(activity) do
+    Elasticsearch.put_document(Pleroma.Search.Elasticsearch.Cluster, activity, "activities")
+  end
+
+  @impl true
+  def remove_from_index(object) do
+    Elasticsearch.delete_document(Pleroma.Search.Elasticsearch.Cluster, object, "activities")
   end
 end
diff --git a/lib/pleroma/search/elasticsearch/cluster.ex b/lib/pleroma/search/elasticsearch/cluster.ex
new file mode 100644 (file)
index 0000000..4f76c4e
--- /dev/null
@@ -0,0 +1,4 @@
+defmodule Pleroma.Search.Elasticsearch.Cluster do
+  @moduledoc false
+  use Elasticsearch.Cluster, otp_app: :pleroma
+end
diff --git a/lib/pleroma/search/elasticsearch/document_mappings/activity.ex b/lib/pleroma/search/elasticsearch/document_mappings/activity.ex
new file mode 100644 (file)
index 0000000..edd8e03
--- /dev/null
@@ -0,0 +1,55 @@
+# Akkoma: A lightweight social networking server
+# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defimpl Elasticsearch.Document, for: Pleroma.Activity do
+  alias Pleroma.Object
+  require Pleroma.Constants
+
+  def id(obj), do: obj.id
+  def routing(_), do: false
+
+  def object_to_search_data(object) do
+    # Only index public or unlisted Notes
+    if not is_nil(object) and object.data["type"] == "Note" and
+         not is_nil(object.data["content"]) and
+         (Pleroma.Constants.as_public() in object.data["to"] or
+            Pleroma.Constants.as_public() in object.data["cc"]) and
+         String.length(object.data["content"]) > 1 do
+      data = object.data
+
+      content_str =
+        case data["content"] do
+          [nil | rest] -> to_string(rest)
+          str -> str
+        end
+
+      content =
+        with {:ok, scrubbed} <- FastSanitize.strip_tags(content_str),
+             trimmed <- String.trim(scrubbed) do
+          trimmed
+        end
+
+      if String.length(content) > 1 do
+        {:ok, published, _} = DateTime.from_iso8601(data["published"])
+
+        %{
+          _timestamp: published,
+          content: content,
+          instance: URI.parse(object.data["actor"]).host,
+          hashtags: Object.hashtags(object),
+          user: Pleroma.User.get_cached_by_ap_id(object.data["actor"]).nickname
+        }
+      else
+        %{}
+      end
+    else
+      %{}
+    end
+  end
+
+  def encode(activity) do
+    object = Pleroma.Object.normalize(activity)
+    object_to_search_data(object)
+  end
+end
diff --git a/lib/pleroma/search/elasticsearch/hashtag_parser.ex b/lib/pleroma/search/elasticsearch/hashtag_parser.ex
deleted file mode 100644 (file)
index 911dc65..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Search.Elasticsearch.Parsers.Hashtag do
-  defp to_es(term) when is_binary(term) do
-    %{
-      term: %{
-        hashtag: %{
-          value: String.downcase(term)
-        }
-      }
-    }
-  end
-
-  defp to_es({:quoted, term}), do: to_es(term)
-
-  defp to_es({:filter, ["hashtag", query]}) do
-    %{
-      term: %{
-        hashtag: %{
-          value: String.downcase(query)
-        }
-      }
-    }
-  end
-
-  defp to_es({:filter, _}), do: nil
-
-  def parse(q) do
-    Enum.map(q, &to_es/1)
-    |> Enum.filter(fn x -> x != nil end)
-  end
-end
diff --git a/lib/pleroma/search/elasticsearch/store.ex b/lib/pleroma/search/elasticsearch/store.ex
new file mode 100644 (file)
index 0000000..895b76d
--- /dev/null
@@ -0,0 +1,52 @@
+# Akkoma: A lightweight social networking server
+# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Search.Elasticsearch.Store do
+  @behaviour Elasticsearch.Store
+  alias Pleroma.Search.Elasticsearch.Cluster
+  require Logger
+
+  alias Pleroma.Repo
+
+  @impl true
+  def stream(schema) do
+    Repo.stream(schema)
+  end
+
+  @impl true
+  def transaction(fun) do
+    {:ok, result} = Repo.transaction(fun, timeout: :infinity)
+    result
+  end
+
+  def search(_, _, _, :skip), do: []
+
+  def search(:raw, index, q) do
+    with {:ok, raw_results} <- Elasticsearch.post(Cluster, "/#{index}/_search", q) do
+      results =
+        raw_results
+        |> Map.get("hits", %{})
+        |> Map.get("hits", [])
+
+      {:ok, results}
+    else
+      {:error, e} ->
+        Logger.error(e)
+        {:error, e}
+    end
+  end
+
+  def search(:activities, q) do
+    with {:ok, results} <- search(:raw, "activities", q) do
+      results
+      |> Enum.map(fn result -> result["_id"] end)
+      |> Pleroma.Activity.all_by_ids_with_object()
+      |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
+    else
+      e ->
+        Logger.error(e)
+        []
+    end
+  end
+end
diff --git a/lib/pleroma/search/elasticsearch/user_paser.ex b/lib/pleroma/search/elasticsearch/user_paser.ex
deleted file mode 100644 (file)
index 4176c61..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-# Akkoma: A lightweight social networking server
-# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Search.Elasticsearch.Parsers.User do
-  defp to_es(term) when is_binary(term) do
-    %{
-      bool: %{
-        minimum_should_match: 1,
-        should: [
-          %{
-            match: %{
-              bio: %{
-                query: term,
-                operator: "AND"
-              }
-            }
-          },
-          %{
-            term: %{
-              nickname: %{
-                value: term
-              }
-            }
-          },
-          %{
-            match: %{
-              display_name: %{
-                query: term,
-                operator: "AND"
-              }
-            }
-          }
-        ]
-      }
-    }
-  end
-
-  defp to_es({:quoted, term}), do: to_es(term)
-
-  defp to_es({:filter, ["user", query]}) do
-    %{
-      term: %{
-        nickname: %{
-          value: query
-        }
-      }
-    }
-  end
-
-  defp to_es({:filter, _}), do: nil
-
-  def parse(q) do
-    Enum.map(q, &to_es/1)
-    |> Enum.filter(fn x -> x != nil end)
-  end
-end
index 9a50ee3ec8b8589c3d1b927087170c24ef0d25d5..dc6c661eaf03486f79d99a1045f80b24e33b75d0 100644 (file)
@@ -1095,7 +1095,6 @@ defmodule Pleroma.User do
     was_superuser_before_update = User.superuser?(user)
 
     with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
-      Pleroma.Elasticsearch.maybe_put_into_elasticsearch(user)
       set_cache(user)
     end
     |> maybe_remove_report_notifications(was_superuser_before_update)
index 214647dbffe8326be6d8f286bde86e0750ed3534..d4e5072874a24a8063b22f10c8d5a69a73512564 100644 (file)
@@ -28,7 +28,6 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
     case Repo.transaction(fn -> do_common_pipeline(object, meta) end, Utils.query_timeout()) do
       {:ok, {:ok, activity, meta}} ->
         side_effects().handle_after_transaction(meta)
-        side_effects().handle_after_transaction(activity)
         {:ok, activity, meta}
 
       {:ok, value} ->
index 517dd0a4fa5881996d043f4147fde237d91b608b..e2371b6939d4240e5f305d196afb25f9d657a929 100644 (file)
@@ -1,5 +1,5 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.ActivityPub.SideEffects do
@@ -272,6 +272,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
     reacted_object = Object.get_by_ap_id(object.data["object"])
     Utils.add_emoji_reaction_to_object(object, reacted_object)
+
     Notification.create_notifications(object)
 
     {:ok, object, meta}
@@ -547,24 +548,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
   end
 
   @impl true
-  def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
-    Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
-  end
-
-  def handle_after_transaction(%Pleroma.Activity{
-        data: %{"type" => "Delete", "deleted_activity_id" => id}
-      }) do
-    Pleroma.Elasticsearch.delete_by_id(:activity, id)
-  end
-
-  def handle_after_transaction(%Pleroma.Activity{}) do
-    :ok
-  end
-
-  def handle_after_transaction(%Pleroma.Object{}) do
-    :ok
-  end
-
   def handle_after_transaction(meta) do
     meta
     |> send_notifications()
index a823051552386513702ce5f7023a82b2e08d90a5..eb012f57623a619753638081956e16d3507251c9 100644 (file)
@@ -1,5 +1,5 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do
index 92afd5cb61e6533ebfc0359742b8eee21bc1ed14..856fa95b9c6dd23369b2554b2ad4736cd4124a3d 100644 (file)
@@ -396,13 +396,7 @@ defmodule Pleroma.Web.CommonAPI do
 
   def post(user, %{status: _} = data) do
     with {:ok, draft} <- ActivityDraft.create(user, data) do
-      activity = ActivityPub.create(draft.changes, draft.preview?)
-
-      unless draft.preview? do
-        Pleroma.Elasticsearch.maybe_put_into_elasticsearch(activity)
-      end
-
-      activity
+      ActivityPub.create(draft.changes, draft.preview?)
     end
   end
 
index 751d46cdfe4323c8a1a9fd1dc2b7ed095b125269..e4acba2264b1b2086e6720dcdc04ecdba1ac3456 100644 (file)
@@ -1,5 +1,5 @@
 # Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
 # SPDX-License-Identifier: AGPL-3.0-only
 
 defmodule Pleroma.Web.MastodonAPI.SearchController do
@@ -8,7 +8,9 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
   alias Pleroma.Repo
   alias Pleroma.User
   alias Pleroma.Web.ControllerHelper
+  alias Pleroma.Web.Endpoint
   alias Pleroma.Web.MastodonAPI.AccountView
+  alias Pleroma.Web.MastodonAPI.StatusView
   alias Pleroma.Web.Plugs.OAuthScopesPlug
   alias Pleroma.Web.Plugs.RateLimiter
 
@@ -42,13 +44,34 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
   def search2(conn, params), do: do_search(:v2, conn, params)
   def search(conn, params), do: do_search(:v1, conn, params)
 
-  defp do_search(version, %{assigns: %{user: user}} = conn, params) do
-    options =
-      search_options(params, user)
-      |> Keyword.put(:version, version)
-
-    search_provider = Pleroma.Config.get([:search, :provider])
-    json(conn, search_provider.search(conn, params, options))
+  defp do_search(version, %{assigns: %{user: user}} = conn, %{q: query} = params) do
+    query = String.trim(query)
+    options = search_options(params, user)
+    timeout = Keyword.get(Repo.config(), :timeout, 15_000)
+    default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
+
+    result =
+      default_values
+      |> Enum.map(fn {resource, default_value} ->
+        if params[:type] in [nil, resource] do
+          {resource, fn -> resource_search(version, resource, query, options) end}
+        else
+          {resource, fn -> default_value end}
+        end
+      end)
+      |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
+        timeout: timeout,
+        on_timeout: :kill_task
+      )
+      |> Enum.reduce(default_values, fn
+        {:ok, {resource, result}}, acc ->
+          Map.put(acc, resource, result)
+
+        _error, acc ->
+          acc
+      end)
+
+    json(conn, result)
   end
 
   defp search_options(params, user) do
diff --git a/mix.exs b/mix.exs
index 564db2d75bf88d339cc007acd029c7ebfdecd71f..558e71262fe67fa1a1ccd868399998ae83055055 100644 (file)
--- a/mix.exs
+++ b/mix.exs
@@ -203,6 +203,7 @@ defmodule Pleroma.Mixfile do
       {:nimble_parsec, "~> 1.0", override: true},
       {:phoenix_live_dashboard, "~> 0.6.2"},
       {:ecto_psql_extras, "~> 0.6"},
+      {:elasticsearch, "~> 1.0.0"},
 
       # indirect dependency version override
       {:plug, "~> 1.10.4", override: true},
index e476fd59f5ea10443c4f3ed80a06bff16ef67981..052633496a08f96ca086f09deed869c504bf2b94 100644 (file)
@@ -1,20 +1,22 @@
 {
-  "properties": {
-    "_timestamp": {
-      "type": "date",
-      "index": true
-    },
-    "instance": {
-      "type": "keyword"
-    },
-    "content": {
-      "type": "text"
-    },
-    "hashtags": {
-      "type": "keyword"
-    },
-    "user": {
-      "type": "text"
+  "mappings": {
+    "properties": {
+      "_timestamp": {
+        "type": "date",
+        "index": true
+      },
+      "instance": {
+        "type": "keyword"
+      },
+      "content": {
+        "type": "text"
+      },
+      "hashtags": {
+        "type": "keyword"
+      },
+      "user": {
+        "type": "text"
+      }
     }
   }
 }
index 30fd5651b43a0039a388d111ce7deec455a8e415..e606fa3d11fbc3353ab797b29203f2e41d50b0aa 100644 (file)
@@ -28,7 +28,6 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
       SideEffectsMock
       |> expect(:handle, fn o, m -> {:ok, o, m} end)
       |> expect(:handle_after_transaction, fn m -> m end)
-      |> expect(:handle_after_transaction, fn m -> m end)
 
       :ok
     end