Apache Flink connector to Elasticsearch v8
This project aims to fill the gap between Apache Flink and Elasticsearch version 8. Officially, the Flink project has connectors to Elasticsearch versions 6 and 7, and they share the same base. However, the latest version of Elasticsearch count on a brand new Java API Client, instead of the now deprecated RestHighLevelClient. That's the main reason to create this connector from scratch.
Similarly to the previous versions, internally each parallel instance uses a buffer to send requests to the Elasticsearch cluster in bulk. The buffer can be flushed by the threshold or enabling the checkpointing.
<dependency>
<groupId>com.mtfelisb</groupId>
<artifactId>flink-connector-elasticsearch</artifactId>
<version>1.0.0</version>
</dependency>
mvn install
implementation com.mtfelisb:flink-connector-elasticsearch:1.0.0
DataStream<T> stream = ...
final ElasticsearchSink<T> esSink = ElasticsearchSinkBuilder.<T>builder()
.setThreshold(100L)
.setHost("localhost")
.setPort(9200)
.setEmitter(
(element, operation, context) ->
(BulkOperation.Builder) operation
.update(up -> up
.id(element.getId())
.index(ELASTICSEARCH_INDEX)
.action(ac -> ac.doc(element.getValue()))
)
)
.build();
stream.addSink(esSink);
env.execute();The flink-connector-elasticsearch is integrated with Flink's checkpointing mechanism, meaning that it will flush all buffered data into the Elasticsearch cluster when the checkpoint is triggered automatically. Hence, flink-connector-elasticsearch holds AT_LEAST_ONCE guarantee when the checkpoint is enabled.
Important: The EXACTLY_ONCE guarantee can also be achieved if the update operation holds deterministic ids and the upsert is flagged true.
| Name | Type | Required | Description |
|---|---|---|---|
| threshold | Long | Yes | The internal buffer limit to send requests |
| host | String | Yes | The host to reach the Elasticsearch cluster |
| port | int | Yes | The port to reach the Elasticsearch cluster |
| emitter | Emitter<> | Yes | The Emitter implementation to process each element before pushing to the buffer |
| username | String | No | The username to authenticate in the Elasticsearch cluster |
| password | String | No | The password to authenticate in the Elasticsearch cluster |
- Initial release
See the LICENSE file for more details.