In the previous post I outlined some of the high-level requirements for a system that indexed log data, and makes that data available for search, all in near-real-time. Satisfying these requirements involves making trade-offs, and sometimes there are no easy answers.
To better understand the design of these systems, this post examines in detail a program I wrote to receive logs over the network, index the data, and allow the logs to be searched. The program is called Ekanite.
The source code for Ekanite can be found on GitHub. It’s written in Go, since that is what I do most of my development in these days. It is built on bleve, which provides the core indexing and search functionality. The following discussion is accurate as of Ekanite v0.1.
A note on terminology: log lines are often referred to as log events in the following discussions. This makes it clear that we’re dealing with specific log line with an associated time. In addition sometimes the phrase document is used when discussing an event, since search engines often talk of indexing documents.
A high-level view of the architecture of Ekanite is shown below, showing some indexes as they might exist at an instant in time.
As shown above Ekanite receives logs over TCP and UDP, stores the log events in indexes, and makes that data available for search. There is actually a lot going on under the covers to support all this, so let’s dig into it.
The layer marked ekanite is the engine layer. Within the engine are the following components:
Ekanite supports both UDP and TCP inputs. This code is fairly standard Go networking code and borrows heavily from an older project of mine called syslog-gollector. The core networking functionality is in collector.go. It relies on special delimiter code, located in delimiter.go, which allows the system to correctly detect multi-line log lines. This is important since stack traces are common in log lines, and usually span multiple lines.
Once a log line has been detected, it is parsed for RFC5424 headers. Restricting Ekanite to RFC5424-formatted lines reduces the problem scope, at the cost of flexibility. The parsing code is also built on regular expressions, which is computationally expensive. A state-machine approach would be much more efficient, but using regular expressions meant fast development. In fact the parsing code already existed, also in syslog-gollector, so this code was ported to Ekanite. You can see the fields that are parsed out in parser.go.
One of the fundamental functions of parsing is determining the event time of the log line. This event time, along with a sequence number, uniquely orders an event with respect to all other events in the system. Ordering log lines involves some subtlety, so let’s examine it further.
How are events ordered?
Imagine 2 events arrive at Ekanite with exactly the same event time — perhaps the timestamps only have second-level granularity. How should the system then order these events when both are presented in search results? Is there an objective answer to the question which arrived first?
The answer is yes — but only if the two events arrived within the same UDP datagram, or on the same TCP connection. In this situation the event which arrived first must always be presented first in search results. The sequence number of the first event is always lower than the sequence number assigned to the second. However if the two events arrive on different TCP connections, or in two different UDP datagrams, with the same event time, either event may have the lower sequence number.
Generating event IDs
Once an event has been parsed and assigned a sequence number, its document ID can be generated. An ID is required before an event can be indexed. In addition these IDs have the important property that events are sorted by increasing ID to determine display order at search time. An event’s ID is determined by this function.
The engine is responsible for selecting the right index — and creating an index if necessary — for all log data, and ensuring that data is indexed correctly. More on this later.
Within the engine is a simple TCP service, which services query requests. In v0.1 it is a simple telnet-like server which accepts requests in bleve query format. While this query language is rudimentary, it allowed fast development of an end-to-end system. And it could be replaced with a more sophisticated query support, allowing actual time-ranged queries and specific searches on fields parsed from the log data.
Configuration and management
The engine layer also performs other tasks such as retention enforcement (runRetentionEnforcement()), configuring statistics and diagnostics, and ensuring clean shut down.
Let’s look in detail at how log data is indexed, starting with bleve and sharding.
bleve and sharding
At the lowest level of the above architecture diagram are bleve instances. bleve is a fully-featured text search library. It provides the actual analysis, tokenizing, indexing and search functionality. It stores its data in a small number of files, which can be considered as a unit within Ekanite. bleve is analogous to Apache Lucene, but instead of being written in Java, it is written in Go.
One level up is the logical concept of an index. Each Ekanite index comprises 1 or more bleve instances — in the diagram above each index is shown composed of 3 bleve instances. bleve calls these instances “indexes” but, to avoid confusion, within Ekanite these are known as “shards”. Why does Ekanite support more than 1 bleve shard per index? The answer is simple: indexing throughput requirements. My previous work with bleve shows that indexing data in parallel is very effective at increasing throughput.
Each index is assigned a start and end time. These times determine within which index a given log message is actually stored. The diagram above shows two indexes — the first spanning 1pm to 2pm, and the second from 2pm to 3pm, with the end time being exclusive. Note that bleve has no concept of time — only Ekanite indexes have times. This is one of the principle functions of Ekanite — it associates specific bleve shards with specific time ranges. It is important to note that time in this context is the time parsed from the log line — defined as event time in the first post of this series. It is not the time the log line was received.
Using indexes like this achieves two goals:
- Quicker search if a time range is specified in the query since only indexes that match the time range need be searched.
- Chunks of data — entire indexes — can be easily deleted as retention enforcement takes place.
Those familiar with bleve will understand the importance of getting the Mapping correct. Mappings are the instructions that configure bleve such that the data is analyzed and tokenized correctly. While the default mapping works for many data sources, log data is not ordinary text data. For example, should term vectors be stored? Which part of the log event are timestamps? Should all text be lower-cased before indexing? The importance of getting the mapping right cannot be underestimated, as it is critical to producing high-quality search results. You can examine the Ekanite mapping configuration in the indexing code.
Indexing a log line
Within Ekanite, it is actually batches of log lines that are indexed. Batching helps significantly with throughput, and is a common pattern with many similar systems. For example, batching amortizes file-sync operations over many events.
Batches are processed by the function Index, in the engine layer. The target index for each event is determined with reference to its event time, and the code builds sub-batches for each index. If the event time of each event is very close to the reception time of each event, most likely each event will be destined for the same index (the index for the most recent window of time), and there will be only 1 sub-batch. However, some events may be destined for indexes in the past, while a few may be written into the future. This can happen if log data was delayed by the sender, of if the sender’s clock is skewed.
Each sub-batch is indexed in parallel via a goroutine per sub-batch.
Next, recall that each index is only a logical grouping of bleve “shards”. Therefore each sub-batch indexing request is further broken down into parallel bleve indexing requests into these shards. The shard for each event is determined by log event document ID (see this function), which ensures that the indexing load for a batch of events is load-balanced across all shards for that index.
When all this processing completes, the indexing operation is considered finished.
Of course, the only reason to index log data is to search it!
In some ways once the data has been indexed successfully, most of the work is done. Searching starts with the reception of a query, via the simple network service. The service reads a query from the network and passes it to the search function on the engine. From there the engine sequentially passes the search query to each index, and streams the results back to the user through the search server.
bleve has a nice feature called aliasing, which allows multiple bleve shards to be searched in a single operation. So when each Ekanite index is searched, bleve is actually searching each shard in parallel.
Because the indexes are searched sequentially from earliest to latest, and results from each index are sorted by Ekanite, the user receives the results in time-ascending order.
Ekanite search is definitely simple. Since it employs bleve’s simple search query support, it doesn’t support time-ranged queries. Full support for time-ranged queries would mean that only certain indexes would be searched — only indexes that could have log data overlapping with the time-range — and could speed up search significantly. It also doesn’t support search on specific fields — for example find messages from a certain host — but the log data is indexed in such a way to support it in the future.
Let’s see it in action!
In the last post I’ll show an example of Ekanite in action, and discuss the many ways it could be improved.