Elasticsearch: Java: Bulk API: To perform bulk operations
Bulk api allows us to perform multiple operations at a time. One way to create bulk request is using BulkRequestBuilder class.
String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex(_index, _type, _id1)
.setSource(jsonBuilder()
.startObject()
.field("user", "Krishna")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex(_index, _type, _id2)
.setSource(jsonBuilder()
.startObject()
.field("user", "PYR")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
Using BulkProcessor
The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
bulkProcessor.add(new IndexRequest(_index, _type, _id1).source(/* your doc here */));
bulkProcessor.add(new DeleteRequest(_index, _type, _id2));
@Override
public void beforeBulk(long executionId,BulkRequest request) { ... }
This method is called just before bulk is executed.
@Override
public void afterBulk(long executionId, BulkRequest request,BulkResponse response) { ... }
This method is called after bulk execution. You can check for some failing requests with response.hasFailures().
@Override
public void afterBulk(long executionId,BulkRequest request,Throwable failure) { ... }
This method is called when the bulk failed and raised a Throwable
setBulkActions(10000)
We want to execute the bulk every 10000 requests
setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
We want to flush the bulk every 1gb
setFlushInterval(TimeValue.timeValueSeconds(5))
We want to flush the bulk every 5 seconds whatever the number of requests
.setConcurrentRequests(1)
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
By default, BulkProcessor:
sets bulkActions to 1000
sets bulkSize to 5mb
does not set flushInterval
sets concurrentRequests to 1
Following application demonstrates simple Bulk utility.
Step 1:Define simple model class Employee.
package com.self_learn.model;
import java.util.ArrayList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@EqualsAndHashCode()
@ToString
public class Employee {
@Getter @Setter private String age;
@Getter @Setter private String firstName;
@Getter @Setter private String lastName;
@Getter @Setter private List<String> hobbies = new ArrayList<>();
}
Step 2: Define Utility class to get Client instance.
package com.self_learn.util;
import static com.self_learn.util.IPUtil.isValidHosts;
import static com.self_learn.util.IPUtil.isValidPorts;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import com.google.common.base.Preconditions;
public class TransportClientUtil {
private static Map<Map<String, Integer>, Client> localMap = new HashMap<>();
/**
* Take machine name and port addresses as map and return transport client.
* Key is host name, value is port number
*
* @throws UnknownHostException
*/
public static Client getTransportClient(String clusterName,
Map<String, Integer> map) throws UnknownHostException {
Preconditions.checkNotNull(clusterName,
"clusterName shouldn't be empty");
Preconditions.checkNotNull(map, "Map shouldn't be empty");
if (localMap.containsKey(map))
return localMap.get(map);
Preconditions.checkState(isValidHostPorts(map),
"Map contains invalid host (or) port");
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", true).build();
TransportClient client = new TransportClient(settings);
InetSocketTransportAddress addresses[] = getInetSocketTransportAddresses(map);
client.addTransportAddresses(addresses);
localMap.put(map, client);
return client;
}
/**
* @param map
* @return true, if all the entries in map are valid host, ports. Else
* false.
*/
private static boolean isValidHostPorts(Map<String, Integer> map) {
Set<String> hostNames = map.keySet();
Set<Integer> ports = new HashSet<>(map.values());
if (!isValidHosts(hostNames.toArray(new String[hostNames.size()])))
return false;
if (!isValidPorts(ports.toArray(new Integer[ports.size()])))
return false;
return true;
}
private static InetSocketTransportAddress[] getInetSocketTransportAddresses(
Map<String, Integer> map) throws UnknownHostException {
InetSocketTransportAddress addresses[] = new InetSocketTransportAddress[map
.size()];
int count = 0;
Set<String> keys = map.keySet();
for (String key : keys) {
InetAddress addr = InetAddress.getByName(key);
InetSocketTransportAddress address = new InetSocketTransportAddress(
addr, map.get(key));
addresses[count] = address;
}
return addresses;
}
/**
* Get transport client for localhost.
*
* @param clusterName
* @param port
* @return
* @throws UnknownHostException
*/
public static Client getLocalTransportClient(String clusterName, int port)
throws UnknownHostException {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", true).build();
TransportClient client = new TransportClient(settings);
InetAddress addr = InetAddress.getByName("127.0.0.1");
InetSocketTransportAddress address = new InetSocketTransportAddress(
addr, port);
client.addTransportAddress(address);
return client;
}
}
Following is the IPUtil class to validate hostnames, ports.
package com.self_learn.util;
import org.apache.commons.validator.routines.InetAddressValidator;
import com.google.common.base.Preconditions;
/**
* Validate IPaddresses, ports
*
* @author harikrishna_gurram
*/
public class IPUtil {
private static InetAddressValidator inetAddressValidator = InetAddressValidator
.getInstance();
/**
* @param ipAddress
* @return true if ip address is valid, else false
*/
public static boolean isValidIPAddress(String ipAddress) {
Preconditions.checkNotNull(ipAddress, "IP address should not be null");
return inetAddressValidator.isValid(ipAddress);
}
/**
* @param port
* : Port number
* @return true if port number is valid, else false
*/
public static boolean isValidPort(int port) {
if (port > 0 && port < 65536)
return true;
return false;
}
/**
* @param hostNames
* @return true if all the elements of array represents valid hosnames, else
* false.
*/
public static boolean isValidHosts(String[] hostNames) {
Preconditions.checkNotNull(hostNames, "Host names shouldn't be empty");
for (String hostName : hostNames) {
if (!isValidIPAddress(hostName)) {
return false;
}
}
return true;
}
/**
*
* @param ports
* @return true if all the elements of array represents valid ports, else
* false.
*/
public static boolean isValidPorts(Integer[] ports) {
Preconditions.checkNotNull(ports, "ports shouldn't be empty");
for (int port : ports) {
if (!isValidPort(port)) {
return false;
}
}
return true;
}
}
Step 3: BulkRequestType class defines constants to identify type of the request like Index, update, upsert update, and delete.
package com.self_learn.constants;
/**
* Specifies the type of BulkRequest
*
* @author harikrishna_gurram
*/
public enum BulkRequestType {
INDEX, UPDATE, UPSERT_UPDATE, DELETE
}
Step 4: Define BulkModelObject. Each BulkObject instance represents a request like INDEX, UPDATE, UPSERT_UPDATE,DELETE.
package com.self_learn.model;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import com.self_learn.constants.BulkRequestType;
@EqualsAndHashCode()
@ToString
public class BulkObject {
@Getter
@Setter
private String _index;
@Getter
@Setter
private String _type;
@Getter
@Setter
private String _id;
@Getter
@Setter
private String _source;
@Getter
@Setter
private String _upsertSource;
@Getter
@Setter
private BulkRequestType type;
public BulkObject(String _index, String _type, String _id,
BulkRequestType type) {
this._index = _index;
this._type = _type;
this._id = _id;
this.type = type;
}
public BulkObject(String _index, String _type, String _id,
BulkRequestType type, String _source) {
this(_index, _type, _id, type);
this._source = _source;
}
public BulkObject(String _index, String _type, String _id,
BulkRequestType type, String _source, String _upsertSource) {
this(_index, _type, _id, type, _source);
this._upsertSource = _upsertSource;
}
}
Step 5: Define BulkUtil class, which takes set of BulkObjects and process them.
package com.self_learn.util;
import java.util.Set;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.base.Preconditions;
import com.self_learn.constants.BulkRequestType;
import com.self_learn.model.BulkObject;
/**
* Utility class to execute bulk requests.
*
* @author harikrishna_gurram
*/
public class BulkUtil {
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type) {
return new BulkObject(_index, _type, _id, type);
}
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type, String _source) {
return new BulkObject(_index, _type, _id, type, _source);
}
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type, String _source,
String _upsertSource) {
return new BulkObject(_index, _type, _id, type, _source, _upsertSource);
}
public static BulkObject getIndexObject(String _index, String _type,
String _id, String _source) {
return getBulkObject(_index, _type, _id, BulkRequestType.INDEX, _source);
}
public static BulkObject getDeleteObject(String _index, String _type,
String _id) {
return getBulkObject(_index, _type, _id, BulkRequestType.DELETE);
}
public static BulkObject getUpdateObject(String _index, String _type,
String _id, String src) {
return getBulkObject(_index, _type, _id, BulkRequestType.UPDATE, src);
}
public static BulkObject getUpsertUpdateObject(String _index, String _type,
String _id, String _source, String _upsertSource) {
return getBulkObject(_index, _type, _id, BulkRequestType.UPSERT_UPDATE,
_source, _upsertSource);
}
public static BulkObject getIndexObject(String _index, String _type,
String _id, Object _source) {
return getIndexObject(_index, _type, _id, JSONUtil.getJson(_source));
}
public static BulkObject getUpdateObject(String _index, String _type,
String _id, Object _source) {
return getUpdateObject(_index, _type, _id, JSONUtil.getJson(_source));
}
public static BulkObject getUpsertUpdateObject(String _index, String _type,
String _id, Object _source, Object _upsertSource) {
return getUpsertUpdateObject(_index, _type, _id,
JSONUtil.getJson(_source), JSONUtil.getJson(_upsertSource));
}
private static boolean isNull(Object obj) {
return (obj == null);
}
private static boolean isNotNull(Object obj) {
return !isNull(obj);
}
/**
* Validate BulkObject based on Request type.
*
* If Request type
*
* a. is @{link BulkRequestType.INDEX} then BulkObject must has _index,
* _type, _id, _source.
*
* b. is @{link BulkRequestType.UPDATE} then BulkObject must has _index,
* _type, _id, _source.
*
* c. is @{link BulkRequestType.UPSERT_UPDATE} then BulkObject must has
* _index, _type, _id, _source, _upsertSource.
*
* d. is @{link BulkRequestType.DELETE} then BulkObject must has _index,
* _type, _id.
*
* @param type
* Represents Request type, it can be INDEX, UPDATE,
* UPSERT_UPDATE, DELETE.
*
* @param obj
* @return
*/
private static boolean isValidBulkObject(BulkObject obj) {
if (obj == null)
return false;
String _index = obj.get_index();
String _type = obj.get_type();
String _id = obj.get_id();
String _source = obj.get_source();
String _upsertSource = obj.get_upsertSource();
BulkRequestType type = obj.getType();
switch (type) {
case INDEX:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source))
return true;
break;
case UPDATE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source))
return true;
break;
case UPSERT_UPDATE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source) && isNotNull(_upsertSource))
return true;
break;
case DELETE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id))
return true;
break;
}
return false;
}
/**
* method takes Map of requests and execute bulk requests. Key of map
* represents type of request.
*
* @{link BulkRequestType.INDEX} represents Index request
* @{link BulkRequestType.UPDATE} represents update request
* @{link BulkRequestType.UPSERT_UPDATE} represents UPSERT_UPDATE request
* @{link BulkRequestType.DELETE} represents delete request
*
* If BulkObject is invalid, then the request is simply ignored.
*
* @param bulkRequest
* @return
*/
public static BulkResponse execute(Client client, Set<BulkObject> bulkReq) {
Preconditions.checkNotNull(client, "client shouldn't be null");
Preconditions.checkNotNull(bulkReq, "bulkReq shouldn't be null");
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (BulkObject obj : bulkReq) {
if (!isValidBulkObject(obj))
continue;
String _index = obj.get_index();
String _type = obj.get_type();
String _id = obj.get_id();
String _source = obj.get_source();
String _upsertSource = obj.get_upsertSource();
BulkRequestType type = obj.getType();
switch (type) {
case INDEX:
bulkRequest.add(new IndexRequest(_index, _type, _id)
.source(_source));
break;
case UPDATE:
bulkRequest.add(new UpdateRequest(_index, _type, _id)
.doc(_source));
break;
case UPSERT_UPDATE:
bulkRequest.add(new UpdateRequest(_index, _type, _id).doc(
_source).upsert(_upsertSource));
break;
case DELETE:
bulkRequest.add(new DeleteRequest(_index, _type, _id));
break;
default:
System.out.println("Invalid Request");
}
}
return bulkRequest.execute().actionGet();
}
}
Step 6: Define ResponseUtil class, used to return the response of query in json format.
package com.self_learn.util;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import com.google.common.base.Preconditions;
/**
* Utility class to return response in string format.
*
* @author harikrishna_gurram
*/
public class ResponseUtil {
/**
* Returns source of the result as string.
*
* @param response
* @return
*/
public static String getSource(GetResponse response, boolean pretty) {
Preconditions.checkNotNull(response, "response shouldn't be null");
return response.getSourceAsString();
}
/**
* @param response
* @return string representation of {@link BulkResponse}
*/
public static String getResponseInfo(Object response, boolean pretty) {
if (pretty)
return JSONUtil.getPrettyJson(response);
return JSONUtil.getJson(response);
}
}
Step 8: Main.java demonstrate how to use BulkUtil class.
package com.self_learn.test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import com.self_learn.model.BulkObject;
import com.self_learn.model.Employee;
import com.self_learn.util.BulkUtil;
import com.self_learn.util.ResponseUtil;
import com.self_learn.util.TransportClientUtil;
public class Main {
private static String clusterName = "my_cluster_1";
private static String _index = "organization";
private static String _type = "employee";
public static void main(String args[]) throws IOException,
InterruptedException, ExecutionException {
/* Get client instance for cluster */
Client client = TransportClientUtil.getLocalTransportClient(
clusterName, 9300);
Set<BulkObject> objects = new HashSet<>();
/* First 5 index requests */
for (int i = 1; i <= 5; i++) {
Employee emp = new Employee();
emp.setAge("" + i);
emp.setFirstName("firstName " + i);
emp.setLastName("lastName " + i);
emp.setHobbies(Arrays.asList("hobbies " + i));
BulkObject obj = BulkUtil
.getIndexObject(_index, _type, "" + i, emp);
objects.add(obj);
}
BulkResponse response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
/* Delete documents 1 and 4 */
objects = new HashSet<>();
objects.add(BulkUtil.getDeleteObject(_index, _type, "1"));
objects.add(BulkUtil.getDeleteObject(_index, _type, "4"));
response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
/* Update document2 and index document 6 */
objects = new HashSet<>();
Map<String, Object> doc = new HashMap<>();
doc.put("firstName", "Krishna");
doc.put("age", "26");
objects.add(BulkUtil.getUpdateObject(_index, _type, "2", doc));
Employee emp = new Employee();
emp.setAge("" + 6);
emp.setFirstName("firstName " + 6);
emp.setLastName("lastName " + 6);
emp.setHobbies(Arrays.asList("hobbies " + 6));
objects.add(BulkUtil.getIndexObject(_index, _type, "6", emp));
response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
client.close();
}
}
Once you ran, Main.java, you will get following output.
Sep 10, 2015 2:02:14 PM org.elasticsearch.plugins.PluginsService <init>
INFO: [Turner Century] loaded [], sites []
{
"responses": [
{
"id": 0,
"opType": "index",
"response": {
"index": "organization",
"id": "2",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 1,
"opType": "index",
"response": {
"index": "organization",
"id": "4",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 2,
"opType": "index",
"response": {
"index": "organization",
"id": "1",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 3,
"opType": "index",
"response": {
"index": "organization",
"id": "3",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 4,
"opType": "index",
"response": {
"index": "organization",
"id": "5",
"type": "employee",
"version": 1,
"created": true
}
}
],
"tookInMillis": 2,
"remoteAddress": {
"address": {}
}
}
{
"responses": [
{
"id": 0,
"opType": "delete",
"response": {
"index": "organization",
"id": "4",
"type": "employee",
"version": 2,
"found": true
}
},
{
"id": 1,
"opType": "delete",
"response": {
"index": "organization",
"id": "1",
"type": "employee",
"version": 2,
"found": true
}
}
],
"tookInMillis": 1,
"remoteAddress": {
"address": {}
}
}
{
"responses": [
{
"id": 0,
"opType": "update",
"response": {
"index": "organization",
"id": "2",
"type": "employee",
"version": 2,
"created": false
}
},
{
"id": 1,
"opType": "index",
"response": {
"index": "organization",
"id": "6",
"type": "employee",
"version": 1,
"created": true
}
}
],
"tookInMillis": 1,
"remoteAddress": {
"address": {}
}
}
Bulk api allows us to perform multiple operations at a time. One way to create bulk request is using BulkRequestBuilder class.
String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex(_index, _type, _id1)
.setSource(jsonBuilder()
.startObject()
.field("user", "Krishna")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex(_index, _type, _id2)
.setSource(jsonBuilder()
.startObject()
.field("user", "PYR")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
Using BulkProcessor
The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
bulkProcessor.add(new IndexRequest(_index, _type, _id1).source(/* your doc here */));
bulkProcessor.add(new DeleteRequest(_index, _type, _id2));
@Override
public void beforeBulk(long executionId,BulkRequest request) { ... }
This method is called just before bulk is executed.
@Override
public void afterBulk(long executionId, BulkRequest request,BulkResponse response) { ... }
This method is called after bulk execution. You can check for some failing requests with response.hasFailures().
@Override
public void afterBulk(long executionId,BulkRequest request,Throwable failure) { ... }
This method is called when the bulk failed and raised a Throwable
setBulkActions(10000)
We want to execute the bulk every 10000 requests
setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
We want to flush the bulk every 1gb
setFlushInterval(TimeValue.timeValueSeconds(5))
We want to flush the bulk every 5 seconds whatever the number of requests
.setConcurrentRequests(1)
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
By default, BulkProcessor:
sets bulkActions to 1000
sets bulkSize to 5mb
does not set flushInterval
sets concurrentRequests to 1
Following application demonstrates simple Bulk utility.
Step 1:Define simple model class Employee.
package com.self_learn.model;
import java.util.ArrayList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@EqualsAndHashCode()
@ToString
public class Employee {
@Getter @Setter private String age;
@Getter @Setter private String firstName;
@Getter @Setter private String lastName;
@Getter @Setter private List<String> hobbies = new ArrayList<>();
}
Step 2: Define Utility class to get Client instance.
package com.self_learn.util;
import static com.self_learn.util.IPUtil.isValidHosts;
import static com.self_learn.util.IPUtil.isValidPorts;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import com.google.common.base.Preconditions;
public class TransportClientUtil {
private static Map<Map<String, Integer>, Client> localMap = new HashMap<>();
/**
* Take machine name and port addresses as map and return transport client.
* Key is host name, value is port number
*
* @throws UnknownHostException
*/
public static Client getTransportClient(String clusterName,
Map<String, Integer> map) throws UnknownHostException {
Preconditions.checkNotNull(clusterName,
"clusterName shouldn't be empty");
Preconditions.checkNotNull(map, "Map shouldn't be empty");
if (localMap.containsKey(map))
return localMap.get(map);
Preconditions.checkState(isValidHostPorts(map),
"Map contains invalid host (or) port");
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", true).build();
TransportClient client = new TransportClient(settings);
InetSocketTransportAddress addresses[] = getInetSocketTransportAddresses(map);
client.addTransportAddresses(addresses);
localMap.put(map, client);
return client;
}
/**
* @param map
* @return true, if all the entries in map are valid host, ports. Else
* false.
*/
private static boolean isValidHostPorts(Map<String, Integer> map) {
Set<String> hostNames = map.keySet();
Set<Integer> ports = new HashSet<>(map.values());
if (!isValidHosts(hostNames.toArray(new String[hostNames.size()])))
return false;
if (!isValidPorts(ports.toArray(new Integer[ports.size()])))
return false;
return true;
}
private static InetSocketTransportAddress[] getInetSocketTransportAddresses(
Map<String, Integer> map) throws UnknownHostException {
InetSocketTransportAddress addresses[] = new InetSocketTransportAddress[map
.size()];
int count = 0;
Set<String> keys = map.keySet();
for (String key : keys) {
InetAddress addr = InetAddress.getByName(key);
InetSocketTransportAddress address = new InetSocketTransportAddress(
addr, map.get(key));
addresses[count] = address;
}
return addresses;
}
/**
* Get transport client for localhost.
*
* @param clusterName
* @param port
* @return
* @throws UnknownHostException
*/
public static Client getLocalTransportClient(String clusterName, int port)
throws UnknownHostException {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", true).build();
TransportClient client = new TransportClient(settings);
InetAddress addr = InetAddress.getByName("127.0.0.1");
InetSocketTransportAddress address = new InetSocketTransportAddress(
addr, port);
client.addTransportAddress(address);
return client;
}
}
Following is the IPUtil class to validate hostnames, ports.
package com.self_learn.util;
import org.apache.commons.validator.routines.InetAddressValidator;
import com.google.common.base.Preconditions;
/**
* Validate IPaddresses, ports
*
* @author harikrishna_gurram
*/
public class IPUtil {
private static InetAddressValidator inetAddressValidator = InetAddressValidator
.getInstance();
/**
* @param ipAddress
* @return true if ip address is valid, else false
*/
public static boolean isValidIPAddress(String ipAddress) {
Preconditions.checkNotNull(ipAddress, "IP address should not be null");
return inetAddressValidator.isValid(ipAddress);
}
/**
* @param port
* : Port number
* @return true if port number is valid, else false
*/
public static boolean isValidPort(int port) {
if (port > 0 && port < 65536)
return true;
return false;
}
/**
* @param hostNames
* @return true if all the elements of array represents valid hosnames, else
* false.
*/
public static boolean isValidHosts(String[] hostNames) {
Preconditions.checkNotNull(hostNames, "Host names shouldn't be empty");
for (String hostName : hostNames) {
if (!isValidIPAddress(hostName)) {
return false;
}
}
return true;
}
/**
*
* @param ports
* @return true if all the elements of array represents valid ports, else
* false.
*/
public static boolean isValidPorts(Integer[] ports) {
Preconditions.checkNotNull(ports, "ports shouldn't be empty");
for (int port : ports) {
if (!isValidPort(port)) {
return false;
}
}
return true;
}
}
Step 3: BulkRequestType class defines constants to identify type of the request like Index, update, upsert update, and delete.
package com.self_learn.constants;
/**
* Specifies the type of BulkRequest
*
* @author harikrishna_gurram
*/
public enum BulkRequestType {
INDEX, UPDATE, UPSERT_UPDATE, DELETE
}
Step 4: Define BulkModelObject. Each BulkObject instance represents a request like INDEX, UPDATE, UPSERT_UPDATE,DELETE.
package com.self_learn.model;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import com.self_learn.constants.BulkRequestType;
@EqualsAndHashCode()
@ToString
public class BulkObject {
@Getter
@Setter
private String _index;
@Getter
@Setter
private String _type;
@Getter
@Setter
private String _id;
@Getter
@Setter
private String _source;
@Getter
@Setter
private String _upsertSource;
@Getter
@Setter
private BulkRequestType type;
public BulkObject(String _index, String _type, String _id,
BulkRequestType type) {
this._index = _index;
this._type = _type;
this._id = _id;
this.type = type;
}
public BulkObject(String _index, String _type, String _id,
BulkRequestType type, String _source) {
this(_index, _type, _id, type);
this._source = _source;
}
public BulkObject(String _index, String _type, String _id,
BulkRequestType type, String _source, String _upsertSource) {
this(_index, _type, _id, type, _source);
this._upsertSource = _upsertSource;
}
}
Step 5: Define BulkUtil class, which takes set of BulkObjects and process them.
package com.self_learn.util;
import java.util.Set;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.base.Preconditions;
import com.self_learn.constants.BulkRequestType;
import com.self_learn.model.BulkObject;
/**
* Utility class to execute bulk requests.
*
* @author harikrishna_gurram
*/
public class BulkUtil {
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type) {
return new BulkObject(_index, _type, _id, type);
}
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type, String _source) {
return new BulkObject(_index, _type, _id, type, _source);
}
private static BulkObject getBulkObject(String _index, String _type,
String _id, BulkRequestType type, String _source,
String _upsertSource) {
return new BulkObject(_index, _type, _id, type, _source, _upsertSource);
}
public static BulkObject getIndexObject(String _index, String _type,
String _id, String _source) {
return getBulkObject(_index, _type, _id, BulkRequestType.INDEX, _source);
}
public static BulkObject getDeleteObject(String _index, String _type,
String _id) {
return getBulkObject(_index, _type, _id, BulkRequestType.DELETE);
}
public static BulkObject getUpdateObject(String _index, String _type,
String _id, String src) {
return getBulkObject(_index, _type, _id, BulkRequestType.UPDATE, src);
}
public static BulkObject getUpsertUpdateObject(String _index, String _type,
String _id, String _source, String _upsertSource) {
return getBulkObject(_index, _type, _id, BulkRequestType.UPSERT_UPDATE,
_source, _upsertSource);
}
public static BulkObject getIndexObject(String _index, String _type,
String _id, Object _source) {
return getIndexObject(_index, _type, _id, JSONUtil.getJson(_source));
}
public static BulkObject getUpdateObject(String _index, String _type,
String _id, Object _source) {
return getUpdateObject(_index, _type, _id, JSONUtil.getJson(_source));
}
public static BulkObject getUpsertUpdateObject(String _index, String _type,
String _id, Object _source, Object _upsertSource) {
return getUpsertUpdateObject(_index, _type, _id,
JSONUtil.getJson(_source), JSONUtil.getJson(_upsertSource));
}
private static boolean isNull(Object obj) {
return (obj == null);
}
private static boolean isNotNull(Object obj) {
return !isNull(obj);
}
/**
* Validate BulkObject based on Request type.
*
* If Request type
*
* a. is @{link BulkRequestType.INDEX} then BulkObject must has _index,
* _type, _id, _source.
*
* b. is @{link BulkRequestType.UPDATE} then BulkObject must has _index,
* _type, _id, _source.
*
* c. is @{link BulkRequestType.UPSERT_UPDATE} then BulkObject must has
* _index, _type, _id, _source, _upsertSource.
*
* d. is @{link BulkRequestType.DELETE} then BulkObject must has _index,
* _type, _id.
*
* @param type
* Represents Request type, it can be INDEX, UPDATE,
* UPSERT_UPDATE, DELETE.
*
* @param obj
* @return
*/
private static boolean isValidBulkObject(BulkObject obj) {
if (obj == null)
return false;
String _index = obj.get_index();
String _type = obj.get_type();
String _id = obj.get_id();
String _source = obj.get_source();
String _upsertSource = obj.get_upsertSource();
BulkRequestType type = obj.getType();
switch (type) {
case INDEX:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source))
return true;
break;
case UPDATE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source))
return true;
break;
case UPSERT_UPDATE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
&& isNotNull(_source) && isNotNull(_upsertSource))
return true;
break;
case DELETE:
if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id))
return true;
break;
}
return false;
}
/**
* method takes Map of requests and execute bulk requests. Key of map
* represents type of request.
*
* @{link BulkRequestType.INDEX} represents Index request
* @{link BulkRequestType.UPDATE} represents update request
* @{link BulkRequestType.UPSERT_UPDATE} represents UPSERT_UPDATE request
* @{link BulkRequestType.DELETE} represents delete request
*
* If BulkObject is invalid, then the request is simply ignored.
*
* @param bulkRequest
* @return
*/
public static BulkResponse execute(Client client, Set<BulkObject> bulkReq) {
Preconditions.checkNotNull(client, "client shouldn't be null");
Preconditions.checkNotNull(bulkReq, "bulkReq shouldn't be null");
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (BulkObject obj : bulkReq) {
if (!isValidBulkObject(obj))
continue;
String _index = obj.get_index();
String _type = obj.get_type();
String _id = obj.get_id();
String _source = obj.get_source();
String _upsertSource = obj.get_upsertSource();
BulkRequestType type = obj.getType();
switch (type) {
case INDEX:
bulkRequest.add(new IndexRequest(_index, _type, _id)
.source(_source));
break;
case UPDATE:
bulkRequest.add(new UpdateRequest(_index, _type, _id)
.doc(_source));
break;
case UPSERT_UPDATE:
bulkRequest.add(new UpdateRequest(_index, _type, _id).doc(
_source).upsert(_upsertSource));
break;
case DELETE:
bulkRequest.add(new DeleteRequest(_index, _type, _id));
break;
default:
System.out.println("Invalid Request");
}
}
return bulkRequest.execute().actionGet();
}
}
Step 6: Define ResponseUtil class, used to return the response of query in json format.
package com.self_learn.util;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import com.google.common.base.Preconditions;
/**
* Utility class to return response in string format.
*
* @author harikrishna_gurram
*/
public class ResponseUtil {
/**
* Returns source of the result as string.
*
* @param response
* @return
*/
public static String getSource(GetResponse response, boolean pretty) {
Preconditions.checkNotNull(response, "response shouldn't be null");
return response.getSourceAsString();
}
/**
* @param response
* @return string representation of {@link BulkResponse}
*/
public static String getResponseInfo(Object response, boolean pretty) {
if (pretty)
return JSONUtil.getPrettyJson(response);
return JSONUtil.getJson(response);
}
}
Step 8: Main.java demonstrate how to use BulkUtil class.
package com.self_learn.test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import com.self_learn.model.BulkObject;
import com.self_learn.model.Employee;
import com.self_learn.util.BulkUtil;
import com.self_learn.util.ResponseUtil;
import com.self_learn.util.TransportClientUtil;
public class Main {
private static String clusterName = "my_cluster_1";
private static String _index = "organization";
private static String _type = "employee";
public static void main(String args[]) throws IOException,
InterruptedException, ExecutionException {
/* Get client instance for cluster */
Client client = TransportClientUtil.getLocalTransportClient(
clusterName, 9300);
Set<BulkObject> objects = new HashSet<>();
/* First 5 index requests */
for (int i = 1; i <= 5; i++) {
Employee emp = new Employee();
emp.setAge("" + i);
emp.setFirstName("firstName " + i);
emp.setLastName("lastName " + i);
emp.setHobbies(Arrays.asList("hobbies " + i));
BulkObject obj = BulkUtil
.getIndexObject(_index, _type, "" + i, emp);
objects.add(obj);
}
BulkResponse response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
/* Delete documents 1 and 4 */
objects = new HashSet<>();
objects.add(BulkUtil.getDeleteObject(_index, _type, "1"));
objects.add(BulkUtil.getDeleteObject(_index, _type, "4"));
response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
/* Update document2 and index document 6 */
objects = new HashSet<>();
Map<String, Object> doc = new HashMap<>();
doc.put("firstName", "Krishna");
doc.put("age", "26");
objects.add(BulkUtil.getUpdateObject(_index, _type, "2", doc));
Employee emp = new Employee();
emp.setAge("" + 6);
emp.setFirstName("firstName " + 6);
emp.setLastName("lastName " + 6);
emp.setHobbies(Arrays.asList("hobbies " + 6));
objects.add(BulkUtil.getIndexObject(_index, _type, "6", emp));
response = BulkUtil.execute(client, objects);
System.out.println(ResponseUtil.getResponseInfo(response, true));
client.close();
}
}
Once you ran, Main.java, you will get following output.
Sep 10, 2015 2:02:14 PM org.elasticsearch.plugins.PluginsService <init>
INFO: [Turner Century] loaded [], sites []
{
"responses": [
{
"id": 0,
"opType": "index",
"response": {
"index": "organization",
"id": "2",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 1,
"opType": "index",
"response": {
"index": "organization",
"id": "4",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 2,
"opType": "index",
"response": {
"index": "organization",
"id": "1",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 3,
"opType": "index",
"response": {
"index": "organization",
"id": "3",
"type": "employee",
"version": 1,
"created": true
}
},
{
"id": 4,
"opType": "index",
"response": {
"index": "organization",
"id": "5",
"type": "employee",
"version": 1,
"created": true
}
}
],
"tookInMillis": 2,
"remoteAddress": {
"address": {}
}
}
{
"responses": [
{
"id": 0,
"opType": "delete",
"response": {
"index": "organization",
"id": "4",
"type": "employee",
"version": 2,
"found": true
}
},
{
"id": 1,
"opType": "delete",
"response": {
"index": "organization",
"id": "1",
"type": "employee",
"version": 2,
"found": true
}
}
],
"tookInMillis": 1,
"remoteAddress": {
"address": {}
}
}
{
"responses": [
{
"id": 0,
"opType": "update",
"response": {
"index": "organization",
"id": "2",
"type": "employee",
"version": 2,
"created": false
}
},
{
"id": 1,
"opType": "index",
"response": {
"index": "organization",
"id": "6",
"type": "employee",
"version": 1,
"created": true
}
}
],
"tookInMillis": 1,
"remoteAddress": {
"address": {}
}
}
How to Get From Hollywood Casino Resort to Hard Rock Hotel
ReplyDeleteCasino resorts like Hard Rock Hotel and Hard Rock Hotel 고양 출장마사지 Casino 동해 출장안마 Hollywood 부산광역 출장마사지 are on a mission to help 보령 출장마사지 bring 출장샵 your luck to life, bringing you luck,