add first-in-first-out queue for image prompts, and add pagination

This commit is contained in:
Silas 2022-08-31 15:45:06 -04:00
parent 559f6f959f
commit 7990246675
Signed by: silentsilas
GPG Key ID: 4199EFB7DAA34349
27 changed files with 236 additions and 143 deletions

View File

@ -12,9 +12,10 @@ config :diffuser,
# Configures the endpoint
config :diffuser, DiffuserWeb.Endpoint,
url: [host: "localhost"],
url: [host: "https://ai.silentsilas.com"],
render_errors: [view: DiffuserWeb.ErrorView, accepts: ~w(html json), layout: false],
pubsub_server: Diffuser.PubSub,
check_origin: false,
live_view: [signing_salt: "mxn2AV/s"]
# Configures the mailer
@ -51,7 +52,7 @@ config :phoenix, :json_library, Jason
config :waffle,
storage: Waffle.Storage.Local,
# or {:system, "ASSET_HOST"}
asset_host: "http://localhost:4000"
asset_host: "https://ai.silentsilas.com"
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.

View File

@ -19,7 +19,7 @@ config :diffuser, Diffuser.Repo,
config :diffuser, DiffuserWeb.Endpoint,
# Binding to loopback ipv4 address prevents access from other machines.
# Change to `ip: {0, 0, 0, 0}` to allow access from other machines.
http: [ip: {127, 0, 0, 1}, port: 4000],
http: [ip: {0, 0, 0, 0}, port: 4000],
check_origin: false,
code_reloader: true,
debug_errors: true,

View File

@ -18,7 +18,7 @@ defmodule Diffuser.Application do
DiffuserWeb.Endpoint,
# Start a worker by calling: Diffuser.Worker.start_link(arg)
# {Diffuser.Worker, arg}
{Task.Supervisor, name: Diffuser.Generator.PromptRequestSupervisor}
Diffuser.Generator.PromptRequestQueue
]
# See https://hexdocs.pm/elixir/Supervisor.html

View File

@ -36,7 +36,7 @@ defmodule Diffuser.Generator do
"""
def get_prompt_request!(id),
do: Repo.one!(from pr in PromptRequest, where: pr.id == ^id, preload: [:images])
do: Repo.one!(from pr in PromptRequest, where: pr.id == ^id) |> Repo.preload(:images)
@doc """
Creates a prompt_request.
@ -122,4 +122,10 @@ defmodule Diffuser.Generator do
})
end)
end
def paginate_prompt_requests(params) do
PromptRequest
|> preload(:images)
|> Repo.paginate(params)
end
end

View File

@ -8,6 +8,8 @@ defmodule Diffuser.Generator.PromptRequest do
schema "prompt_requests" do
field :prompt, :string
field :status, :string, default: "queued"
field :steps, :integer
field :guidance_scale, :float
has_many :images, PromptRequestResult, on_delete: :delete_all
@ -17,7 +19,7 @@ defmodule Diffuser.Generator.PromptRequest do
@doc false
def changeset(prompt_request, attrs) do
prompt_request
|> cast(attrs, [:prompt, :status])
|> cast(attrs, [:prompt, :status, :steps, :guidance_scale])
|> validate_required([:prompt])
end
end

View File

@ -1,98 +0,0 @@
defmodule Diffuser.Generator.PromptRequestGenserver do
use GenServer
alias Diffuser.Generator
alias Diffuser.Generator.PromptRequest
alias DiffuserWeb.Endpoint
alias Diffuser.PythonHelper, as: Helper
@path 'lib/diffuser/python'
def new(%{prompt_request: %PromptRequest{} = prompt_request}) do
GenServer.start_link(
__MODULE__,
%{prompt_request: prompt_request},
name: name_for(prompt_request)
)
end
def name_for(%PromptRequest{id: prompt_request_id}),
do: {:global, "prompt_request:#{prompt_request_id}"}
def init(%{prompt_request: %PromptRequest{} = prompt_request}) do
send(self(), :start_prompt)
{:ok,
%{
prompt_request: prompt_request
}}
end
def handle_info(:start_prompt, %{prompt_request: prompt_request} = state) do
with {:ok, %{prompt: prompt} = active_prompt} <-
update_and_broadcast_progress(prompt_request, "in_progress"),
:ok <- call_python(:test_script, :test_func, prompt),
%PromptRequest{} = prompt_request_with_results <- write_and_save_images(active_prompt),
{:ok, completed_prompt} <-
update_and_broadcast_progress(prompt_request_with_results, "finished") do
IO.inspect(completed_prompt)
{:noreply, state}
else
nil ->
raise("prompt not found")
{:error, message} ->
raise(message)
end
end
defp update_and_broadcast_progress(%PromptRequest{id: id} = prompt_request, new_status) do
{:ok, new_prompt} = Generator.update_prompt_request(prompt_request, %{status: new_status})
:ok = Endpoint.broadcast("request:#{id}", "request", %{prompt_request: new_prompt})
{:ok, new_prompt}
end
defp call_python(_module, _func, prompt) do
Port.open(
{:spawn, "python #{@path}/stable_diffusion.py --prompt #{prompt}"},
[:binary, {:packet, 4}]
)
# TODO: We will want to flush, and get the image data from the script
# then write it to PromptResult
# pid = Helper.py_instance(Path.absname(@path))
# :python.call(pid, module, func, args)
# pid
# |> :python.stop()
:ok
end
defp write_and_save_images(%PromptRequest{id: id, prompt: prompt}) do
height = :rand.uniform(512)
width = :rand.uniform(512)
IO.inspect(height)
{:ok, resp} =
:httpc.request(
:get,
{'http://placekitten.com/#{height}/#{width}', []},
[],
body_format: :binary
)
{{_, 200, 'OK'}, _headers, body} = resp
Generator.create_prompt_request_results(id, [
%{
file_name: "#{prompt}.jpg",
filename: "#{prompt}.jpg",
binary: body
}
])
Generator.get_prompt_request!(id)
end
end

