FloES is a generic wrapper for common Elasticsearch operations, such as writing, finding, searching, listing and paginating documents. Uses Nest & Elasticsearch.Net, supports AWS, and includes ILogger support.
Breaking changes: recent versions (1.4.x) include a large refactoring and cleanup of the generics implementation.
https://www.nuget.org/packages/FloES
// Instantiate a new Floe for our 'Order' documents
_ordersFloe = new Floe<Order>(
client: elasticClient,
defaultIndex: "idx-orders",
logger: _logger, // optionally pass in your ILogger to get automatic logs
numberOfBulkDocumentsToWriteAtOnce: 3, // pick a higher number if you're writing lots of documents very rapidly
rollingDate: true); // documents will be written to indices with rolling dates (e.g.: idx-orders-2020-04-20)// Your AWSOptions
AWSOptions awsOptions = new AWSOptions
{
Credentials = new BasicAWSCredentials(_config.AccessKey, _config.SecretAccessKey),
Region = Amazon.RegionEndpoint.CACentral1
};
// Instantiate a new Floe for our 'Order' documents
_ordersFloe = new Floe<Order>(
awsOptions: awsOptions,
esClusterUri: new Uri(_config.AwsElasticsearchEndpoint),
defaultIndex: "idx-orders",
logger: _logger, // optionally pass in your ILogger to get automatic logs
numberOfBulkDocumentsToWriteAtOnce: 3, // pick a higher number if you're writing lots of documents very rapidly
rollingDate: true); // documents will be written to indices with rolling dates (e.g.: idx-orders-2020-04-20)// Write an order document to the default index with a rolling date (e.g.: idx-orders-2020-04-20)
// You can write many asynchronously by calling this in a loop (safe due to BulkAsync usage, with a smart numberOfBulkDocumentsToWriteAtOnce choice)
await _ordersFloe.Write(order);
// Choosing a good numberOfBulkDocumentsToWriteAtOnce:
// Writing 10,000 documents/hour -> numberOfBulkDocumentsToWriteAtOnce: ~50
// Writing 3 documents a day -> numberOfBulkDocumentsToWriteAtOnce: ~1
// tl;dr: use your head!
// Write any remaining unwritten documents from the buffer (e.g.: call this once after a very long loop to finish up)
await _ordersFloe.WriteUnwritten();
// Get an order
Order order = await _ordersFloe.Find(id: "1");// List all orders
IEnumerable<Order> orders = await _ordersFloe.List();
// List all orders for the last 24 hours
IEnumerable<Order> orders = await _ordersFloe.List(listLastXHours: 24);
// List all orders for the last 7.5 days
IEnumerable<Order> orders = await _ordersFloe.List(listLastXDays: 7.5);
// Search for orders of SKU 100
IEnumerable<Order> orders = await _ordersFloe.Search("sku", 100);
// Search for orders of SKU 100 for the last 4.5 hours
IEnumerable<Order> orders = await _ordersFloe.Search(
fieldToSearch: "sku",
valueToSearch: 100,
listLastXHours: 4.5);(e.g.: Telerik Blazor DataGrid)
async Task ReadItems(GridReadEventArgs args)
{
(string, string)? sort = null;
if (args.Request.Sorts?.Any() == true)
{
string memberNameToUse = args.Request.Sorts.FirstOrDefault()?.Member;
// Convert Telerik 'Sorts' argument to tuple
sort =
args.Request.Sorts?
.Select(s =>
s.SortDirection == ListSortDirection.Ascending
? (memberNameToUse, "asc")
: (memberNameToUse, "des"))
.FirstOrDefault();
}
// Paginate orders while sorting
_ordersGridData =
(await _ordersFloe.Page(
page: args.Request.Page,
recordsOnPage: _pageSize,
sort: sort)
.ToList();
StateHasChanged();
}(i.e.: use this if you want to do some operation during the scroll. Otherwise just use Search or List)
// Begin a scroll for all orders in Canada for the last year, getting 1000 orders at a time
ISearchResponse<Order> scrollCanada = await _ordersFloe.BeginScroll(
fieldToSearch: "region",
valueToSearch: "Canada",
scrollForXDocuments: 1000,
listLastXDays: 365.25);
bool continueScrolling = true;
while (continueScrolling && scrollCanada != null)
{
if (scrollCanada.Documents != null && !scrollCanada.IsValid)
{
break;
}
if (scrollCanada.Documents != null && !scrollCanada.Documents.Any())
{
continueScrolling = false;
}
else
{
// Do something with your 1000 orders before continuing the scroll
_yourResults.AddRange(scrollCanada.Documents);
_yourProgressIndicator.IndicateScrollProgress(_yourResults.Count);
_yourLogger.LogInformation($"We got another 1000 orders from Elasticsearch!");
// Continue the scroll for the next set of orders
scrollCanada = await _ordersFloe.ContinueScroll(scrollCanada);
}
}
// End the scroll
await _ordersFloe.EndScroll(scrollCanada);// DANGER: delete all indices and then dispose of the Floe capable of doing so
{
await using Floe<ExampleAdminService> temporaryDeleteAllIndicesFloe = new Floe(
awsOptions: _awsOptions,
esClusterUri: new Uri(_config.AwsElasticsearchEndpoint));
await temporaryDeleteIndexFloe.DeleteAllIndices();
}Make sure the document object you're writing has a unique "Id" parameter. Because of the asynchronous nature of .Write, and Elasticsearch clustering, by allowing Elasticsearch to automatically generate an "Id" parameter you run the risk of creating duplicate documents with their own unique IDs. An example is below:
// Class definition
public partial class Log
{
[PropertyName("id")]
public string Id { get; set; }
// ...
}
// Document we want to write (e.g.: some Log)
Log log = new Log
{
Id = $"log-{task}-{DateTime.UtcNow.ToString("yyyy-MM-dd-HH-mm-ss)}",
TaskName = task,
Description = description,
};
// No duplicates will be created since we are specifying the ID ourselves
await _logsFloe.Write(log);