Instalando Oban

A instação do Oban é bem simples, basta seguir o guia. Primeiro, vamos adicionar a dependência.

mix.exs
defmodule CoffeeShop.MixProject do
  use Mix.Project

  # ...

  defp deps do
    [
      {:tesla, "~> 1.4"},
      {:bypass, "~> 2.1"},
      {:ecto_sql, "~> 3.0"},
      {:postgrex, ">= 0.0.0"},
      {:oban, "~> 2.17"}
    ]
  end
end
mix deps.get
> mix deps.get                                                                          
Resolving Hex dependencies...
Resolution completed in 0.06s
New:
  jason 1.4.1
  oban 2.17.8
Unchanged:
  bypass 2.1.0
  cowboy 2.12.0
  cowboy_telemetry 0.4.0
  cowlib 2.13.0
  db_connection 2.6.0
  decimal 2.1.1
  ecto 3.11.2
  ecto_sql 3.11.1
  mime 2.0.5
  plug 1.15.3
  plug_cowboy 2.7.1
  plug_crypto 2.0.0
  postgrex 0.17.5
  ranch 1.8.0
  telemetry 1.2.1
  tesla 1.8.0
* Getting oban (Hex package)
* Getting jason (Hex package)
You have added/upgraded packages you could sponsor, run `mix hex.sponsor` to learn more

Oban utiliza a estratégia de migrações utilizando Ecto. Para isso, precisamos criar um nova migração utilizando Ecto.

mix ecto.gen.migration add_oban_jobs_table
* creating priv/repo/migrations
* creating priv/repo/migrations/20240415200026_add_oban_jobs_table.exs

Ele criará uma nova pasta chamada priv/repo/migrations que será onde nossas migrações irão viver. Também criará o arquivo de nosso migração. Vamos abrir ela. No meu caso é o arquivo priv/repo/migrations/20240415200026_add_oban_jobs_table.exs, mas para você terá um prefixo diferente, dependendo da data de criação.

priv/repo/migrations/20240415200026_add_oban_jobs_table.exs
defmodule CoffeeShop.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration

  def change do

  end
end

Primeiro vamos mudar de change/0 para up/0, fazemos isso porque queremos que tenha o up/0 e o down/0. up/0 para rodar a migração e down/0 para fazer o rollback caso necessário. Na função up/0 iremos adicionar a configuração por meio de uma função e no down/0, fazermos o downgrade para a versão 1 zerando as dependência.

priv/repo/migrations/20240415200026_add_oban_jobs_table.exs
defmodule CoffeeShop.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration

  def up do
    Oban.Migration.up(version: 12)
  end

  def down do
    Oban.Migration.down(version: 1)
  end
end

Precisamos agora rodar as migrações pendentes usando o comando do Ecto

mix ecto.migrate
> mix ecto.migrate                          

17:07:47.213 [info] == Running 20240415200026 CoffeeShop.Repo.Migrations.AddObanJobsTable.up/0 forward

17:07:47.243 [info] execute "DO $$\nBEGIN\nIF NOT EXISTS (SELECT 1 FROM pg_type\n               WHERE typname = 'oban_job_state'\n                 AND typnamespace = 'public'::regnamespace::oid) THEN\n    CREATE TYPE \"public\".oban_job_state AS ENUM (\n      'available',\n      'scheduled',\n      'executing',\n      'retryable',\n      'completed',\n      'discarded'\n    );\n  END IF;\nEND$$;\n"

17:07:47.249 [info] create table if not exists public.oban_jobs

17:07:47.259 [info] create index if not exists public.oban_jobs_queue_index

17:07:47.263 [info] create index if not exists public.oban_jobs_state_index

17:07:47.267 [info] create index if not exists public.oban_jobs_scheduled_at_index

17:07:47.272 [info] execute "CREATE OR REPLACE FUNCTION \"public\".oban_jobs_notify() RETURNS trigger AS $$\nDECLARE\n  channel text;\n  notice json;\nBEGIN\n  IF (TG_OP = 'INSERT') THEN\n    channel = 'public.oban_insert';\n    notice = json_build_object('queue', NEW.queue, 'state', NEW.state);\n\n    -- No point triggering for a job that isn't scheduled to run now\n    IF NEW.scheduled_at IS NOT NULL AND NEW.scheduled_at > now() AT TIME ZONE 'utc' THEN\n      RETURN null;\n    END IF;\n  ELSE\n    channel = 'public.oban_update';\n    notice = json_build_object('queue', NEW.queue, 'new_state', NEW.state, 'old_state', OLD.state);\n  END IF;\n\n  PERFORM pg_notify(channel, notice::text);\n\n  RETURN NULL;\nEND;\n$$ LANGUAGE plpgsql;\n"

