Elasticsearch: Java: Bulk API: To perform bulk operations
Bulk api allows us to perform multiple operations at a time. One way to create bulk request is using BulkRequestBuilder class.
String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex(_index, _type, _id1)
.setSource(jsonBuilder()
.startObject()
.field("user", "Krishna")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex(_index, _type, _id2)
.setSource(jsonBuilder()
.startObject()
.field("user", "PYR")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
Using BulkProcessor
The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
bulkProcessor.add(new IndexRequest(_index, _type, _id1).source(/* your doc here */));
bulkProcessor.add(new DeleteRequest(_index, _type, _id2));
@Override
public void beforeBulk(long executionId,BulkRequest request) { ... }
This method is called just before bulk is executed.
@Override
public void afterBulk(long executionId, BulkRequest request,BulkResponse response) { ... }
This method is called after bulk execution. You can check for some failing requests with response.hasFailures().
@Override
public void afterBulk(long executionId,BulkRequest request,Throwable failure) { ... }
This method is called when the bulk failed and raised a Throwable
setBulkActions(10000)
We want to execute the bulk every 10000 requests
setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
We want to flush the bulk every 1gb
setFlushInterval(TimeValue.timeValueSeconds(5))
We want to flush the bulk every 5 seconds whatever the number of requests
.setConcurrentRequests(1)
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
By default, BulkProcessor:
sets bulkActions to 1000
sets bulkSize to 5mb
does not set flushInterval
sets concurrentRequests to 1
Following application demonstrates simple Bulk utility.
Step 1:Define simple model class Employee.
package com.self_learn.model;
import java.util.ArrayList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@EqualsAndHashCode()
@ToString
public class Employee {
@Getter @Setter private String age;
@Getter @Setter private String firstName;
@Getter @Setter private String lastName;
@Getter @Setter private List<String> hobbies = new ArrayList<>();
}
Step 2: Define Utility class to get Client instance.
package com.self_learn.util;
import static com.self_learn.util.IPUtil.isValidHosts;
import static com.self_learn.util.IPUtil.isValidPorts;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import com.google.common.base.Preconditions;
public class TransportClientUtil {
private static Map<Map<String, Integer>, Client> localMap = new HashMap<>();
/**
* Take machine name and port addresses as map and return transport client.
* Key is host name, value is port number
*
* @throws UnknownHostException
*/
public static Client getTransportClient(String clusterName,
Map<String, Integer> map) throws UnknownHostException {
Preconditions.checkNotNull(clusterName,
"clusterName shouldn't be empty");
Preconditions.checkNotNull(map, "Map shouldn't be empty");
if (localMap.containsKey(map))
return localMap.get(map);
Preconditions.checkState(isValidHostPorts(map),
"Map contains invalid host (or) port");
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", true).build();
TransportClient client = new TransportClient(settings);
InetSocketTransportAddress addresses[] = getInetSocketTransportAddresses(map);
client.addTransportAddresses(addresses);
localMap.put(map, client);
return client;
}
/**
* @param map
* @return true, if all the entries in map are valid host, ports. Else
* false.
*/
private static boolean isValidHostPorts(Map<String, Integer> map) {
Set<String> hostNames = map.keySet();
Set<Integer> ports = new HashSet<>(map.values());
if (!isValidHosts(hostNames.toArray(new String[hostNames.size()])))
return false;
if (!isValidPorts(ports.toArray(new Integer[ports.size()])))
return false;
return true;
}
private static InetSocketTransportAddress[] getInetSocketTransportAddresses(
Map<String, Integer> map) throws UnknownHostException {
InetSocketTransportAddress addresses[] = new InetSocketTransportAddress[map
.size()];
int count = 0;
Set<String> keys = map.keySet();
for (String key : keys) {
InetAddress addr = InetAddress.getByName(key);
InetSocketTransportAddress address = new InetSocketTransportAddress(
addr, map.get(key));
addresses[count] = address;
}
return addresses;
}
/**
* Get transport client for localhost.
*
* @param clusterName
* @param port
* @return
* @throws UnknownHostException
*/
public static Client getLocalTransportClient(String clusterName, int port)
throws UnknownHostException {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", true).build();
TransportClient client = new TransportClient(settings);
InetAddress addr = InetAddress.getByName("127.0.0.1");
InetSocketTransportAddress address = new InetSocketTransportAddress(
addr, port);
client.addTransportAddress(address);
return client;
}
}
Following is the IPUtil class to validate hostnames, ports.
package com.self_learn.util;
import org.apache.commons.validator.routines.InetAddressValidator;
import com.google.common.base.Preconditions;
/**
* Validate IPaddresses, ports
*
* @author harikrishna_gurram
*/
public class IPUtil {
private static InetAddressValidator inetAddressValidator = InetAddressValidator
.getInstance();
/**
* @param ipAddress
* @return true if ip address is valid, else false
*/
public static boolean isValidIPAddress(String ipAddress) {
Preconditions.checkNotNull(ipAddress, "IP address should not be null");
return inetAddressValidator.isValid(ipAddress);
}
/**
* @param port
* : Port number
* @return true if port number is valid, else false
*/
public static boolean isValidPort(int port) {
if (port > 0 && port < 65536)
return true;
return false;
}
/**
* @param hostNames
* @return true if all the elements of array represents valid hosnames, else
* false.
*/
public static boolean isValidHosts(String[] hostNames) {
Preconditions.checkNotNull(hostNames, "Host names shouldn't be empty");
for (String hostName : hostNames) {
if (!isValidIPAddress(hostName)) {
return false;
}
}
return true;
}
/**
*
* @param ports
* @return true if all the elements of array represents valid ports, else
* false.
*/
public static boolean isValidPorts(Integer[] ports) {
Preconditions.checkNotNull(ports, "ports shouldn't be empty");
for (int port : ports) {
if (!isValidPort(port)) {
return false;
}
}
return true;
}
}
Step 3: BulkRequestType class defines constants to identify type of the request like Index, update, upsert update, and delete.
package com.self_learn.constants;
/**
* Specifies the type of BulkRequest
*
* @author harikrishna_gurram
*/
public enum BulkRequestType {
INDEX, UPDATE, UPSERT_UPDATE, DELETE
}
Step 4: Define BulkModelObject. Each BulkObject instance represents a request like INDEX, UPDATE, UPSERT_UPDATE,DELETE.
package com.self_learn.model;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import com.self_learn.constants.BulkRequestType;
@EqualsAndHashCode()
@ToString
public class BulkObject {
@Getter
@Setter
private String _index;
@Getter
@Setter
private String _type;
@Getter
@Setter
private String _id;
@Getter
@Setter
private String _source;
@Getter
@Setter
private String _upsertSource;
@Getter
@Setter
private BulkRequestType type;
public BulkObject(String _index, String _type, String _id,
BulkRequestType type) {
this._index = _index;
this._type = _type;
this._id = _id;
this.type = type;
}
public BulkObject(String _index, String _type, String _id,
BulkRequestType type, String _source) {
this(_index, _type, _id, type);
this._source = _source;
}
public BulkObject(String _index, String _type, String _id,
BulkRequestType type, String _source, String _upsertSource) {
this(_index, _type, _id, type, _source);
this._upsertSource = _upsertSource;
}
}
Step 5: Define BulkUtil class, which takes set of BulkObjects and process them.
package com.self_learn.util;
import java.util.Set;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.base.Preconditions;
import com.self_learn.constants.BulkRequestType;
import com.self_learn.model.BulkObject;
/**
* Utility class to execute bulk requests.
*
* @author harikrishna_gurram
*/
public class BulkUtil {
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type) {
return new BulkObject(_index, _type, _id, type);
}
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type, String _source) {
return new BulkObject(_index, _type, _id, type, _source);
}
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type, String _source,
String _upsertSource) {
return new BulkObject(_index, _type, _id, type, _source, _upsertSource);
}
public static BulkObject getIndexObject(String _index, String _type,
String _id, String _source) {
return getBulkObject(_index, _type, _id, BulkRequestType.INDEX, _source);
}
public static BulkObject getDeleteObject(String _index, String _type,
String _id) {
return getBulkObject(_index, _type, _id, BulkRequestType.DELETE);
}
public static BulkObject getUpdateObject(String _index, String _type,
String _id, String src) {
return getBulkObject(_index, _type, _id, BulkRequestType.UPDATE, src);
}
public static BulkObject getUpsertUpdateObject(String _index, String _type,
String _id, String _source, String _upsertSource) {
return getBulkObject(_index, _type, _id, BulkRequestType.UPSERT_UPDATE,
_source, _upsertSource);
}
public static BulkObject getIndexObject(String _index, String _type,
String _id, Object _source) {
return getIndexObject(_index, _type, _id, JSONUtil.getJson(_source));
}
public static BulkObject getUpdateObject(String _index, String _type,
String _id, Object _source) {
return getUpdateObject(_index, _type, _id, JSONUtil.getJson(_source));
}
public static BulkObject getUpsertUpdateObject(String _index, String _type,
String _id, Object _source, Object _upsertSource) {
return getUpsertUpdateObject(_index, _type, _id,
JSONUtil.getJson(_source), JSONUtil.getJson(_upsertSource));
}
private static boolean isNull(Object obj) {
return (obj == null);
}
private static boolean isNotNull(Object obj) {
return !isNull(obj);
}
/**
* Validate BulkObject based on Request type.
*
* If Request type
*
* a. is @{link BulkRequestType.INDEX} then BulkObject must has _index,
* _type, _id, _source.
*
* b. is @{link BulkRequestType.UPDATE} then BulkObject must has _index,
* _type, _id, _source.
*
* c. is @{link BulkRequestType.UPSERT_UPDATE} then BulkObject must has
* _index, _type, _id, _source, _upsertSource.
*
* d. is @{link BulkRequestType.DELETE} then BulkObject must has _index,
* _type, _id.
*
* @param type
* Represents Request type, it can be INDEX, UPDATE,
* UPSERT_UPDATE, DELETE.
*
* @param obj
* @return
*/
private static boolean isValidBulkObject(BulkObject obj) {
if (obj == null)
return false;
String _index = obj.get_index();
String _type = obj.get_type();
String _id = obj.get_id();
String _source = obj.get_source();
String _upsertSource = obj.get_upsertSource();
BulkRequestType type = obj.getType();
switch (type) {
case INDEX:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source))
return true;
break;
case UPDATE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source))
return true;
break;
case UPSERT_UPDATE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source) && isNotNull(_upsertSource))
return true;
break;
case DELETE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id))
return true;
break;
}
return false;
}
/**
* method takes Map of requests and execute bulk requests. Key of map
* represents type of request.
*
* @{link BulkRequestType.INDEX} represents Index request
* @{link BulkRequestType.UPDATE} represents update request
* @{link BulkRequestType.UPSERT_UPDATE} represents UPSERT_UPDATE request
* @{link BulkRequestType.DELETE} represents delete request
*
* If BulkObject is invalid, then the request is simply ignored.
*
* @param bulkRequest
* @return
*/
public static BulkResponse execute(Client client, Set<BulkObject> bulkReq) {
Preconditions.checkNotNull(client, "client shouldn't be null");
Preconditions.checkNotNull(bulkReq, "bulkReq shouldn't be null");
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (BulkObject obj : bulkReq) {
if (!isValidBulkObject(obj))
continue;
String _index = obj.get_index();
String _type = obj.get_type();
String _id = obj.get_id();
String _source = obj.get_source();
String _upsertSource = obj.get_upsertSource();
BulkRequestType type = obj.getType();
switch (type) {
case INDEX:
bulkRequest.add(new IndexRequest(_index, _type, _id)
.source(_source));
break;
case UPDATE:
bulkRequest.add(new UpdateRequest(_index, _type, _id)
.doc(_source));
break;
case UPSERT_UPDATE:
bulkRequest.add(new UpdateRequest(_index, _type, _id).doc(
_source).upsert(_upsertSource));
break;
case DELETE:
bulkRequest.add(new DeleteRequest(_index, _type, _id));
break;
default:
System.out.println("Invalid Request");
}
}
return bulkRequest.execute().actionGet();
}
}
Step 6: Define ResponseUtil class, used to return the response of query in json format.
package com.self_learn.util;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import com.google.common.base.Preconditions;
/**
* Utility class to return response in string format.
*
* @author harikrishna_gurram
*/
public class ResponseUtil {
/**
* Returns source of the result as string.
*
* @param response
* @return
*/
public static String getSource(GetResponse response, boolean pretty) {
Preconditions.checkNotNull(response, "response shouldn't be null");
return response.getSourceAsString();
}
/**
* @param response
* @return string representation of {@link BulkResponse}
*/
public static String getResponseInfo(Object response, boolean pretty) {
if (pretty)
return JSONUtil.getPrettyJson(response);
return JSONUtil.getJson(response);
}
}
Step 8: Main.java demonstrate how to use BulkUtil class.
package com.self_learn.test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import com.self_learn.model.BulkObject;
import com.self_learn.model.Employee;
import com.self_learn.util.BulkUtil;
import com.self_learn.util.ResponseUtil;
import com.self_learn.util.TransportClientUtil;
public class Main {
private static String clusterName = "my_cluster_1";
private static String _index = "organization";
private static String _type = "employee";
public static void main(String args[]) throws IOException,
InterruptedException, ExecutionException {
/* Get client instance for cluster */
Client client = TransportClientUtil.getLocalTransportClient(
clusterName, 9300);
Set<BulkObject> objects = new HashSet<>();
/* First 5 index requests */
for (int i = 1; i <= 5; i++) {
Employee emp = new Employee();
emp.setAge("" + i);
emp.setFirstName("firstName " + i);
emp.setLastName("lastName " + i);
emp.setHobbies(Arrays.asList("hobbies " + i));
BulkObject obj = BulkUtil
.getIndexObject(_index, _type, "" + i, emp);
objects.add(obj);
}
BulkResponse response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
/* Delete documents 1 and 4 */
objects = new HashSet<>();
objects.add(BulkUtil.getDeleteObject(_index, _type, "1"));
objects.add(BulkUtil.getDeleteObject(_index, _type, "4"));
response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
/* Update document2 and index document 6 */
objects = new HashSet<>();
Map<String, Object> doc = new HashMap<>();
doc.put("firstName", "Krishna");
doc.put("age", "26");
objects.add(BulkUtil.getUpdateObject(_index, _type, "2", doc));
Employee emp = new Employee();
emp.setAge("" + 6);
emp.setFirstName("firstName " + 6);
emp.setLastName("lastName " + 6);
emp.setHobbies(Arrays.asList("hobbies " + 6));
objects.add(BulkUtil.getIndexObject(_index, _type, "6", emp));
response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
client.close();
}
}
Once you ran, Main.java, you will get following output.
Sep 10, 2015 2:02:14 PM org.elasticsearch.plugins.PluginsService <init>
INFO: [Turner Century] loaded [], sites []
{
"responses": [
{
"id": 0,
"opType": "index",
"response": {
"index": "organization",
"id": "2",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 1,
"opType": "index",
"response": {
"index": "organization",
"id": "4",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 2,
"opType": "index",
"response": {
"index": "organization",
"id": "1",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 3,
"opType": "index",
"response": {
"index": "organization",
"id": "3",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 4,
"opType": "index",
"response": {
"index": "organization",
"id": "5",
"type": "employee",
"version": 1,
"created": true
}
}
],
"tookInMillis": 2,
"remoteAddress": {
"address": {}
}
}
{
"responses": [
{
"id": 0,
"opType": "delete",
"response": {
"index": "organization",
"id": "4",
"type": "employee",
"version": 2,
"found": true
}
},
{
"id": 1,
"opType": "delete",
"response": {
"index": "organization",
"id": "1",
"type": "employee",
"version": 2,
"found": true
}
}
],
"tookInMillis": 1,
"remoteAddress": {
"address": {}
}
}
{
"responses": [
{
"id": 0,
"opType": "update",
"response": {
"index": "organization",
"id": "2",
"type": "employee",
"version": 2,
"created": false
}
},
{
"id": 1,
"opType": "index",
"response": {
"index": "organization",
"id": "6",
"type": "employee",
"version": 1,
"created": true
}
}
],
"tookInMillis": 1,
"remoteAddress": {
"address": {}
}
}