Skip to content

Commit

Permalink
Merge branch 'v24.2.x' into manual-backport-24059-v24.2.x-793
Browse files Browse the repository at this point in the history
  • Loading branch information
piyushredpanda authored Nov 9, 2024
2 parents 15c0343 + 74404e7 commit 52f30a9
Show file tree
Hide file tree
Showing 29 changed files with 483 additions and 85 deletions.
1 change: 1 addition & 0 deletions src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/hamba/avro/v2 v2.25.2
github.com/hashicorp/go-multierror v1.1.1
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/kr/text v0.2.0
github.com/lestrrat-go/jwx v1.2.30
github.com/linkedin/goavro/v2 v2.13.0
github.com/lorenzosaino/go-sysctl v0.3.1
Expand Down
1 change: 1 addition & 0 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
Expand Down
45 changes: 37 additions & 8 deletions src/go/rpk/pkg/adminapi/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ import (
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/kr/text"
mTerm "github.com/moby/term"

"go.uber.org/zap"

"github.com/redpanda-data/common-go/rpadmin"
Expand Down Expand Up @@ -137,22 +141,31 @@ func licenseFeatureChecks(ctx context.Context, fs afero.Fs, cl *rpadmin.AdminAPI
// violation. (we only save successful responses).
// 2. LicenseStatus was last checked more than 1 hour ago.
if p.LicenseCheck == nil || p.LicenseCheck != nil && time.Unix(p.LicenseCheck.LastUpdate, 0).Add(1*time.Hour).Before(time.Now()) {
resp, err := cl.GetEnterpriseFeatures(ctx)
featResp, err := cl.GetEnterpriseFeatures(ctx)
if err != nil {
zap.L().Sugar().Warnf("unable to check licensed enterprise features in the cluster: %v", err)
return ""
}
info, err := cl.GetLicenseInfo(ctx)
if err != nil {
zap.L().Sugar().Warnf("unable to check license information: %v", err)
return ""
}
// We don't write a profile if the config doesn't exist.
y, exists := p.ActualConfig()
var licenseCheck *config.LicenseStatusCache
if resp.Violation {
var features []string
for _, f := range resp.Features {
if f.Enabled {
features = append(features, f.Name)
}
var enabledFeatures []string
for _, f := range featResp.Features {
if f.Enabled {
enabledFeatures = append(enabledFeatures, f.Name)
}
msg = fmt.Sprintf("\nWARNING: The following Enterprise features are being used in your Redpanda cluster: %v. These features require a license. To get a license, contact us at https://www.redpanda.com/contact. For more information, see https://docs.redpanda.com/current/get-started/licenses/#redpanda-enterprise-edition\n", features)
}
isTrialCheck := isTrialAboutToExpire(info, enabledFeatures)

if featResp.Violation {
msg = fmt.Sprintf("\nWARNING: The following Enterprise features are being used in your Redpanda cluster: %v. These features require a license. To get a license, contact us at https://www.redpanda.com/contact. For more information, see https://docs.redpanda.com/current/get-started/licenses/#redpanda-enterprise-edition\n", enabledFeatures)
} else if isTrialCheck {
msg = fmt.Sprintf("\nWARNING: your TRIAL license is about to expire. The following Enterprise features are being used in your Redpanda cluster: %v. These features require a license. To get a license, contact us at https://www.redpanda.com/contact. For more information, see https://docs.redpanda.com/current/get-started/licenses/#redpanda-enterprise-edition\n", enabledFeatures)
} else {
licenseCheck = &config.LicenseStatusCache{
LastUpdate: time.Now().Unix(),
Expand All @@ -168,5 +181,21 @@ func licenseFeatureChecks(ctx context.Context, fs afero.Fs, cl *rpadmin.AdminAPI
}
}
}
if ws, err := mTerm.GetWinsize(0); err == nil && msg != "" {
// text.Wrap removes the newline from the text. We add it back.
msg = "\n" + text.Wrap(msg, int(ws.Width)) + "\n"
}
return msg
}

// isTrialAboutToExpire returns true if we have a loaded free_trial license that
// expires in less than 15 days, and we have enterprise features enabled.
func isTrialAboutToExpire(info rpadmin.License, enabledFeatures []string) bool {
if len(enabledFeatures) > 0 && info.Loaded && strings.EqualFold(info.Properties.Type, "free_trial") {
ut := time.Unix(info.Properties.Expires, 0)
daysLeft := int(time.Until(ut).Hours() / 24)

return daysLeft < 15 && !ut.Before(time.Now())
}
return false
}
42 changes: 36 additions & 6 deletions src/go/rpk/pkg/adminapi/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package adminapi

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -16,7 +17,7 @@ func Test_licenseFeatureChecks(t *testing.T) {
tests := []struct {
name string
prof *config.RpkProfile
responseCase string // See the mapLicenseResponses below.
responseCase string // See the mapLicenseFeatureResponses below.
expContain string
withErr bool
checkCache func(t *testing.T, before int64, after int64)
Expand All @@ -27,6 +28,18 @@ func Test_licenseFeatureChecks(t *testing.T) {
responseCase: "ok",
expContain: "",
},
{
name: "free_trial about to expire, no features",
prof: &config.RpkProfile{},
responseCase: "ok-free",
expContain: "",
},
{
name: "free_trial about to expire, with features",
prof: &config.RpkProfile{},
responseCase: "ok-features",
expContain: "WARNING: your TRIAL license is about to expire",
},
{
name: "license ok, cache valid",
prof: &config.RpkProfile{LicenseCheck: &config.LicenseStatusCache{LastUpdate: time.Now().Add(20 * time.Minute).Unix()}},
Expand Down Expand Up @@ -144,16 +157,33 @@ type response struct {
body string
}

var mapLicenseResponses = map[string]response{
"ok": {http.StatusOK, `{"license_status": "valid", "violation": false}`},
var mapLicenseFeatureResponses = map[string]response{
"ok": {http.StatusOK, `{"license_status": "valid", "violation": false, "features": [{"name": "fips", "enabled": true},{"name": "partition_auto_balancing_continuous", "enabled": false}]}`},
"inViolation": {http.StatusOK, `{"license_status": "expired", "violation": true, "features": [{"name": "partition_auto_balancing_continuous", "enabled": true}]}`},
"failedRequest": {http.StatusBadRequest, ""},
"ok-free": {http.StatusOK, `{"license_status": "valid", "violation": false}`},
"ok-features": {http.StatusOK, `{"license_status": "valid", "violation": false, "features": [{"name": "partition_auto_balancing_continuous", "enabled": true}]}`},
}

var mapLicenseInfoResponses = map[string]response{
"ok": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "enterprise", "expires": %d}}`, time.Now().Add(60*24*time.Hour).Unix())},
"inViolation": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "enterprise", "expires": %d}}`, time.Now().Add(60*24*time.Hour).Unix())},
"failedRequest": {http.StatusBadRequest, ""},
"ok-free": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "free_trial", "expires": %d}}`, time.Now().Add(24*time.Hour).Unix())}, // expires in 1 day.
"ok-features": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "free_trial", "expires": %d}}`, time.Now().Add(24*time.Hour).Unix())},
}