17:07:47.274 [info] execute "DROP TRIGGER IF EXISTS oban_notify ON \"public\".oban_jobs"

17:07:47.276 [info] trigger "oban_notify" for relation "public.oban_jobs" does not exist, skipping

17:07:47.276 [info] execute "CREATE TRIGGER oban_notify\nAFTER INSERT OR UPDATE OF state ON \"public\".oban_jobs\nFOR EACH ROW EXECUTE PROCEDURE \"public\".oban_jobs_notify();\n"

17:07:47.279 [info] drop index if exists public.oban_jobs_scheduled_at_index

17:07:47.281 [info] create index public.oban_jobs_scheduled_at_index

17:07:47.285 [info] create check constraint worker_length on table public.oban_jobs

17:07:47.287 [info] create check constraint queue_length on table public.oban_jobs

17:07:47.289 [info] execute "CREATE OR REPLACE FUNCTION \"public\".oban_wrap_id(value bigint) RETURNS int AS $$\nBEGIN\n  RETURN (CASE WHEN value > 2147483647 THEN mod(value, 2147483647) ELSE value END)::int;\nEND;\n$$ LANGUAGE plpgsql IMMUTABLE;\n"

17:07:47.291 [info] alter table public.oban_jobs

17:07:47.293 [info] execute "DROP FUNCTION IF EXISTS \"public\".oban_wrap_id(value bigint)"

17:07:47.294 [info] drop index if exists public.oban_jobs_scheduled_at_index

17:07:47.296 [info] drop index if exists public.oban_jobs_queue_index

17:07:47.297 [info] drop index if exists public.oban_jobs_state_index

17:07:47.299 [info] create index if not exists public.oban_jobs_queue_state_scheduled_at_id_index

17:07:47.303 [info] create index if not exists public.oban_jobs_attempted_at_id_index

17:07:47.307 [info] alter table public.oban_jobs

17:07:47.309 [info] alter table public.oban_jobs

17:07:47.312 [info] drop index if exists public.oban_jobs_queue_state_scheduled_at_id_index

17:07:47.314 [info] create index if not exists public.oban_jobs_state_queue_priority_scheduled_at_id_index

17:07:47.318 [info] execute "CREATE OR REPLACE FUNCTION \"public\".oban_jobs_notify() RETURNS trigger AS $$\nDECLARE\n  channel text;\n  notice json;\nBEGIN\n  IF NEW.state = 'available' THEN\n    channel = 'public.oban_insert';\n    notice = json_build_object('queue', NEW.queue);\n\n    PERFORM pg_notify(channel, notice::text);\n  END IF;\n\n  RETURN NULL;\nEND;\n$$ LANGUAGE plpgsql;\n"

17:07:47.320 [info] execute "DROP TRIGGER IF EXISTS oban_notify ON \"public\".oban_jobs"

17:07:47.322 [info] execute "CREATE TRIGGER oban_notify\nAFTER INSERT ON \"public\".oban_jobs\nFOR EACH ROW EXECUTE PROCEDURE \"public\".oban_jobs_notify();\n"

17:07:47.323 [info] alter table public.oban_jobs

17:07:47.331 [info] execute "DO $$\nDECLARE\n  version int;\n  already bool;\nBEGIN\n  SELECT current_setting('server_version_num')::int INTO version;\n  SELECT '{cancelled}' <@ enum_range(NULL::\"public\".oban_job_state)::text[] INTO already;\n\n  IF already THEN\n    RETURN;\n  ELSIF version >= 120000 THEN\n    ALTER TYPE \"public\".oban_job_state ADD VALUE IF NOT EXISTS 'cancelled';\n  ELSE\n    ALTER TYPE \"public\".oban_job_state RENAME TO old_oban_job_state;\n\n    CREATE TYPE \"public\".oban_job_state AS ENUM (\n      'available',\n      'scheduled',\n      'executing',\n      'retryable',\n      'completed',\n      'discarded',\n      'cancelled'\n    );\n\n    ALTER TABLE \"public\".oban_jobs RENAME column state TO _state;\n    ALTER TABLE \"public\".oban_jobs ADD state \"public\".oban_job_state NOT NULL default 'available';\n\n    UPDATE \"public\".oban_jobs SET state = _state::text::\"public\".oban_job_state;\n\n    ALTER TABLE \"public\".oban_jobs DROP column _state;\n    DROP TYPE \"public\".old_oban_job_state;\n  END IF;\nEND$$;\n"

17:07:47.333 [info] create index if not exists public.oban_jobs_state_queue_priority_scheduled_at_id_index

