Tuesday, January 31, 2017

Cassandra Misc

nodetool clearsnapshot

To clarify on why this behaviour occurs, by default Cassandra will snapshot
a table when you perform any destructive action (TRUNCATE, DROP etc)


To free disk space after such an operation you will always need to clear
the snapshots (using either of above suggested methods). Unfortunately this
can be a bit painful if you are rotating your tables, say by month, and
want to remove the oldest one from disk as your client will need to speak
JMX as well.

You can disable this behaviour through the use of auto_snapshot in
cassandra.yaml. Though I would strongly recommend leaving this feature
enabled in any sane production environment and cleaning up snapshots as an
independent task!!

nodetool repair
A time series is a naturally sorted list, since things are happening over time. Sensor readings or live chat are good examples. In older versions of Cassandra, you'd use timestamp as your column name, and the value would be the actual data. This would give you your list of data, sorted in order. The benefit of this is your queries would likely be looking at slices of time, and with the data stored sequentially on disk you'll get very fast reads, since there only needs to be one seek (if the data isn't already in memory).

In particular, the Cassandra team has introduced 2 important items. 1 is the timeuuid field, and the other is specifying compound primary keys with compact storage. This causes the data to be stored sequentially by the timeuuid column, exactly like a really wide row. 

create table sensor_entries ( 
    sensorid uuid, 
    time_taken timeuuid, reading text,  
    primary key(sensorid, time_taken)) with compact storage;

UUID and TIMEUUID are stored the same way in Cassandra, and they only really represent two different sorting implementations.
TIMEUUID columns are sorted by their time components first, and then by their raw bytes, whereas UUID columns are sorted by their version first, then if both are version 1 by their time component, and finally by their raw bytes. Curiosly the time component sorting implementations are duplicated between UUIDType and TimeUUIDType in the Cassandra code, except for different formatting.
I think of the UUID vs. TIMEUUID question primarily as documentation: if you choose TIMEUUIDyou're saying that you're storing things in chronological order, and that these things can occur at the same time, so a simple timestamp isn't enough. Using UUID says that you don't care about order (even if in practice the columns will be ordered by time if you put version 1 UUIDs in them), you just want to make sure that things have unique IDs.
Even if using NOW() to generate UUID values is convenient, it's also very surprising to other people reading your code.
It probably does not matter much in the grand scheme of things, but sorting non-version 1 UUIDs is a bit faster than version 1, so if you have a UUID column and generate the UUIDs yourself, go for another version.
Cassandra doc explicitly advised to use ntp to synchronize system time across all nodes. 

1) Timestamps also suffer from clock drift, so they are no better than TimeUUID in this regard for time series data.
cqlsh  CassandraNEWhost -f mySchema.cdl

cqlsh -e "DESC KEYSPACE user" > user_schema.cql
To export entire database schema:
cqlsh -e "DESC SCHEMA" > db_schema.cql

cqlsh {destination_machine_ip} 9042 -u{username} -p{password} -f schema.cql
SELECT * FROM my_keyspace.users where id in (1,2,3,4)
The -e flag allows you to send a query to Cassandra from the command prompt, where you could redirect or even perform a grep/awk/whatever on your output.
$ bin/cqlsh -e'SELECT video_id,title FROM stackoverflow.videos' > output.txt
$ cat output.txt

 video_id                             | title
 2977b806-df76-4dd7-a57e-11d361e72ce1 |                 Star Wars
 ab696e1f-78c0-45e6-893f-430e88db7f46 | The Witches of Whitewater
 15e6bc0d-6195-4d8b-ad25-771966c780c8 |              Pulp Fiction

(3 rows)
Older versions of cqlsh don't have the -e flag. For older versions of cqlsh, you can put your command into a file, and use the -f flag.
$ echo "SELECT video_id,title FROM stackoverflow.videos;" > select.cql
$ bin/cqlsh -f select.cql > output.txt
  1. Use CAPTURE command to export the query result to a file.
cqlsh> CAPTURE
cqlsh> CAPTURE '/home/Desktop/user.csv';
cqlsh> select *from user;
Now capturing query output to '/home/Desktop/user.csv'.
Now, view the output of the query in /home/Desktop/user.csv
  1. Use DevCenter and execute a query. Right click on the output and select "Copy All as CSV" to paste the output in CSV.
Drop table drops the table and all data. Truncate clears all data in the table, and by default creates a snapshot of the data (but not the schema). Efficiency wise, they're close - though truncate will create the snapshot. You can disable this by setting auto_snapshot to false in cassandra yaml config, but it is server wide. If it's not too much trouble, I'd drop and recreate table - but I've seen issues if you don't wait a while after drop before recreating.
To remove all rows from a CQL Table, you can use the TRUNCATE command:
TRUNCATE keyspace_name.table_name;
Or if you are already using the keyspace that contains your target table:
TRUNCATE table_name;
Important to note, but by default Cassandra creates a snapshot of the table just prior to TRUNCATE. Be sure to clean up old snapshots, or set auto_snapshot: false in your cassandra.yaml.
DROP MATERIALIZED VIEW cycling.cyclist_by_age;
nodetool cfstats mybeautifulkeyspace
select count(*) from mysimpletable
select count(*) from mysimpletable limit 1000000
The CQL count query doesn’t count them,
Instead the compaction process reconciles the data in multiple SSTables on disk. The row fragments from each SSTable are collated and columns with the same name reconciled using the process we’ve already seen. The result of the compaction is a single SSTable that contains the same “truth” as the input files, but may be considerably smaller due to reconciling overwrites and deletions.
So it was possible that the count by sstable count these deleted rows until the compaction is done.
COPY airplanes
(name, mach, year, manufacturer)
 TO 'temp.csv'
cqlsh> COPY music.songs to 'songs-20140603.csv';
The COPY command, as I understand, is a good option. However, as it dumps all the data to .csvon disk and then loads it back, I can't help but wonder if there is a better way to do it in-engine.
cqlsh -k mykeyspace -e 'COPY fromTable(columnNames) TO STDOUT' | head -n -1 | 
cqlsh -k mykeyspace -e 'COPY toTable(columnNames) FROM STDIN'
it doesn't work with certain data types. I tried it on a table with a blob column and it crashed on a newline character.

