Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
September 22, 2021 09:16 am GMT

Injecting and Discovering Dependencies in OTP Supervisors

Our task

We need siblings of supervisors to find out about each other. Preferably we also want our supervisor tree to be "hermetic": the possibility to start multiple tree instances with different options.

Example: a pipeline for processing network data which consists of several intercommunicating processes that consume and transform data. We would like to start different pipelines for different data sources in an ad-hoc fashion so they should not interfere with each other and be "hermetic".

There are several ways to do it.

Our options

Hardcoded process name registration

The easiest and most common way is to simply register your processes using global names.

defmodule PipelineSupervisor do  use Supervisor  @impl Supervisor  def init([]) do    children = [      {SomeConsumer, name: ConsumerGlobalName}      {SomeWorker, %{consumer: ConsumerGlobalName}},    ]    Supervisor.init(children, strategy: :rest_for_one)  endenddefmodule SomeWorker do  use GenServer  @impl GenServer  def init(%{consumer: consumer} = options) do    {:ok, options}  end  @impl GenServer  def handle_call({:process_data_chunk, data_chunk}, _from, state) do    {:reply, :ok, do_process_data_chunk(data_chunk, state)}  end  defp do_process_data_chunk(data_chunk, state) do    case parse_data_somehow(data_chunk, state) do      {:ok, parsed_message, new_state} ->        Process.send(state.consumer, {:message_received, parsed_message})        new_state      {:more_data_necessary, new_state} ->        new_state    end  endenddefmodule SomeConsumer do  use GenServer  def start_link(%{name: name}) do    GenServer.start_link(__MODULE__, [], name: name)  end  @impl GenServer  def init([]) do    {:ok, :undefined_state}  end  @impl GenServer  def handle_info({:message_received, parsed_message}, state) do    # ...    {:noreply, state}  endend

Instead of implicit name registration provided by erlang, you can use some process registry that will serve basically as DI container: a way to register and lookup dependencies (process pid) by some arbitrary id.

These include:

  • Using a well-known and reliable registry, for instance, Elixir.Registry
  • Using some self-written registry that stores its state (process pid mappings) as GenServer State or in ETS

When the process starts, it registers itself. When its pid is required by other processes they ask this DI registry for necessary dependency.

But the problem is still there. It is a chicken and egg dilemma: To call registry you need in turn to know its pid or name.

This solution is easy and common, but it prevents us from running the supervision tree multiple times because of the hardcoded process name of Consumer.

Prefixed process name registration

To make our supervision tree hermetic, we can prefix all processes with some common key and make it an init parameter of our supervisor.

defmodule PipelineSupervisor do  use Supervisor  @impl Supervisor  def init(%{name_prefix: prefix}) do    consumer_name = Module.concat([__MODULE__, prefix, Consumer])    children = [      {SomeConsumer, name: consumer_name}      {SomeWorker, %{consumer: consumer_name}},    ]    Supervisor.init(children, strategy: :rest_for_one)  endend

Although it gets the job done, it clutters the process namespace and creates API that might seem awkward ("why do you need me to provide some prefix?").

Implicit registry using Supervisor.which_children function

If we pay closer attention to Supervisor we can notice that any supervisor is basically a registry itself. It naturally knows pids of all its processes and all processes have unique ids local to the supervisor.

Unfortunately (or luckily) supervisors are single-purpose building blocks and therefore lack convenient apis to lookup their children. However, we still have the way to use supervisors as registers using Supervisor.which_children function.

