Merge remote-tracking branch 'origin/feature/elasticsearch' into develop
[akkoma] / lib / pleroma / elasticsearch / store.ex
index 74c9330387e517603e636e89abe95726e78a9e88..7f6ef37a927ad46dcbbbe4e36761c5b66e1b0e54 100644 (file)
@@ -1,24 +1,33 @@
 defmodule Pleroma.Elasticsearch do
   alias Pleroma.Activity
+  alias Pleroma.User
+  alias Pleroma.Object
   alias Pleroma.Elasticsearch.DocumentMappings
   alias Pleroma.Config
+  require Logger
 
   defp url do
     Config.get([:elasticsearch, :url])
   end
 
-  def put_by_id(id) do
+  defp enabled? do
+    Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
+  end
+
+  def put_by_id(:activity, id) do
     id
     |> Activity.get_by_id_with_object()
     |> maybe_put_into_elasticsearch()
   end
 
-  def maybe_put_into_elasticsearch({:ok, activity}) do
-    maybe_put_into_elasticsearch(activity)
+  def maybe_put_into_elasticsearch({:ok, item}) do
+    maybe_put_into_elasticsearch(item)
   end
 
-  def maybe_put_into_elasticsearch(%{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity) do
-    if Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch do
+  def maybe_put_into_elasticsearch(
+        %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
+      ) do
+    if enabled?() do
       actor = Pleroma.Activity.user_actor(activity)
 
       activity
@@ -27,27 +36,59 @@ defmodule Pleroma.Elasticsearch do
     end
   end
 
+  def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
+    if enabled?() do
+      put(user)
+    end
+  end
+
   def maybe_put_into_elasticsearch(_) do
     {:ok, :skipped}
   end
 
+  def maybe_bulk_post(data, type) do
+    if enabled?() do
+      bulk_post(data, type)
+    end
+  end
+
   def put(%Activity{} = activity) do
-    Elastix.Document.index(
-      url(),
-      "activities",
-      "activity",
-      DocumentMappings.Activity.id(activity),
-      DocumentMappings.Activity.encode(activity)
-    )
+    {:ok, _} =
+      Elastix.Document.index(
+        url(),
+        "activities",
+        "activity",
+        DocumentMappings.Activity.id(activity),
+        DocumentMappings.Activity.encode(activity)
+      )
+
+    activity
+    |> Map.get(:object)
+    |> Object.hashtags()
+    |> Enum.map(fn x -> %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())} end)
+    |> bulk_post(:hashtags)
+  end
+
+  def put(%User{} = user) do
+    {:ok, _} =
+      Elastix.Document.index(
+        url(),
+        "users",
+        "user",
+        DocumentMappings.User.id(user),
+        DocumentMappings.User.encode(user)
+      )
   end
 
   def bulk_post(data, :activities) do
     d =
       data
       |> Enum.filter(fn x ->
-        t = x.object
-        |> Map.get(:data, %{})
-        |> Map.get("type", "")
+        t =
+          x.object
+          |> Map.get(:data, %{})
+          |> Map.get("type", "")
+
         t == "Note"
       end)
       |> Enum.map(fn d ->
@@ -58,17 +99,20 @@ defmodule Pleroma.Elasticsearch do
       end)
       |> List.flatten()
 
-    Elastix.Bulk.post(
-      url(),
-      d,
-      index: "activities",
-      type: "activity"
-    )
+    {:ok, %{body: %{"errors" => false}}} =
+      Elastix.Bulk.post(
+        url(),
+        d,
+        index: "activities",
+        type: "activity"
+      )
   end
 
+
   def bulk_post(data, :users) do
     d =
       data
+      |> Enum.filter(fn x -> x.actor_type == "Person" end)
       |> Enum.map(fn d ->
         [
           %{index: %{_id: DocumentMappings.User.id(d)}},
@@ -85,7 +129,7 @@ defmodule Pleroma.Elasticsearch do
     )
   end
 
-  def bulk_post(data, :hashtags) do
+  def bulk_post(data, :hashtags) when is_list(data) do
     d =
       data
       |> Enum.map(fn d ->
@@ -104,12 +148,59 @@ defmodule Pleroma.Elasticsearch do
     )
   end
 
-  def search_activities(q) do
-    Elastix.Search.search(
-      url(),
-      "activities",
-      ["activity"],
-      q
-    )
+  def bulk_post(_, :hashtags), do: {:ok, nil}
+
+  def search(_, _, _, :skip), do: []
+
+  def search(:raw, index, type, q) do
+    with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
+      results =
+        raw_results
+        |> Map.get(:body, %{})
+        |> Map.get("hits", %{})
+        |> Map.get("hits", [])
+
+      {:ok, results}
+    else
+      {:error, e} ->
+        Logger.error(e)
+        {:error, e}
+    end
+  end
+
+  def search(:activities, q) do
+    with {:ok, results} <- search(:raw, "activities", "activity", q) do
+      results
+      |> Enum.map(fn result -> result["_id"] end)
+      |> Pleroma.Activity.all_by_ids_with_object()
+      |> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
+    else
+      e ->
+        Logger.error(e)
+        []
+    end
+  end
+
+  def search(:users, q) do
+    with {:ok, results} <- search(:raw, "users", "user", q) do
+      results
+      |> Enum.map(fn result -> result["_id"] end)
+      |> Pleroma.User.get_all_by_ids()
+    else
+      e ->
+        Logger.error(e)
+        []
+    end
+  end
+
+  def search(:hashtags, q) do
+    with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
+      results
+      |> Enum.map(fn result -> result["_source"]["hashtag"] end)
+    else
+      e ->
+        Logger.error(e)
+        []
+    end
   end
 end