View File

@ -0,0 +1,50 @@
defmodule Diffuser.Generator.PromptRequestQueue do
use GenServer
alias Diffuser.Generator.PromptRequestWorker
### GenServer API
@doc """
GenServer.init/1 callback
"""
def init(state) do
{:ok, state}
end
@doc """
GenServer.handle_call/3 callback
"""
def handle_call(:dequeue, _from, [value | state]) do
{:reply, value, state}
end
def handle_call(:dequeue, _from, []), do: {:reply, nil, []}
def handle_call(:queue, _from, state), do: {:reply, state, state}
@doc """
GenServer.handle_cast/2 callback
"""
def handle_cast({:enqueue, value}, state) do
{:noreply, state, {:continue, {:enqueue, value}}}
end
def handle_continue({:enqueue, value}, state) when length(state) > 0 do
{:continue, {:enqueue, value}, state}
end
def handle_continue({:enqueue, value}, state) do
PromptRequestWorker.start(value)
{:noreply, state}
end
### Client API / Helper functions
def start_link(state \\ []) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def queue, do: GenServer.call(__MODULE__, :queue)
def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})
def dequeue, do: GenServer.call(__MODULE__, :dequeue)
end

View File

@ -1,27 +0,0 @@
defmodule Diffuser.Generator.PromptRequestSupervisor do
use DynamicSupervisor
alias Diffuser.Generator.PromptRequest
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start_prompt_request(%PromptRequest{} = prompt_request) do
Task.Supervisor.start_child(
__MODULE__,
Diffuser.Generator.PromptRequestGenserver,
:new,
[
%{
prompt_request: prompt_request
}
],
restart: :transient
)
end
end

View File