17:07:47.335 [info] relation "oban_jobs_state_queue_priority_scheduled_at_id_index" already exists, skipping

17:07:47.335 [info] alter table public.oban_jobs

17:07:47.340 [info] create check constraint priority_range on table public.oban_jobs

17:07:47.341 [info] create check constraint positive_max_attempts on table public.oban_jobs

17:07:47.343 [info] create check constraint attempt_range on table public.oban_jobs

17:07:47.345 [info] drop index if exists public.oban_jobs_args_vector

17:07:47.346 [info] index "oban_jobs_args_vector" does not exist, skipping

17:07:47.346 [info] drop index if exists public.oban_jobs_worker_gist

17:07:47.348 [info] index "oban_jobs_worker_gist" does not exist, skipping

17:07:47.348 [info] drop index if exists public.oban_jobs_attempted_at_id_index

17:07:47.349 [info] create index if not exists public.oban_jobs_args_index

17:07:47.351 [info] create index if not exists public.oban_jobs_meta_index

17:07:47.353 [info] create table if not exists public.oban_peers

17:07:47.360 [info] execute "ALTER TABLE \"public\".oban_peers SET UNLOGGED"

17:07:47.371 [info] drop constraint priority_range from table public.oban_jobs

17:07:47.373 [info] create check constraint non_negative_priority on table public.oban_jobs

17:07:47.374 [info] execute "DROP TRIGGER IF EXISTS oban_notify ON \"public\".oban_jobs"

17:07:47.376 [info] execute "DROP FUNCTION IF EXISTS \"public\".oban_jobs_notify()"

17:07:47.377 [info] execute "COMMENT ON TABLE \"public\".oban_jobs IS '12'"

17:07:47.381 [info] == Migrated 20240415200026 in 0.1s

Com isso criamos todas as tabelas necessárias para rodar o Oban. Agora iremos configurar sua utilização no projeto. Abra o arquivo config/config.exs

config/config.exs
import Config

config :coffee_shop,
  ecto_repos: [CoffeeShop.Repo]

config :coffee_shop, CoffeeShop.Repo,
  database: "coffee_shop_repo",
  username: "postgres",
  password: "postgres",
  hostname: "localhost"
  
config :coffee_shop, Oban,
  engine: Oban.Engines.Basic,
  queues: [default: 10],
  repo: CoffeeShop.Repo

Temos algumas configurações básicas da engine. Nela temos a quantidade por fila e o repositório que armazenaremos nossos jobs. Feito isso, pronto para produção, mas não para testes.

Imagine rodar o Oban normalmente e o test ficar preso por horas por causa de um job que tem duração longa. Precisamos criar um tipo de mock para isso. O próprio Oban nos da esse super poder, precisando apenas so configurar ele no ambiente de teste. Para isso, vamos criar um arquivo em config/test.exs

config/test.exs
config :coffee_shop, Oban, testing: :inline

A hierarquia de configurações ficará

  1. Obtem as configuraçĩoes de config/config.ex

  2. Estamos em ambietne de test, pegue as configurações de test e o que tiver de igual o de test mantem, sobrepondo o de config/config.exs.

Finalmente precisamos adicionar o Oban a nossa arvore de supervisão, da mesma forma que adicionamos nosso CoffeeShope.Repo. Vamos em lib/coffee_shop/application.ex e colocar um novo filho a supervisão.

lib/coffee_shop/application.ex
defmodule CoffeeShop.Application do
  use Application

  def start(_type, _args) do
    children = [
      CoffeeShop.Repo,
      {Oban, Application.fetch_env!(:coffee_shop, Oban)}
    ]

    opts = [strategy: :one_for_one, name: CoffeeShop.Supervisor]

    Supervisor.start_link(children, opts)
  end
end

Utilizamos Application.fetch_env!/2 para obter a configuração dos arquivos de configuração.

Pronto. Para conseguir ver se tudo deu certo basta acessar o terminal iterativo e rodar Oban.config:

iex -S mix
Oban.config()
iex(1)> Oban.config()
%Oban.Config{
  dispatch_cooldown: 5,
  engine: Oban.Engines.Basic,
  get_dynamic_repo: nil,
  insert_trigger: true,
  log: false,
  name: Oban,
  node: "iago-effting",
  notifier: {Oban.Notifiers.Postgres, []},
  peer: {Oban.Peers.Postgres, []},
  plugins: [],
  prefix: "public",
  queues: [default: [limit: 10]],
  repo: CoffeeShop.Repo,
  shutdown_grace_period: 15000,
  stage_interval: 1000,
  testing: :disabled
}

Ótimo, Oban a postos.

Atualizado