diff --git a/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift b/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift index 2e6ee87..5782b2c 100644 --- a/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift +++ b/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift @@ -267,7 +267,7 @@ extension ElasticsearchClient { let url = try buildURL(path: "/\(name)") return requester.executeRequest(url: url, method: .HEAD, headers: .init(), body: nil).flatMapThrowing { response in guard response.status == .ok || response.status == .notFound else { - throw ElasticSearchClientError(message: "Invalid response from index exists API - \(response)", status: response.status.code) + throw ElasticSearchClientError(message: "Invalid response from index exists API - \(response)", status: response.status) } return response.status == .ok } @@ -275,4 +275,16 @@ extension ElasticsearchClient { return self.eventLoop.makeFailedFuture(error) } } + + public func custom(_ path: String, method: HTTPMethod, body: Data) -> EventLoopFuture { + do { + let url = try buildURL(path: path) + let body = ByteBuffer(data: body) + var headers = HTTPHeaders() + headers.add(name: "content-type", value: "application/json") + return sendRequest(url: url, method: method, headers: headers, body: body).flatMapThrowing { return Data(buffer: $0) } + } catch { + return self.eventLoop.makeFailedFuture(error) + } + } } diff --git a/Sources/ElasticsearchNIOClient/ElasticsearchClient.swift b/Sources/ElasticsearchNIOClient/ElasticsearchClient.swift index 991a100..dbae1c5 100644 --- a/Sources/ElasticsearchNIOClient/ElasticsearchClient.swift +++ b/Sources/ElasticsearchNIOClient/ElasticsearchClient.swift @@ -90,7 +90,7 @@ public struct ElasticsearchClient { self.jsonDecoder = jsonDecoder } - func sendRequest(url: String, method: HTTPMethod, headers: HTTPHeaders, body: ByteBuffer?) -> EventLoopFuture { + func sendRequest(url: String, method: HTTPMethod, headers: HTTPHeaders, body: ByteBuffer?) -> EventLoopFuture { requester.executeRequest(url: url, method: method, headers: headers, body: body).flatMapThrowing { clientResponse in self.logger.trace("Response: \(clientResponse)") if let responseBody = clientResponse.body { @@ -98,15 +98,11 @@ public struct ElasticsearchClient { } switch clientResponse.status.code { case 200...299: - guard var body = clientResponse.body else { + guard let body = clientResponse.body else { self.logger.debug("No body from ElasticSearch response") - throw ElasticSearchClientError(message: "No body from ElasticSearch response", status: clientResponse.status.code) - } - guard let response = try body.readJSONDecodable(D.self, decoder: jsonDecoder, length: body.readableBytes) else { - self.logger.debug("Failed to convert \(D.self)") - throw ElasticSearchClientError(message: "Failed to convert \(D.self)", status: clientResponse.status.code) + throw ElasticSearchClientError(message: "No body from ElasticSearch response", status: clientResponse.status) } - return response + return body default: let requestBody: String if let body = body { @@ -121,8 +117,19 @@ public struct ElasticsearchClient { responseBody = "Empty" } self.logger.trace("Got response status \(clientResponse.status) from ElasticSearch with response \(clientResponse) when trying \(method) request to \(url). Request body was \(requestBody) and response body was \(responseBody)") - throw ElasticSearchClientError(message: "Bad status code from ElasticSearch", status: clientResponse.status.code) + throw ElasticSearchClientError(message: "Bad status code from ElasticSearch", status: clientResponse.status) + } + } + } + + func sendRequest(url: String, method: HTTPMethod, headers: HTTPHeaders, body: ByteBuffer?) -> EventLoopFuture { + sendRequest(url: url, method: method, headers: headers, body: body).flatMapThrowing { body in + var body = body + guard let response = try body.readJSONDecodable(D.self, decoder: jsonDecoder, length: body.readableBytes) else { + self.logger.debug("Failed to convert \(D.self)") + throw ElasticSearchClientError(message: "Failed to convert \(D.self)", status: nil) } + return response } } } diff --git a/Sources/ElasticsearchNIOClient/Models/ElasticsearchClientError.swift b/Sources/ElasticsearchNIOClient/Models/ElasticsearchClientError.swift index 7fd5f51..8d472ff 100644 --- a/Sources/ElasticsearchNIOClient/Models/ElasticsearchClientError.swift +++ b/Sources/ElasticsearchNIOClient/Models/ElasticsearchClientError.swift @@ -1,8 +1,10 @@ +import NIOHTTP1 + public struct ElasticSearchClientError: Error { public let message: String - public let status: UInt? + public let status: HTTPResponseStatus? - public init(message: String, status: UInt?) { + public init(message: String, status: HTTPResponseStatus?) { self.message = message self.status = status } diff --git a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift index 5d8ce38..4bccb44 100644 --- a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift +++ b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift @@ -5,13 +5,13 @@ import AsyncHTTPClient import Logging class ElasticSearchIntegrationTests: XCTestCase { - + // MARK: - Properties var eventLoopGroup: MultiThreadedEventLoopGroup! var client: ElasticsearchClient! var httpClient: HTTPClient! let indexName = "some-index" - + // MARK: - Overrides override func setUpWithError() throws { eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) @@ -22,12 +22,12 @@ class ElasticSearchIntegrationTests: XCTestCase { _ = try client.deleteIndex(indexName).wait() } } - + override func tearDownWithError() throws { try httpClient.syncShutdown() try eventLoopGroup.syncShutdownGracefully() } - + // MARK: - Tests func testURLSetup() throws { let logger = Logger(label: "io.brokenhands.swift-soto-elasticsearch.test") @@ -36,7 +36,7 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertThrowsError(try ElasticsearchClient(httpClient: httpClient, eventLoop: eventLoopGroup.next(), logger: logger, url: invalidURLString)) { error in XCTAssertEqual(error as! ElasticsearchClient.ValidationError, .invalidURLString) } - + let urlWithoutScheme = URL(string: "://localhost:9200")! XCTAssertThrowsError(try ElasticsearchClient(httpClient: httpClient, eventLoop: eventLoopGroup.next(), logger: logger, url: urlWithoutScheme)) { error in XCTAssertEqual(error as! ElasticsearchClient.ValidationError, .missingURLScheme) @@ -66,40 +66,40 @@ class ElasticSearchIntegrationTests: XCTestCase { func testSearchingItems() throws { try setupItems() - + let results: ESGetMultipleDocumentsResponse = try client.searchDocuments(from: indexName, searchTerm: "Apples").wait() XCTAssertEqual(results.hits.hits.count, 5) } - + func testSearchingItemsWithTypeProvided() throws { try setupItems() - + let results = try client.searchDocuments(from: indexName, searchTerm: "Apples", type: SomeItem.self).wait() XCTAssertEqual(results.hits.hits.count, 5) } - + func testSearchItemsCount() throws { try setupItems() - + let results = try client.searchDocumentsCount(from: indexName, searchTerm: "Apples").wait() XCTAssertEqual(results.count, 5) } - + func testSearchDocumentsTotal() throws { for index in 1...100 { let name = "Some \(index) Apples" let item = SomeItem(id: UUID(), name: name) _ = try client.createDocument(item, in: self.indexName).wait() } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - + let results = try client.searchDocuments(from: indexName, searchTerm: "Apples", type: SomeItem.self).wait() XCTAssertEqual(results.hits.total.value, 100) XCTAssertEqual(results.hits.total.relation, .eq) } - + func testCreateDocument() throws { let item = SomeItem(id: UUID(), name: "Banana") let response = try client.createDocument(item, in: self.indexName).wait() @@ -107,7 +107,7 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertEqual(response.index, self.indexName) XCTAssertEqual(response.result, "created") } - + func testCreateDocumentWithID() throws { let item = SomeItem(id: UUID(), name: "Banana") let response = try client.createDocumentWithID(item, in: self.indexName).wait() @@ -115,7 +115,7 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertEqual(response.index, self.indexName) XCTAssertEqual(response.result, "created") } - + func testUpdateDocumentWithCustomId() throws { let item = SomeItem(id: UUID(), name: "Banana") _ = try client.createDocumentWithID(item, in: self.indexName).wait() @@ -124,7 +124,7 @@ class ElasticSearchIntegrationTests: XCTestCase { let response = try client.updateDocument(updatedItem, id: item.id, in: self.indexName).wait() XCTAssertEqual(response.result, "updated") } - + func testUpdateDocumentWithID() throws { let item = SomeItem(id: UUID(), name: "Banana") _ = try client.createDocumentWithID(item, in: self.indexName).wait() @@ -133,25 +133,25 @@ class ElasticSearchIntegrationTests: XCTestCase { let response = try client.updateDocument(updatedItem, in: self.indexName).wait() XCTAssertEqual(response.result, "updated") } - + func testDeletingDocument() throws { try setupItems() let item = SomeItem(id: UUID(), name: "Banana") _ = try client.createDocumentWithID(item, in: self.indexName).wait() Thread.sleep(forTimeInterval: 1.0) - + let results = try client.searchDocumentsCount(from: indexName, searchTerm: "Banana").wait() XCTAssertEqual(results.count, 1) Thread.sleep(forTimeInterval: 0.5) - + let response = try client.deleteDocument(id: item.id, from: self.indexName).wait() XCTAssertEqual(response.result, "deleted") Thread.sleep(forTimeInterval: 0.5) - + let updatedResults = try client.searchDocumentsCount(from: indexName, searchTerm: "Banana").wait() XCTAssertEqual(updatedResults.count, 0) } - + func testCreateIndex() throws { let mappings: [String: Any] = [ "properties": [ @@ -166,43 +166,43 @@ class ElasticSearchIntegrationTests: XCTestCase { ] ] let settings: [String: Any] = ["number_of_shards": 3] - + let response = try client.createIndex(indexName, mappings: mappings, settings: settings).wait() XCTAssertEqual(response.acknowledged, true) - + let exists = try client.checkIndexExists(self.indexName).wait() XCTAssertTrue(exists) } - + func testIndexExists() throws { let item = SomeItem(id: UUID(), name: "Banana") let response = try client.createDocument(item, in: self.indexName).wait() XCTAssertEqual(response.index, self.indexName) XCTAssertEqual(response.result, "created") Thread.sleep(forTimeInterval: 0.5) - + let exists = try client.checkIndexExists(self.indexName).wait() XCTAssertTrue(exists) - + let notExists = try client.checkIndexExists("some-random-index").wait() XCTAssertFalse(notExists) } - + func testDeleteIndex() throws { let item = SomeItem(id: UUID(), name: "Banana") _ = try client.createDocument(item, in: self.indexName).wait() Thread.sleep(forTimeInterval: 0.5) - + let exists = try client.checkIndexExists(self.indexName).wait() XCTAssertTrue(exists) - + let response = try client.deleteIndex(self.indexName).wait() XCTAssertEqual(response.acknowledged, true) - + let notExists = try client.checkIndexExists(self.indexName).wait() XCTAssertFalse(notExists) } - + func testBulkCreate() throws { var items = [SomeItem]() for index in 1...10 { @@ -215,18 +215,18 @@ class ElasticSearchIntegrationTests: XCTestCase { let item = SomeItem(id: UUID(), name: name) items.append(item) } - + let itemsWithIndex = items.map { ESBulkOperation(operationType: .create, index: self.indexName, id: $0.id, document: $0) } let response = try client.bulk(itemsWithIndex).wait() XCTAssertEqual(response.errors, false) XCTAssertEqual(response.items.count, 10) XCTAssertEqual(response.items.first?.create?.result, "created") Thread.sleep(forTimeInterval: 1.0) - + let results = try client.searchDocumentsCount(from: indexName, searchTerm: nil).wait() XCTAssertEqual(results.count, 10) } - + func testBulkCreateUpdateDeleteIndex() throws { let item1 = SomeItem(id: UUID(), name: "Item 1") let item2 = SomeItem(id: UUID(), name: "Item 2") @@ -238,7 +238,7 @@ class ElasticSearchIntegrationTests: XCTestCase { ESBulkOperation(operationType: .update, index: self.indexName, id: item3.id, document: item3), ESBulkOperation(operationType: .delete, index: self.indexName, id: item4.id, document: item4), ] - + let response = try client.bulk(bulkOperation).wait() XCTAssertEqual(response.items.count, 4) XCTAssertNotNil(response.items[0].create) @@ -246,49 +246,49 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertNotNil(response.items[2].update) XCTAssertNotNil(response.items[3].delete) } - + func testSearchingItemsPaginated() throws { for index in 1...100 { let name = "Some \(index) Apples" let item = SomeItem(id: UUID(), name: name) _ = try client.createDocument(item, in: self.indexName).wait() } - + // This is required for ES to settle and load the indexes to return the right results - Thread.sleep(forTimeInterval: 1.0) - + Thread.sleep(forTimeInterval: 2.0) + let results: ESGetMultipleDocumentsResponse = try client.searchDocumentsPaginated(from: indexName, searchTerm: "Apples", size: 20, offset: 10).wait() XCTAssertEqual(results.hits.hits.count, 20) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 11 Apples" })) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 29 Apples" })) } - + func testSearchingItemsWithTypeProvidedPaginated() throws { for index in 1...100 { let name = "Some \(index) Apples" let item = SomeItem(id: UUID(), name: name) _ = try client.createDocument(item, in: self.indexName).wait() } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - + let results = try client.searchDocumentsPaginated(from: indexName, searchTerm: "Apples", size: 20, offset: 10, type: SomeItem.self).wait() XCTAssertEqual(results.hits.hits.count, 20) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 11 Apples" })) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 29 Apples" })) } - + func testGetItem() throws { let item = SomeItem(id: UUID(), name: "Some item") _ = try client.createDocumentWithID(item, in: self.indexName).wait() - + Thread.sleep(forTimeInterval: 1.0) - + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: item.id, from: self.indexName).wait() XCTAssertEqual(retrievedItem.source.name, item.name) } - + func testBulkUpdateWithScript() throws { var items = [SomeItem]() for index in 1...10 { @@ -302,79 +302,79 @@ class ElasticSearchIntegrationTests: XCTestCase { _ = try client.createDocumentWithID(item, in: self.indexName).wait() items.append(item) } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - + struct ScriptBody: Codable { let inline: String } - + let scriptBody = ScriptBody(inline: "ctx._source.count = ctx._source.count += 1") - + let bulkOperation = [ ESBulkOperation(operationType: .updateScript, index: self.indexName, id: items[0].id, document: scriptBody), ] - + let response = try client.bulk(bulkOperation).wait() XCTAssertEqual(response.items.count, 1) XCTAssertNotNil(response.items.first?.update) XCTAssertFalse(response.errors) - + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: items[0].id, from: self.indexName).wait() XCTAssertEqual(retrievedItem.source.count, 1) } - + func testUpdateWithScript() throws { let item = SomeItem(id: UUID(), name: "Some Item", count: 0) _ = try client.createDocumentWithID(item, in: self.indexName).wait() - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - + struct ScriptRequest: Codable { let script: ScriptBody } - + struct ScriptBody: Codable { let inline: String } - + let scriptBody = ScriptBody(inline: "ctx._source.count = ctx._source.count += 1") let request = ScriptRequest(script: scriptBody) - + let response = try client.updateDocumentWithScript(request, id: item.id, in: self.indexName).wait() XCTAssertEqual(response.result, "updated") - + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: item.id, from: self.indexName).wait() XCTAssertEqual(retrievedItem.source.count, 1) } - + func testUpdateWithNonExistentFieldScript() throws { let item = SomeItem(id: UUID(), name: "Some Item") _ = try client.createDocumentWithID(item, in: self.indexName).wait() - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - + struct ScriptRequest: Codable { let script: ScriptBody } - + struct ScriptBody: Codable { let inline: String } - + let scriptBody = ScriptBody(inline: "if(ctx._source.containsKey('count')) { ctx._source.count += 1 } else { ctx._source.count = 1 }") let request = ScriptRequest(script: scriptBody) - + let response = try client.updateDocumentWithScript(request, id: item.id, in: self.indexName).wait() XCTAssertEqual(response.result, "updated") - + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: item.id, from: self.indexName).wait() XCTAssertEqual(retrievedItem.source.count, 1) } - + func testBulkUpdateWithNonExistentFieldScript() throws { var items = [SomeItem]() for index in 1...10 { @@ -388,131 +388,174 @@ class ElasticSearchIntegrationTests: XCTestCase { _ = try client.createDocumentWithID(item, in: self.indexName).wait() items.append(item) } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - + struct ScriptBody: Codable { let inline: String } - + let scriptBody = ScriptBody(inline: "if(ctx._source.containsKey('count')) { ctx._source.count += 1 } else { ctx._source.count = 1 }") - + let bulkOperation = [ ESBulkOperation(operationType: .updateScript, index: self.indexName, id: items[0].id, document: scriptBody), ] - + let response = try client.bulk(bulkOperation).wait() XCTAssertEqual(response.items.count, 1) XCTAssertNotNil(response.items.first?.update) XCTAssertFalse(response.errors) - + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: items[0].id, from: self.indexName).wait() XCTAssertEqual(retrievedItem.source.count, 1) } - + func testCountWithQueryBody() throws { try setupItems() - + struct SearchQuery: Encodable { let query: QueryBody } - + struct QueryBody: Encodable { let queryString: QueryString - + enum CodingKeys: String, CodingKey { case queryString = "query_string" } } - + struct QueryString: Encodable { let query: String } - + let queryString = QueryString(query: "Apples") let queryBody = QueryBody(queryString: queryString) let searchQuery = SearchQuery(query: queryBody) let results = try client.searchDocumentsCount(from: indexName, query: searchQuery).wait() XCTAssertEqual(results.count, 5) } - + func testPaginationQueryWithQueryBody() throws { for index in 1...100 { let name = "Some \(index) Apples" let item = SomeItem(id: UUID(), name: name) _ = try client.createDocument(item, in: self.indexName).wait() } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - + struct QueryBody: Encodable { let queryString: QueryString - + enum CodingKeys: String, CodingKey { case queryString = "query_string" } } - + struct QueryString: Encodable { let query: String } - + let queryString = QueryString(query: "Apples") let queryBody = QueryBody(queryString: queryString) - + let results: ESGetMultipleDocumentsResponse = try client.searchDocumentsPaginated(from: indexName, queryBody: queryBody, size: 20, offset: 10).wait() XCTAssertEqual(results.hits.hits.count, 20) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 11 Apples" })) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 29 Apples" })) } - + func testCustomSearch() throws { for index in 1...100 { let name = "Some \(index) Apples" let item = SomeItem(id: UUID(), name: name) _ = try client.createDocument(item, in: self.indexName).wait() } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 2.0) - + struct Query: Encodable { let query: QueryBody let from: Int let size: Int } - + struct QueryBody: Encodable { let queryString: QueryString - + enum CodingKeys: String, CodingKey { case queryString = "query_string" } } - + struct QueryString: Encodable { let query: String } - + let queryString = QueryString(query: "Apples") let queryBody = QueryBody(queryString: queryString) let query = Query(query: queryBody, from: 10, size: 20) - + let results: ESGetMultipleDocumentsResponse = try client.customSearch(from: indexName, query: query).wait() XCTAssertEqual(results.hits.hits.count, 20) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 11 Apples" })) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 29 Apples" })) } + func testCustomRequest() throws { + for index in 1...100 { + let name = "Some \(index) Apples" + let item = SomeItem(id: UUID(), name: name, count: index) + _ = try client.createDocument(item, in: self.indexName).wait() + } + + // This is required for ES to settle and load the indexes to return the right results + Thread.sleep(forTimeInterval: 2.0) + + let query: [String: Any] = [ + "from": 0, + "size": 10, + "collapse": [ + "field": "id.keyword" + ], + "aggs": [ + "count-objects": [ + "cardinality": [ + "field": "id.keyword" + ] + ], + "count": [ + "avg": [ + "field": "count" + ] + ] + ] + ] + let queryData = try JSONSerialization.data(withJSONObject: query) + + let resultData = try client.custom("/\(indexName)/_search", method: .GET, body: queryData).wait() + + let results = try JSONSerialization.jsonObject(with: resultData) as! [String: Any] + + let aggregations = results["aggregations"] as! [String: Any] + let countObjects = aggregations["count-objects"] as! [String: Any] + XCTAssertEqual(countObjects["value"] as! Double, 100) + let count = aggregations["count"] as! [String: Any] + XCTAssertEqual(count["value"] as! Double, 50.5) + } + + func testCustomSearchWithDataQuery() throws { for index in 1...100 { let name = "Some \(index) Apples" let item = SomeItem(id: UUID(), name: name) _ = try client.createDocument(item, in: self.indexName).wait() } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) @@ -533,7 +576,7 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 11 Apples" })) XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 29 Apples" })) } - + // MARK: - Private private func setupItems() throws { for index in 1...10 { @@ -546,7 +589,7 @@ class ElasticSearchIntegrationTests: XCTestCase { let item = SomeItem(id: UUID(), name: name) _ = try client.createDocument(item, in: self.indexName).wait() } - + // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) }