Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tenant offloading #286

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package io.weaviate.client.v1.schema.model;

public interface ActivityStatus {
String HOT = "HOT";
@Deprecated
antas-marcin marked this conversation as resolved.
Show resolved Hide resolved
String WARM = "WARM";
String HOT = "HOT";
String COLD = "COLD";
String FROZEN = "FROZEN";
String FREEZING = "FREEZING";
String UNFREEZING = "UNFREEZING";
String ACTIVE = "ACTIVE";
String INACTIVE = "INACTIVE";
String OFFLOADED = "OFFLOADED";
String OFFLOADING = "OFFLOADING";
String ONLOADING = "ONLOADING";
}
Original file line number Diff line number Diff line change
@@ -1,43 +1,64 @@
package io.weaviate.integration.client;

import java.util.ArrayList;
import java.util.List;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MinIOContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.weaviate.WeaviateContainer;

public class WeaviateDockerCompose implements TestRule {

private final String weaviateVersion;
private final boolean withOffloadS3;

public WeaviateDockerCompose() {
this.weaviateVersion = WeaviateDockerImage.WEAVIATE_DOCKER_IMAGE;
this.withOffloadS3 = false;
}

public WeaviateDockerCompose(String version) {
this.weaviateVersion = String.format("semitechnologies/weaviate:%s", version);
this.withOffloadS3 = false;
}

public class Weaviate extends WeaviateContainer {
public Weaviate(String dockerImageName) {
public WeaviateDockerCompose(String version, boolean withOffloadS3) {
this.weaviateVersion = String.format("semitechnologies/weaviate:%s", version);
this.withOffloadS3 = withOffloadS3;
}

public static class Weaviate extends WeaviateContainer {
public Weaviate(String dockerImageName, boolean withOffloadS3) {
super(dockerImageName);
waitingFor(Wait.forHttp("/v1/.well-known/ready").forPort(8080).forStatusCode(200));
withNetwork(Network.SHARED);
List<String> enableModules = new ArrayList<>();
enableModules.add("text2vec-contextionary");
enableModules.add("backup-filesystem");
enableModules.add("generative-openai");
if (withOffloadS3) {
enableModules.add("offload-s3");
withEnv("OFFLOAD_S3_ENDPOINT", "http://minio:9000");
withEnv("AWS_ACCESS_KEY_ID", MinIO.USER);
withEnv("AWS_SECRET_KEY", MinIO.PASSWORD);
}
withEnv("LOG_LEVEL", "debug");
withEnv("CONTEXTIONARY_URL", "contextionary:9999");
withEnv("QUERY_DEFAULTS_LIMIT", "25");
withEnv("DEFAULT_VECTORIZER_MODULE", "text2vec-contextionary");
withEnv("ENABLE_MODULES", "text2vec-contextionary,backup-filesystem,generative-openai");
withEnv("BACKUP_FILESYSTEM_PATH", "/tmp/backups");
withEnv("DISABLE_TELEMETRY", "true");
withEnv("PERSISTENCE_FLUSH_IDLE_MEMTABLES_AFTER", "1");
withEnv("ENABLE_MODULES", String.join(",", enableModules));
withCreateContainerCmdModifier(cmd -> cmd.withHostName("weaviate"));
}
}

public class Contextionary extends GenericContainer<Contextionary> {
public static class Contextionary extends GenericContainer<Contextionary> {
public Contextionary() {
super("semitechnologies/contextionary:en0.16.0-v1.2.1");
withNetwork(Network.SHARED);
Expand All @@ -52,13 +73,31 @@ public Contextionary() {
}
}

public static class MinIO extends MinIOContainer {
private static final String USER = "minioadmin";
private static final String PASSWORD = "minioadmin";

public MinIO() {
super("minio/minio");
withNetwork(Network.SHARED);
withUserName(USER);
withPassword(PASSWORD);
withCreateContainerCmdModifier(cmd -> cmd.withHostName("minio"));
}
}

private static Contextionary contextionary;
private static Weaviate weaviate;
private static MinIO minio;

public void start() {
if (withOffloadS3) {
minio = new MinIO();
minio.start();
}
contextionary = new Contextionary();
contextionary.start();
weaviate = new Weaviate(this.weaviateVersion);
weaviate = new Weaviate(this.weaviateVersion, withOffloadS3);
weaviate.start();
}

Expand All @@ -73,6 +112,9 @@ public String getGrpcHostAddress() {
public void stop() {
weaviate.stop();
contextionary.stop();
if (withOffloadS3) {
minio.stop();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

public class WeaviateDockerComposeCluster implements TestRule {

public class Weaviate extends WeaviateContainer {
public static class Weaviate extends WeaviateContainer {
public Weaviate(String dockerImageName, String hostname, Boolean isJoining) {
super(dockerImageName);
withNetwork(Network.SHARED);
Expand Down Expand Up @@ -41,7 +41,7 @@ public Weaviate(String dockerImageName, String hostname, Boolean isJoining) {
}
}

public class Contextionary extends GenericContainer<WeaviateDockerCompose.Contextionary> {
public static class Contextionary extends GenericContainer<WeaviateDockerCompose.Contextionary> {
public Contextionary() {
super("semitechnologies/contextionary:en0.16.0-v1.2.1");
withNetwork(Network.SHARED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

public class WeaviateVersion {

public static final String WEAVIATE_IMAGE = "preview-range-roaring-set-index-rename-indexrangeble-to-indexrangefilters-1083432";
// docker image version
public static final String WEAVIATE_IMAGE = "preview-fix-license-compliance-issues-97ed59e";

// to be set according to weaviate docker image
public static final String EXPECTED_WEAVIATE_VERSION = "1.26.0-rc.0";
// to be set according to weaviate docker image
public static final String EXPECTED_WEAVIATE_GIT_HASH = "1083432";
public static final String EXPECTED_WEAVIATE_GIT_HASH = "97ed59e";

private WeaviateVersion() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.weaviate.integration.client.schema;

import io.weaviate.client.Config;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.data.model.WeaviateObject;
import io.weaviate.client.v1.schema.model.ActivityStatus;
import io.weaviate.client.v1.schema.model.Tenant;
import io.weaviate.integration.client.WeaviateDockerCompose;
import io.weaviate.integration.client.WeaviateTestGenerics;
import io.weaviate.integration.client.WeaviateVersion;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class ClientSchemaTenantOffloadingTest {

private WeaviateClient client;
private WeaviateTestGenerics testGenerics;

@ClassRule
public static WeaviateDockerCompose compose = new WeaviateDockerCompose(WeaviateVersion.WEAVIATE_IMAGE, true);

@Before
public void before() {
String httpHost = compose.getHttpHostAddress();
String grpcHost = compose.getGrpcHostAddress();
Config config = new Config("http", httpHost);
config.setGRPCSecured(false);
config.setGRPCHost(grpcHost);

client = new WeaviateClient(config);
testGenerics = new WeaviateTestGenerics();
}

@After
public void after() {
testGenerics.cleanupWeaviate(client);
}

@Test
public void shouldOffloadTenants() throws InterruptedException {
// create tenants and class
String className = "Pizza";
String[] tenants = new String[]{"Tenant1", "Tenant2", "Tenant3"};
Tenant[] tenantObjs = Arrays.stream(tenants)
.map(tenant -> Tenant.builder().name(tenant).build())
.toArray(Tenant[]::new);
testGenerics.createSchemaPizzaForTenants(client);
testGenerics.createTenantsPizza(client, tenantObjs);
// verify tenants existence
Result<List<Tenant>> getResult = client.schema().tenantsGetter()
.withClassName(className)
.run();
assertThat(getResult).isNotNull()
.returns(false, Result::hasErrors)
.extracting(Result::getResult).asList()
.hasSize(tenants.length);
// insert data
testGenerics.createDataPizzaForTenants(client, tenants);
// verify data existence
for (String tenant : tenants) {
Result<List<WeaviateObject>> result = client.data().objectsGetter().withClassName(className).withTenant(tenant).run();
assertThat(result).isNotNull()
.returns(false, Result::hasErrors)
.extracting(Result::getResult).asList().hasSize(4);
}
// verify tenant status HOT
verifyEventuallyTenantStatus(className, ActivityStatus.HOT);
// update tenants to FROZEN
updateTenantStatus(className, tenants, ActivityStatus.FROZEN);
// verify tenant status FREEZING
verifyEventuallyTenantStatus(className, ActivityStatus.FREEZING);
antas-marcin marked this conversation as resolved.
Show resolved Hide resolved
// verify tenants does not exist
for (String tenant : tenants) {
Result<List<WeaviateObject>> result = client.data().objectsGetter().withClassName(className).withTenant(tenant).run();
assertThat(result).isNotNull()
.returns(true, Result::hasErrors)
.extracting(Result::getResult).isNull();
}
// verify tenant status FROZEN
verifyEventuallyTenantStatus(className, ActivityStatus.FROZEN);
// updating tenant status to HOT
updateTenantStatus(className, tenants, ActivityStatus.HOT);
// verify tenant status HOT
verifyEventuallyTenantStatus(className, ActivityStatus.HOT);
// verify object creation
for (String tenant : tenants) {
Result<List<WeaviateObject>> result = client.data().objectsGetter().withClassName(className).withTenant(tenant).run();
assertThat(result).isNotNull()
.returns(false, Result::hasErrors)
.extracting(Result::getResult).asList().hasSize(4);
}
}

private void updateTenantStatus(String className, String[] tenants, String activityStatus) {
Tenant[] tenantsWithStatus = Arrays.stream(tenants)
.map(tenant -> Tenant.builder().name(tenant).activityStatus(activityStatus).build())
.toArray(Tenant[]::new);
Result<Boolean> exists = client.schema().tenantsUpdater()
.withClassName(className)
.withTenants(tenantsWithStatus)
.run();
assertThat(exists).isNotNull()
.returns(false, Result::hasErrors)
.returns(true, Result::getResult);
}

private void verifyEventuallyTenantStatus(String className, String activityStatus) throws InterruptedException {
boolean statusOK = false;
int hardBreak = 5*60;
while(hardBreak > 0) {
if (verifyTenantStatus(className, activityStatus)) {
statusOK = true;
break;
}
Thread.sleep(1000);
hardBreak--;
}
assertThat(statusOK).isTrue();
}

private boolean verifyTenantStatus(String className, String activityStatus) {
Result<List<Tenant>> getResult = client.schema().tenantsGetter()
.withClassName(className)
.run();
assertThat(getResult).isNotNull()
.returns(false, Result::hasErrors)
.extracting(Result::getResult).isNotNull();
for (Tenant tenant : getResult.getResult()) {
if (!tenant.getActivityStatus().equals(activityStatus)) {
return false;
}
}
return true;
}
}
Loading