@ -0,0 +1,85 @@
defmodule Diffuser.Generator.PromptRequestWorker do
alias Diffuser.Generator
alias Diffuser.Generator.PromptRequest
alias DiffuserWeb.Endpoint
alias Diffuser.Repo
@path [:code.priv_dir(:diffuser), "python"] |> Path.join()
@steps 10
@guidance_scale 7.5
def start(%PromptRequest{} = prompt_request) do
with {:ok, active_prompt} <-
update_and_broadcast_progress(prompt_request, "in_progress"),
{:ok, _file_location} <- call_python(:test_script, :test_func, active_prompt),
%PromptRequest{} = prompt_request_with_results <-
write_and_save_images(active_prompt),
{:ok, completed_prompt} <-
update_and_broadcast_progress(prompt_request_with_results, "finished") do
{:ok, completed_prompt |> Repo.preload(:images)}
else
nil ->
raise("prompt not found")
{:error, message} ->
raise(message)
end
end
defp update_and_broadcast_progress(%PromptRequest{id: id} = prompt_request, new_status) do
{:ok, new_prompt} =
Generator.update_prompt_request(prompt_request, %{
status: new_status,
steps: @steps,
guidance_scale: @guidance_scale
})
:ok = Endpoint.broadcast("request:#{id}", "request", %{prompt_request: new_prompt})
{:ok, new_prompt}
end
defp call_python(_module, _func, %{id: prompt_id, prompt: prompt}) do
port =
Port.open(
{:spawn,
~s(python #{@path}/stable_diffusion.py --prompt "#{prompt}" --output "#{@path}/#{prompt_id}.png" --num-inference-steps #{@steps})},
[:binary]
)
python_loop(port, prompt_id)
end
defp python_loop(port, prompt_id) do
receive do
{^port, {:data, ":finished" <> msg}} ->
{:ok, msg}
{^port, {:data, ":step" <> step}} ->
Endpoint.broadcast("request:#{prompt_id}", "progress", step)
python_loop(port, prompt_id)
{^port, result} ->
IO.inspect(result, label: "RESULT")
python_loop(port, prompt_id)
end
end
defp write_and_save_images(%PromptRequest{id: id}) do
file_path = "#{@path}/#{id}.png"
with {:ok, body} <- File.read(file_path),
{:ok, _result} <-
Generator.create_prompt_request_result(
id,
%{
file_name: "#{id}.png",
filename: "#{id}.png",
binary: body
}
),
:ok <- File.rm(file_path) do
Generator.get_prompt_request!(id)
end
end
end

View File

@ -2,4 +2,6 @@ defmodule Diffuser.Repo do
use Ecto.Repo,
otp_app: :diffuser,
adapter: Ecto.Adapters.Postgres
use Scrivener, page_size: 1
end

View File

@ -42,9 +42,9 @@ defmodule DiffuserWeb.PromptRequestLive.FormComponent do
end
defp save_prompt_request(socket, :new, prompt_request_params) do
with {:ok, prompt_request} <- Generator.create_prompt_request(prompt_request_params),
{:ok, _pid} <-
Diffuser.Generator.PromptRequestSupervisor.start_prompt_request(prompt_request) do
with {:ok, prompt_request} <- Generator.create_prompt_request(prompt_request_params) do
Diffuser.Generator.PromptRequestQueue.enqueue(prompt_request)
{:noreply,
socket
|> put_flash(:info, "Prompt request created successfully")

View File

@ -6,12 +6,14 @@ defmodule DiffuserWeb.PromptRequestLive.Index do
@impl true
def mount(_params, _session, socket) do
{:ok, assign(socket, :prompt_requests, list_prompt_requests())}
{:ok, socket}
end
@impl true
def handle_params(params, _url, socket) do
{:noreply, apply_action(socket, socket.assigns.live_action, params)}
page = list_prompt_requests(params)
socket = socket |> apply_action(socket.assigns.live_action, params) |> assign(:page, page)
{:noreply, socket}
end
defp apply_action(socket, :edit, %{"id" => id}) do
@ -37,10 +39,10 @@ defmodule DiffuserWeb.PromptRequestLive.Index do
prompt_request = Generator.get_prompt_request!(id)
{:ok, _} = Generator.delete_prompt_request(prompt_request)
{:noreply, assign(socket, :prompt_requests, list_prompt_requests())}
{:noreply, assign(socket, :page, list_prompt_requests(%{"page" => "2"}))}
end
defp list_prompt_requests do
Generator.list_prompt_requests()
defp list_prompt_requests(params) do
Generator.paginate_prompt_requests(params)
end
end

View File

@ -15,14 +15,20 @@
<table>
<thead>
<tr>
<th>Image</th>
<th>Prompt</th>
<th></th>
</tr>
</thead>
<tbody id="prompt_requests">
<%= for prompt_request <- @prompt_requests do %>
<%= for prompt_request <- @page.entries do %>
<tr id={"prompt_request-#{prompt_request.id}"}>
<td>
<%= for result <- prompt_request.images do %>
<img src={"#{Diffuser.Uploaders.Image.url({result.image, result})}"} />
<% end %>
</td>
<td><%= prompt_request.prompt %></td>
<td>
@ -35,4 +41,18 @@
</tbody>
</table>
<div class="pagination">
<%= if @page.page_number > 1 do %>
<%= live_patch "<< Prev Page",
to: Routes.prompt_request_index_path(@socket, :index, page: @page.page_number - 1),
class: "pagination-link" %>
<% end %>
<%= if @page.page_number < @page.total_pages do %>
<%= live_patch "Next Page >>",
to: Routes.prompt_request_index_path(@socket, :index, page: @page.page_number + 1),
class: "pagination-link" %>
<% end %>
</div>
<span><%= live_patch "New Prompt request", to: Routes.prompt_request_index_path(@socket, :new) %></span>

View File

@ -7,12 +7,16 @@ defmodule DiffuserWeb.PromptRequestLive.Show do
alias Diffuser.Repo
alias Phoenix.Socket.Broadcast
alias DiffuserWeb.Endpoint
alias Ecto.Association.NotLoaded
@impl true
def mount(%{"id" => id}, _session, socket) do
Endpoint.subscribe("request:#{id}")
{:ok, socket |> assign(:prompt_request, Generator.get_prompt_request!(id))}
{:ok,
socket
|> assign(:prompt_request, Generator.get_prompt_request!(id))
|> assign(:prompt_request_progress, 0)}
end
@impl true
@ -38,6 +42,21 @@ defmodule DiffuserWeb.PromptRequestLive.Show do
do:
{:noreply,
assign(socket, %{
prompt_request: prompt_request
prompt_request: prompt_request |> Repo.preload(:images)
})}
@impl true
def handle_info(
%Broadcast{
topic: _,
event: "progress",
payload: progress
},
socket
),
do:
{:noreply,
assign(socket, %{
prompt_request_progress: progress
})}
end

View File

@ -23,6 +23,14 @@
<strong>Status:</strong>
<%= @prompt_request.status %>
</li>
<%= if @prompt_request_progress > 0 do %>
<li><strong>Progress:</strong>
<%= @prompt_request_progress %> / <%= @prompt_request.steps %> steps
</li>
<% end %>
<li><strong>Guidance Scale:</strong>
<%= @prompt_request.guidance_scale %>
</li>
<li>
<strong>Images:</strong>
<%= if @prompt_request.images do %>

View File

@ -52,7 +52,8 @@ defmodule Diffuser.MixProject do
# non phoenix default dependencies
{:erlport, "~> 0.10.1"},
{:waffle, "~> 1.1"},
{:waffle_ecto, "~> 0.0.11"}
{:waffle_ecto, "~> 0.0.11"},
{:scrivener_ecto, "~> 2.7"}
]
end

View File

@ -35,6 +35,8 @@
"plug_crypto": {:hex, :plug_crypto, "1.2.3", "8f77d13aeb32bfd9e654cb68f0af517b371fb34c56c9f2b58fe3df1235c1251a", [:mix], [], "hexpm", "b5672099c6ad5c202c45f5a403f21a3411247f164e4a8fab056e5cd8a290f4a2"},
"postgrex": {:hex, :postgrex, "0.16.4", "26d998467b4a22252285e728a29d341e08403d084e44674784975bb1cd00d2cb", [:mix], [{:connection, "~> 1.1", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "3234d1a70cb7b1e0c95d2e242785ec2a7a94a092bbcef4472320b950cfd64c5f"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"scrivener": {:hex, :scrivener, "2.7.2", "1d913c965ec352650a7f864ad7fd8d80462f76a32f33d57d1e48bc5e9d40aba2", [:mix], [], "hexpm", "7866a0ec4d40274efbee1db8bead13a995ea4926ecd8203345af8f90d2b620d9"},
"scrivener_ecto": {:hex, :scrivener_ecto, "2.7.0", "cf64b8cb8a96cd131cdbcecf64e7fd395e21aaa1cb0236c42a7c2e34b0dca580", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:scrivener, "~> 2.4", [hex: :scrivener, repo: "hexpm", optional: false]}], "hexpm", "e809f171687806b0031129034352f5ae44849720c48dd839200adeaf0ac3e260"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"swoosh": {:hex, :swoosh, "1.7.4", "f967d9b2659e81bab241b96267aae1001d35c2beea2df9c03dcf47b007bf566f", [:mix], [{:cowboy, "~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: true]}, {:finch, "~> 0.6", [hex: :finch, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13 or ~> 1.0", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1553d994b4cf069162965e63de1e1c53d8236e127118d21e56ce2abeaa3f25b4"},
"telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"},

View File

Before

Width:  |  Height:  |  Size: 484 KiB

After

Width:  |  Height:  |  Size: 484 KiB

View File

@ -2,6 +2,7 @@ import argparse
import os
import inspect
import numpy as np
import sys
# openvino
from openvino.runtime import Core
# tokenizer
@ -121,6 +122,9 @@ class StableDiffusion:
else:
latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs)["prev_sample"]
sys.stdout.write(":step" + str(i + 1))
sys.stdout.flush()
image = result(self.vae.infer_new_request({
"latents": np.expand_dims(latents, 0)
}))
@ -128,6 +132,7 @@ class StableDiffusion:
# convert tensor to opencv's image format
image = (image / 2 + 0.5).clip(0, 1)
image = (image[0].transpose(1, 2, 0)[:, :, ::-1] * 255).astype(np.uint8)
return image
def main(args):
@ -150,6 +155,11 @@ def main(args):
)
cv2.imwrite(args.output, image)
sys.stdout.write(":finished" + args.output)
sys.stdout.flush()
return image
if __name__ == "__main__":
parser = argparse.ArgumentParser()

View File

@ -0,0 +1,10 @@
defmodule Diffuser.Repo.Migrations.AddStepsAndGuidanceScaleToPromptRequest do
use Ecto.Migration
def change do
alter table(:prompt_requests) do
add :steps, :integer
add :guidance_scale, :float
end
end
end