Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
## Unreleased

- Use new cypher names
- Adds clustering strategy for Nomad orchestratory (see: github.com/hashicorp/nomad)

### 3.3.0

### Changed

- Default multicast address is now 233.252.1.32, was 230.1.1.251, [commit](https://github.com/bitwalker/libcluster/commit/449a65e14f152a83a0f8ee371f05743610cd292f)


### 2.3.0

### Added
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ You can find supporting documentation [here](https://hexdocs.pm/libcluster).
- Kubernetes via its metadata API using via a configurable label selector and
node basename; or alternatively, using DNS.
- Rancher, via its [metadata API][rancher-api]
- Nomad, via its [services API][nomad-api]
- Easy to provide your own custom clustering strategies for your specific environment.
- Easy to use provide your own distribution plumbing (i.e. something other than
Distributed Erlang), by implementing a small set of callbacks. This allows
Expand Down Expand Up @@ -120,7 +121,9 @@ You have a handful of choices with regards to cluster management out of the box:
nodes based on a label selector and basename.
- `Cluster.Strategy.Kubernetes.DNS`, which uses DNS to join nodes under a shared
headless service in a given namespace.
- `Cluster.Strategy.Rancher`, which like the Kubernetes strategy, uses a
- `Cluster.Strategy.Rancher`, which like the Kubernetes and Nomad strategies, uses a
metadata API to query nodes to cluster with.
- `Cluster.Strategy.Nomad`, which like the Kubernetes and Ranches strategies, uses a
metadata API to query nodes to cluster with.

You can also define your own strategy implementation, by implementing the
Expand Down Expand Up @@ -153,3 +156,4 @@ This library is MIT licensed. See the
[LICENSE.md](https://github.com/bitwalker/libcluster/blob/master/LICENSE.md) for details.

[rancher-api]: http://rancher.com/docs/rancher/latest/en/rancher-services/metadata-service/
[nomad-api]: https://www.nomadproject.io/api-docs/services
218 changes: 218 additions & 0 deletions lib/strategy/nomad.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
defmodule Cluster.Strategy.Nomad do
@moduledoc """
This clustering strategy works by querying Nomad for a service specified
by name. It will poll for new service addresses based on the polling interval
specified (in milliseconds).

## Options

* `service_name` - The name of the Nomad service you wish to get the addresses for (required; e.g. "my-elixir-app")
* `namespace` - The Nomad namespace to query (optional; default: "default")
* `nomad_server_url` - The short name of the nodes you wish to connect to (required; e.g. "https://127.0.0.1:4646")
* `node_basename` - The erland node basename (required; e.g. "app")
* `poll_interval` - How often to poll in milliseconds (optional; default: 5_000)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: must add "token" to docs


## Usage

config :libcluster,
topologies: [
dns_poll_example: [
strategy: #{__MODULE__},
config: [
service_name: "my-elixir-app",
nomad_server_url: "https://my-nomad-url:4646",
namespace: "engineering",
node_basename: "app",
polling_interval: 5_000]]]
"""

use GenServer
import Cluster.Logger

alias Cluster.Strategy.State
alias Cluster.Strategy

@default_polling_interval 5_000
@default_namespace "default"
@default_token ""

def start_link(args), do: GenServer.start_link(__MODULE__, args)

@impl true
def init([%State{meta: nil} = state]) do
init([%State{state | :meta => MapSet.new()}])
end

def init([%State{} = state]) do
{:ok, do_poll(state)}
end

@impl true
def handle_info(:timeout, state), do: handle_info(:poll, state)
def handle_info(:poll, state), do: {:noreply, do_poll(state)}
def handle_info(_, state), do: {:noreply, state}

defp do_poll(
%State{
topology: topology,
connect: connect,
disconnect: disconnect,
list_nodes: list_nodes
} = state
) do
new_nodelist = state |> get_nodes() |> MapSet.new()
removed = MapSet.difference(state.meta, new_nodelist)

new_nodelist =
case Strategy.disconnect_nodes(
topology,
disconnect,
list_nodes,
MapSet.to_list(removed)
) do
:ok ->
new_nodelist

{:error, bad_nodes} ->
# Add back the nodes which should have been removed, but which couldn't be for some reason
Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
MapSet.put(acc, n)
end)
end

new_nodelist =
case Strategy.connect_nodes(
topology,
connect,
list_nodes,
MapSet.to_list(new_nodelist)
) do
:ok ->
new_nodelist

{:error, bad_nodes} ->
# Remove the nodes which should have been added, but couldn't be for some reason
Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
MapSet.delete(acc, n)
end)
end

Process.send_after(self(), :poll, polling_interval(state))

%{state | :meta => new_nodelist}
end

defp polling_interval(%{config: config}) do
Keyword.get(config, :polling_interval, @default_polling_interval)
end

defp get_namespace(config) do
Keyword.get(config, :namespace, @default_namespace)
end

defp get_token(config) do
Keyword.get(config, :token, @default_token)
end

@spec get_nodes(State.t()) :: [atom()]
defp get_nodes(%State{config: config} = state) do
server_url = Keyword.fetch(config, :nomad_server_url)
service_name = Keyword.fetch(config, :service_name)
node_basename = Keyword.fetch(config, :node_basename)
namespace = get_namespace(config)
token = get_token(config)

fetch_nodes(server_url, service_name, node_basename, namespace, token, state)
end

defp fetch_nodes(
{:ok, server_url},
{:ok, service_name},
{:ok, node_basename},
namespace,
token,
%State{
topology: topology
}
)
when server_url != "" and service_name != "" and node_basename != "" do
debug(topology, "polling nomad for '#{service_name}' in namespace '#{namespace}'")

headers = [{'X-Nomad-Token', '#{token}'}]
http_options = []
url = '#{server_url}/v1/service/#{service_name}'

case :httpc.request(:get, {url, headers}, http_options, []) do
{:ok, {{_version, 200, _status}, _headers, body}} ->
Jason.decode!(body)
|> Enum.map(fn %{"Address" => addr} -> :"#{node_basename}@#{addr}" end)

{:ok, {{_version, 403, _status}, _headers, _body}} ->
warn(topology, "cannot query nomad (unauthorized)")
[]

{:ok, {{_version, code, status}, _headers, body}} ->
warn(topology, "cannot query nomad (#{code} #{status}): #{inspect(body)}")
[]

{:error, reason} ->
error(topology, "request to nomad failed!: #{inspect(reason)}")
[]

_ ->
error(topology, "unknown error fetching nomad service info")
[]
end
end

defp fetch_nodes(
{:ok, invalid_server_url},
{:ok, invalid_service_name},
{:ok, invalid_node_base_name},
_namespace,
_token,
%State{
topology: topology
}
) do
warn(
topology,
"nomad strategy is selected, but server_url, service_name, or node_base_name param is invalid: #{inspect(%{nomad_server_url: invalid_server_url, service_name: invalid_service_name, node_basename: invalid_node_base_name})}"
)

[]
end

defp fetch_nodes(:error, _service_name, _node_base_name, _namespace, _token, %State{
topology: topology
}) do
warn(
topology,
"nomad polling strategy is selected, but nomad_server_url param missed"
)

[]
end

defp fetch_nodes(_server_url, :error, _node_base_name, _namespace, _token, %State{
topology: topology
}) do
warn(
topology,
"nomad polling strategy is selected, but service_name param missed"
)

[]
end

defp fetch_nodes(_server_url, _service_name, :error, _namespace, _token, %State{
topology: topology
}) do
warn(
topology,
"nomad polling strategy is selected, but node_base_name param missed"
)

[]
end
end
Empty file added test/nomat_test.exs
Empty file.