Criando uma requisição assíncrona

Primeiro iremos criar uma nova requisição. Utilizaremos o mesmo recurso para fins de estudo, mas a estratégia de execução será diferente.

  • Precisamos da atualização dos cafés quentes;

  • Temos a limitação de 50 requisições por dia no Rate Limit;

  • Não precisamos ter uma resposta imediata, isso pode ser atualizado quando der.

Tendo isso em mente, criaremos uma requisição que bate no mesmo endpoint dos cafés quentes, porém, não iremos esperar uma resposta imediata e sim, um :ok, nos avisando que o agendamento esta pronto.

lib/coffee_shop/integrations/coffee/client.ex
defmodule CoffeeShop.Integrations.Coffee.Client do
  alias CoffeeShop.Integrations.Coffee.Response

  # ...

  def all_hot_coffees(opts \\ []) do
    base_url = Keyword.get(opts, :base_url, base_url())

    opts
    |> new_client()
    |> Tesla.get("#{base_url}/coffee/hot")
    |> Response.build()
  end

  def all_hot_coffees_async(opts \\ []) do
    
  end

  defp base_url(), do: "https://api.sampleapis.com"
end

usei o mesmo nome da função, com adição do sufixo _assync, indicando assincronicidade. Isso quer dizer, não esperamos que isso seja feito agora e sim, de forma assincrona.

Diferente da primeira função, na segunda precisamos agendar um Job para ser executado. Para isso, precisamos primeiro do Worker que irá processar nosso Job. Vamos criar o worker com o nome HotCoffeesWorker que fara a sincronia de forma assíncrona para nos.

A estrutura do nosso modulo é bem simples, utilizaremos a macro use Oban.Worker e precisaremos implementar a função perform/1 .

lib/coffee_shop/integrations/coffee/hot_coffees_worker.ex
defmodule CoffeeShop.Integrations.Coffee.HotCoffeesWorker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(%Oban.Job{args: _args}) do
    :ok
  end
end

A função perform/1 será executada quando um job rodar. Precisamos que ela realize a chamada para o serviço externo. Para isso, vamos reaproveitar a função sincrona criada no cliente, Client.all_hot_coffees/1.

lib/coffee_shop/integrations/coffee/hot_coffees_worker.ex
defmodule CoffeeShop.Integrations.Coffee.HotCoffeesWorker do
  use Oban.Worker

  alias CoffeeShop.Integrations.Coffee.Response
  alias CoffeeShop.Integrations.Coffee.Client

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"opts" => opts}}) do
    opts = to_list(opts)
    
    case Client.all_hot_coffees(opts) do
      {:ok, %Response{status: 200, body: _body}} -> :ok
      {:ok, %Response{status: status, body: _body}} -> {:error, "status: #{status}"}
      error -> error
    end
  end
  
  defp to_list(map) do
    Enum.map(map, fn {key, value} -> {String.to_existing_atom(key), value} end)
  end
end

O Oban salva os parâmetros como map, devido a não suportar uma lista. Por isso que ao receber o opts convertemos ele novamente para lista e o processamento continua igual o original utilizanod o Keyword.

O case/1 controla o que esperamos. Nesse caso, sera apenas um sucesso quando recebermos um status 200. Fora isso, queremos que seja um erro.

Nosso worker está pronto, agora precisamos agendar a execução. Para isso utilizaremos a função Oban.insert/1 que espera um Job. Para conseguirmos o Job de forma fácil, utilizaremos a macro em nosso modulo worker e executaremos a função HotCoffeesWorker.new/1 que retorna um Job já configurado para nosso Worker. Vamos replicar isso em nosso cliente.

lib/coffee_shop/integrations/coffee/client.ex
defmodule CoffeeShop.Integrations.Coffee.Client do
  alias CoffeeShop.Integrations.Coffee.Response
  alias CoffeeShop.Integrations.Coffee.HotCoffeesWorker
  
  # ...

  def all_hot_coffees(opts \\ []) do
    base_url = Keyword.get(opts, :base_url, base_url())

    opts
    |> new_client()
    |> Tesla.get("#{base_url}/coffee/hot")
    |> Response.build()
  end

  def all_hot_coffees_async(opts \\ []) do
    opts
    |> Map.new(fn option -> option end)
    |> HotCoffeesWorker.new()
    |> Oban.insert()
  end

  defp base_url(), do: "https://api.sampleapis.com"
