diff --git a/implementations/elixir/ockam/ockam_cloud_node/config/runtime.exs b/implementations/elixir/ockam/ockam_cloud_node/config/runtime.exs index 40d404074fc..0723b65c86a 100644 --- a/implementations/elixir/ockam/ockam_cloud_node/config/runtime.exs +++ b/implementations/elixir/ockam/ockam_cloud_node/config/runtime.exs @@ -115,9 +115,7 @@ config :ockam_services, # discovery service Ockam.Services.Provider.Discovery, # proxies for remote services - Ockam.Services.Provider.Proxy, - # proxies to services in other nodes - Ockam.Services.Provider.Sidecar + Ockam.Services.Provider.Proxy ], services: services @@ -185,3 +183,44 @@ config :logger, :console, metadata: [:module, :line, :pid], format_string: "$dateT$time $metadata[$level] $message\n", format: {Ockam.CloudNode.LogFormatter, :format} + +if Mix.env() == :test do + config :logger, level: :debug + + opentelemetry_proxy_service = if System.get_env("OCKAM_OPENTELEMETRY_ENDPOINT", nil) != nil do + grpc_endpoint = System.get_env("OCKAM_OPENTELEMETRY_ENDPOINT") + [ + {:echo, + [ + address: "echo", + log_level: :debug, + authorization: [] + ]}, + {:grpc_forwarder, + [ + address: "grpc_forwarder", + log_level: :debug, + grpc_endpoint: grpc_endpoint, + authorization: [] + ]} + ] + else + [] + end + + config :ockam_services, + service_providers: [ + Ockam.Services.Provider.Routing, + Ockam.Services.Provider.SecureChannel, + Ockam.Services.Provider.Proxy + ], + services: opentelemetry_proxy_service + + config :ockam_metrics, + start_telemetry_poller: false + + config :ockam, identity_module: Ockam.Identity.Stub + + config :ockam_cloud_node, + storage_path: "./test" +end diff --git a/implementations/elixir/ockam/ockam_cloud_node/lib/cloud_node.ex b/implementations/elixir/ockam/ockam_cloud_node/lib/cloud_node.ex index 4b2dd457639..225c6c6793b 100644 --- a/implementations/elixir/ockam/ockam_cloud_node/lib/cloud_node.ex +++ b/implementations/elixir/ockam/ockam_cloud_node/lib/cloud_node.ex @@ -41,11 +41,12 @@ defmodule Ockam.CloudNode do [] end + {nil, nil} -> + [] _other -> Logger.info( "Invalid cleanup config: #{inspect(crontab)} : #{inspect(idle_timeout)}. Ignoring" ) - [] end end diff --git a/implementations/elixir/ockam/ockam_cloud_node/mix.lock b/implementations/elixir/ockam/ockam_cloud_node/mix.lock index 47135202fde..7b62cf9203f 100644 --- a/implementations/elixir/ockam/ockam_cloud_node/mix.lock +++ b/implementations/elixir/ockam/ockam_cloud_node/mix.lock @@ -17,11 +17,15 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.31.0", "06eb1dfd787445d9cab9a45088405593dd3bb7fe99e097eaa71f37ba80c7a676", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5350cafa6b7f77bdd107aa2199fe277acf29d739aba5aee7e865fc680c62a110"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "gettext": {:hex, :gettext, "0.19.1", "564953fd21f29358e68b91634799d9d26989f8d039d7512622efb3c3b1c97892", [:mix], [], "hexpm", "10c656c0912b8299adba9b061c06947511e3f109ab0d18b44a866a4498e77222"}, + "grpc": {:hex, :grpc, "0.9.0", "1b930a57272d4356ea65969b984c2eb04f3dab81420e1e28f0e6ec04b8f88515", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:cowboy, "~> 2.10", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowlib, "~> 2.12", [hex: :cowlib, repo: "hexpm", optional: false]}, {:gun, "~> 2.0", [hex: :gun, repo: "hexpm", optional: false]}, {:jason, ">= 0.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.11", [hex: :protobuf, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7c059698248738fcf7ad551f1d78f4a3d2e0642a72a5834f2a0b0db4b9f3d2b5"}, + "gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, "hex2bin": {:hex, :hex2bin, "1.0.0", "aac26eab998ae80eacee1c7607c629ab503ebf77a62b9242bae2b94d47dcb71e", [:rebar3], [], "hexpm", "e7012d1d9aadd26e680f0983d26fb8923707f05fac9688f19f530fa3795e716f"}, "hkdf_erlang": {:hex, :hkdf_erlang, "1.0.0", "0b681b428805eacf1286e02ece8617e843cdceae7adb5fad43c283c98088fbc2", [:rebar3], [], "hexpm", "d2091d9e0df97613fd149074929a94e3675047f28cca6b1626fe7be60f5d76ac"}, + "hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"}, "httpoison": {:hex, :httpoison, "2.1.0", "655fd9a7b0b95ee3e9a3b535cf7ac8e08ef5229bab187fa86ac4208b122d934b", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "fc455cb4306b43827def4f57299b2d5ac8ac331cb23f517e734a4b78210a160c"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, @@ -32,14 +36,19 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "neotoma": {:git, "https://github.com/seancribbs/neotoma.git", "9e57d8ebd4ebb02c3e2428b08f3a01e2ff834ce2", []}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "plug": {:hex, :plug, "1.14.0", "ba4f558468f69cbd9f6b356d25443d0b796fbdc887e03fa89001384a9cac638f", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bf020432c7d4feb7b3af16a0c2701455cbbbb95e5b6866132cb09eb0c29adc14"}, "plug_cowboy": {:hex, :plug_cowboy, "2.5.2", "62894ccd601cf9597e2c23911ff12798a8a18d237e9739f58a6b04e4988899fe", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "ea6e87f774c8608d60c8d34022a7d073bd7680a0a013f049fc62bf35efea1044"}, "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, "poison": {:hex, :poison, "5.0.0", "d2b54589ab4157bbb82ec2050757779bfed724463a544b6e20d79855a9e43b24", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "11dc6117c501b80c62a7594f941d043982a1bd05a1184280c0d9166eb4d8d3fc"}, + "protobuf": {:hex, :protobuf, "0.14.0", "75b64b8c1f0c833b5e76cd841d3448f077f655c2a2eed53358651fbfe4a6b70e", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "0f747eaa54ace9617536f1cb4b2a4962bc7e43f1aa475c6fa9c60079955c4cb0"}, "ranch": {:hex, :ranch, "2.1.0", "2261f9ed9574dcfcc444106b9f6da155e6e540b2f82ba3d42b339b93673b72a3", [:make, :rebar3], [], "hexpm", "244ee3fa2a6175270d8e1fc59024fd9dbc76294a321057de8f803b1479e76916"}, + "req": {:hex, :req, "0.5.1", "90584216d064389a4ff2d4279fe2c11ff6c812ab00fa01a9fb9d15457f65ba70", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "7ea96a1a95388eb0fefa92d89466cdfedba24032794e5c1147d78ec90db7edca"}, "rustler": {:git, "https://github.com/polvorin/rustler.git", "f6ead5802b3b8bf38b3c5fcaf0a0dbdf0e66a221", [branch: "fix_local_crate", sparse: "rustler_mix"]}, "rustler_precompiled": {:hex, :rustler_precompiled, "0.7.1", "ecadf02cc59a0eccbaed6c1937303a5827fbcf60010c541595e6d3747d3d0f9f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "b9e4657b99a1483ea31502e1d58c464bedebe9028808eda45c3a429af4550c66"}, "sched_ex": {:hex, :sched_ex, "1.1.4", "893de8ffcb1590ae6089d9862d49447fbb948535ba5777c231e55c8404bc3e6e", [:mix], [{:crontab, "~> 1.1.2", [hex: :crontab, repo: "hexpm", optional: false]}, {:timex, "~> 3.1", [hex: :timex, repo: "hexpm", optional: false]}], "hexpm", "f32336c40a62aba581c57d6d6009e40e5c52011bb8305fc3a33ef9e5815b861c"}, diff --git a/implementations/elixir/ockam/ockam_cloud_node/test/identity b/implementations/elixir/ockam/ockam_cloud_node/test/identity new file mode 100644 index 00000000000..b9589c23af4 Binary files /dev/null and b/implementations/elixir/ockam/ockam_cloud_node/test/identity differ diff --git a/implementations/elixir/ockam/ockam_cloud_node/test/identity.id b/implementations/elixir/ockam/ockam_cloud_node/test/identity.id new file mode 100644 index 00000000000..38f9e336d8f --- /dev/null +++ b/implementations/elixir/ockam/ockam_cloud_node/test/identity.id @@ -0,0 +1 @@ +I12dea6479e2ececa711851c371f7bc9c16a162cd34cfe096386a76222c736f69 \ No newline at end of file diff --git a/implementations/elixir/ockam/ockam_cloud_node/test/identity.secret b/implementations/elixir/ockam/ockam_cloud_node/test/identity.secret new file mode 100644 index 00000000000..5343f9ee97b --- /dev/null +++ b/implementations/elixir/ockam/ockam_cloud_node/test/identity.secret @@ -0,0 +1,2 @@ +˜±#!RCD_Ouõ¢­¨l„€‚<5ç;ɧ. +ªÚÅ \ No newline at end of file diff --git a/implementations/elixir/ockam/ockam_cloud_node/test/start_node.exs b/implementations/elixir/ockam/ockam_cloud_node/test/start_node.exs new file mode 100644 index 00000000000..dad29cb4a5d --- /dev/null +++ b/implementations/elixir/ockam/ockam_cloud_node/test/start_node.exs @@ -0,0 +1,56 @@ + +defmodule Start do + + def get_api_identity() do + case File.read(identity_path()) do + {:ok, bytes} -> + with {:ok, secret_bytes} <- File.read(secret_signing_key_path()), + {:ok, identity, _identifier} <- Ockam.Identity.import(bytes, secret_bytes) do + {:ok, identity} + end + + {:error, :enoent} -> + with {_pub, secret} <- :crypto.generate_key(:eddsa, :ed25519), + {:ok, identity} <- Ockam.Identity.create(secret), + :ok <- File.mkdir_p(Path.dirname(identity_id_path())), + :ok <- + File.write( + identity_id_path(), + Ockam.Identity.Identifier.to_str(Ockam.Identity.get_identifier(identity)) + ), + :ok <- File.write(identity_path(), Ockam.Identity.get_data(identity)), + :ok <- File.write(secret_signing_key_path(), secret) do + {:ok, identity} + end + end + end + + defp identity_id_path() do + Path.join(Application.fetch_env!(:ockam_cloud_node, :storage_path), "identity.id") + end + + defp identity_path() do + Path.join(Application.fetch_env!(:ockam_cloud_node, :storage_path), "identity") + end + + defp secret_signing_key_path() do + Path.join(Application.fetch_env!(:ockam_cloud_node, :storage_path), "identity.secret") + end + + def start_node() do + with {:ok, own_identity} <- get_api_identity(), + {:ok, keypair} <- Ockam.SecureChannel.Crypto.generate_dh_keypair(), + {:ok, attestation} <- Ockam.Identity.attest_purpose_key(own_identity, keypair) do + Ockam.Services.start_service( + :secure_channel, + [ + identity: own_identity, + address: "api", + encryption_options: [static_keypair: keypair, static_key_attestation: attestation] + ] + ) + end + end +end + +Start.start_node() diff --git a/implementations/elixir/ockam/ockam_metrics/lib/application.ex b/implementations/elixir/ockam/ockam_metrics/lib/application.ex index 6925c5b936c..b17d63a7eaf 100644 --- a/implementations/elixir/ockam/ockam_metrics/lib/application.ex +++ b/implementations/elixir/ockam/ockam_metrics/lib/application.ex @@ -17,7 +17,11 @@ defmodule Ockam.Metrics.Application do else [] end ++ + if Application.get_env(:ockam_metrics, :start_telemetry_poller, true) do telemetry_poller() + else + [] + end Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__) end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/grpc_forwarder.ex b/implementations/elixir/ockam/ockam_services/lib/services/grpc_forwarder.ex new file mode 100644 index 00000000000..076be9e2370 --- /dev/null +++ b/implementations/elixir/ockam/ockam_services/lib/services/grpc_forwarder.ex @@ -0,0 +1,67 @@ +defmodule Ockam.Services.GrpcForwarder do + @moduledoc false + + use Ockam.Worker + + alias Ockam.Message + alias Ockam.Worker + + require Logger + + + defmodule GrpcRequest do + @moduledoc """ + gRPC request, minicbor schema definition + """ + use TypedStruct + + typedstruct do + plugin(Ockam.TypedCBOR.Plugin) + field(:method, String.t(), minicbor: [key: 1]) + field(:path, String.t(), minicbor: [key: 2]) + field(:version, :http09 | :http10 | :http11 | :http2 | :http3, minicbor: [key: 3, schema: {:enum, [http09: 0, http10: 1, http11: 2, http2: 3, http3: 4]}]) + field(:headers, list(list(String.t())), minicbor: [key: 4]) + field(:body, :binary, minicbor: [key: 5]) + end + end + + @impl true + def setup(options, state) do + log_level = Keyword.get(options, :log_level, :debug) + grpc_endpoint = Keyword.get(options, :grpc_endpoint, "http://localhost:4317") + {:ok, channel} = GRPC.Stub.connect(grpc_endpoint) + Logger.debug("Starting a grpc forwarder to: #{grpc_endpoint}") + + state = state + |> Map.put(:log_level, log_level) + |> Map.put(:grpc_endpoint, grpc_endpoint) + |> Map.put(:channel, channel) + {:ok, state} + end + + @impl true + def handle_message(message, state) do + {:ok, decoded_request} = Ockam.API.Request.decode(message.payload) + {:ok, grpc_request, _} = GrpcRequest.decode(decoded_request.body) + + channel = Map.get(state, :channel) + headers = GRPC.Transport.HTTP2.client_headers_without_reserved(%{ + channel: channel, + codec: channel.codec, + compressor: GRPC.Compressor.Gzip, + accepted_compressors: nil, + headers: grpc_request.headers |> Enum.map(fn [a, b] -> {a, b} end) + }) + :gun.post(channel.adapter_payload.conn_pid, grpc_request.path, headers, grpc_request.body) + + reply = Message.reply(message, state.address, <<>>) + Worker.route(reply, state) + + {:ok, state} + end + + @impl true + def handle_info(_, state) do + {:noreply, state} + end +end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex b/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex index d30c25cc985..fff331ca14e 100644 --- a/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex +++ b/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex @@ -8,6 +8,7 @@ defmodule Ockam.Services.Provider.Routing do alias Ockam.Services.Echo, as: EchoService alias Ockam.Services.Forwarding, as: ForwardingService + alias Ockam.Services.GrpcForwarder alias Ockam.Services.PubSub, as: PubSubService alias Ockam.Services.Relay.StaticForwarding alias Ockam.Services.Relay.StaticForwardingAPI @@ -20,7 +21,8 @@ defmodule Ockam.Services.Provider.Routing do :static_forwarding, :static_forwarding_api, :pub_sub, - :tracing + :tracing, + :grpc_forwarder ] @impl true @@ -65,4 +67,10 @@ defmodule Ockam.Services.Provider.Routing do def child_spec(:tracing, args) do {TracingService, Keyword.merge([address: "tracing"], args)} end + + @impl true + def child_spec(:grpc_forwarder, args) do + {GrpcForwarder, Keyword.merge([address: "grpc_forwarder"], args)} + end + end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/proxy.ex b/implementations/elixir/ockam/ockam_services/lib/services/proxy.ex index d5f1bb53b12..7a508aeb06a 100644 --- a/implementations/elixir/ockam/ockam_services/lib/services/proxy.ex +++ b/implementations/elixir/ockam/ockam_services/lib/services/proxy.ex @@ -31,6 +31,7 @@ defmodule Ockam.Services.Proxy do @impl true def inner_setup(options, state) do forward_route_option = Keyword.fetch!(options, :forward_route) + supports_ockam_protocol = Keyword.get(options, :supports_ockam_protocol, true) with {:ok, [first_address | route_tail]} <- forward_route_config(forward_route_option) do case TCPAddress.is_tcp_address(first_address) do @@ -44,7 +45,8 @@ defmodule Ockam.Services.Proxy do RecoverableClient.start_link( destination: first_address, address: client_address, - authorization: client_auth + authorization: client_auth, + supports_ockam_protocol: supports_ockam_protocol ) do forward_route = [client_address | route_tail] diff --git a/implementations/elixir/ockam/ockam_services/mix.exs b/implementations/elixir/ockam/ockam_services/mix.exs index d5d5c2084a9..a35867eea59 100644 --- a/implementations/elixir/ockam/ockam_services/mix.exs +++ b/implementations/elixir/ockam/ockam_services/mix.exs @@ -50,6 +50,8 @@ defmodule Ockam.Services.MixProject do {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.1", only: [:dev], runtime: false}, {:ex_doc, "~> 0.25", only: :dev, runtime: false}, + {:grpc, "~> 0.9.0"}, + {:gun, "~> 2.0"}, {:ockam, path: "../ockam"}, {:ockam_metrics, path: "../ockam_metrics"}, {:ockam_abac, path: "../ockam_abac"}, diff --git a/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs b/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs index fd13d13dd84..57615efc080 100644 --- a/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs +++ b/implementations/rust/ockam/ockam_api/src/authority_node/authority.rs @@ -13,7 +13,7 @@ use crate::authenticator::{ }; use crate::authority_node::Configuration; use crate::echoer::Echoer; -use crate::logs::HttpForwarder; +use crate::logs::GrpcForwarder; use crate::nodes::service::default_address::DefaultAddress; use crate::ApiError; use ockam::identity::utils::now; @@ -311,15 +311,15 @@ impl Authority { ctx.start_worker(address, Echoer) } - /// Start an http forwarder service - pub async fn start_http_forwarder( + /// Start a gRPC forwarder service + pub async fn start_grpc_forwarder( &self, ctx: &Context, secure_channel_flow_control_id: &FlowControlId, configuration: &Configuration, ) -> Result<()> { if let Some(telemetry_endpoint_url) = &configuration.telemetry_endpoint_url { - let address = DefaultAddress::HTTP_FORWARDER; + let address = DefaultAddress::GRPC_FORWARDER; ctx.flow_controls() .add_consumer(&address.into(), secure_channel_flow_control_id); @@ -328,10 +328,10 @@ impl Authority { let uri = url .parse::() .map_err(|e| Error::new(Origin::Ockam, Kind::Invalid, e))?; - debug!("start an http forwarder at '{uri}'"); + debug!("start a grpc forwarder at '{uri}'"); ctx.start_worker( address, - HttpForwarder::new(uri).await.map_err(ApiError::core)?, + GrpcForwarder::new(uri).await.map_err(ApiError::core)?, )? }; Ok(()) diff --git a/implementations/rust/ockam/ockam_api/src/authority_node/node.rs b/implementations/rust/ockam/ockam_api/src/authority_node/node.rs index 8baca7115b7..a7c0b66a2fe 100644 --- a/implementations/rust/ockam/ockam_api/src/authority_node/node.rs +++ b/implementations/rust/ockam/ockam_api/src/authority_node/node.rs @@ -36,11 +36,11 @@ pub async fn start_node( authority.start_echo_service(ctx, &secure_channel_flow_control_id)?; debug!("echo service started"); - // start the http forwarder for telemetry traces + // start the grpc forwarder for telemetry traces authority - .start_http_forwarder(ctx, &secure_channel_flow_control_id, configuration) + .start_grpc_forwarder(ctx, &secure_channel_flow_control_id, configuration) .await?; - debug!("http forwarder started"); + debug!("grpc forwarder started"); info!("authority node started"); diff --git a/implementations/rust/ockam/ockam_api/src/logs/env_variables.rs b/implementations/rust/ockam/ockam_api/src/logs/env_variables.rs index b02067b75d5..74a716af2ad 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/env_variables.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/env_variables.rs @@ -34,9 +34,24 @@ pub(crate) const OCKAM_TELEMETRY_EXPORT: &str = "OCKAM_TELEMETRY_EXPORT"; /// Deprecated, use OCKAM_TELEMETRY_EXPORT instead pub(crate) const OCKAM_OPENTELEMETRY_EXPORT: &str = "OCKAM_OPENTELEMETRY_EXPORT"; -/// Decides if spans and log records should be exported via the project exporter portal. Accepted values, see BooleanVar. For example; true, false, 1, 0 +/// Decides if spans and log records should be exported via a secure channel to the project node. Accepted values, see BooleanVar. For example; true, false, 1, 0 pub(crate) const OCKAM_TELEMETRY_EXPORT_VIA_PROJECT: &str = "OCKAM_TELEMETRY_EXPORT_VIA_PROJECT"; +/// Decides if spans and log records should be exported via a secure channel to the authority node. Accepted values, see BooleanVar. For example; true, false, 1, 0 +pub(crate) const OCKAM_TELEMETRY_EXPORT_VIA_AUTHORITY: &str = + "OCKAM_TELEMETRY_EXPORT_VIA_AUTHORITY"; + +/// Route to a node accepting telemetry data, over a secure channel. Accepted values, see MultiAddr. For example: /dnsaddr/localhost/tcp/30002/service/a4da84b7-af6f-4ba6-bf58-709f2b5f0153/service/api +pub(crate) const OCKAM_TELEMETRY_EXPORT_NODE_ROUTE: &str = "OCKAM_TELEMETRY_EXPORT_NODE_ROUTE"; + +/// Identifier of a node accepting telemetry data, over a secure channel. Accepted values, see Identifier. For example: Ief435842446fe86b7880c08d4187073711ec810136880d61cd04a9aa08e74eef +pub(crate) const OCKAM_TELEMETRY_EXPORT_NODE_IDENTIFIER: &str = + "OCKAM_TELEMETRY_EXPORT_NODE_IDENTIFIER"; + +/// Name of the service forwarding telemetry data on a node accepting telemetry data, over a secure channel. For example: grpc_forwarder +pub(crate) const OCKAM_TELEMETRY_EXPORT_NODE_FORWARDER_SERVICE: &str = + "OCKAM_TELEMETRY_EXPORT_NODE_FORWARDER_SERVICE"; + /// Boolean set to true if the current user is an Ockam developer pub const OCKAM_DEVELOPER: &str = "OCKAM_DEVELOPER"; diff --git a/implementations/rust/ockam/ockam_api/src/logs/exporting_configuration.rs b/implementations/rust/ockam/ockam_api/src/logs/exporting_configuration.rs index 437a6386593..4d622d12e48 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/exporting_configuration.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/exporting_configuration.rs @@ -3,11 +3,11 @@ use crate::config::UrlVar; use crate::logs::default_values::*; use crate::logs::env_variables::*; use crate::logs::ExportingEnabled; -use crate::orchestrator::project::Project; -use crate::Result; use crate::{ApiError, CliState, TransportRouteResolver}; -use ockam::identity::{get_default_timeout, SecureClient, TrustIdentifierPolicy}; +use crate::{DefaultAddress, Result}; +use ockam::identity::{get_default_timeout, Identifier, SecureClient, TrustIdentifierPolicy}; use ockam_core::env::{get_env_with_default, FromString}; +use ockam_multiaddr::MultiAddr; use ockam_node::Context; use ockam_transport_tcp::TcpTransport; use std::fmt::{Debug, Display, Formatter}; @@ -219,20 +219,23 @@ impl Display for ExportingConfiguration { /// This enum represents the 2 possible endpoints for exporting traces. Either via: /// /// - An HTTPS collector -/// - An HTTP forwarder on the project node +/// - An gRPC forwarder on a remote node, accessed via a secure channel /// #[derive(Clone)] pub enum TelemetryEndpoint { - ProjectEndpoint(SecureClient), + SecureChannelEndpoint(SecureClient, String), HttpsEndpoint(Url), } impl Display for TelemetryEndpoint { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - TelemetryEndpoint::ProjectEndpoint(client) => { - f.write_str(&client.secure_route().to_string()) - } + TelemetryEndpoint::SecureChannelEndpoint(client, forwarder_service_name) => f + .write_fmt(format_args!( + "{} => 0#{}", + &client.secure_route().to_string(), + forwarder_service_name + )), TelemetryEndpoint::HttpsEndpoint(url) => f.write_str(url.as_str()), } } @@ -263,6 +266,42 @@ pub fn is_exporting_via_project_set() -> Result { )?) } +/// Return true if traces and log records can be exported via a portal (when a project exists), +/// as decided by the OCKAM_TELEMETRY_EXPORT_VIA_PROJECT environment variable. +pub fn is_exporting_via_authority_set() -> Result { + Ok(get_env_with_default( + OCKAM_TELEMETRY_EXPORT_VIA_AUTHORITY, + false, + )?) +} + +/// Return the route to the node accepting telemetry data +/// as decided by the OCKAM_TELEMETRY_EXPORT_NODE_ROUTE environment variable. +pub fn telemetry_export_node_route() -> Result> { + Ok(get_env_with_default( + OCKAM_TELEMETRY_EXPORT_NODE_ROUTE, + None, + )?) +} + +/// Return the identifier to the node accepting telemetry data +/// as decided by the OCKAM_TELEMETRY_EXPORT_NODE_IDENTIFIER environment variable. +pub fn telemetry_export_node_identifier() -> Result> { + Ok(get_env_with_default( + OCKAM_TELEMETRY_EXPORT_NODE_IDENTIFIER, + None, + )?) +} + +/// Return the name of the service collecting telemetry data on a remote node +/// as decided by the OCKAM_TELEMETRY_EXPORT_NODE_FORWARDER_SERVICE environment variable. +pub fn telemetry_export_node_forwarder_service() -> Result { + Ok(get_env_with_default( + OCKAM_TELEMETRY_EXPORT_NODE_FORWARDER_SERVICE, + DefaultAddress::GRPC_FORWARDER.to_string(), + )?) +} + /// Return true to display messages during the setup of the export pub fn is_export_debug_set() -> Result { Ok(get_env_with_default( @@ -294,7 +333,7 @@ async fn exporting_enabled( } else { let endpoint_kind = match endpoint { TelemetryEndpoint::HttpsEndpoint(_) => "HTTPs telemetry collector endpoint", - TelemetryEndpoint::ProjectEndpoint(_) => "Project telemetry collector inlet", + TelemetryEndpoint::SecureChannelEndpoint(_, _) => "Node telemetry collector endpoint", }; print_debug(format!("Exporting telemetry events is disabled because the {} at {} cannot be reached after {}ms", endpoint_kind, endpoint, connection_check_timeout.as_millis())); print_debug("You can disable the export of telemetry events with: `export OCKAM_TELEMETRY_EXPORT=false` to avoid this connection check."); @@ -311,8 +350,8 @@ async fn is_endpoint_accessible( connection_check_timeout: Duration, ) -> bool { match endpoint { - TelemetryEndpoint::ProjectEndpoint(client) => { - is_project_accessible(client, ctx, &connection_check_timeout).await + TelemetryEndpoint::SecureChannelEndpoint(client, _) => { + is_node_accessible(client, ctx, &connection_check_timeout).await } TelemetryEndpoint::HttpsEndpoint(url) => { print_debug("check if the endpoint is accessible"); @@ -322,7 +361,7 @@ async fn is_endpoint_accessible( } /// Return true if the project node can be accessed with a secure channel -async fn is_project_accessible( +async fn is_node_accessible( secure_client: &SecureClient, ctx: &Context, connection_check_timeout: &Duration, @@ -405,46 +444,71 @@ async fn opentelemetry_endpoint( cli_state: &CliState, ctx: &Context, ) -> Result> { - if is_exporting_via_project_set()? { - if let Ok(project) = cli_state.projects().get_default_project().await { - if project.is_ready() { - let project_route = project.authority_multiaddr()?; - print_debug(format!( - "A default project exists. Telemetry data will be sent to {}", - project_route - )); - Ok(Some(TelemetryEndpoint::ProjectEndpoint( - make_authority_node_client(cli_state, ctx, project).await?, - ))) + let route_and_identifier = match ( + telemetry_export_node_route()?, + telemetry_export_node_identifier()?, + ) { + (Some(route), Some(identifier)) => Some((route, identifier)), + _ => { + if let Ok(project) = cli_state.projects().get_default_project().await { + if project.is_ready() { + let via_project = is_exporting_via_project_set()?; + let via_authority = is_exporting_via_authority_set()? && !via_project; + + if via_project { + print_debug("The project node is used as a telemetry endpoint"); + Some(( + project.project_multiaddr()?.clone(), + project.project_identifier().ok_or(ApiError::message( + "The default project must have an identifier", + ))?, + )) + } else if via_authority { + print_debug("The authority node is used as a telemetry endpoint"); + Some(( + project.authority_multiaddr()?.clone(), + project.authority_identifier().ok_or(ApiError::message( + "The default project authority must have an identifier", + ))?, + )) + } else { + print_debug( + "The default project is ready but export via the project node or the authority node is disabled. Getting the default HTTPs endpoint", + ); + None + } + } else { + print_debug( + "The default project is not ready. Getting the default HTTPs endpoint", + ); + None + } } else { - print_debug( - "The default project is not yet ready. Getting the default HTTPs endpoint instead", - ); - Ok(Some(get_https_endpoint()?)) + print_debug("There is no default project. Getting the default HTTPs endpoint"); + None } - } else { - print_debug( - "A default project does not exist. Getting the default HTTPs endpoint instead", - ); - Ok(Some(get_https_endpoint()?)) } + }; + + let endpoint = if let Some((route, identifier)) = route_and_identifier { + let client = make_secure_client(cli_state, ctx, route, identifier).await?; + TelemetryEndpoint::SecureChannelEndpoint(client, telemetry_export_node_forwarder_service()?) } else { - print_debug("A default project does not exist. Getting the default HTTPs endpoint"); - Ok(Some(get_https_endpoint()?)) - } + get_https_endpoint()? + }; + print_debug(format!("Exporting telemetry data to: {endpoint}")); + Ok(Some(endpoint)) } -async fn make_authority_node_client( +async fn make_secure_client( cli_state: &CliState, ctx: &Context, - project: Project, + route: MultiAddr, + identifier: Identifier, ) -> Result { - let authority_route = TransportRouteResolver::default() + let project_route = TransportRouteResolver::default() .allow_tcp() - .resolve(project.authority_multiaddr()?)?; - let authority_identifier = project.authority_identifier().ok_or(ApiError::message( - "The default project must have an authority identifier", - ))?; + .resolve(&route)?; let default_node = if let Ok(node) = cli_state.get_default_node().await { node } else { @@ -458,8 +522,8 @@ async fn make_authority_node_client( secure_channels, None, TcpTransport::create(ctx)?, - authority_route, - Arc::new(TrustIdentifierPolicy::new(authority_identifier)), + project_route, + Arc::new(TrustIdentifierPolicy::new(identifier)), &default_node.identifier(), get_default_timeout(), get_default_timeout(), diff --git a/implementations/rust/ockam/ockam_api/src/logs/http_forwarder.rs b/implementations/rust/ockam/ockam_api/src/logs/grpc_forwarder.rs similarity index 78% rename from implementations/rust/ockam/ockam_api/src/logs/http_forwarder.rs rename to implementations/rust/ockam/ockam_api/src/logs/grpc_forwarder.rs index cd6d84932bd..cb0be2a056f 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/http_forwarder.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/grpc_forwarder.rs @@ -1,8 +1,7 @@ -use crate::logs::secure_client_service::OckamRequest; +use crate::logs::secure_client_service::OckamGrpcRequest; use crate::{ApiError, Result}; use hyper::{http, Uri}; -use minicbor::Decoder; -use ockam_core::api::{Method, RequestHeader}; +use ockam_core::api::{Method, Request}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{async_trait, Routed, Worker}; use ockam_node::Context; @@ -11,15 +10,15 @@ use tonic::body::BoxBody; use tonic::client::GrpcService; use tonic::transport::Channel; -/// The HttpForwarder worker accepts http requests serialized as Ockam messages +/// The GrpcForwarder worker accepts gRPC requests serialized as Ockam messages /// and forwards them to an HTTP endpoint. /// /// Note that we don't wait for a response from the endpoint. -pub struct HttpForwarder { +pub struct GrpcForwarder { channel: Channel, } -impl HttpForwarder { +impl GrpcForwarder { /// Create a Channel for the given URI pub async fn new(uri: Uri) -> Result { let channel = Channel::builder(uri.clone()) @@ -58,20 +57,20 @@ impl HttpForwarder { } #[async_trait] -impl Worker for HttpForwarder { - type Message = Vec; +impl Worker for GrpcForwarder { + type Message = Request; type Context = Context; async fn handle_message( &mut self, _ctx: &mut Context, - message: Routed>, + message: Routed>, ) -> ockam_core::Result<()> { - let body = message.into_body()?; - let mut dec = Decoder::new(&body); - let header: RequestHeader = dec.decode()?; - if let (Some(Method::Post), "/") = (header.method(), header.path()) { - let ockam_request: OckamRequest = dec.decode()?; + let request = message.into_body()?; + let (header, body) = request.into_parts(); + if let (Some(Method::Post), "/", Some(ockam_request)) = + (header.method(), header.path(), body) + { // Every posted message must be forwarded let http_request = ockam_request.make_http_request().map_err(|e| { ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")) diff --git a/implementations/rust/ockam/ockam_api/src/logs/mod.rs b/implementations/rust/ockam/ockam_api/src/logs/mod.rs index edd55ab3c28..c45f89dd7b0 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/mod.rs @@ -13,7 +13,7 @@ mod current_span; mod default_values; pub mod env_variables; pub mod exporting_configuration; -mod http_forwarder; +mod grpc_forwarder; mod log_exporters; pub mod logging_configuration; mod logging_options; @@ -27,7 +27,7 @@ mod tracing_options; pub use current_span::*; pub use exporting_configuration::*; -pub use http_forwarder::*; +pub use grpc_forwarder::*; pub use log_exporters::*; pub use logging_configuration::*; pub use logging_options::*; diff --git a/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_logs_client.rs b/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_logs_client.rs index de79d986dca..9bd50c9f9c4 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_logs_client.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_logs_client.rs @@ -10,8 +10,8 @@ use tonic::metadata::{KeyAndValueRef, MetadataMap}; use tonic::{async_trait, codegen::CompressionEncoding, Request}; /// This struct does what most of the TonicLogsClient does as a LogExporter, except that -/// it uses a SecureClientService to send the gRCP requests serialized as http Request to -/// an HTTP forwarder service located in another Ockam node, via a secure channel. +/// it uses a SecureClientService to send the gRPC requests serialized as http Request to +/// a gRPC forwarder service located in another Ockam node, via a secure channel. /// /// Note that the original TonicLogsClient can also be parameterized with an Interceptor to: /// - Potentially drop some requests @@ -62,7 +62,7 @@ impl OckamTonicLogsClient { /// Implement the LogsExporter trait for OckamTonicLogsClient /// If the inner client is available, use it to export the logs as an ExportLogsServiceRequest -/// to the remote collector, via the secure channel and an HTTP forwarder service on the other node. +/// to the remote collector, via the secure channel and a gRPC forwarder service on the other node. #[async_trait] impl LogExporter for OckamTonicLogsClient { async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> { @@ -124,7 +124,7 @@ mod tests { fn test_export_logs() { let runtime = Arc::new(Runtime::new().unwrap()); let port = random_port(); - start_node_with_http_forwarder_service(runtime.clone(), port); + start_node_with_grpc_forwarder_service(runtime.clone(), port); let (ctx, mut executor) = NodeBuilder::new() .with_logging(LOGGING) @@ -137,7 +137,7 @@ mod tests { let tcp_transport = TcpTransport::create(&ctx)?; let secure_client = make_secure_client(port, secure_channels, tcp_transport).await?; let project_service = - SecureClientService::new(secure_client, &ctx, DefaultAddress::HTTP_FORWARDER); + SecureClientService::new(secure_client, &ctx, DefaultAddress::GRPC_FORWARDER); let mut exporter = OckamTonicLogsClient::new(project_service, Default::default(), None); let record = LogRecord::default(); let scope = InstrumentationScope::default(); diff --git a/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_traces_client.rs b/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_traces_client.rs index cc1994cb384..80ace283fe3 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_traces_client.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/ockam_tonic_traces_client.rs @@ -12,8 +12,8 @@ use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_sc use tonic::metadata::{KeyAndValueRef, MetadataMap}; /// This struct does what most of the TonicTracesClient does as a SpanExporter, except that -/// it uses a SecureClientService to send the gRCP requests serialized as http Request to -/// an HTTP forwarder service located in another Ockam node, via a secure channel. +/// it uses a SecureClientService to send the gRPC requests serialized as http Request to +/// a gRPC forwarder service located in another Ockam node, via a secure channel. /// /// Note that the original TonicTracesClient can also be parameterized with an Interceptor to: /// - Potentially drop some requests @@ -64,7 +64,7 @@ impl OckamTonicTracesClient { /// Implement the SpanExporter trait for OckamTonicTracesClient /// If the inner client is available, use it to export the traces as an ExportTraceServiceRequest -/// to the remote collector, via the secure channel and an HTTP forwarder service on the other node. +/// to the remote collector, via the secure channel and a gRPC forwarder service on the other node. impl SpanExporter for OckamTonicTracesClient { fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { if let Some(ref mut client) = &mut self.inner { @@ -112,7 +112,7 @@ pub(crate) mod tests { use super::*; use crate::authority_node::tests::random_port; use crate::cli_state::{CliStateMode, UseAwsKms}; - use crate::logs::http_forwarder::HttpForwarder; + use crate::logs::grpc_forwarder::GrpcForwarder; use crate::{ApiError, CliState, DefaultAddress, Result}; use hyper::Uri; use ockam::identity::{ @@ -134,7 +134,7 @@ pub(crate) mod tests { fn test_export_spans() { let runtime = Arc::new(Runtime::new().unwrap()); let port = random_port(); - start_node_with_http_forwarder_service(runtime.clone(), port); + start_node_with_grpc_forwarder_service(runtime.clone(), port); let (ctx, mut executor) = NodeBuilder::new() .with_logging(LOGGING) @@ -147,7 +147,7 @@ pub(crate) mod tests { let tcp_transport = TcpTransport::create(&ctx)?; let secure_client = make_secure_client(port, secure_channels, tcp_transport).await?; let project_service = - SecureClientService::new(secure_client, &ctx, DefaultAddress::HTTP_FORWARDER); + SecureClientService::new(secure_client, &ctx, DefaultAddress::GRPC_FORWARDER); let mut exporter = OckamTonicTracesClient::new(project_service, Default::default(), None); exporter @@ -223,9 +223,9 @@ pub(crate) mod tests { )) } - /// Start a node with a HTTP forwarder service. + /// Start a node with a GRPC forwarder service. /// That service connects to a local OpenTelemetry collector. - pub(crate) fn start_node_with_http_forwarder_service(runtime: Arc, port: u16) { + pub(crate) fn start_node_with_grpc_forwarder_service(runtime: Arc, port: u16) { let runtime_clone = runtime.clone(); runtime.spawn(async move { let (ctx, _executor) = NodeBuilder::new() @@ -233,7 +233,7 @@ pub(crate) mod tests { .with_runtime(runtime_clone) .build(); - // start a TCP listener, and a secure channel listener and the http forwarder service. + // start a TCP listener, and a secure channel listener and the grpc forwarder service. let tcp_listener_options = start_tcp_listener(&ctx, port).await?; let secure_channel_listener = start_secure_channel_listener(&ctx, tcp_listener_options).await?; @@ -286,11 +286,11 @@ pub(crate) mod tests { ) -> Result<()> { let uri = Uri::from_static(endpoint); ctx.start_worker( - DefaultAddress::HTTP_FORWARDER, - HttpForwarder::new(uri).await?, + DefaultAddress::GRPC_FORWARDER, + GrpcForwarder::new(uri).await?, )?; ctx.flow_controls().add_consumer( - &Address::from_string(DefaultAddress::HTTP_FORWARDER), + &Address::from_string(DefaultAddress::GRPC_FORWARDER), secure_channel_listener.flow_control_id(), ); Ok(()) diff --git a/implementations/rust/ockam/ockam_api/src/logs/secure_client_service.rs b/implementations/rust/ockam/ockam_api/src/logs/secure_client_service.rs index 0e415c2edee..66d0fae7069 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/secure_client_service.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/secure_client_service.rs @@ -3,10 +3,10 @@ use crate::Result; use futures_core::future::BoxFuture; use http_body_util::{BodyExt, Full}; use hyper::{http, Method, Version}; -use minicbor::{Decode, Decoder, Encode}; +use minicbor::{CborLen, Decode, Encode}; use ockam::identity::SecureClient; use ockam_core::api::Request; -use ockam_core::{Decodable, Encodable, Encoded, TryClone}; +use ockam_core::{cbor_encode_preallocate, Decodable, Encodable, Encoded, Message, TryClone}; use ockam_node::Context; use std::str::FromStr; use std::task::Poll; @@ -90,7 +90,13 @@ impl SecureClientService { ) -> Result> { if let Some(ctx) = &self.ctx { let ockam_request_body = Self::make_ockam_request_body(request).await?; - let _ = self + + trace!( + "Sending a request to {} => 0#{}", + self.secure_client.secure_route(), + self.service_address + ); + let r = self .secure_client .tell( ctx, @@ -98,6 +104,9 @@ impl SecureClientService { Request::post("/").body(ockam_request_body), ) .await?; + if let Some(e) = r.error()? { + trace!("Sending a request - received an error {e}"); + } }; http::Response::builder() .body(BoxBody::default()) @@ -106,7 +115,7 @@ impl SecureClientService { /// In order to make an Ockam request we collect all the bytes from the http request body. /// We could improve this by chunking the original body into several Ockam messages if the original body is too big. - async fn make_ockam_request_body(request: http::Request) -> Result { + async fn make_ockam_request_body(request: http::Request) -> Result { let mut bytes: Vec = Vec::new(); let (head, mut body) = request.into_parts(); while let Some(frame) = body.frame().await { @@ -116,29 +125,34 @@ impl SecureClientService { } } } - Ok(OckamRequest::from(http::Request::from_parts(head, bytes))) + Ok(OckamGrpcRequest::from(http::Request::from_parts( + head, bytes, + ))) } } -/// This struct represent an http Request sent as an Encodable Ockam message +/// This struct represent an gRPC Request sent as an Encodable Ockam message /// /// The from and to_http_request methods are used to convert between the two. /// -#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] -pub struct OckamRequest { - #[n(0)] - method: String, +#[derive(Debug, Clone, Encode, Decode, CborLen, Message, PartialEq, Eq)] +#[cbor(map)] +pub struct OckamGrpcRequest { #[n(1)] - uri: String, + method: String, #[n(2)] - version: HttpVersion, + uri: String, #[n(3)] - headers: Vec<(String, String)>, + version: HttpVersion, #[n(4)] + headers: Vec<(String, String)>, + #[cbor(with = "minicbor::bytes")] + #[n(5)] body: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode, CborLen)] +#[cbor(index_only)] enum HttpVersion { #[n(0)] Http09, @@ -152,7 +166,7 @@ enum HttpVersion { Http3, } -impl From>> for OckamRequest { +impl From>> for OckamGrpcRequest { fn from(req: http::Request>) -> Self { Self { method: req.method().to_string(), @@ -175,7 +189,7 @@ impl From>> for OckamRequest { } } -impl OckamRequest { +impl OckamGrpcRequest { pub fn make_http_request(self) -> Result> { let mut req = http::Request::builder(); req = req.method(Method::from_str(&self.method).map_err(ApiError::message)?); @@ -198,16 +212,15 @@ impl OckamRequest { } } -impl Decodable for OckamRequest { - fn decode(bytes: &[u8]) -> ockam_core::Result { - let mut decoder = Decoder::new(bytes); - Ok(decoder.decode()?) +impl Encodable for OckamGrpcRequest { + fn encode(self) -> ockam_core::Result { + cbor_encode_preallocate(self) } } -impl Encodable for OckamRequest { - fn encode(self) -> ockam_core::Result { - Ok(minicbor::to_vec(self)?) +impl Decodable for OckamGrpcRequest { + fn decode(e: &[u8]) -> ockam_core::Result { + Ok(minicbor::decode(e)?) } } @@ -219,9 +232,9 @@ mod tests { #[test] fn test_make_ockam_request_body() { let request = make_http_request(); - let ockam_request_body = OckamRequest::from(request); + let ockam_request_body = OckamGrpcRequest::from(request); assert_eq!( - ::decode( + ::decode( minicbor::to_vec(ockam_request_body.clone()) .unwrap() .as_slice() diff --git a/implementations/rust/ockam/ockam_api/src/logs/setup.rs b/implementations/rust/ockam/ockam_api/src/logs/setup.rs index 4288953486a..1bbb855e6ed 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/setup.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/setup.rs @@ -8,7 +8,6 @@ use crate::logs::{ TelemetryEndpoint, }; use crate::logs::{LogFormat, OckamSpanExporter}; -use crate::DefaultAddress; use gethostname::gethostname; use ockam_node::Context; use opentelemetry::trace::TracerProvider; @@ -202,9 +201,9 @@ fn create_log_exporter( let log_export_timeout = exporting_configuration.log_export_timeout(); match exporting_configuration.opentelemetry_endpoint() { - TelemetryEndpoint::ProjectEndpoint(client) => { + TelemetryEndpoint::SecureChannelEndpoint(client, forwarder_service_name) => { opentelemetry_otlp::LogExporter::new(OckamTonicLogsClient::new( - SecureClientService::new(client, ctx, DefaultAddress::HTTP_FORWARDER), + SecureClientService::new(client, ctx, &forwarder_service_name), get_otlp_headers(), Some(CompressionEncoding::Gzip), )) @@ -230,9 +229,9 @@ fn create_span_exporter( ) -> opentelemetry_otlp::SpanExporter { let trace_export_timeout = exporting_configuration.span_export_timeout(); match exporting_configuration.opentelemetry_endpoint() { - TelemetryEndpoint::ProjectEndpoint(client) => { + TelemetryEndpoint::SecureChannelEndpoint(client, forwarder_service_name) => { opentelemetry_otlp::SpanExporter::new(OckamTonicTracesClient::new( - SecureClientService::new(client, ctx, DefaultAddress::HTTP_FORWARDER), + SecureClientService::new(client, ctx, &forwarder_service_name), get_otlp_headers(), Some(CompressionEncoding::Gzip), )) diff --git a/implementations/rust/ockam/ockam_api/src/logs/span_exporters.rs b/implementations/rust/ockam/ockam_api/src/logs/span_exporters.rs index b8bbfa9182d..48e720a7f7f 100644 --- a/implementations/rust/ockam/ockam_api/src/logs/span_exporters.rs +++ b/implementations/rust/ockam/ockam_api/src/logs/span_exporters.rs @@ -14,6 +14,7 @@ pub struct DecoratedSpanExporter { #[async_trait] impl SpanExporter for DecoratedSpanExporter { fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + debug!("exporting {} spans", batch.len()); self.exporter.export(batch) } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs index b88a7b65113..330007ec24e 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs @@ -23,7 +23,7 @@ impl DefaultAddress { pub const KAFKA_INLET: &'static str = "kafka_inlet"; pub const LEASE_MANAGER: &'static str = "lease_manager"; pub const CONTROL_API: &'static str = "control_api"; - pub const HTTP_FORWARDER: &'static str = "http_forwarder"; + pub const GRPC_FORWARDER: &'static str = "grpc_forwarder"; pub fn get_rendezvous_server_address() -> Address { let server_address = std::env::var("OCKAM_RENDEZVOUS_SERVER") @@ -48,7 +48,7 @@ impl DefaultAddress { | Self::KAFKA_OUTLET | Self::LEASE_MANAGER | Self::CONTROL_API - | Self::HTTP_FORWARDER) + | Self::GRPC_FORWARDER) } pub fn iter() -> impl Iterator { @@ -70,7 +70,7 @@ impl DefaultAddress { Self::KAFKA_OUTLET, Self::LEASE_MANAGER, Self::CONTROL_API, - Self::HTTP_FORWARDER, + Self::GRPC_FORWARDER, ] .iter() .copied() diff --git a/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt b/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt index 38b2703e0a5..bd57a952818 100644 --- a/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt +++ b/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt @@ -66,6 +66,7 @@ Devs Usage - OCKAM_DEVELOPER: a `boolean` specifying if the current user is an Ockam developer (for more accurate metrics). - OCKAM_OPENTELEMETRY_EXPORT_DEBUG: a `boolean` specifying if debug messages must be printed to the console when the OpenTelemetry export is configured. - OCKAM_TELEMETRY_EXPORT_VIA_PROJECT: a `boolean` specifying if traces must be exported via a secure channel to the project node (when it exists) +- OCKAM_TELEMETRY_EXPORT_VIA_AUTHORITY: a `boolean` specifying if traces must be exported via a secure channel to the authority node (when it exists). The `OCKAM_TELEMETRY_EXPORT_VIA_PROJECT` value takes precedence. - OCKAM_DEFAULT_TIMEOUT: a `Duration` used to timeout secure channels creation and API requests. Default value: `120s`. Internal (to enable some special behavior in the logic) diff --git a/implementations/rust/ockam/ockam_command/src/node/create.rs b/implementations/rust/ockam/ockam_command/src/node/create.rs index 4c5f5af83c0..bfc89fee2d3 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use clap::Args; use colorful::Colorful; use miette::{miette, IntoDiagnostic, WrapErr}; +use ockam_api::cli_state::DEFAULT_NODE_NAME; use ockam_api::colors::{color_error, color_primary}; use ockam_api::nodes::models::transport::Port; use ockam_api::terminal::notification::NotificationHandler; diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index e16be8d3364..d7436b82e61 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -1,4 +1,3 @@ -use crate::node::create::DEFAULT_NODE_NAME; use crate::node::node_callback::NodeCallback; use crate::node::CreateCommand; use crate::service::config::ControlApiNodeResolution; @@ -9,7 +8,7 @@ use miette::IntoDiagnostic; use ockam::tcp::{TcpListenerOptions, TcpTransport}; use ockam::udp::{UdpBindArguments, UdpBindOptions, UdpTransport}; use ockam::{Address, Context}; -use ockam_api::cli_state::random_name; +use ockam_api::cli_state::{random_name, DEFAULT_NODE_NAME}; use ockam_api::colors::color_primary; use ockam_api::control_api::frontend::NodeResolution; use ockam_api::fmt_log; diff --git a/implementations/rust/ockam/ockam_core/src/api.rs b/implementations/rust/ockam/ockam_core/src/api.rs index f6cf96b9b23..8d8b9031844 100644 --- a/implementations/rust/ockam/ockam_core/src/api.rs +++ b/implementations/rust/ockam/ockam_core/src/api.rs @@ -142,6 +142,19 @@ impl Reply { } } + /// Return an error message if any. + #[track_caller] + pub fn error(&self) -> Result> { + match self { + Reply::Successful(_) => Ok(None), + Reply::Failed(e, _) => Ok(Some( + e.message() + .unwrap_or("no message defined for this error") + .to_string(), + )), + } + } + #[cfg(feature = "std")] #[track_caller] pub fn miette_success(self, request_kind: &str) -> Result { diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs index 0c8161338c7..8ff20eda5fa 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_client.rs @@ -169,7 +169,7 @@ impl SecureClient { response.to_reply() } - /// Send a request of type T and don't expect a reply + /// Send a request of type T and don't expect a reply body /// See `ask` for more information pub async fn tell( &self, diff --git a/implementations/rust/telemetry.md b/implementations/rust/telemetry.md index 7b715b38af1..56199cea7b5 100644 --- a/implementations/rust/telemetry.md +++ b/implementations/rust/telemetry.md @@ -9,9 +9,11 @@ collector serves as a central point of collection and can forward this data to a # Configuration -There are two ways to send telemetry data to the collector: +There are various ways to send telemetry data to the collector: 1. Via a secure channel to your project and then to the collector (the default). +1. Via a secure channel to your project's authority node and then to the collector. +1. Via a secure channel to an arbitrary Ockam node and then to the collector (the default). 2. Directly via HTTP. ## Sending telemetry data directly via a secure channel to the project @@ -25,6 +27,17 @@ Then, the telemetry data is sent as Ockam messages to the authority node of the In order for the forwarding to work, the authority node be configured with the `OCKAM_OPENTELEMETRY_ENDPOINT` set to the collector endpoint. +## Sending telemetry data directly via a secure channel to the authority node + +This behaviour can be disabled by setting the `OCKAM_TELEMETRY_EXPORT_VIA_PROJECT` environment variable to `false`. +In that case, if `OCKAM_OPENTELEMETRY_EXPORT` is set to true, the telemetry data will be sent directly via HTTP. + +This mode is only active if a default project can be detected locally and is accessible via a secure channel. +Then, the telemetry data is sent as Ockam messages to the authority node of the project and forwarded to the collector. + +In order for the forwarding to work, the authority node be configured with the `OCKAM_OPENTELEMETRY_ENDPOINT` set to +the collector endpoint. + ## Sending telemetry data directly via HTTP In order to do this you need to set the following environment variables: