During the seven-week Insight Data Engineering Fellows Program recent grads and experienced software engineers learn the latest open source technologies by building a data platform to handle large, real-time datasets. Ryan Walker (now a Data Engineer at Casetext) discusses his project of building a streaming search platform.
On average, Twitter users worldwide generate about 6,000 tweets per second. Obviously, there is much interest in extracting real-time signal from this rich but noisy stream of data. More generally, there are many open and interesting problems in using high-velocity streaming text sources to track real-time events. In this post, I describe the key components of a platform that will allow for near real-time search of a streaming text data source such as the Twitter firehose.
Such a platform can have many applications far beyond monitoring Twitter. For example, a network of speech to text monitors could transcribe radio and television feeds and pass the transcriptions to the platform. When key phrases or features are found in the feeds, the platform could be configured to trigger real-time event management. This application is potentially relevant to finance, marketing, and other domains that depend on real-time information processing.
All code for the platform I describe here can be found on my github repository Straw. The code base includes:
Scripts to automate AWS deployment using boto3
A local run mode enabling testing on a single machine using dockerized components
A demo of multiuser web interface where users register queries and receive streaming matches from a simulated twitter firehose
I completed this project as a Fellow in the Insight Data Engineering Program. The original inspiration for for this project came from two excellent blog posts on streaming search:
Streaming Search The key data structure for solving a traditional text search problem is an inverted index built from the collection of documents you want to be able to query. In its simplest form, an inverted index is just a map whose keys are the set of all unique terms in the documents. The value associated to a particular term in the map is a list of all the documents which use that term.
After the index has been built, users can submit queries to run against the index. For example, we can have a query that should return all the documents that contain both words in the phrase "llama pajamas". The query engine will split the input phrase into the tokens "llama" and "pajamas", then it will check the inverted index to get the list of all documents that contain the word "llamas" and the list of all documents that contain the word "pajamas". The engine will then return the intersection of these two lists, i.e. the list of the documents that are present in both lists.
In the streaming case, documents arrive at a very fast rate (e.g. average of 6000 per second in the case of Twitter) and with this kind of velocity and volume it is impractical to build the inverted document index in real-time. Moreover, the goal is not to create a static index of tweets--rather it is to scan the tweets as they arrive in real-time and determine if they match a registered query. Here's where we can play a clever trick. Instead of building our inverted index from the documents, we can instead build the index from the queries themselves.
As a simple example, suppose a user wants to see all the tweets that contain the word "llama" and "pajamas". To add this query to the inverted index we would:
Create an identifier for the query, say "q1".
If "llama" is in the inverted index add "q1" to the list of keys at "llama". Otherwise, initialize "llama" in the index with a list containing "q1".
If "pajamas" is in the inverted index add "q1" to the list of keys at "pajamas". Otherwise, initialize "pajamas" in the index with a list containing "q1".
As tweets arrive in the stream, the query engine will break the text into tokens and then query engine would return the intersection of all the list values whose key is a token in the inverted index.
Fortunately, there are already several existing tools which can be used to build an inverted query index:
Elasticsearch percolators is a standard feature of Elasticsearch that allows us to index queries and "percolate" documents
Architecture Now that we've got the basic tools for streaming search (Elasticsearch-Percolators or Lucene-Luwak), lets describe the architecture for the platform. The Straw platform is made up of the following components:
A streaming text source, such as the Twitter firehose, which emits a continuous stream of JSON documents
An Apache Kafka cluster, which handles ingestion from the text stream
An Apache Storm cluster, which distributes computation across multiple search engine workers
A Redis server, which provides a PUBSUB framework to collect and distribute matches to subscribed users
One or more clients, who submit queries and listen for matches on behalf of users
Streaming Sources The Twitter streaming API does not offer access to the firehose without special permission. To see how Straw would perform under firehose level load, we can instead use the sample endpoint to collect a large corpus of tweets. We can either store these tweets in a file or alternatively send them directly to the Kafka cluster's documents topic:
Alternatively, we can load tweets from a file into Kafka with a simple producer script:
To maintain a high load we can run multiple instances of this script and restart the script as soon as it finishes reading the file, using for example a supervisor.
Though the Straw project was designed for handling discrete JSON documents, by change of the internal parsers it could be very easy to use other formats like XML. A more interesting challenge is handling continuous stream data, such as audio transcriptions. In this case, several strategies could be tried. For example, we could detect sentence breaks and treat each detected break as a separate document in the stream.
Kafka The Kafka cluster has two topics: documents and queries. The producer script above can be used to populate the documents topic. The frontend client populates the query topic with user subscriptions. In production, I found a 5 node Kafka cluster could easily accommodate Twitter level volume. For the documents topic, I used a partition factor of 5 and a replication factor of 2. While high availability is very important to accommodate the volume of the stream, document loss may not be a big concern. For queries, I used only 2 partitions with a replication factor of 3. Queries are infrequent so availability may not be important but query loss is not acceptable. Note that the partition factor should be less than or equal to the number of KafkaSpouts in our Storm topology, since each spout will consume from exactly one partition.
One other important Kafka configuration is in kafka.server.properties:
# The minimum age of a log file to be eligible for deletion
The Kafka default is 168 hours--far too big since you can easily fill a modestly sized disk under load. As messages should ideally be consumed in real-time, I recommend using the minimum value which is 1 hour. Note, however, that you may still need to ensure that you have a sufficiently large volume for the Kafka log. In production, I gave each Kafka node a 64GB volume with a 1 hour retention.
Storm The Storm topology implements KafkaSpouts for the documents and queries topics. In production, I used 5 document spouts and 3 query spouts (consistent with the Kafka partitioning). The bolts in the topology search the document stream and publish any matches to Redis. In production, I allocated a total of 6 workers. Sizing the cluster correctly proved to be somewhat challenging. I highly recommend this post which explains the key concepts of Storm parallelism. Also the Storm built-in UI can be helpful for monitoring and understanding how the cluster is performing.
In the most basic scenario, we assume that the number of queries is small and can fit easily into memory on a single machine. Then scaling to the volume of the stream is quite easy. The idea is to give each bolt a complete copy of the in memory Lucene-Luwak index (remember that the queries are what's being indexed here). So each time a user registers a new query, we must broadcast it to all of the bolts in the topology to maintain the local query index. When a document arrives from the stream, we can then randomly assign it to any bolt since each bolt has a full copy of the query index. To handle failover, we can also keep a global copy of the all the queries, so that if a bolt dies we can replace it with a new one and populate its index from the global store. This Java snippet defines such a topology:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("query-spout", new KafkaSpout(query_spout_config), 3); builder.setSpout("document-spout", new KafkaSpout(document_spout_config), 5); builder.setBolt("search-bolt", new LuwakSearchBolt(), 5) .allGrouping("query-spout") .shuffleGrouping("document-spout");
Since this platform is intended to be multiuser and multitenant, we can easily imagine a situation where the number of queries can't practically fit in memory on a single bolt. In this case, we can add another layer of bolts to the Storm topology:
Here the complete query index is partitioned across a small cluster of bolts. Incoming queries are broadcast to the fan bolts. Each fan bolt will then randomly choose one Lucene worker to index that query. Documents from the stream can be shuffled among the fan bolts. Each fan bolt must then broadcast the document so that each Lucene bolt can check the document against its partition of the index.
If we use Percolators instead of Luwak then each bolt contains an Elasticsearch client. In this case, it is a good idea to collocate the Elasticsearch cluster with the search bolts and to use high replication so as to minimize network overhead. Note that Percolator queries are also stored in-memory, so we still face difficulties as the number of queries becomes large.
Redis Redis is most commonly used as an in-memory application cache, but it also has a simple and elegant publish-subscribe framework. Here's an example of pubsub using the Redis-cli:
In a terminal A, listeners subscribe to a topic:
127.0.0.1:6379> SUBSCRIBE "llama-topic"
In a separate terminal B, the publisher publishes to the topic:
127.0.0.1:6379> PUBLISH "llama-topic" "llamas love to wear pajamas"
Back in terminal A, the subscriber receives the message:
1) "message" 2) "llama-topic" 3) "llamas love to wear pajamas"
That's all there is to it. All standard Redis clients expose an API to interact with the PUBSUB framework.
When a user registers a query in the Straw platform, here's what happens:
The client passes the query to the Kafka queries topic
The client computes the MD5 hash of the query which will be the ID for the query
The client subscribes the user to the computed ID in Redis PUBSUB
The Storm cluster receives the query from the Kafka spout and broadcasts it to the Lucene bolts
Each bolt computes the MD5 hash of the query and registers the query with Luwak using the hash as the query ID
When a bolt receives a document, it uses Luwak to check if the document matches any query in the index. If Luwak finds a match, it will return one or matching IDs. For each ID returned by Luwak, the bolt will use Redis PUBSUB to publish the original document using the ID as the topic.
Subscribed clients will receive documents as they are published to Redis
Using the hash as the query ID allows two or more users to subscribe to the same query while only needing to actually index a single query.
Clients A client for Straw has the following duties:
Manage users. In particular, it must keep track of which users have subscribed to which queries.
Push user queries to Kafka and subscribe to queries in Redis
Listen for responses from queries
The Straw platform comes packaged with a default client which is a simple Python Flask webserver. The webserver is sessionized so that users can follow particular queries. The server implements a basic Kafka producer to publish queries to Kafka and Redis keeps track of the list of subscribed query IDs for each user. The listening is handled by a single background thread that holds a Redis client subscribed to all unique queries across the entire set of active users. When a query ID and document pair are found, the background thread queries Redis to find which users are subscribed to that query ID. It will then copy the document text to a result pool for each subscribed user. The user interface checks the user's pool for updates every half-second so that results stream into the console. Here is a video of UI in action:
Benchmarks and Conclusions One goal of the Straw project was to compare and measure performance of Elasticsearch-Percolators vs. Lucene-Luwak. Measuring this performance is not completely straightforward. I used the following very basic approach to measuring throughput:
Fill Kafka's documents topic with a very large number of documents
Add a fixed number of reasonably complex queries to the Kafka query topic
Start the Kafka cluster
Each worker Bolt has a counter and a stopwatch running in a background thread
Each time a document is passed to Lucene and response (empty or non-empty) is recieved, increment the counter
When the stopwatch reaches 10 seconds, publish the value of the counter to a special Redis topic e.g. "benchmark". Set the counter to 0 and restart the stopwatch
By monitoring the benchmark channel in Redis, we can then track the search throughput of the system. Pictured below are density plots for estimated total throughput per second obtained by running this procedure for several hours:
Some comments and conclusion about these preliminary estimates are in order:
In both cases, Lucene-Luwak strongly outperforms Elasticsearch-Percolators. However, the Elasticsearch cluster I used was not optimized for this experiment. I suspect that a portion of the differential would disappear if more effort was made to localize the Elasticsearch index to the search bolts.
As the number of queries increases we see significant reduction in the throughput. It would be very interesting to see if the fan bolt solution described above would improve this performance.
The variance of throughput is very high, particularly for Luwak
In the small query case, we are easily accommodating average twitter level volume; for large queries we are close and could likely scale horizontally to obtain a solution
The queries used here are available in the straw repository. I generated these by computing bigram frequency across a sample of 40M English language tweets and keeping the most frequent bigrams. It would be interesting to evaluate performance with more complex queries
The documents here are tweets which are limited to 140 characters. It would be interesting to evaluate performance with longer text sources
Interested in transitioning to a career in data engineering?
Find out more about the Insight Data Engineering Fellows Program in New York and Silicon Valley, apply today, or sign up for program updates.
Already a data scientist or engineer?
Find out more about our Advanced Workshops for Data Professionals. Register for two-day workshops in Apache Spark and Data Visualization, or sign up for workshop updates.