diff --git a/flake.nix b/flake.nix index 200417c3e7b..78cb248d731 100644 --- a/flake.nix +++ b/flake.nix @@ -88,7 +88,8 @@ (aws-sdk-cpp.override { apis = ["s3" "transfer"]; customMemoryManagement = false; - }); + }) + ++ lib.optional (stdenv.isLinux || stdenv.isDarwin) ipfs; propagatedDeps = [ (boehmgc.override { enableLargeConfig = true; }) diff --git a/src/libstore/content-address.cc b/src/libstore/content-address.cc index 90a3ad1f570..6f55e69f71c 100644 --- a/src/libstore/content-address.cc +++ b/src/libstore/content-address.cc @@ -1,3 +1,5 @@ +#include + #include "args.hh" #include "content-address.hh" #include "split.hh" @@ -134,6 +136,64 @@ std::string renderContentAddress(std::optional ca) return ca ? renderContentAddress(*ca) : ""; } +void to_json(nlohmann::json& j, const ContentAddress & ca) { + j = std::visit(overloaded { + [](TextHash th) { + return nlohmann::json { + { "type", "text" }, + { "hash", th.hash.to_string(Base32, false) }, + }; + }, + [](FixedOutputHash foh) { + return nlohmann::json { + { "type", "fixed" }, + { "method", foh.method == FileIngestionMethod::Flat ? "flat" : "recursive" }, + { "algo", printHashType(foh.hash.type) }, + { "hash", foh.hash.to_string(Base32, false) }, + }; + } + }, ca); +} + +void from_json(const nlohmann::json& j, ContentAddress & ca) { + std::string_view type = j.at("type").get(); + if (type == "text") { + ca = TextHash { + .hash = Hash::parseNonSRIUnprefixed(j.at("hash").get(), htSHA256), + }; + } else if (type == "fixed") { + std::string_view methodRaw = j.at("method").get(); + auto method = methodRaw == "flat" ? FileIngestionMethod::Flat + : methodRaw == "recursive" ? FileIngestionMethod::Recursive + : throw Error("invalid file ingestion method: %s", methodRaw); + auto hashAlgo = parseHashType(j.at("algo").get()); + ca = FixedOutputHash { + .method = method, + .hash = Hash::parseNonSRIUnprefixed(j.at("hash").get(), hashAlgo), + }; + } else + throw Error("invalid type: %s", type); +} + +// Needed until https://github.com/nlohmann/json/pull/2117 + +void to_json(nlohmann::json& j, const std::optional & c) { + if (!c) + j = nullptr; + else + to_json(j, *c); +} + +void from_json(const nlohmann::json& j, std::optional & c) { + if (j.is_null()) { + c = std::nullopt; + } else { + // Dummy value to set tag bit. + c = TextHash { .hash = Hash { htSHA256 } }; + from_json(j, *c); + } +} + Hash getContentAddressHash(const ContentAddress & ca) { return std::visit(overloaded { diff --git a/src/libstore/content-address.hh b/src/libstore/content-address.hh index f6a6f514004..6337d8d9e98 100644 --- a/src/libstore/content-address.hh +++ b/src/libstore/content-address.hh @@ -1,6 +1,8 @@ #pragma once +#include #include + #include "hash.hh" namespace nix { @@ -53,6 +55,13 @@ ContentAddress parseContentAddress(std::string_view rawCa); std::optional parseContentAddressOpt(std::string_view rawCaOpt); +void to_json(nlohmann::json& j, const ContentAddress & c); +void from_json(const nlohmann::json& j, ContentAddress & c); + +// Needed until https://github.com/nlohmann/json/pull/211 + +void to_json(nlohmann::json& j, const std::optional & c); +void from_json(const nlohmann::json& j, std::optional & c); Hash getContentAddressHash(const ContentAddress & ca); /* diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 99d8add9233..0e74e9ae434 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -859,6 +859,14 @@ static void performOp(TunnelLogger * logger, ref store, break; } + case wopSync: { + logger->startWork(); + store->sync(); + logger->stopWork(); + to << 1; + break; + } + default: throw Error("invalid operation %1%", op); } diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index c2c65af05a4..17b223fad9e 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -33,6 +33,8 @@ FileTransferSettings fileTransferSettings; static GlobalConfig::Register rFileTransferSettings(&fileTransferSettings); +MakeError(URLEncodeError, Error); + std::string resolveUri(const std::string & uri) { if (uri.compare(0, 8, "channel:") == 0) @@ -310,12 +312,22 @@ struct curlFileTransfer : public FileTransfer if (request.head) curl_easy_setopt(req, CURLOPT_NOBODY, 1); + else if (request.post) + curl_easy_setopt(req, CURLOPT_POST, 1); if (request.data) { - curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper); - curl_easy_setopt(req, CURLOPT_READDATA, this); - curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + if (request.post) { + // based off of https://curl.haxx.se/libcurl/c/postit2.html + curl_mime *form = curl_mime_init(req); + curl_mimepart *field = curl_mime_addpart(form); + curl_mime_data(field, request.data->data(), request.data->length()); + curl_easy_setopt(req, CURLOPT_MIMEPOST, form); + } else { + curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper); + curl_easy_setopt(req, CURLOPT_READDATA, this); + curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + } } if (request.verifyTLS) { @@ -332,6 +344,14 @@ struct curlFileTransfer : public FileTransfer curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L); curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, fileTransferSettings.stalledDownloadTimeout.get()); + /* FIXME: We hit a weird issue when 1 second goes by + * without Expect: 100-continue. curl_multi_perform + * appears to block indefinitely. To workaround this, we + * just set the timeout to a really big value unlikely to + * be hit in any server without Expect: 100-continue. This + * may specifically be a bug in the IPFS API. */ + curl_easy_setopt(req, CURLOPT_EXPECT_100_TIMEOUT_MS, 300000); + /* If no file exist in the specified path, curl continues to work anyway as if netrc support was disabled. */ curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str()); @@ -711,6 +731,22 @@ struct curlFileTransfer : public FileTransfer enqueueItem(std::make_shared(*this, request, std::move(callback))); } + + std::string urlEncode(const std::string & param) override { + //TODO reuse curl handle or move function to another class/file + CURL *curl = curl_easy_init(); + char *encoded = NULL; + if (curl) { + encoded = curl_easy_escape(curl, param.c_str(), 0); + } + if ((curl == NULL) || (encoded == NULL)) { + throw URLEncodeError("Could not encode param"); + } + std::string ret(encoded); + curl_free(encoded); + curl_easy_cleanup(curl); + return ret; + } }; ref getFileTransfer() @@ -844,6 +880,10 @@ void FileTransfer::download(FileTransferRequest && request, Sink & sink) } } +std::string FileTransfer::urlEncode(const std::string & param) { + throw URLEncodeError("not implemented"); +} + template FileTransferError::FileTransferError(FileTransfer::Error error, std::shared_ptr response, const Args & ... args) : Error(args...), error(error), response(response) diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh index c89c51a21ed..48f1b8fe9ea 100644 --- a/src/libstore/filetransfer.hh +++ b/src/libstore/filetransfer.hh @@ -55,6 +55,7 @@ struct FileTransferRequest std::string expectedETag; bool verifyTLS = true; bool head = false; + bool post = false; size_t tries = fileTransferSettings.tries; unsigned int baseRetryTimeMs = 250; ActivityId parentAct; @@ -105,6 +106,8 @@ struct FileTransfer invoked on the thread of the caller. */ void download(FileTransferRequest && request, Sink & sink); + virtual std::string urlEncode(const std::string & param); + enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; }; diff --git a/src/libstore/ipfs-binary-cache-store.cc b/src/libstore/ipfs-binary-cache-store.cc new file mode 100644 index 00000000000..12048a9ea4c --- /dev/null +++ b/src/libstore/ipfs-binary-cache-store.cc @@ -0,0 +1,655 @@ +#include +#include + +#include "ipfs-binary-cache-store.hh" +#include "filetransfer.hh" +#include "nar-info-disk-cache.hh" +#include "archive.hh" +#include "compression.hh" +#include "names.hh" +#include "callback.hh" + +namespace nix { + +IPFSBinaryCacheStore::IPFSBinaryCacheStore( + const std::string & scheme, const std::string & uri, const Params & params) + : StoreConfig(params) + , Store(params) + , cacheScheme(scheme) + , cacheUri(uri) +{ + auto state(_state.lock()); + + if (secretKeyFile != "") + secretKey = std::unique_ptr(new SecretKey(readFile(secretKeyFile))); + + StringSink sink; + sink << narVersionMagic1; + narMagic = *sink.s; + + if (cacheUri.back() == '/') + cacheUri.pop_back(); + + if (cacheScheme == "ipfs") { + initialIpfsPath = "/ipfs/" + cacheUri; + state->ipfsPath = initialIpfsPath; + allowModify = get(params, "allow-modify").value_or("") == "true"; + } else if (cacheScheme == "ipns") { + ipnsPath = "/ipns/" + cacheUri; + + // TODO: we should try to determine if we are able to modify + // this ipns + allowModify = true; + } else + throw Error("unknown IPNS URI '%s'", getUri()); + + std::string ipfsAPIHost(get(params, "host").value_or("127.0.0.1")); + std::string ipfsAPIPort(get(params, "port").value_or("5001")); + daemonUri = "http://" + ipfsAPIHost + ":" + ipfsAPIPort; + + // Check the IPFS daemon is running + FileTransferRequest request(daemonUri + "/api/v0/version"); + request.post = true; + request.tries = 1; + auto res = getFileTransfer()->download(request); + auto versionInfo = nlohmann::json::parse(*res.data); + if (versionInfo.find("Version") == versionInfo.end()) + throw Error("daemon for IPFS is not running properly"); + + if (compareVersions(versionInfo["Version"], "0.4.0") < 0) + throw Error("daemon for IPFS is %s, when a minimum of 0.4.0 is required", versionInfo["Version"]); + + // Resolve the IPNS name to an IPFS object + if (ipnsPath) { + initialIpfsPath = resolveIPNSName(*ipnsPath); + state->ipfsPath = initialIpfsPath; + } + + auto json = getIpfsDag(state->ipfsPath); + + // Verify StoreDir is correct + if (json.find("StoreDir") == json.end()) { + json["StoreDir"] = storeDir; + state->ipfsPath = putIpfsDag(json); + } else if (json["StoreDir"] != storeDir) + throw Error("binary cache '%s' is for Nix stores with prefix '%s', not '%s'", + getUri(), json["StoreDir"], storeDir); + + if (json.find("WantMassQuery") != json.end()) + wantMassQuery.setDefault(json["WantMassQuery"] ? "true" : "false"); + + if (json.find("Priority") != json.end()) + priority.setDefault(fmt("%d", json["Priority"])); +} + +std::string IPFSBinaryCacheStore::putIpfsDag(nlohmann::json data) +{ + auto req = FileTransferRequest(daemonUri + "/api/v0/dag/put"); + req.data = std::make_shared(data.dump()); + req.post = true; + req.tries = 1; + auto res = getFileTransfer()->upload(req); + auto json = nlohmann::json::parse(*res.data); + return "/ipfs/" + (std::string) json["Cid"]["/"]; +} + +nlohmann::json IPFSBinaryCacheStore::getIpfsDag(std::string objectPath) +{ + auto req = FileTransferRequest(daemonUri + "/api/v0/dag/get?arg=" + objectPath); + req.post = true; + req.tries = 1; + auto res = getFileTransfer()->download(req); + auto json = nlohmann::json::parse(*res.data); + return json; +} + +// Given a ipns path, checks if it corresponds to a DNSLink path, and in +// case returns the domain +std::optional IPFSBinaryCacheStore::isDNSLinkPath(std::string path) +{ + if (path.find("/ipns/") != 0) + throw Error("path '%s' is not an ipns path", path); + auto subpath = std::string(path, 6); + if (subpath.find(".") != std::string::npos) { + return subpath; + } + return std::nullopt; +} + +bool IPFSBinaryCacheStore::ipfsObjectExists(const std::string ipfsPath) +{ + auto uri = daemonUri + "/api/v0/object/stat?arg=" + getFileTransfer()->urlEncode(ipfsPath); + + FileTransferRequest request(uri); + request.post = true; + request.tries = 1; + try { + auto res = getFileTransfer()->download(request); + auto json = nlohmann::json::parse(*res.data); + + return json.find("Hash") != json.end(); + } catch (FileTransferError & e) { + // probably should verify this is a not found error but + // ipfs gives us a 500 + return false; + } +} + +// Resolve the IPNS name to an IPFS object +std::string IPFSBinaryCacheStore::resolveIPNSName(std::string ipnsPath) { + debug("Resolving IPFS object of '%s', this could take a while.", ipnsPath); + auto uri = daemonUri + "/api/v0/name/resolve?arg=" + getFileTransfer()->urlEncode(ipnsPath); + FileTransferRequest request(uri); + request.post = true; + request.tries = 1; + auto res = getFileTransfer()->download(request); + auto json = nlohmann::json::parse(*res.data); + if (json.find("Path") == json.end()) + throw Error("daemon for IPFS is not running properly"); + return json["Path"]; +} + +Path IPFSBinaryCacheStore::formatPathAsProtocol(Path path) { + if (hasPrefix(path, "/ipfs/")) + return "ipfs://" + path.substr(strlen("/ipfs/"), string::npos); + else if (hasPrefix(path, "/ipns/")) + return "ipns://" + path.substr(strlen("/ipfs/"), string::npos); + else return path; +} + +// IPNS publish can be slow, we try to do it rarely. +void IPFSBinaryCacheStore::sync() +{ + auto state(_state.lock()); + + if (state->ipfsPath == initialIpfsPath) + return; + + // If we aren't in trustless mode (handled above) and we don't allow + // modifications, state->ipfsPath should never be changed from the initial + // one, + assert(allowModify); + + if (!ipnsPath) { + warn("created new store at '%s'. The old store at %s is immutable, so we can't update it", + "ipfs://" + std::string(state->ipfsPath, 6), getUri()); + return; + } + + auto resolvedIpfsPath = resolveIPNSName(*ipnsPath); + if (resolvedIpfsPath != initialIpfsPath) { + throw Error( + "The IPNS hash or DNS link %s resolves to something different from the value it had when Nix was started:\n" + " expected: %s\n" + " got %s\n" + "\n" + "Perhaps something else updated it in the meantime?", + *ipnsPath, initialIpfsPath, resolvedIpfsPath); + } + + if (resolvedIpfsPath == state->ipfsPath) { + printMsg(lvlInfo, "The hash is already up to date, nothing to do"); + return; + } + + // Now, we know that paths are not up to date but also not changed due to updates in DNS or IPNS hash. + auto optDomain = isDNSLinkPath(*ipnsPath); + if (optDomain) { + auto domain = *optDomain; + throw Error("The provided ipns path is a DNSLink, and syncing those is not supported.\n Current DNSLink: %s\nYou should update your DNS settings" + , domain); + } + + debug("Publishing '%s' to '%s', this could take a while.", state->ipfsPath, *ipnsPath); + + auto uri = daemonUri + "/api/v0/name/publish?allow-offline=true"; + uri += "&arg=" + getFileTransfer()->urlEncode(state->ipfsPath); + + // Given the hash, we want to discover the corresponding name in the + // `ipfs key list` command, so that we publish to the right address in + // case the user has multiple ones available. + + // NOTE: this is needed for ipfs < 0.5.0 because key must be a + // name, not an address. + + auto ipnsPathHash = std::string(*ipnsPath, 6); + debug("Getting the name corresponding to hash %s", ipnsPathHash); + + auto keyListRequest = FileTransferRequest(daemonUri + "/api/v0/key/list/"); + keyListRequest.post = true; + keyListRequest.tries = 1; + + auto keyListResponse = nlohmann::json::parse(*(getFileTransfer()->download(keyListRequest)).data); + + std::string keyName {""}; + for (auto & key : keyListResponse["Keys"]) + if (key["Id"] == ipnsPathHash) + keyName = key["Name"]; + if (keyName == "") { + throw Error("We couldn't find a name corresponding to the provided ipns hash:\n hash: %s", ipnsPathHash); + } + + // Now we can append the keyname to our original request + uri += "&key=" + keyName; + + auto req = FileTransferRequest(uri); + req.post = true; + req.tries = 1; + getFileTransfer()->download(req); +} + +void IPFSBinaryCacheStore::addLink(std::string name, std::string ipfsObject) +{ + auto state(_state.lock()); + + auto uri = daemonUri + "/api/v0/object/patch/add-link?create=true"; + uri += "&arg=" + getFileTransfer()->urlEncode(state->ipfsPath); + uri += "&arg=" + getFileTransfer()->urlEncode(name); + uri += "&arg=" + getFileTransfer()->urlEncode(ipfsObject); + + auto req = FileTransferRequest(uri); + req.post = true; + req.tries = 1; + auto res = getFileTransfer()->download(req); + auto json = nlohmann::json::parse(*res.data); + + state->ipfsPath = "/ipfs/" + (std::string) json["Hash"]; +} + +std::string IPFSBinaryCacheStore::addFile(const std::string & data) +{ + // TODO: use callbacks + + auto req = FileTransferRequest(daemonUri + "/api/v0/add"); + req.data = std::make_shared(data); + req.post = true; + req.tries = 1; + auto res = getFileTransfer()->upload(req); + auto json = nlohmann::json::parse(*res.data); + return (std::string) json["Hash"]; +} + +void IPFSBinaryCacheStore::upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) +{ + try { + addLink(path, "/ipfs/" + addFile(data)); + } catch (FileTransferError & e) { + // TODO: may wrap the inner error in a better way. + throw UploadToIPFS("while uploading to IPFS binary cache at '%s': %s", getUri(), e.msg()); + } +} + +void IPFSBinaryCacheStore::getFile(const std::string & path, + Callback> callback) noexcept +{ + std::string path_(path); + if (hasPrefix(path, "ipfs://")) + path_ = "/ipfs/" + std::string(path, 7); + getIpfsObject(path_, std::move(callback)); +} + +void IPFSBinaryCacheStore::getFile(const std::string & path, Sink & sink) +{ + std::promise> promise; + getFile(path, + {[&](std::future> result) { + try { + promise.set_value(result.get()); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }}); + auto data = promise.get_future().get(); + sink((unsigned char *) data->data(), data->size()); +} + +std::shared_ptr IPFSBinaryCacheStore::getFile(const std::string & path) +{ + StringSink sink; + try { + getFile(path, sink); + } catch (NoSuchBinaryCacheFile &) { + return nullptr; + } + return sink.s; +} + +void IPFSBinaryCacheStore::getIpfsObject(const std::string & ipfsPath, + Callback> callback) noexcept +{ + auto uri = daemonUri + "/api/v0/cat?arg=" + getFileTransfer()->urlEncode(ipfsPath); + + FileTransferRequest request(uri); + request.post = true; + request.tries = 1; + + auto callbackPtr = std::make_shared(std::move(callback)); + + getFileTransfer()->enqueueFileTransfer(request, + {[callbackPtr](std::future result){ + try { + (*callbackPtr)(result.get().data); + } catch (FileTransferError & e) { + return (*callbackPtr)(std::shared_ptr()); + } catch (...) { + callbackPtr->rethrow(); + } + }} + ); +} + +void IPFSBinaryCacheStore::writeNarInfo(ref narInfo) +{ + auto json = nlohmann::json::object(); + json["narHash"] = narInfo->narHash.to_string(Base32, true); + json["narSize"] = narInfo->narSize; + + auto narMap = getIpfsDag(getIpfsPath())["nar"]; + + json["references"] = nlohmann::json::object(); + json["hasSelfReference"] = false; + for (auto & ref : narInfo->references) { + if (ref == narInfo->path) + json["hasSelfReference"] = true; + else + json["references"].emplace(ref.to_string(), narMap[(std::string) ref.to_string()]); + } + + json["ca"] = narInfo->ca; + + if (narInfo->deriver) + json["deriver"] = printStorePath(*narInfo->deriver); + + json["registrationTime"] = narInfo->registrationTime; + json["ultimate"] = narInfo->ultimate; + + json["sigs"] = nlohmann::json::array(); + for (auto & sig : narInfo->sigs) + json["sigs"].push_back(sig); + + if (!narInfo->url.empty()) { + json["ipfsCid"] = nlohmann::json::object(); + json["ipfsCid"]["/"] = std::string(narInfo->url, 7); + } + + if (narInfo->fileHash) + json["downloadHash"] = narInfo->fileHash->to_string(Base32, true); + + json["downloadSize"] = narInfo->fileSize; + json["compression"] = narInfo->compression; + json["system"] = narInfo->system; + + auto narObjectPath = putIpfsDag(json); + + auto state(_state.lock()); + json = getIpfsDag(state->ipfsPath); + + if (json.find("nar") == json.end()) + json["nar"] = nlohmann::json::object(); + + auto hashObject = nlohmann::json::object(); + hashObject.emplace("/", std::string(narObjectPath, 6)); + + json["nar"].emplace(narInfo->path.to_string(), hashObject); + + state->ipfsPath = putIpfsDag(json); + + { + auto hashPart = narInfo->path.hashPart(); + auto state_(this->state.lock()); + state_->pathInfoCache.upsert( + std::string { hashPart }, + PathInfoCacheValue { .value = std::shared_ptr(narInfo) }); + } +} + +void IPFSBinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource, + RepairFlag repair, CheckSigsFlag checkSigs) +{ + // FIXME: See if we can use the original source to reduce memory usage. + auto nar = make_ref(narSource.drain()); + + if (!repair && isValidPath(info.path)) return; + + if (!allowModify) + throw Error("can't update '%s'", getUri()); + + /* Verify that all references are valid. This may do some .narinfo + reads, but typically they'll already be cached. */ + for (auto & ref : info.references) + try { + if (ref != info.path) + queryPathInfo(ref); + } catch (InvalidPath &) { + throw Error("cannot add '%s' to the binary cache because the reference '%s' is not valid", + printStorePath(info.path), printStorePath(ref)); + } + + assert(nar->compare(0, narMagic.size(), narMagic) == 0); + + auto narInfo = make_ref(info); + + narInfo->narSize = nar->size(); + narInfo->narHash = hashString(htSHA256, *nar); + + if (info.narHash != narInfo->narHash) + throw Error("refusing to copy corrupted path '%1%' to binary cache", printStorePath(info.path)); + + /* Compress the NAR. */ + narInfo->compression = compression; + auto now1 = std::chrono::steady_clock::now(); + auto narCompressed = compress(compression, *nar, parallelCompression); + auto now2 = std::chrono::steady_clock::now(); + narInfo->fileHash = hashString(htSHA256, *narCompressed); + narInfo->fileSize = narCompressed->size(); + + auto duration = std::chrono::duration_cast(now2 - now1).count(); + printMsg(lvlTalkative, "copying path '%1%' (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache", + printStorePath(narInfo->path), narInfo->narSize, + ((1.0 - (double) narCompressed->size() / nar->size()) * 100.0), + duration); + + /* Atomically write the NAR file. */ + stats.narWrite++; + narInfo->url = "ipfs://" + addFile(*narCompressed); + + stats.narWriteBytes += nar->size(); + stats.narWriteCompressedBytes += narCompressed->size(); + stats.narWriteCompressionTimeMs += duration; + + /* Atomically write the NAR info file.*/ + if (secretKey) narInfo->sign(*this, *secretKey); + + writeNarInfo(narInfo); + + stats.narInfoWrite++; +} + +bool IPFSBinaryCacheStore::isValidPathUncached(const StorePath & storePath) +{ + auto json = getIpfsDag(getIpfsPath()); + if (!json.contains("nar")) + return false; + return json["nar"].contains(storePath.to_string()); +} + +void IPFSBinaryCacheStore::narFromPath(const StorePath & storePath, Sink & sink) +{ + auto info = queryPathInfo(storePath).cast(); + + uint64_t narSize = 0; + + LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { + sink(data, len); + narSize += len; + }); + + auto decompressor = makeDecompressionSink(info->compression, wrapperSink); + + try { + getFile(info->url, *decompressor); + } catch (NoSuchBinaryCacheFile & e) { + throw SubstituteGone(e.what()); + } + + decompressor->finish(); + + stats.narRead++; + //stats.narReadCompressedBytes += nar->size(); // FIXME + stats.narReadBytes += narSize; +} + +void IPFSBinaryCacheStore::queryPathInfoUncached(const StorePath & storePath, + Callback> callback) noexcept +{ + // TODO: properly use callbacks + + auto callbackPtr = std::make_shared(std::move(callback)); + + auto uri = getUri(); + auto storePathS = printStorePath(storePath); + auto act = std::make_shared(*logger, lvlTalkative, actQueryPathInfo, + fmt("querying info about '%s' on '%s'", storePathS, uri), Logger::Fields{storePathS, uri}); + PushActivity pact(act->id); + + auto json = getIpfsDag(getIpfsPath()); + + if (!json.contains("nar") || !json["nar"].contains(storePath.to_string())) + return (*callbackPtr)(nullptr); + + auto narObjectHash = (std::string) json["nar"][(std::string) storePath.to_string()]["/"]; + json = getIpfsDag("/ipfs/" + narObjectHash); + + NarInfo narInfo { + StorePath { storePath }, + Hash::parseAnyPrefixed(json.at("narHash").get()), + }; + narInfo.narSize = json["narSize"]; + + for (auto & ref : json["references"].items()) + narInfo.references.insert(StorePath(ref.key())); + + if (json["hasSelfReference"]) + narInfo.references.insert(storePath); + + if (json.find("ca") != json.end()) + json["ca"].get_to(narInfo.ca); + + if (json.find("deriver") != json.end()) + narInfo.deriver = parseStorePath((std::string) json["deriver"]); + + if (json.find("registrationTime") != json.end()) + narInfo.registrationTime = json["registrationTime"]; + + if (json.find("ultimate") != json.end()) + narInfo.ultimate = json["ultimate"]; + + if (json.find("sigs") != json.end()) + for (auto & sig : json["sigs"]) + narInfo.sigs.insert((std::string) sig); + + if (json.find("ipfsCid") != json.end()) + narInfo.url = "ipfs://" + json["ipfsCid"]["/"].get(); + + if (json.find("downloadHash") != json.end()) + narInfo.fileHash = Hash::parseAnyPrefixed((std::string) json["downloadHash"]); + + if (json.find("downloadSize") != json.end()) + narInfo.fileSize = json["downloadSize"]; + + if (json.find("compression") != json.end()) + narInfo.compression = json["compression"]; + + if (json.find("system") != json.end()) + narInfo.system = json["system"]; + + (*callbackPtr)((std::shared_ptr) + std::make_shared(narInfo)); +} + +StorePath IPFSBinaryCacheStore::addToStore(const string & name, const Path & srcPath, + FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair) +{ + // FIXME: some cut&paste from LocalStore::addToStore(). + + /* Read the whole path into memory. This is not a very scalable + method for very large paths, but `copyPath' is mainly used for + small files. */ + StringSink sink; + Hash h { htSHA256 }; // dummy initial value + if (method == FileIngestionMethod::Recursive) { + dumpPath(srcPath, sink, filter); + h = hashString(hashAlgo, *sink.s); + } else { + auto s = readFile(srcPath); + dumpString(s, sink); + h = hashString(hashAlgo, s); + } + + ValidPathInfo info { + makeFixedOutputPath(method, h, name), + Hash::dummy, // FIX + }; + + auto source = StringSource { *sink.s }; + addToStore(info, source, repair, CheckSigs); + + return std::move(info.path); +} + +StorePath IPFSBinaryCacheStore::addTextToStore(const string & name, const string & s, + const StorePathSet & references, RepairFlag repair) +{ + StringSink sink; + dumpString(s, sink); + auto narHash = hashString(htSHA256, *sink.s); + + ValidPathInfo info { + computeStorePathForText(name, s, references), + std::move(narHash), + }; + info.references = references; + + if (repair || !isValidPath(info.path)) { + StringSink sink; + dumpString(s, sink); + auto source = StringSource { *sink.s }; + addToStore(info, source, repair, CheckSigs); + } + + return std::move(info.path); +} + +void IPFSBinaryCacheStore::addSignatures(const StorePath & storePath, const StringSet & sigs) +{ + if (!allowModify) + throw Error("can't update '%s'", getUri()); + + /* Note: this is inherently racy since there is no locking on + binary caches. In particular, with S3 this unreliable, even + when addSignatures() is called sequentially on a path, because + S3 might return an outdated cached version. */ + + auto narInfo = make_ref((NarInfo &) *queryPathInfo(storePath)); + + narInfo->sigs.insert(sigs.begin(), sigs.end()); + + writeNarInfo(narInfo); +} + +void IPFSBinaryCacheStore::addTempRoot(const StorePath & path) +{ + // TODO make temporary pin/addToStore, see + // https://github.com/ipfs/go-ipfs/issues/4559 and + // https://github.com/ipfs/go-ipfs/issues/4328 for some ideas. + auto uri = daemonUri + "/api/v0/pin/add?arg=" + getIpfsPath() + "/" "nar" "/" + string { path.to_string() }; + + FileTransferRequest request(uri); + request.post = true; + request.tries = 1; + getFileTransfer()->upload(request); +} + +static RegisterStoreImplementation regStore; + +} diff --git a/src/libstore/ipfs-binary-cache-store.hh b/src/libstore/ipfs-binary-cache-store.hh new file mode 100644 index 00000000000..ec589e8e998 --- /dev/null +++ b/src/libstore/ipfs-binary-cache-store.hh @@ -0,0 +1,147 @@ +#pragma once + +#include "binary-cache-store.hh" + +namespace nix { + +MakeError(UploadToIPFS, Error); + +struct IPFSBinaryCacheStoreConfig : virtual StoreConfig +{ + using StoreConfig::StoreConfig; + + const Setting compression{(StoreConfig *)this, "xz", "compression", "NAR compression method ('xz', 'bzip2', or 'none')"}; + const Setting secretKeyFile{(StoreConfig *)this, "", "secret-key", "path to secret key used to sign the binary cache"}; + const Setting parallelCompression{(StoreConfig *)this, false, "parallel-compression", + "enable multi-threading compression, available for xz only currently"}; + + // FIXME: merge with allowModify bool + const Setting _allowModify{(StoreConfig *)this, false, "allow-modify", + "allow Nix to update IPFS/IPNS address when appropriate"}; + + const std::string name() override { return "IPFS Store"; } +}; + +class IPFSBinaryCacheStore : public virtual Store, public virtual IPFSBinaryCacheStoreConfig +{ + +public: + + bool allowModify; + + std::unique_ptr secretKey; + std::string narMagic; + + std::string cacheScheme; + std::string cacheUri; + std::string daemonUri; + + std::string getIpfsPath() { + auto state(_state.lock()); + return state->ipfsPath; + } + std::string initialIpfsPath; + std::optional ipnsPath; + + struct State + { + std::string ipfsPath; + }; + Sync _state; + +public: + + IPFSBinaryCacheStore(const std::string & scheme, const std::string & uri, const Params & params); + + std::string getUri() override + { + return cacheScheme + "://" + cacheUri; + } + + static std::set uriSchemes() + { return {"ipfs", "ipns"}; } + +private: + + std::string putIpfsDag(nlohmann::json data); + + nlohmann::json getIpfsDag(std::string objectPath); + + // Given a ipns path, checks if it corresponds to a DNSLink path, and in + // case returns the domain + static std::optional isDNSLinkPath(std::string path); + + bool ipfsObjectExists(const std::string ipfsPath); + + bool fileExists(const std::string & path) + { + return ipfsObjectExists(getIpfsPath() + "/" + path); + } + + // Resolve the IPNS name to an IPFS object + std::string resolveIPNSName(std::string ipnsPath); + +public: + Path formatPathAsProtocol(Path path); + + // IPNS publish can be slow, we try to do it rarely. + void sync() override; + +private: + + void addLink(std::string name, std::string ipfsObject); + + std::string addFile(const std::string & data); + + void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType); + + void getFile(const std::string & path, + Callback> callback) noexcept; + + void getFile(const std::string & path, Sink & sink); + + std::shared_ptr getFile(const std::string & path); + + void getIpfsObject(const std::string & ipfsPath, + Callback> callback) noexcept; + + void writeNarInfo(ref narInfo); + +public: + + void addToStore(const ValidPathInfo & info, Source & narSource, + RepairFlag repair, CheckSigsFlag checkSigs) override; + + bool isValidPathUncached(const StorePath & storePath) override; + + void narFromPath(const StorePath & storePath, Sink & sink) override; + + void queryPathInfoUncached(const StorePath & storePath, + Callback> callback) noexcept override; + + StorePath addToStore(const string & name, const Path & srcPath, + FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair) override; + + StorePath addTextToStore(const string & name, const string & s, + const StorePathSet & references, RepairFlag repair) override; + + void addSignatures(const StorePath & storePath, const StringSet & sigs) override; + + virtual void addTempRoot(const StorePath & path) override; + + std::shared_ptr getBuildLog(const StorePath & path) override + { unsupported("getBuildLog"); } + + BuildResult buildDerivation(const StorePath & drvPath, const BasicDerivation & drv, + BuildMode buildMode) override + { unsupported("buildDerivation"); } + + void ensurePath(const StorePath & path) override + { unsupported("ensurePath"); } + + std::optional queryPathFromHashPart(const std::string & hashPart) override + { unsupported("queryPathFromHashPart"); } + +}; + +} diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 488270f4880..bf3d90d1273 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -770,6 +770,15 @@ void RemoteStore::queryMissing(const std::vector & targets } +void RemoteStore::sync() +{ + auto conn(getConnection()); + conn->to << wopSync; + conn.processStderr(); + readInt(conn->from); +} + + void RemoteStore::connect() { auto conn(getConnection()); diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 9f78fcb02d2..65a5f6dc051 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -112,6 +112,8 @@ public: unsigned int getProtocol() override; + void sync() override; + void flushBadConnections(); struct Connection diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index f77bc21d1fc..46d26c88641 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -703,6 +703,10 @@ public: virtual void createUser(const std::string & userName, uid_t userId) { } + /* Sync writes to commits written data, usually a no-op. */ + virtual void sync() + { }; + protected: Stats stats; diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index b3705578ea2..c6942c4184a 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -50,6 +50,7 @@ typedef enum { wopAddToStoreNar = 39, wopQueryMissing = 40, wopQueryDerivationOutputMap = 41, + wopSync = 42, } WorkerOp; diff --git a/src/nix-build/nix-build.cc b/src/nix-build/nix-build.cc index f60e0706c6c..d40e3764610 100755 --- a/src/nix-build/nix-build.cc +++ b/src/nix-build/nix-build.cc @@ -534,6 +534,8 @@ static void main_nix_build(int argc, char * * argv) } } + store->sync(); + logger->stop(); for (auto & path : outPaths) diff --git a/src/nix-channel/nix-channel.cc b/src/nix-channel/nix-channel.cc index 309970df6e8..0b86288dc0f 100755 --- a/src/nix-channel/nix-channel.cc +++ b/src/nix-channel/nix-channel.cc @@ -127,6 +127,8 @@ static void update(const StringSet & channelNames) // Regardless of where it came from, add the expression representing this channel to accumulated expression exprs.push_back("f: f { name = \"" + cname + "\"; channelName = \"" + name + "\"; src = builtins.storePath \"" + filename + "\"; " + extraAttrs + " }"); + + store->sync(); } // Unpack the channel tarballs into the Nix store and install them diff --git a/src/nix-collect-garbage/nix-collect-garbage.cc b/src/nix-collect-garbage/nix-collect-garbage.cc index 57092b887c6..c7774a916dc 100644 --- a/src/nix-collect-garbage/nix-collect-garbage.cc +++ b/src/nix-collect-garbage/nix-collect-garbage.cc @@ -86,6 +86,7 @@ static int main_nix_collect_garbage(int argc, char * * argv) GCResults results; PrintFreed freed(true, results); store->collectGarbage(options, results); + store->sync(); } return 0; diff --git a/src/nix-copy-closure/nix-copy-closure.cc b/src/nix-copy-closure/nix-copy-closure.cc index 10990f7b548..0b67f5b23bb 100755 --- a/src/nix-copy-closure/nix-copy-closure.cc +++ b/src/nix-copy-closure/nix-copy-closure.cc @@ -61,6 +61,9 @@ static int main_nix_copy_closure(int argc, char ** argv) copyPaths(from, to, closure, NoRepair, NoCheckSigs, useSubstitutes); + from->sync(); + to->sync(); + return 0; } } diff --git a/src/nix-env/nix-env.cc b/src/nix-env/nix-env.cc index a4b5c9e2c7d..97062b33508 100644 --- a/src/nix-env/nix-env.cc +++ b/src/nix-env/nix-env.cc @@ -1455,6 +1455,8 @@ static int main_nix_env(int argc, char * * argv) globals.state->printStats(); + store->sync(); + logger->stop(); return 0; diff --git a/src/nix-instantiate/nix-instantiate.cc b/src/nix-instantiate/nix-instantiate.cc index 3956fef6dc1..b0c6b9ee383 100644 --- a/src/nix-instantiate/nix-instantiate.cc +++ b/src/nix-instantiate/nix-instantiate.cc @@ -169,6 +169,7 @@ static int main_nix_instantiate(int argc, char * * argv) if (p == "") throw Error("unable to find '%1%'", i); std::cout << p << std::endl; } + store->sync(); return 0; } @@ -189,6 +190,8 @@ static int main_nix_instantiate(int argc, char * * argv) state->printStats(); + store->sync(); + return 0; } } diff --git a/src/nix-prefetch-url/nix-prefetch-url.cc b/src/nix-prefetch-url/nix-prefetch-url.cc index 3bdee55a767..85c8584b7e9 100644 --- a/src/nix-prefetch-url/nix-prefetch-url.cc +++ b/src/nix-prefetch-url/nix-prefetch-url.cc @@ -225,6 +225,8 @@ static int main_nix_prefetch_url(int argc, char * * argv) if (printPath) std::cout << store->printStorePath(*storePath) << std::endl; + store->sync(); + return 0; } } diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc index 14baabc36fa..590d640806e 100644 --- a/src/nix-store/nix-store.cc +++ b/src/nix-store/nix-store.cc @@ -1118,6 +1118,9 @@ static int main_nix_store(int argc, char * * argv) op(opFlags, opArgs); + if (store) + store->sync(); + logger->stop(); return 0; diff --git a/src/nix/command.cc b/src/nix/command.cc index 9a38c77f193..9807afb2080 100644 --- a/src/nix/command.cc +++ b/src/nix/command.cc @@ -42,7 +42,9 @@ ref StoreCommand::createStore() void StoreCommand::run() { - run(getStore()); + auto store = getStore(); + run(store); + store->sync(); } StorePathsCommand::StorePathsCommand(bool recursive) diff --git a/src/nix/copy.cc b/src/nix/copy.cc index cb31aac8fbe..5b54c00ce8c 100644 --- a/src/nix/copy.cc +++ b/src/nix/copy.cc @@ -103,6 +103,8 @@ struct CmdCopy : StorePathsCommand copyPaths(srcStore, dstStore, StorePathSet(storePaths.begin(), storePaths.end()), NoRepair, checkSigs, substitute); + + dstStore->sync(); } }; diff --git a/tests/init.sh b/tests/init.sh index f9ced6b0d08..adf6a553423 100644 --- a/tests/init.sh +++ b/tests/init.sh @@ -2,7 +2,7 @@ source common.sh test -n "$TEST_ROOT" if test -d "$TEST_ROOT"; then - chmod -R u+w "$TEST_ROOT" + chmod -R u+rw "$TEST_ROOT" rm -rf "$TEST_ROOT" fi mkdir "$TEST_ROOT" diff --git a/tests/ipfs.sh b/tests/ipfs.sh new file mode 100644 index 00000000000..9526bf28aff --- /dev/null +++ b/tests/ipfs.sh @@ -0,0 +1,171 @@ +source common.sh + +set -o pipefail + +# This are for ./fixed.nix +export IMPURE_VAR1=foo +export IMPURE_VAR2=bar + +################################################################################ +## Check that the ipfs daemon is present and enabled in your environment +################################################################################ + +if [[ -z $(type -p ipfs) ]]; then + echo "Ipfs not installed; skipping ipfs tests" + exit 99 +fi + +# To see if ipfs is connected to the network, we check if we can see some peers +# other than ourselves. +if (! (ipfs log ls)); +then + echo "Ipfs daemon not detected; initializing.." + ipfs init + ipfs daemon --offline & + pidIpfsDaemon=$! + trap "kill -9 $pidIpfsDaemon" EXIT +fi + +clearStore + +################################################################################ +## Create the folders for the source and destination stores +################################################################################ + +IPFS_TESTS=$TEST_ROOT/ipfs_tests +mkdir $IPFS_TESTS + +# Here we define some store locations, one for the initial store we upload, and +# the other three for the destination stores to which we'll copy (one for each +# method) +IPFS_SRC_STORE=$IPFS_TESTS/ipfs_source_store + +IPFS_DST_HTTP_STORE=$IPFS_TESTS/ipfs_dest_http_store +IPFS_DST_HTTP_LOCAL_STORE=$IPFS_TESTS/ipfs_dest_http_local_store +IPFS_DST_IPFS_STORE=$IPFS_TESTS/ipfs_dest_ipfs_store +IPFS_DST_IPNS_STORE=$IPFS_TESTS/ipfs_dest_ipns_store + +EMPTY_HASH=$(echo {} | ipfs dag put) + +################################################################################ +## Check that fetchurl works directly with the ipfs store +################################################################################ + +TEST_FILE=test-file.txt +touch $TEST_FILE + +# We try to do the evaluation with a known wrong hash to get the suggestion for +# the correct one +CORRECT_ADDRESS=$( \ + nix eval --raw --impure --expr "builtins.fetchurl \"file://$PWD/$TEST_FILE\"" --store ipfs://$EMPTY_HASH?allow-modify=true \ + |& grep '^warning: created new store' \ + | sed "s/^warning: created new store at '\(.*\)'\. .*$/\1/") + +# Then we eval and get back the hash-name part of the store path +RESULT=$(nix eval --json --impure --expr '(builtins.fetchurl 'file://$PWD/$TEST_FILE')' --store "$CORRECT_ADDRESS" \ + | jq -r | awk -F/ '{print $NF}') + +# Finally, we ask back the info from IPFS (formatting the address the right way +# beforehand) +ADDRESS_IPFS_FORMATTED=$(echo $CORRECT_ADDRESS | awk -F/ '{print $3}') +ipfs dag get /ipfs/$ADDRESS_IPFS_FORMATTED/nar/$RESULT + +################################################################################ +## Generate the keys to sign the store +################################################################################ + +SIGNING_KEY_NAME=nixcache.for.ipfs-1 +SIGNING_KEY_PRI_FILE=$IPFS_TESTS/nix-cache-key.sec +SIGNING_KEY_PUB_FILE=$IPFS_TESTS/nix-cache-key.pub + +nix-store --generate-binary-cache-key $SIGNING_KEY_NAME $SIGNING_KEY_PRI_FILE $SIGNING_KEY_PUB_FILE + +################################################################################ +## Create and sign the source store +################################################################################ + +mkdir -p $IPFS_SRC_STORE +storePaths=$(nix-build ./fixed.nix -A good --no-out-link) + +nix sign-paths -k $SIGNING_KEY_PRI_FILE $storePaths + +################################################################################ +## Manually upload the source store +################################################################################ + +# Hack around https://github.com/NixOS/nix/issues/3695 +for path in $storePaths; do + nix copy --to file://$IPFS_SRC_STORE $path +done +unset path + +MANUAL_IPFS_HASH=$(ipfs add -r $IPFS_SRC_STORE 2>/dev/null | tail -n 1 | awk '{print $2}') + +################################################################################ +## Create the local http store and download the derivation there +################################################################################ + +mkdir $IPFS_DST_HTTP_LOCAL_STORE + +IPFS_HTTP_LOCAL_PREFIX='http://localhost:8080/ipfs' + +nix-build ./fixed.nix -A good \ + --option substituters $IPFS_HTTP_LOCAL_PREFIX/$MANUAL_IPFS_HASH \ + --store $IPFS_DST_HTTP_LOCAL_STORE \ + --no-out-link \ + -j0 \ + --option trusted-public-keys $(cat $SIGNING_KEY_PUB_FILE) + +################################################################################ +## Create the ipfs store and download the derivation there +################################################################################ + +# Try to upload the content to the empty directory, fail but grab the right hash +# HERE do the same thing but expect failure +IPFS_ADDRESS=$(nix copy --to ipfs://$EMPTY_HASH?allow-modify=true $(nix-build ./fixed.nix -A good --no-out-link) --experimental-features nix-command |& \ + grep '^warning: created new store' | sed "s/^warning: created new store at '\(.*\)'\. .*$/\1/") + +# We want to check that the `allow-modify` flag is required for the command to +# succeed. This is an invocation of the same command without that flag that we +# expect to fail +(! nix copy --to ipfs://$EMPTY_HASH $(nix-build ./fixed.nix -A good --no-out-link) --experimental-features nix-command) + +# Verify that new path is valid. +nix copy --to $IPFS_ADDRESS $(nix-build ./fixed.nix -A good --no-out-link) --experimental-features nix-command + +mkdir $IPFS_DST_IPFS_STORE + +nix-build ./fixed.nix -A good \ + --option substituters $IPFS_ADDRESS \ + --store $IPFS_DST_IPFS_STORE \ + --no-out-link \ + -j0 \ + --option trusted-public-keys $(cat $SIGNING_KEY_PUB_FILE) + + +################################################################################ +## Create the ipns store and download the derivation there +################################################################################ + +# First I have to publish: +IPNS_ID=$(ipfs name publish $EMPTY_HASH --allow-offline | awk '{print substr($3,1,length($3)-1)}') + +# Check that we can upload the ipns store directly +nix copy --to ipns://$IPNS_ID $(nix-build ./fixed.nix -A good --no-out-link) --experimental-features nix-command + +mkdir $IPFS_DST_IPNS_STORE + +nix-build ./fixed.nix -A good \ + --option substituters 'ipns://'$IPNS_ID \ + --store $IPFS_DST_IPNS_STORE \ + --no-out-link \ + -j0 \ + --option trusted-public-keys $(cat $SIGNING_KEY_PUB_FILE) + +# Verify we can copy something with dependencies +outPath=$(nix-build dependencies.nix --no-out-link) + +nix copy $outPath --to ipns://$IPNS_ID --experimental-features nix-command + +# and copy back +nix copy $outPath --store file://$IPFS_DST_IPNS_STORE --from ipns://$IPNS_ID --experimental-features nix-command diff --git a/tests/local.mk b/tests/local.mk index a1929f96d29..413ef2610aa 100644 --- a/tests/local.mk +++ b/tests/local.mk @@ -33,6 +33,7 @@ nix_tests = \ post-hook.sh \ function-trace.sh \ recursive.sh \ + ipfs.sh \ describe-stores.sh \ flakes.sh \ content-addressed.sh