Skip to content

Commit

Permalink
feat: 示例更新
Browse files Browse the repository at this point in the history
  • Loading branch information
dunwu committed Oct 8, 2024
1 parent bd6b17c commit 47d7d15
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public <T extends BaseEsEntity> T save(String index, String type, T entity) thro
|| response.getResult() == DocWriteResponse.Result.UPDATED) {
return entity;
} else {
log.warn("【ES】save 响应结果无效!result: {}", response.getResult());
log.warn("【ES】save 失败,result: {}", response.getResult());
return null;
}
}
Expand All @@ -292,45 +292,25 @@ public <T extends BaseEsEntity> boolean saveBatch(String index, String type, Col
return true;
}

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
}
bulkRequest.add(request);
}

BulkRequest bulkRequest = toBulkIndexRequest(index, type, list);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response != null && !response.hasFailures();
if (response == null) {
log.warn("【ES】saveBatch 失败,result 为空!list: {}", JsonUtil.toString(list));
return false;
}
if (response.hasFailures()) {
log.warn("【ES】saveBatch 失败,result: {}!", response.buildFailureMessage());
return false;
}
return true;
}

public <T extends BaseEsEntity> void asyncSaveBatch(String index, String type, Collection<T> list,
ActionListener<BulkResponse> listener) {

if (CollectionUtil.isEmpty(list)) {
return;
}

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
}
bulkRequest.add(request);
}

BulkRequest bulkRequest = toBulkIndexRequest(index, type, list);
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}

Expand Down Expand Up @@ -362,7 +342,7 @@ public <T extends BaseEsEntity> T updateById(String index, String type, T entity
if (response.getResult() == DocWriteResponse.Result.UPDATED) {
return entity;
} else {
log.warn("【ES】updateById 响应结果无效result: {}", response.getResult());
log.warn("【ES】updateById 响应结果无效result: {}", response.getResult());
return null;
}
}
Expand All @@ -374,23 +354,49 @@ public <T extends BaseEsEntity> boolean updateBatchIds(String index, String type
return true;
}

BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list);
BulkRequest bulkRequest = toBulkUpdateRequest(index, type, list);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response != null && !response.hasFailures();
if (response == null) {
log.warn("【ES】updateBatchIds 失败,result 为空!list: {}", JsonUtil.toString(list));
return false;
}
if (response.hasFailures()) {
log.warn("【ES】updateBatchIds 失败,result: {}!", response.buildFailureMessage());
return false;
}
return true;
}

public <T extends BaseEsEntity> void asyncUpdateBatchIds(String index, String type, Collection<T> list,
ActionListener<BulkResponse> listener) {

if (CollectionUtil.isEmpty(list)) {
return;
}

BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list);
BulkRequest bulkRequest = toBulkUpdateRequest(index, type, list);
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}

private <T extends BaseEsEntity> BulkRequest toUpdateBulkRequest(String index, String type, Collection<T> list) {
private <T extends BaseEsEntity> BulkRequest toBulkIndexRequest(String index, String type, Collection<T> list) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
if (entity == null) {
continue;
}
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
}
bulkRequest.add(request);
}
return bulkRequest;
}

private <T extends BaseEsEntity> BulkRequest toBulkUpdateRequest(String index, String type, Collection<T> list) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
Expand Down Expand Up @@ -426,11 +432,14 @@ public boolean deleteBatchIds(String index, String type, Collection<String> ids)

BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response == null) {
log.warn("【ES】batchDeleteById 响应结果为空!");
log.warn("【ES】deleteBatchIds 失败,result 为空!ids: {}", JsonUtil.toString(ids));
return false;
}

return !response.hasFailures();
if (response.hasFailures()) {
log.warn("【ES】deleteBatchIds 失败,result: {}!", response.buildFailureMessage());
return false;
}
return true;
}

public void asyncDeleteBatchIds(String index, String type, Collection<String> ids,
Expand Down Expand Up @@ -568,7 +577,8 @@ public <T> PageData<T> pojoPage(String index, String type, int from, int size, Q
/**
* search after 分页
*/
public <T extends BaseEsEntity> ScrollData<T> pojoPageByScrollId(String index, String type, String scrollId, int size,
public <T extends BaseEsEntity> ScrollData<T> pojoPageByScrollId(String index, String type, String scrollId,
int size,
QueryBuilder queryBuilder, Class<T> clazz) throws IOException {

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
Expand Down Expand Up @@ -59,12 +60,18 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
protected abstract List<T> getMockList(int num);

protected void deleteIndex() throws IOException {
boolean exists = TEMPLATE.isIndexExists(getIndex());
if (!exists) {
return;
try {
Set<String> set = TEMPLATE.getIndexSet(getAlias());
if (CollectionUtil.isNotEmpty(set)) {
for (String index : set) {
log.info("删除 alias: {}, index: {}", getAlias(), index);
TEMPLATE.deleteIndex(index);
}
}
} catch (IOException | ElasticsearchException e) {
log.error("删除索引失败!", e);
}
TEMPLATE.deleteIndex(getIndex());
exists = TEMPLATE.isIndexExists(getIndex());
boolean exists = TEMPLATE.isIndexExists(getIndex());
Assertions.assertThat(exists).isFalse();
}

Expand Down Expand Up @@ -98,7 +105,7 @@ protected void saveBatch() throws IOException {
int total = 5000;
List<List<T>> listGroup = CollectionUtil.split(getMockList(total), 1000);
for (List<T> list : listGroup) {
TEMPLATE.saveBatch(getIndex(), getType(), list);
Assertions.assertThat(TEMPLATE.saveBatch(getIndex(), getType(), list)).isTrue();
}
long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder());
log.info("批量更新记录数: {}", count);
Expand Down
22 changes: 3 additions & 19 deletions codes/javadb/redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<version>2.7.18</version>
</parent>

<groupId>io.github.dunwu</groupId>
Expand Down Expand Up @@ -36,7 +36,7 @@
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.9</version>
<version>5.8.27</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand All @@ -51,27 +51,11 @@
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.8</version>
<version>3.29.0</version>
</dependency>
<!-- database end -->

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package io.github.dunwu.javadb.redis.springboot;

import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

Expand All @@ -22,6 +32,19 @@ public class RedisAutoConfiguration {
@Autowired
private ObjectMapper objectMapper;

@Value("${spring.redis.host:localhost}")
private String host;

@Value("${spring.redis.port:6379}")
private String port;

@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress(StrUtil.format("redis://{}:{}", host, port));
return Redisson.create(config);
}

@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
Expand All @@ -44,7 +67,6 @@ public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factor
// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
serializer.setObjectMapper(objectMapper);

RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
Expand Down
Loading

0 comments on commit 47d7d15

Please sign in to comment.