end

Na linha 18, convertemos a lista em Map. Como informamos que aconteceria na criação do worker.

Ao rodar a função all_hot_coffees_async/1 receberemos um {:ok, _job} de resposta e não mais a estrutura %Response{}, isso acontece porque estamos criando um novo Job e não mais realizando uma requisição para o serviço. Vamos criar um teste comprovando nossa ideia.

test/coffee_shop/integrations/coffee/client_test.exs
defmodule CoffeeShop.Integrations.Coffee.ClientTest do
  use ExUnit.Case
  use Oban.Testing, repo: CoffeeShop.Repo

  alias CoffeeShop.Integrations.Coffee.Client
  alias CoffeeShop.Integrations.Coffee.HotCoffeesWorker
  alias CoffeeShop.Integrations.Coffee.Response
  alias CoffeeShop.Integrations.Counter

  # ...
  
  describe "all_hot_coffees_async/0" do
    test "schedule a job" do
      opts = [
        base_url: "http://localhost:3000",
        retry_delay: 1
      ]

      assert {:ok, _job} = Client.all_hot_coffees_async(opts)

      # Verificando se foi agendado com sucesso
      assert_enqueued(
        worker: HotCoffeesWorker,
        args: %{"base_url" => "http://localhost:3000", "retry_delay" => 1}
      )
    end
  end
end

Você já pode rodar os testes do nosso cliente

mix test test/coffee_shop/integrations/coffee/client_test.exs
> mix test test/coffee_shop/integrations/coffee/client_test.exs
....
Finished in 1.0 seconds (0.00s async, 1.0s sync)
4 tests, 0 failures

Agendamento realizado com sucesso. Mas precisamos de mais garantias. Nosso Rate Limit é de 24h. Isso quer dizer que precisamos garantir que essa execução só irá ser executada em 24h certo? No teste, nao temos essa garantia ainda, temos que adiciona-la.

test/coffee_shop/integrations/coffee/client_test.exs
defmodule CoffeeShop.Integrations.Coffee.ClientTest do
  use ExUnit.Case
  use Oban.Testing, repo: CoffeeShop.Repo

  alias CoffeeShop.Integrations.Coffee.Client
  alias CoffeeShop.Integrations.Coffee.HotCoffeesWorker
  alias CoffeeShop.Integrations.Coffee.Response
  alias CoffeeShop.Integrations.Counter

  # ...

  describe "all_hot_coffees_async/0" do
    test "schedule a job" do
      opts = [
        base_url: "http://localhost:3000",
        retry_delay: 1
      ]

      assert {:ok, _job} = Client.all_hot_coffees_async(opts)

      in_a_day = DateTime.add(DateTime.utc_now(), 3600 * 24, :second)

      assert_enqueued(
        worker: HotCoffeesWorker,
        args: %{"base_url" => "http://localhost:3000", "retry_delay" => 1},
        scheduled_at: in_a_day
      )
    end
  end
end

O próprio Oban nos disponibiliza na função assert_enqueued o scheduled_at, para confirmarmos para quando foi agendado. Com isso, criamos a variavel in_a_day com a utilização do DateTime para adicionarmos o tempo a partir de agora + 24h e adicionamos na assertion. Vamos rodar e ver o que acontece.