defmodule PipelineSupervisor do  use Supervisor  @impl Supervisor  def init([]) do    supervisor_pid = self()    children = [      SomeConsumer,      {SomeWorker, %{parent_supervisor: supervisor_pid}},    ]    Supervisor.init(children, strategy: :rest_for_one)  end  def child_pid!(supervisor_pid, child_id) do    spec = Supervisor.which_children(supervisor_pid)    case List.keyfind(spec, child_id, 0) do      {_id, pid, _type, _modules} when is_pid(pid) ->        pid      nil ->        raise "no started child with id:#{id} in supervisor spec:#{inspect(spec)}"    end  endenddefmodule SomeWorker do  use GenServer  @impl GenServer  def init(%{supervisor_pid: supervisor_pid} = options) do    {:ok, options}  end  defp do_process_data_chunk(data_chunk, state) do    # ...    consumer_pid = PipelineSupervisor.child_pid!(state.supervisor_pid, SomeConsumer)    Process.send(consumer_pid, {:message_received, parsed_message})    # ...    end  endenddefmodule SomeConsumer do  use GenServer  def start_link([]) do    GenServer.start_link(__MODULE__, [])  endend

In some cases calling a supervisor on each message might create a performance bottleneck. You can fix this by calling child_pid! once in Worker.init function and saving Consumer pid in GenServer state.

defmodule SomeWorker do  use GenServer  @impl GenServer  def init(%{supervisor_pid: supervisor_pid} = options) do    consumer_pid = PipelineSupervisor.child_pid!(state.supervisor_pid, SomeConsumer)    {:ok, %{consumer_pid: consumer_pid}}  end  defp do_process_data_chunk(data_chunk, state) do    # ...    Process.send(state.consumer_pid, {:message_received, parsed_message})    # ...    end  endend

But when we save process pids, we should always be careful, and in our case, think about what would happen if Consumer crashes. In our case, everything will be fine because of the rest_for_one supervisor strategy. If Consumer crashes, its pid saved in Worker will become invalid. But our supervisor is configured with rest_for_one strategy, and Consumer is higher in its children list than Worker. Therefore when Consumer crashes, Worker will also be restarted and will discover pid of new Consumer.

Starting dependencies ad-hoc

Also, we can exploit the fact that Supervisor returns pid of started process in Supervisor.start_child function.

defmodule PipelineSupervisor do  use Supervisor  @impl Supervisor  def init([]) do    supervisor_pid = self()    children = [      {SomeWorker, %{parent_supervisor: supervisor_pid}},    ]    Supervisor.init(children, strategy: :rest_for_one)  end  def start_consumer do    Supervisor.start_child(SomeConsumer)  endenddefmodule SomeWorker do  use GenServer  @impl GenServer  def init(%{supervisor_pid: supervisor_pid} = options) do    {:ok, options, {:continue, :start_worker}}  end  def handle_continue(:start_worker, state) do    {:ok, consumer_pid} = PipelineSupervisor.start_consumer()    {:noreply, Map.put(state, :consumer_pid, consumer_pid)}  end  defp do_process_data_chunk(data_chunk, state) do    # ...    Process.send(state.consumer_pid, {:message_received, parsed_message})    # ...    end  endenddefmodule SomeConsumer do  use GenServer  def start_link([]) do    GenServer.start_link(__MODULE__, [])  endend

Note that we have to use :handle_continue callback to avoid deadlocking the supervisor. Otherwise, it would not be able to reply to our start_child call because it would be waiting init function to return.

This method of DI is convoluted and probably not worth it just for the sake of itself. But it can make more sense in more complicated scenarios where Worker instead of just sending messages to Consumer also needs to manage its lifetime (kill or restart).

Conclusion

Some of the options that I showed may seem complex and unnecessary. And in many cases that is entirely true. But my point is that using OTP is much more than just writing GenServers and occasionally some custom Supervisor or two. OTP gives us a pretty basic but very smart combination of base elements that we can use to form some higher-level usage patterns. Our task as a community is to discover those patterns and make them simple to use.

To find out more about supervisors and their usage patterns look at this great blogpost Using Supervisors to Organize Your Elixir Application

About the author

I am Miroslav Malkin, Erlang/Elixir expert at FunBox. I am also co-founder of Cogito, where we investigate code and process quality and measure their impact on product success.


Original Link: https://dev.to/miros/injecting-and-discovering-dependencies-in-otp-supervisors-352m

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To