Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strict type Dependabot::Terraform::RegistryClient #10643

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions terraform/lib/dependabot/terraform/registry_client.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# typed: true
# typed: strict
# frozen_string_literal: true

require "dependabot/dependency"
Expand All @@ -12,29 +12,36 @@ module Terraform
# Terraform::RegistryClient is a basic API client to interact with a
# terraform registry: https://www.terraform.io/docs/registry/api.html
class RegistryClient
ARCHIVE_EXTENSIONS = %w(.zip .tbz2 .tgz .txz).freeze
extend T::Sig

ARCHIVE_EXTENSIONS = T.let(%w(.zip .tbz2 .tgz .txz).freeze, T::Array[String])
PUBLIC_HOSTNAME = "registry.terraform.io"

sig { params(hostname: String, credentials: T::Array[Dependabot::Credential]).void }
def initialize(hostname: PUBLIC_HOSTNAME, credentials: [])
@hostname = hostname
@tokens = credentials.each_with_object({}) do |item, memo|
memo[item["host"]] = item["token"] if item["type"] == "terraform_registry"
end
@tokens = T.let(
credentials.each_with_object({}) do |item, memo|
memo[item["host"]] = item["token"] if item["type"] == "terraform_registry"
end,
T::Hash[String, String]
)
end

# rubocop:disable Metrics/PerceivedComplexity
# See https://www.terraform.io/docs/modules/sources.html#http-urls for
# details of how Terraform handle HTTP(S) sources for modules
# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/CyclomaticComplexity
# See https://www.terraform.io/docs/modules/sources.html#http-urls for
# details of how Terraform handle HTTP(S) sources for modules
sig { params(raw_source: String).returns(T.nilable(String)) }
def self.get_proxied_source(raw_source)
return raw_source unless raw_source.start_with?("http")

uri = URI.parse(raw_source.split(%r{(?<!:)//}).first)
uri = URI.parse(T.must(raw_source.split(%r{(?<!:)//}).first))
return raw_source if ARCHIVE_EXTENSIONS.any? { |ext| uri.path&.end_with?(ext) }
return raw_source if URI.parse(raw_source).query&.include?("archive=")

url = raw_source.split(%r{(?<!:)//}).first + "?terraform-get=1"
url = T.must(raw_source.split(%r{(?<!:)//}).first) + "?terraform-get=1"
host = URI.parse(raw_source).host

response = Dependabot::RegistryClient.get(url: url)
Expand Down Expand Up @@ -62,6 +69,7 @@ def self.get_proxied_source(raw_source)
# "hashicorp/aws"
# @return [Array<Dependabot::Terraform::Version>]
# @raise [Dependabot::DependabotError] when the versions cannot be retrieved
sig { params(identifier: String).returns(T::Array[Dependabot::Terraform::Version]) }
def all_provider_versions(identifier:)
base_url = service_url_for("providers.v1")
response = http_get!(URI.join(base_url, "#{identifier}/versions"))
Expand All @@ -80,6 +88,7 @@ def all_provider_versions(identifier:)
# "hashicorp/consul/aws"
# @return [Array<Dependabot::Terraform::Version>]
# @raise [Dependabot::DependabotError] when the versions cannot be retrieved
sig { params(identifier: String).returns(T::Array[Dependabot::Terraform::Version]) }
def all_module_versions(identifier:)
base_url = service_url_for("modules.v1")
response = http_get!(URI.join(base_url, "#{identifier}/versions"))
Expand All @@ -97,8 +106,9 @@ def all_module_versions(identifier:)
# @param dependency [Dependabot::Dependency] the dependency who's source
# we're attempting to find
# @return [nil, Dependabot::Source]
sig { params(dependency: Dependabot::Dependency).returns(T.nilable(Dependabot::Source)) }
def source(dependency:)
type = dependency.requirements.first[:source][:type]
type = T.must(dependency.requirements.first)[:source][:type]
base_url = service_url_for(service_key_for(type))
case type
# https://www.terraform.io/internals/module-registry-protocol#download-source-code-for-a-specific-module-version
Expand Down Expand Up @@ -130,6 +140,7 @@ def source(dependency:)
# @param service_key [String] the service type described in https://www.terraform.io/docs/internals/remote-service-discovery.html#supported-services
# @param return String
# @raise [Dependabot::PrivateSourceAuthenticationFailure] when the service is not available
sig { params(service_key: String).returns(String) }
def service_url_for(service_key)
url_for(services.fetch(service_key))
rescue KeyError
Expand All @@ -138,26 +149,35 @@ def service_url_for(service_key)

private

sig { returns(String) }
attr_reader :hostname

sig { returns(T::Hash[String, String]) }
attr_reader :tokens

sig { returns(T.class_of(Dependabot::Terraform::Version)) }
def version_class
Version
end

sig { params(hostname: String).returns(T::Hash[String, String]) }
def headers_for(hostname)
token = tokens[hostname]
token ? { "Authorization" => "Bearer #{token}" } : {}
end

sig { returns(T::Hash[String, String]) }
def services
@services ||=
@services ||= T.let(
begin
response = http_get(url_for("/.well-known/terraform.json"))
response.status == 200 ? JSON.parse(response.body) : {}
end
end,
T.nilable(T::Hash[String, String])
)
end

sig { params(type: String).returns(String) }
def service_key_for(type)
case type
when "module", "modules", "registry"
Expand All @@ -169,13 +189,15 @@ def service_key_for(type)
end
end

sig { params(url: T.any(String, URI::Generic)).returns(Excon::Response) }
def http_get(url)
Dependabot::RegistryClient.get(
url: url.to_s,
headers: headers_for(hostname)
)
end

sig { params(url: URI::Generic).returns(Excon::Response) }
def http_get!(url)
response = http_get(url)

Expand All @@ -185,6 +207,7 @@ def http_get!(url)
response
end

sig { params(path: String).returns(String) }
def url_for(path)
uri = URI.parse(path)
return uri.to_s if uri.scheme == "https"
Expand All @@ -195,6 +218,7 @@ def url_for(path)
uri.to_s
end

sig { params(message: String).returns(Dependabot::DependabotError) }
def error(message)
Dependabot::DependabotError.new(message)
end
Expand Down
Loading