mix test test/coffee_shop/integrations/coffee/client_test.exs
mix test test/coffee_shop/integrations/coffee/client_test.exs
.

  1) test all_hot_coffees_async/0 schedule a job (CoffeeShop.Integrations.Coffee.ClientTest)
     test/coffee_shop/integrations/coffee/client_test.exs:73
     Expected a job matching:
     
     %{
       args: %{"base_url" => "http://localhost:3000", "retry_delay" => 1},
       worker: CoffeeShop.Integrations.Coffee.HotCoffeesWorker,
       scheduled_at: ~U[2024-04-17 13:44:40.309948Z]
     }
     
     to be enqueued. Instead found:
     
     [
       %{
         args: %{"base_url" => "http://localhost:3000", "retry_delay" => 1},
         worker: "CoffeeShop.Integrations.Coffee.HotCoffeesWorker",
         scheduled_at: ~U[2024-04-16 13:44:40.294598Z]
       }
     ]
     
     code: assert_enqueued(
     stacktrace:
       test/coffee_shop/integrations/coffee/client_test.exs:83: (test)

..
Finished in 1.2 seconds (0.00s async, 1.2s sync)
4 tests, 1 failure

A há, é um ótimo teste para se ter, não é? Esperamos é rodar o job em 2024-04-17, mas está sendo executado um dia antes 2024-04-16. Isso quer dizer, esta rodando logo quando é agendado.

Não é o comportamento que esperamos. Isso está acontecendo porque realmente não configuramos essa etapa. Devemos configurar isso quando o job. Em nosso cliente é inserido o job e lá vamos adicionar essa opção:

lib/coffee_shop/integrations/coffee/hot_coffees_worker.ex
defmodule CoffeeShop.Integrations.Coffee.Client do
  alias CoffeeShop.Integrations.Coffee.Response
  alias CoffeeShop.Integrations.Coffee.HotCoffeesWorker

  # ...
  
  def all_hot_coffees_async(opts \\ []) do
   in_a_day = DateTime.add(DateTime.utc_now(), 3600 * 24, :second)

    opts
    |> Map.new(fn option -> option end)
    |> HotCoffeesWorker.new(scheduled_at: in_a_day)
    |> Oban.insert()
  end

  # ...
end

Criado a regra na linha 9 e adicionado a opção na linha 12. Agora rodaremos o teste novamente.

mix test test/coffee_shop/integrations/coffee/client_test.exs
mix test test/coffee_shop/integrations/coffee/client_test.exs
Compiling 1 file (.ex)
....
Finished in 1.1 seconds (0.00s async, 1.1s sync)
4 tests, 0 failures

Estamos atendendo a regra que precisamos seguir com o rate limit de longa duração.

Em nosso teste, não temos garantia de que a execução do perform funciona, apenas garantimos o agendamento. Precisamos saber se conseguimos rodar o que foi agendado. Para isso usaremos a função perform_job/2 que executará o job agendado.

test/coffee_shop/integrations/coffee/client_test.exs
defmodule CoffeeShop.Integrations.Coffee.ClientTest do
  use ExUnit.Case
  use Oban.Testing, repo: CoffeeShop.Repo

  alias CoffeeShop.Integrations.Coffee.Client
  alias CoffeeShop.Integrations.Coffee.HotCoffeesWorker
  alias CoffeeShop.Integrations.Coffee.Response
  alias CoffeeShop.Integrations.Counter

  setup do
    bypass = Bypass.open(port: 3000)
    {:ok, bypass: bypass}
  end

  describe "all_hot_coffees/0" do
    test "respond a list of hot coffees", %{bypass: bypass} do
      # ...
    end

    test "service is crashed", %{bypass: bypass} do
      # ...
    end

    test "too much requests", %{bypass: bypass} do
      # ...
    end
  end

  describe "all_hot_coffees_async/0" do
    test "schedule a job" do
      Bypass.expect_once(bypass, "GET", "/coffee/hot", fn conn ->
        Plug.Conn.resp(conn, 200, "")
      end)
      
      opts = [
        base_url: "http://localhost:3000",
        retry_delay: 1
      ]

      assert {:ok, _job} = Client.all_hot_coffees_async(opts)

      in_a_day = DateTime.add(DateTime.utc_now(), 3600 * 24, :second)

      assert_enqueued(
        worker: HotCoffeesWorker,
        args: %{"base_url" => "http://localhost:3000", "retry_delay" => 1},
        scheduled_at: in_a_day
      )
      
      assert %{success: 1} =
               Oban.drain_queue(queue: :default, with_scheduled: true, with_safety: false)
    end
  end
end

Adicionamos uma nova etapa na linha 50 que executa o job agendado. Sua resposta vem com a estrutura de map dos seguintes dados:

%{failure: _, snoozed: _, success: _}

Para nosso teste passar, a execução deve ser um sucesso. por isso nossa resposta está com o pattern matching %{success: 1}.

Rode seu teste e veja o resultado.

mix test test/coffee_shop/integrations/coffee/client_test.exs
> mix test test/coffee_shop/integrations/coffee/client_test.exs   
....
Finished in 1.1 seconds (0.00s async, 1.1s sync)
4 tests, 0 failures

Mais bonito que uma geladeira inox side-by-side com dispenser de gelo.

Atualizado