func licenseHandler(respCase string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
resp := mapLicenseResponses[respCase]
w.WriteHeader(resp.status)
w.Write([]byte(resp.body))
fmt.Println(r.URL.Path)
if r.URL.Path == "/v1/features/enterprise" {
resp := mapLicenseFeatureResponses[respCase]
w.WriteHeader(resp.status)
w.Write([]byte(resp.body))
} else if r.URL.Path == "/v1/features/license" {
resp := mapLicenseInfoResponses[respCase]
w.WriteHeader(resp.status)
w.Write([]byte(resp.body))
}
}
}
11 changes: 8 additions & 3 deletions src/go/rpk/pkg/cli/cluster/license/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package license
import (
"fmt"
"os"
"strings"
"time"

"github.com/redpanda-data/common-go/rpadmin"
Expand Down Expand Up @@ -133,17 +134,21 @@ func printTextLicenseInfo(resp infoResponse) {
if *resp.Expired {
tw.Print("License expired:", *resp.Expired)
}
checkLicenseExpiry(resp.ExpiresUnix)
checkLicenseExpiry(resp.ExpiresUnix, resp.Type)
}
out.Section("LICENSE INFORMATION")
tw.Flush()
}

func checkLicenseExpiry(expiresUnix int64) {
func checkLicenseExpiry(expiresUnix int64, licenseType string) {
ut := time.Unix(expiresUnix, 0)
daysLeft := int(time.Until(ut).Hours() / 24)

if daysLeft < 30 && !ut.Before(time.Now()) {
dayThreshold := 30
if strings.EqualFold(licenseType, "free_trial") {
dayThreshold = 15
}
if daysLeft < dayThreshold && !ut.Before(time.Now()) {
fmt.Fprintf(os.Stderr, "WARNING: your license will expire soon.\n\n")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ find_package(Python3 REQUIRED COMPONENTS Interpreter)
function(add_wasm_transform NAME)
find_program(TINYGO_BIN "tinygo")
set(wasm_output "${CMAKE_CURRENT_BINARY_DIR}/${NAME}.wasm")
set(tinygo_cmd ${TINYGO_BIN} build -o ${wasm_output} -quiet -target wasi -scheduler none "${NAME}/transform.go")
set(tinygo_cmd ${TINYGO_BIN} build -o ${wasm_output} -target wasi -scheduler none "${NAME}/transform.go")
set(gopath ${CMAKE_CURRENT_BINARY_DIR}/${NAME})
add_custom_command(OUTPUT ${wasm_output}
COMMAND Python3::Interpreter ${CMAKE_CURRENT_SOURCE_DIR}/retry.py
Expand Down
65 changes: 42 additions & 23 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <seastar/core/shard_id.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/defer.hh>

#include <cloud_storage/cache_service.h>
Expand Down Expand Up @@ -1233,6 +1234,7 @@ ss::future<> cache::put(
ss::this_shard_id(),
(++_cnt),
cache_tmp_file_extension));
auto tmp_filepath = dir_path / tmp_filename;

ss::file tmp_cache_file;
while (true) {
Expand All @@ -1247,14 +1249,14 @@ ss::future<> cache::put(
| ss::open_flags::exclusive;

tmp_cache_file = co_await ss::open_file_dma(
(dir_path / tmp_filename).native(), flags);
tmp_filepath.native(), flags);
break;
} catch (std::filesystem::filesystem_error& e) {
if (e.code() == std::errc::no_such_file_or_directory) {
vlog(
cst_log.debug,
"Couldn't open {}, gonna retry",
(dir_path / tmp_filename).native());
tmp_filepath.native());
} else {
throw;
}
Expand All @@ -1267,42 +1269,59 @@ ss::future<> cache::put(
options.io_priority_class = io_priority;
auto out = co_await ss::make_file_output_stream(tmp_cache_file, options);

std::exception_ptr disk_full_error;
std::exception_ptr eptr;
bool no_space_on_device = false;
try {
co_await ss::copy(data, out)
.then([&out]() { return out.flush(); })
.finally([&out]() { return out.close(); });
} catch (std::filesystem::filesystem_error& e) {
// For ENOSPC errors, delay handling so that we can do a trim
if (e.code() == std::errc::no_space_on_device) {
disk_full_error = std::current_exception();
} else {
throw;
no_space_on_device = e.code() == std::errc::no_space_on_device;
eptr = std::current_exception();
} catch (...) {
// For other errors, delay handling so that we can clean up the tmp file
eptr = std::current_exception();
}

// If we failed to write to the tmp file, we should delete it, maybe do an
// eager trim, and rethrow the exception.
if (eptr) {
if (!_gate.is_closed()) {
auto delete_tmp_fut = co_await ss::coroutine::as_future(
delete_file_and_empty_parents(tmp_filepath.native()));
if (
delete_tmp_fut.failed()
&& !ssx::is_shutdown_exception(delete_tmp_fut.get_exception())) {
vlog(
cst_log.error,
"Failed to delete tmp file {}: {}",
tmp_filepath.native(),
delete_tmp_fut.get_exception());
}
}
}

if (disk_full_error) {
vlog(cst_log.error, "Out of space while writing to cache");
if (no_space_on_device) {
vlog(cst_log.error, "Out of space while writing to cache");

// Block further puts from being attempted until notify_disk_status
// reports that there is space available.
set_block_puts(true);
// Block further puts from being attempted until notify_disk_status
// reports that there is space available.
set_block_puts(true);

// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
co_await trim_throttled();
// Trim proactively: if many fibers hit this concurrently,
// they'll contend for cleanup_sm and the losers will skip
// trim due to throttling.
co_await trim_throttled();
}

throw disk_full_error;
std::rethrow_exception(eptr);
}

// commit write transaction
auto src = (dir_path / tmp_filename).native();
auto dest = (dir_path / filename).native();
auto put_size = co_await ss::file_size(tmp_filepath.native());

auto put_size = co_await ss::file_size(src);

co_await ss::rename_file(src, dest);
auto dest = (dir_path / filename).native();
co_await ss::rename_file(tmp_filepath.native(), dest);

// We will now update
reservation.wrote_data(put_size, 1);
Expand Down
37 changes: 37 additions & 0 deletions src/v/cloud_storage/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include "random/generators.h"
#include "ssx/sformat.h"
#include "test_utils/fixture.h"
#include "test_utils/iostream.h"
#include "test_utils/scoped_config.h"
#include "utils/directory_walker.h"
#include "utils/file_io.h"
#include "utils/human.h"

Expand Down Expand Up @@ -451,6 +453,41 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) {
BOOST_REQUIRE_EQUAL(out.size(), 0);
}

ss::future<size_t> count_files(ss::sstring dirname) {
directory_walker walker;
size_t count = 0;
co_await walker.walk(
dirname,
[dirname, &count](const ss::directory_entry& entry) -> ss::future<> {
if (entry.type == ss::directory_entry_type::directory) {
return count_files(fmt::format("{}/{}", dirname, entry.name))
.then([&count](size_t sub_count) { count += sub_count; });
} else {
++count;
return ss::now();
}
});

co_return count;
}

FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) {
auto s = tests::make_throwing_stream(ss::abort_requested_exception());
auto reservation = sharded_cache.local().reserve_space(1, 1).get();
BOOST_CHECK_THROW(
sharded_cache.local().put(KEY, s, reservation).get(),
ss::abort_requested_exception);
vlog(test_log.info, "Put failed as expected");

BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_bytes(), 0);
BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_objects(), 0);

vlog(test_log.info, "Counting files in cache directory");
BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 0);

vlog(test_log.info, "Test passed");
}

/**
* Validate that .part files and empty directories are deleted if found during
* the startup walk of the cache.
Expand Down
Loading

0 comments on commit 52f30a9

Please sign in to comment.