On cassandra 2.1.2 I had to modify the command to be head -n -2 because the ouput of COPY includes two lines at the end that need to be trimmed. 

nodetool info outputs slightly more detailed statistics for an individual node in the cluster, including uptime, loadkey cache hit rate, and a total count of all exceptions. You can specify which node you’d like to inspect by using the --host argument with an IP address or hostname
bin/nodetool --host info

bin/nodetool cfstats demo
nodetool cfstats provides statistics on each keyspace and column family (akin to databases and database tables, respectively), including read latencywrite latency, and total disk space used.

nodetool gcstats returns statistics on garbage collections, including total number of collections and elapsed time (both the total and the max elapsed time). The counters are reset each time the command is issued

nodetool tpstats provides usage statistics on Cassandra’s thread pool, including pending tasks as well as current and historical blocked tasks.
nodetool tablestats -H keyspace1.standard1
nodetool cfstats
nodetool status <keyspace>

Unrestricted clustering columns

The role of clustering columns is to cluster data within a partition. 

Unrestricted clustering columns

The role of clustering columns is to cluster data within a partition. If you have the following table:
CREATE TABLE numberOfRequests (
    cluster text,
    date text,
    datacenter text,
    hour int,
    minute int,
    numberOfRequests int,
    PRIMARY KEY ((cluster, date), datacenter, hour, minute))
