Ele criará uma nova pasta chamada priv/repo/migrations que será onde nossas migrações irão viver. Também criará o arquivo de nosso migração. Vamos abrir ela. No meu caso é o arquivo priv/repo/migrations/20240415200026_add_oban_jobs_table.exs, mas para você terá um prefixo diferente, dependendo da data de criação.
Primeiro vamos mudar de change/0 para up/0, fazemos isso porque queremos que tenha o up/0 e o down/0. up/0 para rodar a migração e down/0 para fazer o rollback caso necessário. Na função up/0 iremos adicionar a configuração por meio de uma função e no down/0, fazermos o downgrade para a versão 1 zerando as dependência.
Precisamos agora rodar as migrações pendentes usando o comando do Ecto
mix ecto.migrate
> mix ecto.migrate
17:07:47.213 [info] == Running 20240415200026 CoffeeShop.Repo.Migrations.AddObanJobsTable.up/0 forward
17:07:47.243 [info] execute "DO $$\nBEGIN\nIF NOT EXISTS (SELECT 1 FROM pg_type\n WHERE typname = 'oban_job_state'\n AND typnamespace = 'public'::regnamespace::oid) THEN\n CREATE TYPE \"public\".oban_job_state AS ENUM (\n 'available',\n 'scheduled',\n 'executing',\n 'retryable',\n 'completed',\n 'discarded'\n );\n END IF;\nEND$$;\n"
17:07:47.249 [info] create table if not exists public.oban_jobs
17:07:47.259 [info] create index if not exists public.oban_jobs_queue_index
17:07:47.263 [info] create index if not exists public.oban_jobs_state_index
17:07:47.267 [info] create index if not exists public.oban_jobs_scheduled_at_index
17:07:47.272 [info] execute "CREATE OR REPLACE FUNCTION \"public\".oban_jobs_notify() RETURNS trigger AS $$\nDECLARE\n channel text;\n notice json;\nBEGIN\n IF (TG_OP = 'INSERT') THEN\n channel = 'public.oban_insert';\n notice = json_build_object('queue', NEW.queue, 'state', NEW.state);\n\n -- No point triggering for a job that isn't scheduled to run now\n IF NEW.scheduled_at IS NOT NULL AND NEW.scheduled_at > now() AT TIME ZONE 'utc' THEN\n RETURN null;\n END IF;\n ELSE\n channel = 'public.oban_update';\n notice = json_build_object('queue', NEW.queue, 'new_state', NEW.state, 'old_state', OLD.state);\n END IF;\n\n PERFORM pg_notify(channel, notice::text);\n\n RETURN NULL;\nEND;\n$$ LANGUAGE plpgsql;\n"
17:07:47.274 [info] execute "DROP TRIGGER IF EXISTS oban_notify ON \"public\".oban_jobs"
17:07:47.276 [info] trigger "oban_notify" for relation "public.oban_jobs" does not exist, skipping
17:07:47.276 [info] execute "CREATE TRIGGER oban_notify\nAFTER INSERT OR UPDATE OF state ON \"public\".oban_jobs\nFOR EACH ROW EXECUTE PROCEDURE \"public\".oban_jobs_notify();\n"
17:07:47.279 [info] drop index if exists public.oban_jobs_scheduled_at_index
17:07:47.281 [info] create index public.oban_jobs_scheduled_at_index
17:07:47.285 [info] create check constraint worker_length on table public.oban_jobs
17:07:47.287 [info] create check constraint queue_length on table public.oban_jobs
17:07:47.289 [info] execute "CREATE OR REPLACE FUNCTION \"public\".oban_wrap_id(value bigint) RETURNS int AS $$\nBEGIN\n RETURN (CASE WHEN value > 2147483647 THEN mod(value, 2147483647) ELSE value END)::int;\nEND;\n$$ LANGUAGE plpgsql IMMUTABLE;\n"
17:07:47.291 [info] alter table public.oban_jobs
17:07:47.293 [info] execute "DROP FUNCTION IF EXISTS \"public\".oban_wrap_id(value bigint)"
17:07:47.294 [info] drop index if exists public.oban_jobs_scheduled_at_index
17:07:47.296 [info] drop index if exists public.oban_jobs_queue_index
17:07:47.297 [info] drop index if exists public.oban_jobs_state_index
17:07:47.299 [info] create index if not exists public.oban_jobs_queue_state_scheduled_at_id_index
17:07:47.303 [info] create index if not exists public.oban_jobs_attempted_at_id_index
17:07:47.307 [info] alter table public.oban_jobs
17:07:47.309 [info] alter table public.oban_jobs
17:07:47.312 [info] drop index if exists public.oban_jobs_queue_state_scheduled_at_id_index
17:07:47.314 [info] create index if not exists public.oban_jobs_state_queue_priority_scheduled_at_id_index
17:07:47.318 [info] execute "CREATE OR REPLACE FUNCTION \"public\".oban_jobs_notify() RETURNS trigger AS $$\nDECLARE\n channel text;\n notice json;\nBEGIN\n IF NEW.state = 'available' THEN\n channel = 'public.oban_insert';\n notice = json_build_object('queue', NEW.queue);\n\n PERFORM pg_notify(channel, notice::text);\n END IF;\n\n RETURN NULL;\nEND;\n$$ LANGUAGE plpgsql;\n"
17:07:47.320 [info] execute "DROP TRIGGER IF EXISTS oban_notify ON \"public\".oban_jobs"
17:07:47.322 [info] execute "CREATE TRIGGER oban_notify\nAFTER INSERT ON \"public\".oban_jobs\nFOR EACH ROW EXECUTE PROCEDURE \"public\".oban_jobs_notify();\n"
17:07:47.323 [info] alter table public.oban_jobs
17:07:47.331 [info] execute "DO $$\nDECLARE\n version int;\n already bool;\nBEGIN\n SELECT current_setting('server_version_num')::int INTO version;\n SELECT '{cancelled}' <@ enum_range(NULL::\"public\".oban_job_state)::text[] INTO already;\n\n IF already THEN\n RETURN;\n ELSIF version >= 120000 THEN\n ALTER TYPE \"public\".oban_job_state ADD VALUE IF NOT EXISTS 'cancelled';\n ELSE\n ALTER TYPE \"public\".oban_job_state RENAME TO old_oban_job_state;\n\n CREATE TYPE \"public\".oban_job_state AS ENUM (\n 'available',\n 'scheduled',\n 'executing',\n 'retryable',\n 'completed',\n 'discarded',\n 'cancelled'\n );\n\n ALTER TABLE \"public\".oban_jobs RENAME column state TO _state;\n ALTER TABLE \"public\".oban_jobs ADD state \"public\".oban_job_state NOT NULL default 'available';\n\n UPDATE \"public\".oban_jobs SET state = _state::text::\"public\".oban_job_state;\n\n ALTER TABLE \"public\".oban_jobs DROP column _state;\n DROP TYPE \"public\".old_oban_job_state;\n END IF;\nEND$$;\n"
17:07:47.333 [info] create index if not exists public.oban_jobs_state_queue_priority_scheduled_at_id_index
17:07:47.335 [info] relation "oban_jobs_state_queue_priority_scheduled_at_id_index" already exists, skipping
17:07:47.335 [info] alter table public.oban_jobs
17:07:47.340 [info] create check constraint priority_range on table public.oban_jobs
17:07:47.341 [info] create check constraint positive_max_attempts on table public.oban_jobs
17:07:47.343 [info] create check constraint attempt_range on table public.oban_jobs
17:07:47.345 [info] drop index if exists public.oban_jobs_args_vector
17:07:47.346 [info] index "oban_jobs_args_vector" does not exist, skipping
17:07:47.346 [info] drop index if exists public.oban_jobs_worker_gist
17:07:47.348 [info] index "oban_jobs_worker_gist" does not exist, skipping
17:07:47.348 [info] drop index if exists public.oban_jobs_attempted_at_id_index
17:07:47.349 [info] create index if not exists public.oban_jobs_args_index
17:07:47.351 [info] create index if not exists public.oban_jobs_meta_index
17:07:47.353 [info] create table if not exists public.oban_peers
17:07:47.360 [info] execute "ALTER TABLE \"public\".oban_peers SET UNLOGGED"
17:07:47.371 [info] drop constraint priority_range from table public.oban_jobs
17:07:47.373 [info] create check constraint non_negative_priority on table public.oban_jobs
17:07:47.374 [info] execute "DROP TRIGGER IF EXISTS oban_notify ON \"public\".oban_jobs"
17:07:47.376 [info] execute "DROP FUNCTION IF EXISTS \"public\".oban_jobs_notify()"
17:07:47.377 [info] execute "COMMENT ON TABLE \"public\".oban_jobs IS '12'"
17:07:47.381 [info] == Migrated 20240415200026 in 0.1s
Com isso criamos todas as tabelas necessárias para rodar o Oban. Agora iremos configurar sua utilização no projeto. Abra o arquivo config/config.exs
Temos algumas configurações básicas da engine. Nela temos a quantidade por fila e o repositório que armazenaremos nossos jobs. Feito isso, pronto para produção, mas não para testes.
Imagine rodar o Oban normalmente e o test ficar preso por horas por causa de um job que tem duração longa. Precisamos criar um tipo de mock para isso. O próprio Oban nos da esse super poder, precisando apenas so configurar ele no ambiente de teste. Para isso, vamos criar um arquivo em config/test.exs
config/test.exs
config :coffee_shop,Oban, testing: :inline
A hierarquia de configurações ficará
Obtem as configuraçĩoes de config/config.ex
Estamos em ambietne de test, pegue as configurações de test e o que tiver de igual o de test mantem, sobrepondo o de config/config.exs.
Finalmente precisamos adicionar o Oban a nossa arvore de supervisão, da mesma forma que adicionamos nosso CoffeeShope.Repo. Vamos em lib/coffee_shop/application.ex e colocar um novo filho a supervisão.
lib/coffee_shop/application.ex
defmoduleCoffeeShop.ApplicationdouseApplicationdefstart(_type,_args) do children = [CoffeeShop.Repo, {Oban,Application.fetch_env!(:coffee_shop,Oban)} ] opts = [strategy: :one_for_one, name: CoffeeShop.Supervisor]Supervisor.start_link(children, opts)endend
Utilizamos Application.fetch_env!/2 para obter a configuração dos arquivos de configuração.
Pronto. Para conseguir ver se tudo deu certo basta acessar o terminal iterativo e rodar Oban.config: