Scylla Summit 2018 write-up

It’s been almost one month since I had the chance to attend and speak at Scylla Summit 2018 so I’m relieved to finally publish a short write-up on the key things I wanted to share about this wonderful event!

Make Scylla boring

This statement of Glauber Costa sums up what looked to me to be the main driver of the engineering efforts put into Scylla lately: making it work so consistently well on any kind of workload that it’s boring to operate 🙂

I will follow up on this statement to highlight the things I heard and (hopefully) understood during the summit. I hope you’ll find it insightful.

Reduced operational efforts

The thread-per-core and queues design still has a lot of possibilities to be leveraged.

The recent addition of RPC streaming capabilities to seastar allows a drastic reduction in the time it takes the cluster to grow or shrink (data rebalancing / resynchronization).

Incremental compaction is also very promising as this background process is one of the most expensive there is in the database’s design.

I was happy to hear that scylla-manager will soon be made available and free to use with basic features while retaining more advanced ones for enterprise version (like backup/restore).
I also noticed that the current version was not supporting SSL enabled clusters to store its configuration. So I directly asked MichaÅ‚ for it and I’m glad that it will be released on version 1.3.1.

Performant multi-tenancy

Why choose between real-time OLTP & analytics OLAP workloads?

The goal here is to be able to run both on the same cluster by giving users the ability to assign “SLA” shares to ROLES. That’s basically like pools on Hadoop at a much finer grain since it will create dedicated queues that will be weighted by their share.

Having one queue per usage and full accounting will allow to limit resources efficiently and users to have their say on their latency SLAs.

But Scylla also has a lot to do in the background to run smoothly. So while this design pattern was already applied to tamper compactions, a lot of work has also been done on automatic flow control and back pressure.

For instance, Materialized Views are updated asynchronously which means that while we can interact and put a lot of pressure on the table its based on (called the Main Table), we could overwhelm the background work that’s needed to keep MVs View Tables in sync. To mitigate this, a smart back pressure approach was developed and will throttle the clients to make sure that Scylla can manage to do everything at the best performance the hardware allows!

I was happy to hear that work on tiered storage is also planned to better optimize disk space costs for certain workloads.

Last but not least, columnar storage optimized for time series and analytics workloads are also something the developers are looking at.

Latency is expensive

If you care for latency, you might be happy to hear that a new polling API (named IOCB_CMD_POLL) has been contributed by Christoph Hellwig and Avi Kivity to the 4.19 Linux kernel which avoids context switching I/O by using a shared ring between kernel and userspace. Scylla will be using it by default if the kernel supports it.

The iotune utility has been upgraded since 2.3 to generate an enhanced I/O configuration.

Also, persistent (disk backed) in-memory tables are getting ready and are very promising for latency sensitive workloads!

A word on drivers

ScyllaDB has been relying on the Datastax drivers since the start. While it’s a good thing for the whole community, it’s important to note that the shard-per-CPU approach on data that Scylla is using is not known and leveraged by the current drivers.

Discussions took place and it seems that Datastax will not allow the protocol to evolve so that drivers could discover if the connected cluster is shard aware or not and then use this information to be more clever in which write/read path to use.

So for now ScyllaDB has been forking and developing their shard aware drivers for Java and Go (no Python yet… I was disappointed).

Kubernetes & containers

The ScyllaDB guys of course couldn’t avoid the Kubernetes frenzy so Moreno Garcia gave a lot of feedback and tips on how to operate Scylla on docker with minimal performance degradation.

Kubernetes has been designed for stateless applications, not stateful ones and Docker does some automatic magic that have rather big performance hits on Scylla. You will basically have to play with affinities to dedicate one Scylla instance to run on one server with a “retain” reclaim policy.

Remember that the official Scylla docker image runs with dev-mode enabled by default which turns off all performance checks on start. So start by disabling that and look at all the tips and literature that Moreno has put online!

Scylla 3.0

A lot has been written on it already so I will just be short on things that important to understand in my point of view.

  • Materialized Views do back fill the whole data set
    • this job is done by the view building process
    • you can watch its progress in the system_distributed.view_build_status table
  • Secondary Indexes are Materialized Views under the hood
    • it’s like a reverse pointer to the primary key of the Main Table
    • so if you read the whole row by selecting on the indexed column, two reads will be issued under the hood: one on the indexed MV view table to get the primary key and one on the main table to get the rest of the columns
    • so if your workload is mostly interested by the whole row, you’re better off creating a complete MV to read from than using a SI
    • this is even more true if you plan to do range scans as this double query could lead you to read from multiple nodes instead of one
  • Range scan is way more performant
    • ALLOW FILTERING finally allows a great flexibility by providing server-side filtering!

Random notes

Support for LWT (lightweight transactions) will be relying on a future implementation of the Raft consensus algorithm inside Scylla. This work will also benefits Materialized Views consistency. Duarte Nunes will be the one working on this and I envy him very much!

Support for search workloads is high in the ScyllaDB devs priorities so we should definitely hear about it in the coming months.

Support for “mc” sstables (new generation format) is done and will reduce storage requirements thanks to metadata / data compression. Migration will be transparent because Scylla can read previous formats as well so it will upgrade your sstables as it compacts them.

ScyllaDB developers have not settled on how to best implement CDC. I hope they do rather soon because it is crucial in their ability to integrate well with Kafka!

Materialized Views, Secondary Indexes and filtering will benefit from the work on partition key and indexes intersections to avoid server side filtering on the coordinator. That’s an important optimization to come!

Last but not least, I’ve had the pleasure to discuss with Takuya Asada who is the packager of Scylla for RedHat/CentOS & Debian/Ubuntu. We discussed Gentoo Linux packaging requirements as well as the recent and promising work on a relocatable package. We will collaborate more closely in the future!

py3status v3.14

I’m happy to announce this release as it contains some very interesting developments in the project. This release was focused on core changes.

IMPORTANT notice

There are now two optional dependencies to py3status:

  • gevent
    • will monkey patch the code to make it concurrent
    • the main benefit is to use an asynchronous loop instead of threads
  • pyudev
    • will enable a udev monitor if a module asks for it (only xrandr so far)
    • the benefit is described below

To install them all using pip, simply do:

pip install py3status[all]

Modules can now react/refresh on udev events

When pyudev is available, py3status will allow modules to subscribe and react to udev events!

The xrandr module uses this feature by default which allows the module to instantly refresh when you plug in or off a secondary monitor. This also allows to stop running the xrandr command in the background and saves a lot of CPU!

Highlights

  • py3status core uses black formatter
  • fix default i3status.conf detection
    • add ~/.config/i3 as a default config directory, closes #1548
    • add .config/i3/py3status in default user modules include directories
  • add markup (pango) support for modules (#1408), by @MikaYuoadas
  • py3: notify_user module name in the title (#1556), by @lasers
  • print module information to sdtout instead of stderr (#1565), by @robertnf
  • battery_level module: default to using sys instead of acpi (#1562), by @eddie-dunn
  • imap module: fix output formatting issue (#1559), by @girst

Thank you contributors!

  • eddie-dunn
  • girst
  • MikaYuoadas
  • robertnf
  • lasers
  • maximbaz
  • tobes

py3status v3.13

I am once again lagging behind the release blog posts but this one is an important one.

I’m proud to announce that our long time contributor @lasers has become an official collaborator of the py3status project!

Dear @lasers, your amazing energy and overwhelming ideas have served our little community for a while. I’m sure we’ll have a great way forward as we learn to work together with @tobes 🙂 Thank you again very much for everything you do!

This release is as much dedicated to you as it is yours 🙂

IMPORTANT notice

After this release, py3status coding style CI will enforce the ‘black‘ formatter style.

Highlights

Needless to say that the changelog is huge, as usual, here is a very condensed view:

  • documentation updates, especially on the formatter (thanks @L0ric0)
  • py3 storage: use $XDG_CACHE_HOME or ~/.cache
  • formatter: multiple variable and feature fixes and enhancements
  • better config parser
  • new modules: lm_sensors, loadavg, mail, nvidia_smi, sql, timewarrior, wanda_the_fish

Thank you contributors!

  • lasers
  • tobes
  • maximbaz
  • cyrinux
  • Lorenz Steinert @L0ric0
  • wojtex
  • horgix
  • su8
  • Maikel Punie

Authenticating and connecting to a SSL enabled Scylla cluster using Spark 2

This quick article is a wrap up for reference on how to connect to ScyllaDB using Spark 2 when authentication and SSL are enforced for the clients on the Scylla cluster.

We encountered multiple problems, even more since we distribute our workload using a YARN cluster so that our worker nodes should have everything they need to connect properly to Scylla.

We found very little help online so I hope it will serve anyone facing similar issues (that’s also why I copy/pasted them here).

The authentication part is easy going by itself and was not the source of our problems, SSL on the client side was.

Environment

  • (py)spark: 2.1.0.cloudera2
  • spark-cassandra-connector: datastax:spark-cassandra-connector: 2.0.1-s_2.11
  • python: 3.5.5
  • java: 1.8.0_144
  • scylladb: 2.1.5

SSL cipher setup

The Datastax spark cassandra driver uses default the TLS_RSA_WITH_AES_256_CBC_SHA cipher that the JVM does not support by default. This raises the following error when connecting to Scylla:

18/07/18 13:13:41 WARN channel.ChannelInitializer: Failed to initialize a channel. Closing: [id: 0x8d6f78a7]
java.lang.IllegalArgumentException: Cannot support TLS_RSA_WITH_AES_256_CBC_SHA with currently installed providers

According to the ssl documentation we have two ciphers available:

  1. TLS_RSA_WITH_AES_256_CBC_SHA
  2. TLS_RSA_WITH_AES_128_CBC_SHA

We can get get rid of the error by lowering the cipher to TLS_RSA_WITH_AES_128_CBC_SHA using the following configuration:

.config("spark.cassandra.connection.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA")\

However, this is not really a good solution and instead we’d be inclined to use the TLS_RSA_WITH_AES_256_CBC_SHA version. For this we need to follow this Datastax’s procedure.

Then we need to deploy the JCE security jars on our all client nodes, if using YARN like us this means that you have to deploy these jars to all your NodeManager nodes.

For example by hand:

# unzip jce_policy-8.zip
# cp UnlimitedJCEPolicyJDK8/*.jar /opt/oracle-jdk-bin-1.8.0.144/jre/lib/security/

Java trust store

When connecting, the clients need to be able to validate the Scylla cluster’s self-signed CA. This is done by setting up a trustStore JKS file and providing it to the spark connector configuration (note that you protect this file with a password).

keyStore vs trustStore

In SSL handshake purpose of trustStore is to verify credentials and purpose of keyStore is to provide credentials. keyStore in Java stores private key and certificates corresponding to the public keys and is required if you are a SSL Server or SSL requires client authentication. TrustStore stores certificates from third parties or your own self-signed certificates, your application identify and validates them using this trustStore.

The spark-cassandra-connector documentation has two options to handle keyStore and trustStore.

When we did not use the trustStore option, we would get some obscure error when connecting to Scylla:

com.datastax.driver.core.exceptions.TransportException: [node/1.1.1.1:9042] Channel has been closed

When enabling DEBUG logging, we get a clearer error which indicated a failure in validating the SSL certificate provided by the Scylla server node:

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

setting up the trustStore JKS

You need to have the self-signed CA public certificate file, then issue the following command:

# keytool -importcert -file /usr/local/share/ca-certificates/MY_SELF_SIGNED_CA.crt -keystore COMPANY_TRUSTSTORE.jks -noprompt
Enter keystore password:  
Re-enter new password: 
Certificate was added to keystore

using the trustStore

Now you need to configure spark to use the trustStore like this:

.config("spark.cassandra.connection.ssl.trustStore.password", "PASSWORD")\
.config("spark.cassandra.connection.ssl.trustStore.path", "COMPANY_TRUSTSTORE.jks")\

Spark SSL configuration example

This wraps up the SSL connection configuration used for spark.

This example uses pyspark2 and reads a table in Scylla from a YARN cluster:

$ pyspark2 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --files COMPANY_TRUSTSTORE.jks

>>> spark = SparkSession.builder.appName("scylla_app")\
.config("spark.cassandra.auth.password", "test")\
.config("spark.cassandra.auth.username", "test")\
.config("spark.cassandra.connection.host", "node1,node2,node3")\
.config("spark.cassandra.connection.ssl.clientAuth.enabled", True)\
.config("spark.cassandra.connection.ssl.enabled", True)\
.config("spark.cassandra.connection.ssl.trustStore.password", "PASSWORD")\
.config("spark.cassandra.connection.ssl.trustStore.path", "COMPANY_TRUSTSTORE.jks")\
.config("spark.cassandra.input.split.size_in_mb", 1)\
.config("spark.yarn.queue", "scylla_queue").getOrCreate()

>>> df = spark.read.format("org.apache.spark.sql.cassandra").options(table="my_table", keyspace="test").load()
>>> df.show()

A botspot story

I felt like sharing a recent story that allowed us identify a bot in a haystack thanks to Scylla.

 

The scenario

While working on loading up 2B+ of rows into Scylla from Hive (using Spark), we noticed a strange behaviour in the performances of one of our nodes:

 

So we started wondering why that server in blue was having those peaks of load and was clearly diverging from the two others… As we obviously expect the three nodes to behave the same, there were two options on the table:

  1. hardware problem on the node
  2. bad data distribution (bad schema design? consistent hash problem?)

We shared this with our pals from ScyllaDB and started working on finding out what was going on.

The investigation

Hardware?

Hardware problem was pretty quickly evicted, nothing showed up on the monitoring and on the kernel logs. I/O queues and throughput were good:

Data distribution?

Avi Kivity (ScyllaDB’s CTO) quickly got the feeling that something was wrong with the data distribution and that we could be facing a hotspot situation. He quickly nailed it down to shard 44 thanks to the scylla-grafana-monitoring platform.

Data is distributed between shards that are stored on nodes (consistent hash ring). This distribution is done by hashing the primary key of your data which dictates the shard it belongs to (and thus the node(s) where the shard is stored).

If one of your keys is over represented in your original data set, then the shard it belongs to can be overly populated and the related node overloaded. This is called a hotspot situation.

tracing queries

The first step was to trace queries in Scylla to try to get deeper into the hotspot analysis. So we enabled tracing using the following formula to get about 1 trace per second in the system_traces namespace.

tracing probability = 1 / expected requests per second throughput

In our case, we were doing between 90K req/s and 150K req/s so we settled for 100K req/s to be safe and enabled tracing on our nodes like this:

# nodetool settraceprobability 0.00001

Turns out tracing didn’t help very much in our case because the traces do not include the query parameters in Scylla 2.1, it is becoming available in the soon to be released 2.2 version.

NOTE: traces expire on the tables, make sure your TRUNCATE the events and sessions tables while iterating. Else you will have to wait for the next gc_grace_period (10 days by default) before they are actually removed. If you do not do that and generate millions of traces like we did, querying the mentioned tables will likely time out because of the “tombstoned” rows even if there is no trace inside any more.

looking at cfhistograms

Glauber Costa was also helping on the case and got us looking at the cfhistograms of the tables we were pushing data to. That proved to be clearly highlighting a hotspot problem:

histograms
Percentile  SSTables     Write Latency      Read Latency    Partition Size        Cell Count
                             (micros)          (micros)           (bytes)                  
50%             0,00              6,00              0,00               258                 2
75%             0,00              6,00              0,00               535                 5
95%             0,00              8,00              0,00              1916                24
98%             0,00             11,72              0,00              3311                50
99%             0,00             28,46              0,00              5722                72
Min             0,00              2,00              0,00               104                 0
Max             0,00          45359,00              0,00          14530764            182785

What this basically means is that 99% percentile of our partitions are small (5KB) while the biggest is 14MB! That’s a huge difference and clearly shows that we have a hotspot on a partition somewhere.

So now we know for sure that we have an over represented key in our data set, but what key is it and why?

The culprit

So we looked at the cardinality of our data set keys which are SHA256 hashes and found out that indeed we had one with more than 1M occurrences while the second highest one was around 100K!…

Now that we had the main culprit hash, we turned to our data streaming pipeline to figure out what kind of event was generating the data associated to the given SHA256 hash… and surprise! It was a client’s quality assurance bot that was constantly browsing their own website with legitimate behaviour and identity credentials associated to it.

So we modified our pipeline to detect this bot and discard its events so that it stops polluting our databases with fake data. Then we cleaned up the million of events worth of mess and traces we stored about the bot.

The aftermath

Finally, we cleared out the data in Scylla and tried again from scratch. Needless to say that the curves got way better and are exactly what we should expect from a well balanced cluster:

Thanks a lot to the ScyllaDB team for their thorough help and high spirited support!

I’ll quote them conclude this quick blog post:

py3status v3.8

Another long awaited release has come true thanks to our community!

The changelog is so huge that I had to open an issue and cry for help to make it happen… thanks again @lasers for stepping up once again 🙂

Highlights

  • gevent support (-g option) to switch from threads scheduling to greenlets and reduce resources consumption
  • environment variables support in i3status.conf to remove sensible information from your config
  • modules can now leverage a persistent data store
  • hundreds of improvements for various modules
  • we now have an official debian package
  • we reached 500 stars on github #vanity

Milestone 3.9

  • try to release a version faster than every 4 months (j/k) 😉

The next release will focus on bugs and modules improvements / standardization.

Thanks contributors!

This release is their work, thanks a lot guys!

  • alex o’neill
  • anubiann00b
  • cypher1
  • daniel foerster
  • daniel schaefer
  • girst
  • igor grebenkov
  • james curtis
  • lasers
  • maxim baz
  • nollain
  • raspbeguy
  • regnat
  • robert ricci
  • sébastien delafond
  • themistokle benetatos
  • tobes
  • woland

Evaluating ScyllaDB for production 2/2

In my previous blog post, I shared 7 lessons on our experience in evaluating Scylla for production.

Those lessons were focused on the setup and execution of the POC and I promised a more technical blog post with technical details and lessons learned from the POC, here it is!

Before you read on, be mindful that our POC was set up to test workloads and workflows, not to benchmark technologies. So even if the Scylla figures are great, they have not been the main drivers of the actual conclusion of the POC.

Business context

As a data driven company working in the Marketing and Advertising industry, we help our clients make sense of multiple sources of data to build and improve their relationship with their customers and prospects.

Dealing with multiple sources of data is nothing new but their volume has dramatically changed during the past decade. I will spare you with the Big-Data-means-nothing term and the technical challenges that comes with it as you already heard enough of it.

Still, it is clear that our line of business is tied to our capacity at mixing and correlating a massive amount of different types of events (data sources/types) coming from various sources which all have their own identifiers (think primary keys):

  • Web navigation tracking: identifier is a cookie that’s tied to the tracking domain (we have our own)
  • CRM databases: usually the email address or an internal account ID serve as an identifier
  • Partners’ digital platform: identifier is also a cookie tied to their tracking domain

To try to make things simple, let’s take a concrete example:

You work for UNICEF and want to optimize their banner ads budget by targeting the donors of their last fundraising campaign.

  • Your reference user database is composed of the donors who registered with their email address on the last campaign: main identifier is the email address.
  • To buy web display ads, you use an Ad Exchange partner such as AppNexus or DoubleClick (Google). From their point of view, users are seen as cookie IDs which are tied to their own domain.

So you basically need to be able to translate an email address to a cookie ID for every partner you work with.

Use case: ID matching tables

We operate and maintain huge ID matching tables for every partner and a great deal of our time is spent translating those IDs from one to another. In SQL terms, we are basically doing JOINs between a dataset and those ID matching tables.

  • You select your reference population
  • You JOIN it with the corresponding ID matching table
  • You get a matched population that your partner can recognize and interact with

Those ID matching tables have a pretty high read AND write throughput because they’re updated and queried all the time.

Usual figures are JOINs between a 10+ Million dataset and 1.5+ Billion ID matching tables.

The reference query basically looks like this:

SELECT count(m.partnerid)
FROM population_10M_rows AS p JOIN partner_id_match_400M_rows AS m
ON p.id = m.id

 Current implementations

We operate a lambda architecture where we handle real time ID matching using MongoDB and batch ones using Hive (Apache Hadoop).

The first downside to note is that it requires us to maintain two copies of every ID matching table. We also couldn’t choose one over the other because neither MongoDB nor Hive can sustain both the read/write lookup/update ratio while performing within the low latencies that we need.

This is an operational burden and requires quite a bunch of engineering to ensure data consistency between different data stores.

Production hardware overview:

  • MongoDB is running on a 15 nodes (5 shards) cluster
    • 64GB RAM, 2 sockets, RAID10 SAS spinning disks, 10Gbps dual NIC
  • Hive is running on 50+ YARN NodeManager instances
    • 128GB RAM, 2 sockets, JBOD SAS spinning disks, 10Gbps dual NIC

Target implementation

The key question is simple: is there a technology out there that can sustain our ID matching tables workloads while maintaining consistently low upsert/write and lookup/read latencies?

Having one technology to handle both use cases would allow:

  • Simpler data consistency
  • Operational simplicity and efficiency
  • Reduced costs

POC hardware overview:

So we decided to find out if Scylla could be that technology. For this, we used three decommissioned machines that we had in the basement of our Paris office.

  • 2 DELL R510
    • 19GB RAM, 2 socket 8 cores, RAID0 SAS spinning disks, 1Gbps NIC
  • 1 DELL R710
    • 19GB RAM, 2 socket 4 cores, RAID0 SAS spinning disks, 1Gbps NIC

I know, these are not glamorous machines and they are even inconsistent in specs, but we still set up a 3 node Scylla cluster running Gentoo Linux with them.

Our take? If those three lousy machines can challenge or beat the production machines on our current workloads, then Scylla can seriously be considered for production.

Step 1: Validate a schema model

Once the POC document was complete and the ScyllaDB team understood what we were trying to do, we started iterating on the schema model using a query based modeling strategy.

So we wrote down and rated the questions that our model(s) should answer to, they included stuff like:

  • What are all our cookie IDs associated to the given partner ID ?
  • What are all the cookie IDs associated to the given partner ID over the last N months ?
  • What is the last cookie ID/date for the given partner ID ?
  • What is the last date we have seen the given cookie ID / partner ID couple ?

As you can imagine, the reverse questions are also to be answered so ID translations can be done both ways (ouch!).

Prototyping

This is no news that I’m a Python addict so I did all my prototyping using Python and the cassandra-driver.

I ended up using a test-driven data modelling strategy using pytest. I wrote tests on my dataset so I could concentrate on the model while making sure that all my questions were being answered correctly and consistently.

Schema

In our case, we ended up with three denormalized tables to answer all the questions we had. To answer the first three questions above, you could use the schema below:

CREATE TABLE IF NOT EXISTS ids_by_partnerid(
 partnerid text,
 id text,
 date timestamp,
 PRIMARY KEY ((partnerid), date, id)
 )
 WITH CLUSTERING ORDER BY (date DESC)

Note on clustering key ordering

One important learning I got in the process of validating the model is about the internals of Cassandra’s file format that resulted in the choice of using a descending order DESC on the date clustering key as you can see above.

If your main use case of querying is to look for the latest value of an history-like table design like ours, then make sure to change the default ASC order of your clustering key to DESC. This will ensure that the latest values (rows) are stored at the beginning of the sstable file effectively reducing the read latency when the row is not in cache!

Let me quote Glauber Costa’s detailed explanation on this:

Basically in Cassandra’s file format, the index points to an entire partition (for very large partitions there is a hack to avoid that, but the logic is mostly the same). So if you want to read the first row, that’s easy you get the index to the partition and read the first row. If you want to read the last row, then you get the index to the partition and do a linear scan to the next.

This is the kind of learning you can only get from experts like Glauber and that can justify the whole POC on its own!

Step 2: Set up scylla-grafana-monitoring

As I said before, make sure to set up and run the scylla-grafana-monitoring project before running your test workloads. This easy to run solution will be of great help to understand the performance of your cluster and to tune your workload for optimal performances.

If you can, also discuss with the ScyllaDB team to allow them to access the Grafana dashboard. This will be very valuable since they know where to look better than we usually do… I gained a lot of understandings thanks to this!

Note on scrape interval

I advise you to lower the Prometheus scrape interval to have a shorter and finer sampling of your metrics. This will allow your dashboard to be more reactive when you start your test workloads.

For this, change the prometheus/prometheus.yml file like this:

scrape_interval: 2s # Scrape targets every 2 seconds (5s default)
scrape_timeout: 1s # Timeout before trying to scrape a target again (4s default)

Test your monitoring

Before going any further, I strongly advise you to run a stress test on your POC cluster using the cassandra-stress tool and share the results and their monitoring graphs with the ScyllaDB team.

This will give you a common understanding of the general performances of your cluster as well as help in outlining any obvious misconfiguration or hardware problem.

Key graphs to look at

There are a lot of interesting graphs so I’d like to share the ones that I have been mainly looking at. Remember that depending on your test workloads, some other graphs may be more relevant for you.

  • number of open connections

You’ll want to see a steady and high enough number of open connections which will prove that your clients are pushed at their maximum (at the time of testing this graph was not on Grafana and you had to add it yourself)

  • cache hits / misses

Depending on your reference dataset, you’ll obviously see that cache hits and misses will have a direct correlation with disk I/O and overall performances. Running your test workloads multiple times should trigger higher cache hits if your RAM is big enough.

  • per shard/node distribution

The Requests Served per shard graph should display a nicely distributed load between your shards and nodes so that you’re sure that you’re getting the best out of your cluster.

The same is true for almost every other “per shard/node” graph: you’re looking for evenly distributed load.

  • sstable reads

Directly linked with your disk performances, you’ll be trying to make sure that you have almost no queued sstable reads.

Step 3: Get your reference data and metrics

We obviously need to have some reference metrics on our current production stack so we can compare them with the results on our POC Scylla cluster.

Whether you choose to use your current production machines or set up a similar stack on the side to run your test workloads is up to you. We chose to run the vast majority of our tests on our current production machines to be as close to our real workloads as possible.

Prepare a reference dataset

During your work on the POC document, you should have detailed the usual data cardinality and volume you work with. Use this information to set up a reference dataset that you can use on all of the platforms that you plan to compare.

In our case, we chose a 10 Million reference dataset that we JOINed with a 400+ Million extract of an ID matching table. Those volumes seemed easy enough to work with and allowed some nice ratio for memory bound workloads.

Measure on your current stack

Then it’s time to load this reference datasets on your current platforms.

  • If you run a MongoDB cluster like we do, make sure to shard and index the dataset just like you do on the production collections.
  • On Hive, make sure to respect the storage file format of your current implementations as well as their partitioning.

If you chose to run your test workloads on your production machines, make sure to run them multiple times and at different hours of the day and night so you can correlate the measures with the load on the cluster at the time of the tests.

Reference metrics

For the sake of simplicity I’ll focus on the Hive-only batch workloads. I performed a count on the JOIN of the dataset and the ID matching table using Spark 2 and then I also ran the JOIN using a simple Hive query through Beeline.

I gave the following definitions on the reference load:

  • IDLE: YARN available containers and free resources are optimal, parallelism is very limited
  • NORMAL: YARN sustains some casual load, parallelism exists but we are not bound by anything still
  • HIGH: YARN has pending containers, parallelism is high and applications have to wait for containers before executing

There’s always an error margin on the results you get and I found that there was not significant enough differences between the results using Spark 2 and Beeline so I stuck with a simple set of results:

  • IDLE: 2 minutes, 15 seconds
  • NORMAL: 4 minutes
  • HIGH: 15 minutes

Step 4: Get Scylla in the mix

It’s finally time to do your best to break Scylla or at least to push it to its limits on your hardware… But most importantly, you’ll be looking to understand what those limits are depending on your test workloads as well as outlining out all the required tuning that you will be required to do on the client side to reach those limits.

Speaking about the results, we will have to differentiate two cases:

  1. The Scylla cluster is fresh and its cache is empty (cold start): performance is mostly Disk I/O bound
  2. The Scylla cluster has been running some test workload already and its cache is hot: performance is mostly Memory bound with some Disk I/O depending on the size of your RAM

Spark 2 / Scala test workload

Here I used Scala (yes, I did) and DataStax’s spark-cassandra-connector so I could use the magic joinWithCassandraTable function.

  • spark-cassandra-connector-2.0.1-s_2.11.jar
  • Java 7

I had to stick with the 2.0.1 version of the spark-cassandra-connector because newer version (2.0.5 at the time of testing) were performing bad with no apparent reason. The ScyllaDB team couldn’t help on this.

You can interact with your test workload using the spark2-shell:

spark2-shell --jars jars/commons-beanutils_commons-beanutils-1.9.3.jar,jars/com.twitter_jsr166e-1.1.0.jar,jars/io.netty_netty-all-4.0.33.Final.jar,jars/org.joda_joda-convert-1.2.jar,jars/commons-collections_commons-collections-3.2.2.jar,jars/joda-time_joda-time-2.3.jar,jars/org.scala-lang_scala-reflect-2.11.8.jar,jars/spark-cassandra-connector-2.0.1-s_2.11.jar

Then use the following Scala imports:

// main connector import
import com.datastax.spark.connector._

// the joinWithCassandraTable failed without this (dunno why, I'm no Scala guy)
import com.datastax.spark.connector.writer._
implicit val rowWriter = SqlRowWriter.Factory

Finally I could run my test workload to select the data from Hive and JOIN it with Scylla easily:

val df_population = spark.sql("SELECT id FROM population_10M_rows")
val join_rdd = df_population.rdd.repartitionByCassandraReplica("test_keyspace", "partner_id_match_400M_rows").joinWithCassandraTable("test_keyspace", "partner_id_match_400M_rows")
val joined_count = join_rdd.count()

Notes on tuning spark-cassandra-connector

I experienced pretty crappy performances at first. Thanks to the easy Grafana monitoring, I could see that Scylla was not being the bottleneck at all and that I instead had trouble getting some real load on it.

So I engaged in a thorough tuning of the spark-cassandra-connector with the help of Glauber… and it was pretty painful but we finally made it and got the best parameters to get the load on the Scylla cluster close to 100% when running the test workloads.

This tuning was done in the spark-defaults.conf file:

  • have a fixed set of executors and boost their overhead memory

This will increase test results reliability by making sure you always have a reserved number of available workers at your disposal.

spark.dynamicAllocation.enabled=false
spark.executor.instances=30
spark.yarn.executor.memoryOverhead=1024
  • set the split size to 1MB

Default is 8MB but Scylla uses a split size of 1MB so you’ll see a great boost of performance and stability by setting this setting to the right number.

spark.cassandra.input.split.size_in_mb=1
  • align driver timeouts with server timeouts

It is advised to make sure that your read request timeouts are the same on the driver and the server so you do not get stalled states waiting for a timeout to happen on one hand. You can do the same with write timeouts if your test workloads are write intensive.

/etc/scylla/scylla.yaml

read_request_timeout_in_ms: 150000

spark-defaults.conf

spark.cassandra.connection.timeout_ms=150000
spark.cassandra.read.timeout_ms=150000

// optional if you want to fail / retry faster for HA scenarios
spark.cassandra.connection.reconnection_delay_ms.max=5000
spark.cassandra.connection.reconnection_delay_ms.min=1000
spark.cassandra.query.retry.count=100
  • adjust your reads per second rate

Last but surely not least, this setting you will need to try and find out the best value for yourself since it has a direct impact on the load on your Scylla cluster. You will be looking at pushing your POC cluster to almost 100% load.

spark.cassandra.input.reads_per_sec=6666

As I said before, I could only get this to work perfectly using the 2.0.1 version of the spark-cassandra-connector driver. But then it worked very well and with great speed.

Spark 2 results

Once tuned, the best results I was able to reach on this hardware are listed below. It’s interesting to see that with spinning disks, the cold start result can compete with the results of a heavily loaded Hadoop cluster where pending containers and parallelism are knocking down its performances.

  • hot cache: 2min
  • cold cache: 12min

Wow! Those three refurbished machines can compete with our current production machines and implementations, they can even match an idle Hive cluster of a medium size!

Python test workload

I couldn’t conclude on a Scala/Spark 2 only test workload. So I obviously went back to my language of choice Python only to discover at my disappointment that there is no joinWithCassandraTable equivalent available on pyspark

I tried with some projects claiming otherwise with no success until I changed my mind and decided that I probably didn’t need Spark 2 at all. So I went into the crazy quest of beating Spark 2 performances using a pure Python implementation.

This basically means that instead of having a JOIN like helper, I had to do a massive amount of single “id -> partnerid” lookups. Simple but greatly inefficient you say? Really?

When I broke down the pieces, I was left with the following steps to implement and optimize:

  • Load the 10M rows worth of population data from Hive
  • For every row, lookup the corresponding partnerid in the ID matching table from Scylla
  • Count the resulting number of matches

The main problem to compete with Spark 2 is that it is a distributed framework and Python by itself is not. So you can’t possibly imagine outperforming Spark 2 with your single machine.

However, let’s remember that Spark 2 is shipped and ran on executors using YARN so we are firing up JVMs and dispatching containers all the time. This is a quite expensive process that we have a chance to avoid using Python!

So what I needed was a distributed computation framework that would allow to load data in a partitioned way and run the lookups on all the partitions in parallel before merging the results. In Python, this framework exists and is named Dask!

You will obviously need to have to deploy a dask topology (that’s easy and well documented) to have a comparable number of dask workers than of Spark 2 executors (30 in my case) .

The corresponding Python code samples are here.

Hive + Scylla results

Reading the population id’s from Hive, the workload can be split and executed concurrently on multiple dask workers.

  • read the 10M population rows from Hive in a partitioned manner
  • for each partition (slice of 10M), query Scylla to lookup the possibly matching partnerid
  • create a dataframe from the resulting matches
  • gather back all the dataframes and merge them
  • count the number of matches

The results showed that it is possible to compete with Spark 2 with Dask:

  • hot cache: 2min (rounded up)
  • cold cache: 6min

Interestingly, those almost two minutes can be broken down like this:

  • distributed read data from Hive: 50s
  • distributed lookup from Scylla: 60s
  • merge + count: 10s

This meant that if I could cut down the reading of data from Hive I could go even faster!

Parquet + Scylla results

Going further on my previous remark I decided to get rid of Hive and put the 10M rows population data in a parquet file instead. I ended up trying to find out the most efficient way to read and load a parquet file from HDFS.

My conclusion so far is that you can’t be the amazing libhdfs3 + pyarrow combo. It is faster to load everything on a single machine than loading from Hive on multiple ones!

The results showed that I could almost get rid of a whole minute in the total process, effectively and easily beating Spark 2!

  • hot cache: 1min 5s
  • cold cache: 5min

Notes on the Python cassandra-driver

Tests using Python showed robust queries experiencing far less failures than the spark-cassandra-connector, even more during the cold start scenario.

  • The usage of execute_concurrent() provides a clean and linear interface to submit a large number of queries while providing some level of concurrency control
  • Increasing the concurrency parameter from 100 to 512 provided additional throughput, but increasing it more looked useless
  • Protocol version 4 forbids the tuning of connection requests / number to some sort of auto configuration. All tentative to hand tune it (by lowering protocol version to 2) failed to achieve higher throughput
  • Installation of libev on the system allows the cassandra-driver to use it to handle concurrency instead of asyncore with a somewhat lower load footprint on the worker node but no noticeable change on the throughput
  • When reading a parquet file stored on HDFS, the hdfs3 + pyarrow combo provides an insane speed (less than 10s to fully load 10M rows of a single column)

Step 5: Play with High Availability

I was quite disappointed and surprised by the lack of maturity of the Cassandra community on this critical topic. Maybe the main reason is that the cassandra-driver allows for too many levels of configuration and strategies.

I wrote this simple bash script to allow me to simulate node failures. Then I could play with handling those failures and retries on the Python client code.

#!/bin/bash

iptables -t filter -X
iptables -t filter -F

ip="0.0.0.0/0"
for port in 9042 9160 9180 10000 7000; do
	iptables -t filter -A INPUT -p tcp --dport ${port} -s ${ip} -j DROP
	iptables -t filter -A OUTPUT -p tcp --sport ${port} -d ${ip} -j DROP
done

while true; do
	trap break INT
	clear
	iptables -t filter -vnL
	sleep 1
done

iptables -t filter -X
iptables -t filter -F
iptables -t filter -vnL

This topic is worth going in more details on a dedicated blog post that I shall write later on while providing code samples.

Concluding the evaluation

I’m happy to say that Scylla passed our production evaluation and will soon go live on our infrastructure!

As I said at the beginning of this post, the conclusion of the evaluation has not been driven by the good figures we got out of our test workloads. Those are no benchmarks and never pretended to be but we could still prove that performances were solid enough to not be a blocker in the adoption of Scylla.

Instead we decided on the following points of interest (in no particular order):

  • data consistency
  • production reliability
  • datacenter awareness
  • ease of operation
  • infrastructure rationalisation
  • developer friendliness
  • costs

On the side, I tried Scylla on two other different use cases which proved interesting to follow later on to displace MongoDB again…

Moving to production

Since our relationship was great we also decided to partner with ScyllaDB and support them by subscribing to their enterprise offerings. They also accepted to support us using Gentoo Linux!

We are starting with a three nodes heavy duty cluster:

  • DELL R640
    • dual socket 2,6GHz 14C, 512GB RAM, Samsung 17xxx NVMe 3,2 TB

I’m eager to see ScyllaDB building up and will continue to help with my modest contributions. Thanks again to the ScyllaDB team for their patience and support during the POC!

Evaluating ScyllaDB for production 1/2

I have recently been conducting a quite deep evaluation of ScyllaDB to find out if we could benefit from this database in some of our intensive and latency critical data streams and jobs.

I’ll try to share this great experience within two posts:

  1. The first one (you’re reading) will walk through how to prepare yourself for a successful Proof Of Concept based evaluation with the help of the ScyllaDB team.
  2. The second post will cover the technical aspects and details of the POC I’ve conducted with the various approaches I’ve followed to find the most optimal solution.

But let’s start with how I got into this in the first place…


Selecting ScyllaDB

I got interested in ScyllaDB because of its philosophy and engagement and I quickly got into it by being a modest contributor and its Gentoo Linux packager (not in portage yet).

Of course, I didn’t pick an interest in that technology by chance:

We’ve been using MongoDB in (mass) production at work for a very very long time now. I can easily say we were early MongoDB adopters. But there’s no wisdom in saying that MongoDB is not suited for every use case and the Hadoop stack has come very strong in our data centers since then, with a predominance of Hive for the heavy duty and data hungry workflows.

One thing I was never satisfied with MongoDB was its primary/secondary architecture which makes you lose write throughput and is even more horrible when you want to set up what they call a “cluster” which is in fact some mediocre abstraction they add on top of replica-sets. To say the least, it is inefficient and cumbersome to operate and maintain.

So I obviously had Cassandra on my radar for a long time, but I was pushed back by its Java stack, heap size and silly tuning… Also, coming from the versatile MongoDB world, Cassandra’s CQL limitations looked dreadful at that time…

The day I found myself on ScyllaDB’s webpage and read their promises, I was sure to be challenging our current use cases with this interesting sea monster.


Setting up a POC with the people at ScyllaDB

Through my contributions around my packaging of ScyllaDB for Gentoo Linux, I got to know a bit about the people behind the technology. They got interested in why I was packaging this in the first place and when I explained my not-so-secret goal of challenging our production data workflows using Scylla, they told me that they would love to help!

I was a bit surprised at first because this was the first time I ever saw a real engagement of the people behind a technology into someone else’s POC.

Their pitch is simple, they will help (for free) anyone conducting a serious POC to make sure that the outcome and the comprehension behind it is the best possible. It is a very mature reasoning to me because it is easy to make false assumptions and conclude badly when testing a technology you don’t know, even more when your use cases are complex and your expectations are very high like us.

Still, to my current knowledge, they’re the only ones in the data industry to have this kind of logic in place since the start. So I wanted to take this chance to thank them again for this!

The POC includes:

  • no bullshit, simple tech-to-tech relationship
  • a private slack channel with multiple ScyllaDB’s engineers
  • video calls to introduce ourselves and discuss our progress later on
  • help in schema design and logic
  • fast answers to every question you have
  • detailed explanations on the internals of the technology
  • hardware sizing help and validation
  • funny comments and French jokes (ok, not suitable for everyone)

 

 

 

 

 

 

 

 

 


Lessons for a successful POC

As I said before, you’ve got to be serious in your approach to make sure your POC will be efficient and will lead to an unbiased and fair conclusion.

This is a list of the main things I consider important to have prepared before you start.

Have some background

Make sure to read some literature to have the key concepts and words in mind before you go. It is even more important if like me you do not come from the Cassandra world.

I found that the Cassandra: The Definitive Guide book at O’Reilly is a great read. Also, make sure to go around ScyllaDB’s documentation.

Work with a shared reference document

Make sure you share with the ScyllaDB guys a clear and detailed document explaining exactly what you’re trying to achieve and how you are doing it today (if you plan on migrating like we did).

I made a google document for this because it felt the easiest. This document will be updated as you go and will serve as a reference for everyone participating in the POC.

This shared reference document is very important, so if you don’t know how to construct it or what to put in it, here is how I structured it:

  1. Who’s participating at <your company>
    • photo + name + speciality
  2. Who’s participating at ScyllaDB
  3. POC hardware
    • if you have your own bare metal machines you want to run your POC on, give every detail about their number and specs
    • if not, explain how you plan to setup and run your scylla cluster
  4. Reference infrastructure
    • give every details on the technologies and on the hardware of the servers that are currently responsible for running your workflows
    • explain your clusters and their speciality
  5. Use case #1 : <name>
    • Context
      • give context about your use case by explaining it without tech words, think from the business / user point of view
    • Current implementations
      • that’s where you get technical
      • technology names and where they come into play in your current stack
      • insightful data volumes and cardinality
      • current schema models
    • Workload related to this use case
      • queries per second per data source / type
      • peek hours or no peek hours?
      • criticality
    • Questions we want to answer to
      • remember, the NoSQL world is lead by query-based-modeling schema design logic, cassandra/scylla is no exception
      • write down the real questions you want your data model(s) to be able to answer to
      • group them and rate them by importance
    • Validated models
      • this one comes during the POC when you have settled on the data models
      • write them down, explain them or relate them to the questions they answer to
      • copy/paste some code showcasing how to work with them
    • Code examples
      • depending on the complexity of your use case, you may have multiple constraints or ways to compare your current implementation with your POC
      • try to explain what you test and copy/paste the best code you came up with to validate each point

Have monitoring in place

ScyllaDB provides a monitoring platform based on Docker, Prometheus and Grafana that is efficient and simple to set up. I strongly recommend that you set it up, as it provides valuable insights almost immediately, and on an ongoing basis.

Also you should strive to give access to your monitoring to the ScyllaDB guys, if that’s possible for you. They will provide with a fixed IP which you can authorize to access your grafana dashboards so they can have a look at the performances of your POC cluster as you go. You’ll learn a great deal about ScyllaDB’s internals by sharing with them.

Know when to stop

The main trap in a POC is to work without boundaries. Since you’re looking for the best of what you can get out of a technology, you’ll get tempted to refine indefinitely.

So this is good to have at least an idea on the minimal figures you’d like to reach to get satisfied with your tests. You can always push a bit further but not for too long!

Plan some high availability tests

Even if you first came to ScyllaDB for its speed, make sure to test its high availability capabilities based on your experience.

Most importantly, make sure you test it within your code base and guidelines. How will your code react and handle a failure, partial and total? I was very surprised and saddened to discover so little literature on the subject in the Cassandra community.

POC != production

Remember that even when everything is right on paper, production load will have its share of surprises and unexpected behaviours. So keep a good deal of flexibility in your design and your capacity planning to absorb them.

Make time

Our POC lasted almost 5 months instead of estimated 3, mostly because of my agenda’s unwillingness to cooperate…

As you can imagine this interruption was not always optimal, for either me or the ScyllaDB guys, but they were kind not to complain about it. So depending on how thorough you plan to be, make sure you make time matching your degree of demands. The reference document is also helpful to get back to speed.


Feedback for the ScyllaDB guys

Here are the main points I noted during the POC that the guys from ScyllaDB could improve on.

They are subjective of course but it’s important to give feedback so here it goes. I’m fully aware that everyone is trying to improve, so I’m not pointing any fingers at all.

I shared those comments already with them and they acknowledged them very well.

More video meetings on start

When starting the POC, try to have some pre-scheduled video meetings to set it right in motion. This will provide a good pace as well as making sure that everyone is on the same page.

Make a POC kick starter questionnaire

Having a minimal plan to follow with some key points to set up just like the ones I explained before would help. Maybe also a minimal questionnaire to make sure that the key aspects and figures have been given some thought since the start. This will raise awareness on the real answers the POC aims to answer.

To put it simpler: some minimal formalism helps to check out the key aspects and questions.

Develop a higher client driver expertise

This one was the most painful to me, and is likely to be painful for anyone who, like me, is not coming from the Cassandra world.

Finding good and strong code examples and guidelines on the client side was hard and that’s where I felt the most alone. This was not pleasant because a technology is definitely validated through its usage which means on the client side.

Most of my tests were using python and the python-cassandra driver so I had tons of questions about it with no sticking answers. Same thing went with the spark-cassandra-connector when using scala where some key configuration options (not documented) can change the shape of your results drastically (more details on the next post).

High Availability guidelines and examples

This one still strikes me as the most awkward on the Cassandra community. I literally struggled with finding clear and detailed explanations about how to handle failure more or less gracefully with the python driver (or any other driver).

This is kind of a disappointment to me for a technology that position itself as highly available… I’ll get into more details about it on the next post.

A clearer sizing documentation

Even if there will never be a magic formula, there are some rules of thumb that exist for sizing your hardware for ScyllaDB. They should be written down more clearly in a maybe dedicated documentation (sizing guide is labeled as admin guide at time of writing).

Some examples:

  • RAM per core ? what is a core ? relation to shard ?
  • Disk / RAM maximal ratio ?
  • Multiple SSDs vs one NMVe ?
  • Hardware RAID vs software RAID ? need a RAID controller at all ?

Maybe even provide a bare metal complete example from two different vendors such as DELL and HP.

What’s next?

In the next post, I’ll get into more details on the POC itself and the technical learnings we found along the way. This will lead to the final conclusion and the next move we engaged ourselves with.

py3status v3.7

This important release has been long awaited as it focused on improving overall performance of py3status as well as dramatically decreasing its memory footprint!

I want once again to salute the impressive work of @lasers, our amazing contributors from the USA who has become top one contributor of 2017 in term of commits and PRs.

Thanks to him, this release brings a whole batch of improvements and QA clean ups on various modules. I encourage you to go through the changelog to see everything.

Highlights

Deep rework of the usage and scheduling of threads to run modules has been done by @tobes.

  •  now py3status does not keep one thread per module running permanently but instead uses a queue to spawn a thread to execute the module only when its cache expires
  • this new scheduling and usage of threads allows py3status to run under asynchronous event loops and gevent will be supported on the upcoming 3.8
  • memory footprint of py3status got largely reduced thanks to the threads modifications and thanks to a nice hunt on ever growing and useless variables
  • modules error reporting is now more detailed

Milestone 3.8

The next release will bring some awesome new features such as gevent support, environment variable support in config file and per module persistent data storage as well as new modules!

Thanks contributors!

This release is their work, thanks a lot guys!

  • JohnAZoidberg
  • lasers
  • maximbaz
  • pcewing
  • tobes