The data will be stored per partition in the following way:
{datacenter: US_WEST_COAST {hour: 0 {minute: 0 {numberOfRequests: 130}} {minute: 1 {numberOfRequests: 125}} … {minute: 59 {numberOfRequests: 97}}} {hour: 1 {minute: 0 …
You can see that in order to retrieve data in an efficient way without a secondary index, you need to know all the clustering key columns for you selection.
So, if you execute:
SELECT * FROM numberOfRequests
    WHERE cluster = ‘cluster1’
    AND date = ‘2015-06-05’
    AND datacenter = 'US_WEST_COAST'
    AND hour = 14
    AND minute = 00;
Cassandra will find the data efficiently but if you execute:
SELECT * FROM numberOfRequests
    WHERE cluster = ‘cluster1’
    AND date = ‘2015-06-05’
    AND hour = 14
    AND minute = 0;
Cassandra will reject the query as it has to scan the entire partition to find the requested data, which is inefficient.
In 2.2, the IN restriction can be used on any column and the following query will work:

>, >=, <= and < restrictions

Single column slice restrictions are allowed only on the last clustering column being restricted.



Minimize the Number of Writes

Writes in Cassandra aren’t free, but they’re awfully cheap. Cassandra is optimized for high write throughput, and almost all writes are equally efficient [1]. If you can perform extra writes to improve the efficiency of your read queries, it’s almost always a good tradeoff. Reads tend to be more expensive and are much more difficult to tune.

Minimize Data Duplication

Denormalization and duplication of data is a fact of life with Cassandra. Don’t be afraid of it. Disk space is generally the cheapest resource (compared to CPU, memory, disk IOPs, or network), and Cassandra is architected around that fact. In order to get the most efficient reads, you often need to duplicate data.

Rule 1: Spread Data Evenly Around the Cluster

You want every node in the cluster to have roughly the same amount of data. Cassandra makes this easy, but it’s not a given. Rows are spread around the cluster based on a hash of the partition key, which is the first element of the PRIMARY KEY. So, the key to spreading data evenly is this: pick a good primary key.

Rule 2: Minimize the Number of Partitions Read

Partitions are groups of rows that share the same partition key. When you issue a read query, you want to read rows from as few partitions as possible.
Why is this important? Each partition may reside on a different node. The coordinator will generally need to issue separate commands to separate nodes for each partition you request. This adds a lot of overhead and increases the variation in latency. Furthermore, even on a single node, it’s more expensive to read from multiple partitions than from a single one due to the way rows are stored.

Step 1: Determine What Queries to Support

Try to determine exactly what queries you need to support. This can include a lot of considerations that you may not think of at first. For example, you may need to think about:
  • Grouping by an attribute
  • Ordering by an attribute
  • Filtering based on some set of conditions
  • Enforcing uniqueness in the result set
  • etc …
Changes to just one of these query requirements will frequently warrant a data model change for maximum efficiency.

Step 2: Try to create a table where you can satisfy your query by reading (roughly) one partition

In practice, this generally means you will use roughly one table per query pattern. If you need to support multiple query patterns, you usually need more than one table.
To put this another way, each table should pre-build the “answer” to a high-level query that you need to support. If you need different types of answers, you usually need different tables. This is how you optimize for reads.
Spreads data evenly? Each user gets their own partition, so yes.
Minimal partitions read? We only have to read one partition, so yes.

    groupname text,
    username text,
    email text,
    age int,
    PRIMARY KEY (groupname, username)

 it doesn’t do so well with the first goal of evenly spreading data around the cluster. If we have thousands or millions of small groups with hundreds of users each, we’ll get a pretty even spread. But if there’s one group with millions of users in it, the entire burden will be shouldered by one node (or one set of replicas).
The basic technique is to add another column to the PRIMARY KEY to form a compound partition key. Here’s one example:
    groupname text,
    username text,
    email text,
    age int,
    hash_prefix int,
    PRIMARY KEY ((groupname, hash_prefix), username)
The new column, hash_prefix, holds a prefix of a hash of the username. For example, it could be the first byte of the hash modulo four. Together with groupname, these two columns form the compound partition key. Instead of a group residing on one partition, it’s now spread across four partitions. Our data is more evenly spread out, but we now have to read four times as many partitions. This is an example of the two goals conflicting. You need to find a good balance for your particular use case. If you do a lot of reads and groups don’t get too large, maybe changing the modulo value from four to two would be a good choice. On the other hand, if you do very few reads, but any given group can grow very large, changing from four to ten would be a better choice.
We’re duplicating user info potentially many times, once for each group. You might be tempted to try a data model like this to reduce duplication.

    id uuid PRIMARY KEY,
    username text,
    email text,
    age int
    groupname text,
    user_id uuid,
    PRIMARY KEY (groupname, user_id)
Obviously, this minimizes duplication. But how many partitions do we need to read? If a group has 1000 users, we need to read 1001 partitions. This is probably 100x more expensive to read than our first data model. If reads need to be efficient at all, this isn’t a good model. On the other hand, if reads are extremely infrequent, but updates to user info (say, the username) are extremely common, this data model might actually make sense. Make sure to take your read/update ratio into account when designing your schema.

CREATE TABLE group_join_dates (
    groupname text,
    joined timeuuid,
    username text,
    email text,
    age int,
    PRIMARY KEY (groupname, joined)

CREATE TABLE group_join_dates (
    groupname text,
    joined timeuuid,
    join_date text,
    username text,
    email text,
    age int,
    PRIMARY KEY ((groupname, join_date), joined)
We’re using a compound partition key again, but this time we’re using the join date. Each day, a new partition will start. When querying the X newest users, we will first query today’s partition, then yesterday’s, and so on, until we have X users. We may have to read multiple partitions before the limit is met.
To minimize the number of partitions you need to query, try to select a time range for splitting partitions that will typically let you query only one or two partitions. For example, if we usually need the ten newest users, and groups usually acquire three users per day, we should split by four-day ranges instead of a single day
I suggest using a timestamp truncated by some number of seconds. For example, to handle four-day ranges, you might use something like this:

now = time()
four_days = 4 * 24 * 60 * 60
shard_id = now - (now % four_days)

SASI index that enables full text search as well as faster multi-criteria search in Cassandra (introduced since Cassandra 3.4 but I recommend Cassandra 3.5 at least because of critical bugs being fixed).

, it introduces a new idea: let the index file follows the life-cycle of the SSTable. It means that whenever an SSTable is created on disk, a corresponding SASI index file is also created. When are SSTables created ?
  1. during normal flush
  2. during compaction
  3. during streaming operations (node joining or being decommissioned)
Delete heavy workloads have a number of pretty serious issues when it comes to using a distributed database. Unfortunately one of the most common delete heavy workloads and the most common desired use case for Cassandra is to use it as a global queue.

In Cassandra, because it’s distributed we actually have to WRITE a marker called a ‘Tombstone’ that indicates the record is deleted.

The developers of the Cassandra project got you covered, and there is a time period which defaults to 10 days where 10 days after a delete is issued the tombstone and all records related to that tombstone are removed from the system, this reclaims diskspace (the setting is called gc_grace_seconds). You realize that based on your queue workflow instead of 5 records you’ll end up with millions and millions per day for your short lived queue, your query times end up missing SLA and you realize this won’t work for your tiny cluster

let’s say one machine loses it’s network card and writes start failing to it. This is no problem at all as you’ve done your homework and set the replication factor to 3 so that 3 machines own the record. When you issue your delete the 2 remaining machines happily accept it. In the meantime you find a network card, and you don’t want to bother with decommissioning the node in the meantime

if you scan too many tombstones in a query (100k by default) in later cassandra versions, it will cause your query to fail. This is a safe guard to prevent against OOM and poor performance. It also implies you should be monitoring tombstone counts (either via the mbeans or cassandra warnings about tombstone thresholds).

Tombstones are treated specially by cassandra to guard against netsplits and prevent deleted data from resurrecting. Unfortunately it’s a leaky abstraction, and tombstones tend to rear their ugly head if you use Cassandra in particular ways.
The default grace period is 10 days for tombstones, so compaction won’t remove them until then. You can lower the grace period, but keep in mind repairs need to happen more frequently than the grace period to prevent data inconsistencies (as repairs ensure your deletes are replicated correctly).
Because Cassandra is a distributed system with immutable sstables, deletes are done differently compared to a relational database. Keep in mind that 1) writes can take time to reach all replicas (eventual consistency) and 2) cassandra has a sparse data model where a missing value for a column in a row in one sstable does not mean that the value is not present in another sstable.
Deletes are accomplished by writing tombstones or 'null's to the database. Like other writes, tombstones have timestamps which allow us to determine if the tombstone is current or stale by following standard last write wins semantics (LWW).
We also have the ability to establish data retention policies via TTL (Time to Live) expiration settings which can be configured at write time or at the table level.
Once a cell's TTL expires it is treated as a tombstone for all intents and purposes.
Cassandra does eventually clean up tombstones, but will not do so until the data fits certain criteria, giving the system ample time to ensure the tombstone makes its way to all replicas before it is removed. Otherwise we may have scenarios where deleted values would again become readable again because a tombstone only made it to a limited set of replicas and then got cleaned up. The lifetime of a tombstone is defined by gcgraceseconds. After gc grace, the tombstone becomes available for deletion during compaction.

Imagine a workload that constantly inserts and deletes a set of partitions from Cassandra. Regular deletes would generate individual tombstones for each cell in the partition that is being deleted. Reading back these rows would be slow due to the number of sstables (usually disk operations) required to pull the latest value, and compaction would painstakingly clean up tombstones (after GC grace etc. etc.) One by one.
In the context of Cassandra, a tombstone is specific data stored alongside standard data. A delete does nothing more than insert a tombstone. When Cassandra reads the data it will merge all the shards of the requested rows from the memtable and the SSTables. It then applies a Last Write Wins (LWW) algorithm to choose what is the correct data, no matter if it is a standard value or a tombstone.
To have a human readable format of the SSTable we will transform it using the SSTabledump tool
Two partitions (3 rows, 2 sharing the same partition) are now stored on disk.
A column from a specific row is called a “cell” in the Cassandra storage engine.
 The partition, row and cell are still there except there is no liveness_info at the column level anymore. The deletion info has been updated accordingly too. This is it. This is a cell tombstone.
A row tombstone is a row with no liveness_info and no cells. Deletion time is present at the row level as expected
we now have a new special insert, which is not from row type but range_tombstone_bound instead. With a start and an end: from the clustering key 20160615 excluded to the end (no clustering specified). Those entries with the range_tombstone_bound type are nested in the apple partition as expected. So removing an entire range is quite efficient from a disk space perspective, we do not write an information per cell, we just store delete boundaries.
A tombstone for a partition is an inserted partition with a deletion_info and no rows
Multi-value data types (sets, lists and maps) are a powerful feature of Cassandra, aiding you in denormalisation while allowing you to still retrieve and set data at a very fine-grained level. 

For simple-type columns, Cassandra performs an update by simply writing a new value for the cell and the most recently written value wins when the data is read. However, when you overwrite a collection Cassandra can’t simply write the new elements because all the existing elements in the map have their own individual cells and would still be returned alongside the new elements whenever a read is performed on the map.

Using sstable2json to analyse the data, as expected we have one key, a, however it has two locations entries, despite the fact we only did one write.

This is because in Cassandra, overwrites, updates, and inserts, are really all just the same thing. The insert against the map will do the same thing whether the key already exists or not.


Cassandra Query Patterns: Not using the “in” query for multiple partitions.
In practical terms this means you’re waiting on this single coordinator node to give you a response, it’s keeping all those queries and their responses in the heap, and if one of those queries fails, or the coordinator fails, you have to retry the whole thing.
With separate queries you get no single point of failure, faster reads, less pressure on the coordinator node, and better performance semantics when you have a nodes failing. It truly embraces the distributed nature of Cassandra.
The “in” keyword has it’s place such as when querying INSIDE of a partition, but by and large it’s something I wish wasn’t doable across partitions, I fixed a good dozen performance problems with it so far, and I’ve yet to see it be faster than separate queries plus async.
Most things that don’t really work as well in a distributed database as people think they should, bulk loading via batch, in queries, and ‘rollbacks’ are left over vestiges from a single machine thinking.
Unlogged batches require the coordinator to do all the work of managing these inserts, and will make a single node do more work. Worse if the partition keys are owned by other nodes then the coordinator node has an extra network hop to manage as well. The data is not delivered in the most efficient path.
Logged batches add a fair amount of work to the coordinator. However it has an important role in maintaining consistency between tables. When a batch is sent out to a coordinator node, two other nodes are sent batch logs, so that if that coordinator fails then the batch will be retried by both nodes.
This obviously puts a fair a amount of work on the coordinator node and cluster as a whole. Therefore the primary use case of a logged batch is when you need to keep tables in sync with one another, and NOT performance.
A compound primary key consists of the partition key and one or more additional columns that determine clustering. The partition key determines which node stores the data. It is responsible for data distribution across the nodes. The additional columns determine per-partition clustering. Clustering is a storage engine process that sorts data within the partition.

Cassandra stores an entire row of data on a node by partition key. If you have too much data in a partition and want to spread the data over multiple nodes, use a composite partition key

The first column declared in the PRIMARY KEY definition, or in the case of a compound key, multiple columns can declare those columns that form the primary key
 composite partition key is a partition key consisting of multiple columns. You use an extra set of parentheses to enclose columns that make up the composite partition key. The columns within the primary key definition but outside the nested parentheses are clustering columns. These columns form logical sets inside a partition to facilitate retrieval.
  block_id uuid,
  breed text,
  color text,
  short_hair boolean,
  PRIMARY KEY ((block_id, breed), color, short_hair)
For example, the composite partition key consists of block_id and breed. The clustering columns, color and short_hair, determine the clustering order of the data. Generally, Cassandra will store columns having the same block_id but a different breed on different nodes, and columns having the same block_id and breed on the same node
The first element in our PRIMARY KEY is what we call a partition key. The partition key has a special use in Apache Cassandra beyond showing the uniqueness of the record in the database. The other purpose, and one that very critical in distributed systems, is determining data locality.
When data is inserted into the cluster, the first step is to apply a hash function to the partition key. The output is used to determine what node (and replicas) will get the data. The algorithm used by Apache Cassandra utilizes Murmur3 which will take an arbitrary input and create a consistent token value. That token value will be inside the range of tokens owned by single node.
In simpler terms, a partition key will always belong to one node and that partition’s data will always be found on that node.
Why is that important? If there wasn’t an absolute location of a partition’s data, then it would require searching every node in the cluster for your data.
CREATE TABLE user_videos (
   userid uuid,
   added_date timestamp,
   videoid uuid,
   name text,
   preview_image_location text,
   PRIMARY KEY (userid, added_date, videoid)
  • Item one is the partition key
  • Item two is the first clustering column. Added_date is a timestamp so the sort order is chronological, ascending.
  • Item three is the second clustering column. Since videoid is a UUID, we are including it so simply show that it is a part of a unique record.
After inserting data, you should expect your SELECT to return data in the ascending order of the added_date for a single partition in ascending order.
CREATE TABLE user_videos (
   userid uuid,
   added_date timestamp,
   videoid uuid,
   name text,
   preview_image_location text,
   PRIMARY KEY (userid, added_date, videoid)
  • Item one is the partition key
  • Item two is the first clustering column. Added_date is a timestamp so the sort order is chronological, ascending.
  • Item three is the second clustering column. Since videoid is a UUID, we are including it so simply show that it is a part of a unique record.
After inserting data, you should expect your SELECT to return data in the ascending order of the added_date for a single partition in ascending order.
Since the clustering columns specify the order in a single partition, it would be helpful to control the directionality of the sorting. We could accomplish this run time by added an ORDER BY clause to our SELECT like this:
What if we want to control the sort order as a default of the data model? We can specify that at table creation time using the CLUSTERING ORDER BY clause:
The secondary index for each indexed column is stored in a hidden, local (not distributed across the cluster) column family, whose partition key is the indexed column, and the value is the list of partition keys in the main index with this column value.

SASI (acroynym of "SStable-Attached Secondary Indexing") is a reimplementation of the classic Cassandra secondary indexing with one main goal in mind - efficiently support more sophisticated search queries such as: 1. AND or OR combinations of queries. 2. Wildcard search in string values. 3. Range queries. 4. Lucene-inspired word search in string values (including word breaking, capitalization normalization, stemming, etc., as determined by a user-given "Analyzer").

SASI implement them using a new on-disk format based on B+ trees, and does not reuse regular Cassandra column families or sstables like the classic Secondary Indexing method did.
SASI attaches to each sstable its own immutable index file (and hence the name of this method), and also attaches an index to each memtable. During compaction, the indexes of the files being compacted together are also compacted to create one new index.

In Local Indexing above, we only held on each node an index of the data it holds. Conversely, in Distributed Indexing we distribute the whole index over the cluster,

Often a distributed index will only list partition keys matching a column value (although Lucene, for example, does support general "payloads" in the inverted index), but Cassandra's implementation, Materialized Views, basically builds a new table (a regular distributed Cassandra table) with the indexed column as a partition key, and the user's choice of data as values.
Introduce in Cassandra 0.7 "Indexes on column values are called “secondary indexes,” to distinguish them from the index on the row key that all ColumnFamilies have. Secondary indexes allow querying by value and can be built in the background automatically without blocking reads or writes."

Implementation of secondary index is different than partitions (main) keys: Secondary Index is always located at the same node as the data. This mean that any query to a secondary key will ALWAYS query ALL nodes! This makes Secondary Index useful in a very specific use cases

    Known issues: Secondary index can go out of sync, and the only way to fix it is rebuild [3][4]
    TODO http://www.wentnet.com/blog/?p=77

    SASIIndex, or "SASI" for short, is an implementation of Cassandra's Index interface that can be used as an alternative to the existing implementations. SASI's indexing and querying improves on existing implementations by tailoring it specifically to Cassandra's needs. SASI has superior performance in cases where queries would previously require filtering. In achieving this performance, SASI aims to be significantly less resource intensive than existing implementations, in memory, disk, and CPU usage. In addition, SASI supports prefix and contains queries on strings (similar to SQL's LIKE = "foo*" or LIKE = "*foo*"')

    cqlsh:demo> CREATE CUSTOM INDEX ON sasi (first_name) USING 'org.apache.cassandra.index.sasi.SASIIndex'
            ... WITH OPTIONS = {
            ... 'analyzer_class':
            ...   'org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer',
            ... 'case_sensitive': 'false'
            ... };
    cqlsh:demo> CREATE CUSTOM INDEX ON sasi (last_name) USING 'org.apache.cassandra.index.sasi.SASIIndex'
            ... WITH OPTIONS = {'mode': 'CONTAINS'};
    cqlsh:demo> CREATE CUSTOM INDEX ON sasi (age) USING 'org.apache.cassandra.index.sasi.SASIIndex';
    cqlsh:demo> CREATE CUSTOM INDEX ON sasi (created_at) USING 'org.apache.cassandra.index.sasi.SASIIndex'
            ...  WITH OPTIONS = {'mode': 'SPARSE'};
    The indexes created have some options specified that customize their behaviour and potentially performance. The index on first_name is case-insensitive. The analyzers are discussed more in a subsequent example. The NonTokenizingAnalyzer performs no analysis on the text. Each index has a mode: PREFIXCONTAINS, or SPARSE, the first being the default. The last_name index is created with the mode CONTAINS which matches terms on suffixes instead of prefix only.
    The created_at column is created with its mode set to SPARSE, which is meant to improve performance of querying large, dense number ranges like timestamps for data inserted every millisecond. 
    SASI supports queries with multiple predicates, however, due to the nature of the default indexing implementation, CQL requires the user to specify ALLOW FILTERING to opt-in to the potential performance pitfalls of such a query. With SASI, while the requirement to include ALLOW FILTERING remains, to reduce modifications to the grammar, the performance pitfalls do not exist because filtering is not performed.

    SASI takes advantage of Cassandra's write-once, immutable, ordered data model to build indexes along with the flushing of the memtable to disk -- this is the origin of the name "SSTable Attached Secondary Index".
    The SASI index data structures are built in memory as the SSTable is being written and they are flushed to disk before the writing of the SSTable completes. The writing of each index file only requires sequential writes to disk. In some cases, partial flushes are performed, and later stitched back together, to reduce memory usage. These data structures are optimized for this use case.
    Taking advantage of Cassandra's ordered data model, at query time, candidate indexes are narrowed down for searching, minimizing the amount o
    In Cassandra, data is divided into partitions, which can be found by a partition key. Sometimes, the application needs to find a partition - or partitions - by the value of another column. Doing this efficiently, without scanning all the partitions requires indexingMaterialized Views, which we explain here, is one of the three indexing options supported.

    Cassandra's two other indexing options ("Secondary Indexing" and "SASI") are "local indexes", in the sense that each Cassandra node indexes the data it holds. The biggest problem with local indexes is scalability: When a search is expected to yield just one (or few) partitions, we need to send it to all the nodes because we cannot know which one might hold the data (the location of the data is determined by the partition key, which we don't know, not the column value we are searching).
    So, "Materialized Views" is not a local index, but rather a global index: There is one big index which is distributed to the different nodes using the normal Cassandra distribution scheme. 

    Materialized Views builds a new table (a regular distributed Cassandra table) with the indexed column as a partition key, and a user-chosen subset of columns as values.

    Each of the materialized views is a separate table. The view tables are distributed across the cluster in the normal Cassandra way. The view tables are created in the same keyspace as the base table, and in particular have the same replication factor

    For example, it is a known faux-pas to have a partition key with low cardinality - i.e., very few different keys, or where a significant percentage of the rows have the same value for the partition key. It us usually fine to have a non-key column with low cardinality, e.g., consider data where each row is a person, and there is a column for the person's gender - which only has two different values ('male' or 'female'). However, trying to use this low-cardinality column as a partition key in a materialized view is not fine: if one tries to create a materialized view with the gender column as the partition key, all the data will be put in just two partitions (one partition listing all the males, one partition listing all the females) which would be extremely inefficient, and the storage and load is unbalanced (all the data is on a small set of nodes and the rest have no data from this table).

    Two copies of the data using different partitioning and placed
    on different replicas
    • Automated, server-side denormalization of data
    • Native Cassandra read performance
    • Write penalty, but acceptable performance

    • Best practice: Denormalization
    • Start by understanding the queries you need
    • Create a table for each query

    Why is Denormalization Hard?
    • Updates to existing rows require cleanup, read-before-write

    • All Songs for a given playlist
    • Track Users who like the same Artist
    • Find most recently played song

    When a base table is repaired, the data will also be inserted into the view
    • TTL’d base data will remain TTL’d in view

    An update from a base table is asynchronously applied to the view, so it is possible there will be a delay
    • A MV on a low-cardinality table can cause hotspots in the ring, overloading some nodes
    • The write path is one of Cassandra’s key strengths: for each write request one sequential disk write plus one in-memory write occur, both of which are extremely fast.
    • During a write operation, Cassandra never reads before writing (with the exception of Counters and Materialized Views), never rewrites data, never deletes data and never performs random I/O.
    Secondary indexes are suited for low cardinality data. Queries of high cardinality columns on secondary indexes require Cassandra to access all nodes in a cluster, causing high read latency.
    Materialized views are suited for high cardinality data. The data in a materialized view is arranged serially based on the view's primary key. Materialized views cause hotspots when low cardinality data is inserted.
    In Cassandra 3.10 and later, a materialized view can be created using a filtering statement that includes a restriction on a non-primary key column.

    Materialized views allow fast lookup of the data using the normal Cassandra read path. However, materialized views do not have the same write performance as normal table writes. Cassandra performs an additional read-before-write to update each materialized view. To complete an update, Cassandra preforms a data consistency check on each replica. A write to the source table incurs latency. The performance of deletes on the source table also suffers. If a delete on the source table affects two or more contiguous rows, this delete is tagged with one tombstone. But these same rows may not be contiguous in materialized views derived from the source table. If they are not, Cassandra creates multiple tombstones in the materialized views.
    Cassandra can only write data directly to source tables, not to materialized views. Cassandra updates a materialized view asynchronously after inserting data into the source table, so the update of materialized view is delayed. Cassandra performs a read repair to a materialized view only after updating the source table.
    • All changes to the base table will be eventually reflected in the view tables unless there is a total data loss in the base table
    • All updates to the view happen asynchronously unless corresponding view replica is the same node.  We must do this to ensure availability is not compromised.  It’s easy to imagine a worst case scenario of 10 Materialized Views for which each update to the base table requires writing to 10 separate nodes. Under normal operation views will see the data quickly and there are new metrics to track it (ViewWriteMetricss).
    • There is no read repair between the views and the base table.  Meaning a read repair on the view will only correct that view’s data not the base table’s data.  If you are reading from the base table though, read repair will send updates to the base and the view.
    • Mutations on a base table partition must happen sequentially per replica if the mutation touches a column in a view (this will improve after ticket CASSANDRA-10307)
    One of the main principles that Cassandra was built on is that disk space is very cheap resource; minimizing disk seeks at the cost of higher space consumption is a good tradeoff. 


    A view must have a primary key and that primary key must conform to the following restrictions:
    • it must contain all the primary key columns of the base table. This ensures that every row of the view correspond to exactly one row of the base table.
    • it can only contain a single column that is not a primary key column in the base table.

    At first view, it is obvious that the materialized view needs a base table. A materialized view, conceptually, is just another way to present the data of the base table, with a different primary key for a different access pattern.
    The alert reader should remark the clause WHERE column1 IS NOT NULL AND column2 IS NOT NULL …. This clause guarantees that all columns that will be used as primary key for the view are not null, of course.
    Some notes on the constraints that apply to materialized views creation:
    • The AS SELECT column1, column2, … clause lets you pick which columns of the base table you want to duplicate into the view. For now, you should pick at least all columns of the base table that are part of it’s primary key
    • The WHERE column1 IS NOT NULL AND column2 IS NOT NULL … clause guarantees that the primary key of the view has no null column
    • The PRIMARY KEY(column1, column2, …) clause should contain all primary key columns of the base table, plus at most one column that is NOT part of the base table’s primary key.The order of the columns in the primary key does not matter, which allows us to access data by different patterns
    If the system property cassandra.mv_enable_coordinator_batchlog is set, the coordinator will create a batchlog for the operation
    the coordinator sends the mutation to all replicas and will wait for as many acknowledgement(s) as requested by Consistency Level
    each replica is acquiring a local lock on the partition to be inserted/updated/deleted in the base table
    each replica is performing a local read on the partition of the base table

    each replica creates a local batchlog with the following statements:
    • DELETE FROM user_by_country WHERE country = ‘old_value’
    • INSERT INTO user_by_country(country, id, …) VALUES(‘FR’, 1, …)

    Generally, remember one important thing: Cassandra has a eventually consistency model. That means: If it's really important to have a perfect ,or nearly perfect, consistency, don't use cassandra. Use another solution. E.g. SQL with sharding. Cassandra is optimized for writes and you will only get happy when you're using the cassandra features.
    Some performance tips: If you need a better consistency: Use QUORUM, never use ALL. And, generally, write you queries standalone. Sometimes batch is useful. Don't execute queries with ALLOW FILTERING. Don't use token ranges or IN operator on partiton keys :)
    • denormalize immutable data
    • for mutable data, either:
      • accept to normalize them and pay the price of extra-reads but don’t care about mutation
      • denormalize but pay the price for read-before-write and manual handling of updates
    • since denormalization is required most of the time for different read patterns, you can rely on a 3rd party indexing solution (like Datastax Enterprise Search or Stratio Lucene-based secondary index or more recently the SASI secondary index) for the job

    With Cassandra, an index is a poor choice because indexes are local to each node.  That means that if we created this index:
    CREATE INDEX users_by_name ON users (username);

    … a query that accessed it would need to fan out to each node in the cluster, and collect the results together.  Put another way, even though the username field is unique, the coordinator doesn’t know which node to find the requested user on, because the data is partitioned by id and not by name. Thus, each node contains a mixture of usernames across the entire value range (represented as a-z in the diagram)
    This causes index performance to scale poorly with cluster size: as the cluster grows, the overhead of coordinating the scatter/gather starts to dominate query performance.
    Thus, for performance-critical queries the recommended approach has been to denormalize into another table, as Tyler outlined:
    CREATE TABLE users_by_name (
        username text PRIMARY KEY,
        id uuid
    Now we can look look up users with a partitioned primary key lookup against a single node, giving us performance identical to primary key queries against the base table itself–but these tables must be kept in sync with the users table by application code.  
    Materialized views give you the performance benefits of denormalization, but are automatically updated by Cassandra whenever the base table is:
    SELECT * FROM users 
    WHERE username IS NOT NULL
    PRIMARY KEY (username, id);
    Now the view will be repartitioned by username, and just as with manually de
    Indexes can still be useful when pushing analytical predicates down to the data nodes, since analytical queries tend to touch all or most nodes in the cluster anyway, making the primary advantage of materialized views irrelevant.  
    Indexes are also useful for full text search–another query type that often needs to touch many nodes–now that the new SASI indexes have been released.

    Recall that Cassandra avoids reading existing values on UPDATE.  New values are appended to a commitlog and ultimately flushed to a new data file on disk, but old values are purged in bulk during compaction.

    Materialized views change this equation.  When an MV is added to a table, Cassandra is forced to read the existing value as part of the UPDATE.  Suppose user jbellis wants to change his username to jellis:

    UPDATE users
    SET username = 'jellis'
    WHERE id = 'fcc1c301-9117-49d8-88f8-9df0cdeb4130';

    Cassandra needs to fetch the existing row identified by fcc1c301-9117-49d8-88f8-9df0cdeb4130 to see that the current username is jbellis, and remove the jbellis materialized view entry.

    (Even for local indexes, Cassandra does not need to read-before-write.  The difference is that MV denormalizes the entire row and not just the primary key, which makes reads more performant at the expense of needing to pay the entire consistency price at write time.)

    Materialized views also introduce a per-replica overhead of tracking which MV updates have been applied.

    Added together, here’s the performance impact we see adding materialized views to a table.  As a rough rule of thumb, we lose about 10% performance per MV

    • Reading from a normal table or MV has identical performance.
    • Each MV will cost you about 10% performance at write time.
    • For simple primary keys (tables with one row per partition), MV will be about twice as fast as manually denormalizing the same data.
    • For compound primary keys, MV are still twice as fast for updates but manual denormalization can better optimize inserts.  The crossover point where manual becomes faster is a few hundred rows per partition.  CASSANDRA-9779 is open to address this limitation

    You can also get some estimates from nodetool cfhistograms if you don't need an exact count (these values are estimates).

    However, there is a default limit of 10,000 applied to this statement which will truncate the result for larger column families. The limit can be increased for larger column families:

    DESCRIBE keyspaces;

    1. use <keyspace_name>;
    2. describe tables;
    DROP MATERIALIZED VIEW cycling.cyclist_by_age;

     InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot drop table when materialized views still depend on it (vms.{review_by_target})"

    SELECT view_name FROM system_schema.views;

    In cassandra there is no difference between update and insert, but in either case, you need to have complete primary key, for cassandra to find specific row.
    You need to use either of below to update.
    update start_stop set end = 'z' where id = '123' and start='w';
    insert into start_stop (id, start, end) values ('123', 'w', 'z');
    There is a subtle difference. Inserted records via INSERT remain if you set all non-key fields to null. Records inserted via UPDATE go away if you set all non-key fields to null.
    Yes, for Cassandra UPDATE is synonymous with INSERT, as explained in the CQL documentation where it says the following about UPDATE:
    Note that unlike in SQL, UPDATE does not check the prior existence of the row: the row is created if none existed before, and updated otherwise. Furthermore, there is no mean to know which of creation or update happened. In fact, the semantic of INSERT and UPDATE are identical.
    For the semantics to be different, Cassandra would need to do a read to know if the row already exists. Cassandra is write optimized, so you can always assume it doesn't do a read before write on any write operation. The only exception is counters (unless replicate_on_write = false), in which case replication on increment involves a read.
    When a query that returns a range of rows (or columns) is issued to Cassandra, it has to scan the table to collect the result set (this is called a slice). Now, deleted data is stored in the same manner as regular data, except that it's marked as tombstoned until compacted away. But the table reader has to scan through it nevertheless. So if you have tons of tombstones lying around, you will have an arbitrarily large amount of work to do to satisfy your ostensibly limited slice.
    A concrete example: let's say you have two rows with clustering keys 1 and 3, and a hundred thousand dead rows with clustering key 2 that are located in between rows 1 and 3 in the table. Now when you issue a SELECT query where the key is to be >= 1 and < 3, you'll have to scan 100002 rows, instead of the expected two.
    To make it worse, Cassandra doesn't just scan through these rows, but also has to accumulate them in memory while it prepares the response. This can cause an out-of-memory error on the node if things go too far out, and if multiple nodes are servicing the request, it may even cause a multiple failure bringing down the whole cluster. To prevent this from happening, the service aborts the query if it detects a dangerous number of tombstones. You're free to crank this up, but it's risky, if your Cassandra heap is close to running out during these spikes.
    No. Regardless of data type used, Cassandra stores all data on disk (including primary key values) as hex byte arrays. In terms of performance, the datatype of the primary key really doesn't matter.
    The only case where it would matter, is in token/node distribution. This is because the generated token for "12345" as text will be different from the token generated for 12345 as a bigint:
    1. No. As you can see in the CQL data types doc, both text and varchar are UTF-8 strings.
    2. No. I answered a similar question here: Are there any performance penalties when using a TEXT as a Primary Key?
      Basically, all data is stored on-disk as a hex byte array, so it really doesn't matter.
    -e "cql_statement"--execute="cql_statement"Execute the CQL statement and exit. To direct the command output to a file see saving CQL output.
    --connect-timeout="timeout"Connection timeout in seconds; default: 5.
    The name is misleading, but SocketOptions.getReadTimeoutMillis() applies to all requests from the driver to cassandra. You can think of it as a client-level timeout. If a response hasn't been returned by a cassandra node in that period of time an OperationTimeoutException will be raised and another node will be tried. Refer to the javadoc link above for more nuanced information about when the exception is raised to the client. Generally, you will want this timeout to be greater than your timeouts in cassandra.yaml, which is why 12 seconds is the default.
    If you want to effectively manage timeouts at the client level, you can control this on a per query basis by using executeAsync along with a timed get on the ResultSetFuture to give up on the request after a period of time, i.e.:
    ResultSet result = session.executeAsync("your query").get(300, TimeUnit.MILLISECONDS);
    This will throw a TimeoutException if the request hasn't been completed in 300 ms.
    I agree with Jonathan, I don't see how we could make that efficient without having to read all the secondary key tombstones each time you read the row, which doesn't sound fun.
    But as an aside, I'll note that another option for this is to use a secondary index. Now I know it's not read-free, but provided you do provide the partition key in the query, this will not be horribly inefficient either. And you'll exchange slightly slower writes for no hit whatsoever on reads, which I would suspect is a better trade-off more often than not for that kind of operation.
    when I do the select statement like the delete statement, it gives me (0 rows).
    Which (as LordKain indicated) means that if you have nothing to SELECT then you won't have anything to DELETE, either.
    I can use a select statement if I create index on a column and do select * from this column with allow filtering
    DO NOT DO THIS. In several query-related questions that I have answered here lately, there seems to be many out there who think that queries with ALLOW FILTERING are a good idea. These same users are typically back in a week or so, wondering why their ALLOW FILTERING queries suddenly are timing-out with more data. Asking for ALLOW FILTERING is Cassandra's way of telling you that you are trying something which you probably should not be.
    Additionally, secondary indexes are NOT your friend. They were created for convenience, not for performance. Some have even identified their use as an anti-pattern.
    Cassandra was designed to be queried by specific keys in a specific order. If your table does not suit your query, then you need to build an additional table that does. This will allow you to perform your queries without ALLOW FILTERING or secondary indexes.

    No comments:

    Post a Comment


    Review (554) System Design (293) System Design - Review (189) Java (178) Coding (75) Interview-System Design (65) Interview (60) Book Notes (59) Coding - Review (59) to-do (45) Knowledge (39) Linux (39) Interview-Java (35) Knowledge - Review (32) Database (30) Design Patterns (29) Product Architecture (28) Big Data (27) Soft Skills (27) Miscs (25) MultiThread (25) Concurrency (24) Cracking Code Interview (24) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Distributed (20) Interview Q&A (20) OOD Design (20) System Design - Practice (19) Security (17) Algorithm (15) How to Ace Interview (15) Brain Teaser (14) Google (13) Linux - Shell (13) Spark (13) Spring (13) Code Quality (12) How to (12) Interview-Database (12) Interview-Operating System (12) Redis (12) Tools (12) Architecture Principles (11) Company - LinkedIn (11) Testing (11) Resource (10) Solr (10) Amazon (9) Cache (9) Search (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Company - Uber (8) Interview - MultiThread (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Scalability (8) Cassandra (7) Git (7) Interview Corner (7) JVM (7) Java Basics (7) Machine Learning (7) NoSQL (7) C++ (6) Design (6) File System (6) Highscalability (6) How to Better (6) Kafka (6) Network (6) Restful (6) Trouble Shooting (6) CareerCup (5) Code Review (5) Company - Facebook (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Be Architect (4) Big Fata (4) C (4) Company Product Architecture (4) Data structures (4) Design Principles (4) Facebook (4) GeeksforGeeks (4) Generics (4) Google Interview (4) Hardware (4) JDK8 (4) Optimization (4) Product + Framework (4) Shopping System (4) Source Code (4) Web Service (4) node.js (4) Back-of-Envelope (3) Company - Pinterest (3) Company - Twiiter (3) Company - Twitter (3) Consistent Hash (3) GOF (3) Game Design (3) GeoHash (3) Growth (3) Guava (3) Interview-Big Data (3) Interview-Linux (3) Interview-Network (3) Java EE Patterns (3) Javarevisited (3) Map Reduce (3) Math - Probabilities (3) Performance (3) Puzzles (3) Python (3) Resource-System Desgin (3) Scala (3) UML (3) geeksquiz (3) AI (2) API Design (2) AngularJS (2) Behavior Question (2) Bugs (2) Coding Interview (2) Company - Netflix (2) Crawler (2) Cross Data Center (2) Data Structure Design (2) Database-Shard (2) Debugging (2) Docker (2) Elasticsearch (2) Garbage Collection (2) Go (2) Hadoop (2) Html (2) Interview - Soft Skills (2) Interview-Miscs (2) Interview-Web (2) JDK (2) Logging (2) POI (2) Papers (2) Programming (2) Project Practice (2) Random (2) Software Desgin (2) System Design - Feed (2) Thread Synchronization (2) Video (2) ZooKeeper (2) reddit (2) Ads (1) Advanced data structures (1) Algorithm - Review (1) Android (1) Approximate Algorithms (1) Base X (1) Bash (1) Books (1) C# (1) CSS (1) Chrome (1) Client-Side (1) Cloud (1) CodingHorror (1) Company - Yelp (1) Counter (1) DSL (1) Dead Lock (1) Difficult Puzzles (1) Distributed ALgorithm (1) Eclipse (1) Facebook Interview (1) Function Design (1) Functional (1) GoLang (1) How to Solve Problems (1) ID Generation (1) IO (1) Important (1) Internals (1) Interview - Dropbox (1) Interview - Project Experience (1) Interview Tips (1) Interview-Brain Teaser (1) Interview-How (1) Interview-Mics (1) Interview-Process (1) Jeff Dean (1) Joda (1) LeetCode - Review (1) Library (1) LinkedIn (1) LintCode (1) Mac (1) Micro-Services (1) Mini System (1) MySQL (1) Nigix (1) NonBlock (1) Process (1) Productivity (1) Program Output (1) Programcreek (1) Quora (1) RPC (1) Raft (1) RateLimiter (1) Reactive (1) Reading (1) Reading Code (1) Refactoring (1) Resource-Java (1) Resource-System Design (1) Resume (1) SQL (1) Sampling (1) Shuffle (1) Slide Window (1) Spotify (1) Stability (1) Storm (1) Summary (1) System Design - TODO (1) Tic Tac Toe (1) Time Management (1) Web Tools (1) algolist (1) corejavainterviewquestions (1) martin fowler (1) mitbbs (1)

    Popular Posts