1.Executing Parallel Statements to Improve Performance
2.Parallel Statement Execution
When a CockroachDB node receives a SQL query, this is approximately what happens:
The pgwire module handles the communication with the client application, and receives the query from the client. The SQL text is analyzed and transformed into an Abstract Syntax Tree (AST). This is then further analyzed and transformed into a logical query plan which is a tree ofrelational operatorslike filter, render (project), join. Incidentally, the logical plan tree is the data reported by theEXPLAIN statement.
pgwire 模块负责与客户端应用程序的通信，接收来自客户端的SQL，并把分析后的SQL 文本传给抽象语法树（AST）,作进一步的分析处理，并转换为带有关系运算的逻辑查询计划，比如like匹配，投影，连接。同时也提一下，逻辑执行计划树的信息是explan 语句生成的。
The logical plan is then handed up to a back-end layer in charge of executing the query and producing result rows to be sent back to the client.
There are two such back-ends in CockroachDB: alocal execution engineand adistributed execution engine.
在 CockroachDB 这样的后端层主要有两个: 一个本地查询引擎，一个分布式查询引擎。
Local query processing
The local execution engine is able to execute SQL statements directly on the node that a client app is connected to.It processes queries mostly locally, on one node: any data it requires is read on other nodes in the cluster and copied onto the processing node to build the query results there.
当客户端的应用程序直接连接上某节点后，本地执行引擎，能够直接这个节点上执行SQL 语句。且在这种情况下，大部份SQ也主要在这个节点处理、对于这个节点来说: 它需要的任何数据，都可以在群集中的其他节点上读取, 并复制到这个节点，并生成查询结果。
The architecture of CockroachDB’s local engine follows approximately that of theVolcano model, described by Goetz Graefe in 1993 (PDF link). From a software architect’s perspective, each node in the logical query plan acts like a stateful iterator (e.g. whatPython generatorsdo): iterating over the root of the tree produces all the result rows, and at each node of the plan tree, one iteration will consume zero or more iterations of the nodes further in the tree. The leaves of the tree are table or index reader nodes, which issue KV lookup operations towards CockroachDB’s distributed storage layer.
CockroachDB 本地引擎架构，源自Goetz Graefe 于1993年提出的火山模型(PDF link)。从软件架构师的角度来看, 逻辑查询计划中的每个节点都像一个有状态的迭代器 (如 Python 生成器):
迭代器先遍历树的根节点，处理根节点中的所有结果行,计划树其他每个节点也已同样的方式处理，（在这个过程中）每一个迭代器将处理树中下一个节点的零个或多个迭代的结果。（在下图中）树的叶子节点是对应的表或索引, 它向 CockroachDB 的分布式存储层执行KV 查找操作。
Example logical plan for:
逻辑执行计划例子： SELECT cust_id, address FROM customer WHERE name LIKE 'Comp%' AND state = 'CA' Assuming primary key iscust_idand an index oncustomer(name).
假如主键是 cust_id,索引列为 custer(name)
From a code perspective, each relational operator is implemented as an iterator’s “next” method; while a query runs, the tree of iterators is processed sequentially: each node’s “next” method waits until the source nodes have completed their own “next” method call. From the outside, the query execution logic processes data and makes decisions (e.g. keep/remove a row, compute derived results) row by row. The processing is essentially sequential.
The main characteristic of this engine is that it isrelatively simple.
The code for this engine can be reviewed and validated for correctness using only local reasoning; we (the CockroachDB developers) have come to trust it the most.
Also, becausethe processing is performed locally, it can deliver results very fastif all the data it needs is available locally (on the same node), and/or when there are only few rows to process from the source tables/indices.
从代码的角度来看, 每个关系运算符都是以迭代器的 "next" 方法来实现的;当查询运行时, 迭代器树按顺序处理: 每个节点的 "next" 方法一直等到源节点完成自己的 "next" 方法调用。从外部来看, 查询操作一行行的执行逻辑处理数据并进行决策 (如保留/删除行, 计算派生结果) 。处理基本上是顺序的。 本地引擎的主要特点相对简单。 对于以上引擎的代码，我们 (CockroachDB 的开发者) 基本上可以保证，可以通过仅使用本地推理进行检查和验证其正确性。 此外, 由于该处理过程是在本地完成的, 因此, 对于查询处理中所需的所有数据都可以在本地 (在相同节点上) 找到, 并且/或只有很少的行可以从源表/索引中处理, 那么是可以非常快地返回结果。
Parallelized local processing for updates
A common pattern in client apps is to issue multiple INSERT or UPDATE (or UPSERT, or DELETE) statements in a row inside a single session/transaction. Meanwhile, data updates in CockroachDB necessarily last longer than with most other SQL engines, because of the mandatory network traffic needed for consensus.
客户端应用程序中的一个常见模式是，在单个会话/事务内的一行中发出多个插入或更新 (或插入或删除) 语句。这时, CockroachDB 中的数据更新显然会比大多数其他 SQL 引擎持续更长的时间, 因为需要协商一致的网络流量。（理解为：需要保证与其他节点的数据一致性） We found ourselves wondering: could weaccelerate the processing of data writes by executing them in parallel?This way, despite the higher latency of single data-modifying statements, the overall latency of multiple such statements could be reduced.
This is, however, not trivial.
ThestandardSQL language, when viewed as an API between a client app and a database server, has an inconvenient property: it does not permit concurrent processing of multiple queries in parallel.
我们询思: 我们是否可以通过并行执行写入数据来加快处理速度？这样, 尽管单个数据修改语句的延迟会更高, 然而, 这并不微不足道，因为对于多个这个类似语句话，可以减少总体滞后时间。
对于标准 SQL 语言，当被作为客户端应用程序和数据库服务器之间交互的API时，不允许并行处理多个查询。
The designers of the SQL language, especially the dialect implemented by PostgreSQL and that CockroachDB has adopted, have specified that each SQL statement should operate “as if the previous statement has entirely completed.” A SELECT statement following an INSERT, for example, must observe the data that has just been inserted. Furthermore, the SQL “API” or “protocol” isconversational: each statement may have a result, and the client app can observe that results before it decides which statement will be run next. For example UPDATE has a result too: the number of rows affected. A client can run UPDATE to update a row, and decide to issue INSERT if UPDATE reports 0 rows affected (the row doesn’t exist).
sql 语言的设计者, 指定了每个 sql 语句要执行，要求其前一条语句已经完全执行完毕，特别是 PostgreSQL 和 CockroachDB 所采用的特别sql语法。例如, SELECT 在insert语句后，须确保刚刚insert的数据已写入。此外, SQL "API" 或 "协议" 是会话式的: 每个语句都可能有自已的结果, 客户端应用程序可以在决定下一步运行哪个语句之前得到结果。例如,执行 UPDATE 时有结果是: 受影响的行数比较多。客户端可以运行update去更新行, 并确保在更新受影响的行为0时 (该行不存在) 时，执行insert。
These semantic properties of the SQL language are incredibly useful; they give a lot of control to client applications. However, the choice that was made to include these features in SQL has also, inadvertently, made automatic parallelization of SQL execution impossible.
What would automatic parallelization look like? This is a classic problem in computer science! At a sufficiently high level, every solution looks the same: the processing engine that receives instructions/operations/queries from an app must findwhich operations are functionally independent from the operations before and after them. If a client app / program says to the processing engine “Do A, then do B”, and the processing engine can ascertain that B does not need any result produced by A, and A would not be influenced if B were to complete before it does, it can start B before A completes (presumably, at the same time), so that A and B execute in parallel. And of course, the result of each operation reported back to the app/program must appear as if they had executed sequentially.
With standard SQL, this is extremely hard to determine as soon as data-modifying statements are interleaved with SELECTs on the same table.
这些 SQL 语言的语义属性是非常有用的;他们对客户端应用程序给予了很大的控制。但是, 在 sql 中包含这些功能的选择也无意中使 sql 执行并行化变为不可能。自动并行化是什么样子？这是计算机科学中的一个经典问题! 在站高的角度来看, 每个解决方案看起来都一样:处理引擎从应用程序接收指令/操作/查询，必须知道哪些操作在功能上与它们前后的操作无关。如果一个客户端应app/程序对处理引擎说 "先做A, 然后做 b", 并且处理引擎能确定 b 不需要任何a的结果, 并且 , 如果 b 是在它之前完成,a 的结果也是不会受影响的， 它可以在 a 完成之前执行b (也可以在同一时间), 使 A 和 B 并行执行。当然, 每个操作的结果返回给app/程序时在显示时有先后，假装是按顺序执行。 对于标准 SQL来说,很难确保，修改语句与在同一表中执行的select 语句像以上方式处理。
In particular, it is hard to parallelize SELECT with anything else, because in order to determine which rows are touched by a specific INSERT/UPDATE, and whether these rows are involved in a SELECT close by, the analysis required would amount to running these statements, and thus defeat parallelism upfront.
特别是, 它是很难并行选择与任何其他, 因为为了确定哪些行被特定的插入/更新, 以及这些行是否涉及在选择关闭, 所需的分析将等于运行这些声明, 从而击败了并行性的前期。
It is furtherimpossibleto parallelize multiple standard INSERT/DELETE/UPSERT/UPDATE statements, because each of these statements return the number of rows affected, or even data from these rows when a standard RETURNING clause is mentioned, and parallelization would cause these results to be influenced by parallel execution and break the semantic definition that the results must appear as if the statements execute in sequence.
更不可能并行多个标准的插入/删除/插入/更新语句, 因为这些语句中的每一个都返回受影响的行数, 甚至在提到标准返回子句时这些行中的数据,并行化将导致这些结果受到平行执行的影响, 并中断语义定义, 即结果必须显示为顺序执行的语句。
This is why there is not much CockroachDB can do to parallelize data updates using standard SQL syntax.
这就是为什么使用标准 SQL 语法来并行数据更新没有多少 CockroachDB 可以做的。
However, when discussing this with some of our users who have a particular interest in write latencies, we found an agreement: we could extend our SQL dialect to provide CockroachDB-specificsyntax extensions that enable parallel processing. This was found agreeable because the one-shot upfront cost of updating client code to add the necessary annotations was acceptable compared to ongoing business costs caused by higher-latency transactions.
但是, 在与一些对写入延迟有特殊兴趣的用户讨论此问题时, 我们发现了一个协议: 我们可以扩展我们的 SQL 方言来提供 CockroachDB 特定的语法扩展, 从而实现并行处理。这被认为是同意的, 因为与高延迟事务导致的日常业务成本相比, 更新客户端代码以添加必要注释的 one-shot 前期成本是可以接受的。
The detailed design can be found in our repository. To exploit this new feature, a client app can use the special clauseRETURNING NOTHINGto INSERT/DELETE/UPSERT/UPDATE. When two or more data-modifying statements are issued with RETURNING NOTHING,the local execution engine will start them concurrently and they can progress in parallel.Only when the transaction is committed does the engine wait until completion of every in-flight data update.
详细的设计可以在我们的存储库中找到。要利用这一新功能, 客户端应用程序可以使用特殊子句, 而不返回任何内容以插入/删除/插入/更新。当两个或两个以上的数据修改语句以无返回的结果发出时, 本地执行引擎将同时启动它们, 并且它们可以并行进行。只有在提交事务时, 引擎才会等到每个 in-flight 数据更新完成。
Distributed query processing in CockroachDB
Next to the local engine, CockroachDB also provides a distributed execution engine. What this does is to delegate parts of the processing required for a single SQL statement to multiple nodes in the cluster, so that the processing can occur in parallel on multiple nodes and hopefully finish faster. We can also expect this to consume less network traffic, for example when filters can be applied at the source.
Why, and how?
This blog post details the why and outlines the how. We will dedicate a couple separate articles to further explain how it works.
Dispelling the false idols
The usual motivation for a distributed processing engine is the observation that the data for a query is often spread over multiple nodes in the cluster. Instead of bringing the data onto a single processing node, intuition suggests we could ship the computation where the data is stored instead, andsave processing time(= make queries faster).
However, at a high level, this motivation is weak: if it were for only this motivation, there is a large range of possible solutions that we would have explored besides query distribution.
For example, one could argue that there are already good non-distributed solutions to improve performance.
A strong piece of wisdom crystallized over the past 30 years can be summarized thus: data workloads in production code have been historically observed to consist of either small bursty transactions that need up-to-date and consistent views of the data, but only touch very few rows (OLTP workloads), or long wide-spanning read-only transactions that touch a lot of rows, but don’t usually need a very up-to-date and consistent view of the data (analytical workloads, online or not).
From this observation, one can argue that OLTP workloads only need to talk to very few nodes in a distributed storage system (because primary and secondary indexes will narrow down the work to a few rows in storage), and analytical workloads can be run onmaterialized viewsthat are maintained asynchronously in aseparate system, optimized for fast throughput at the expense of consistency (as they do not need to update anything). In either case, distributed processing is not an obvious value-add.
Another conventional motivation for distributed processing is a challenge to the aforementioned wisdom, acknowledging the rise of new workloads in internet services: it is now common to find OLTP workloads that need to read many rows before they update a few, and analytical workloads that benefit from reading from very up-to-date data. In both cases, distributed processing would seem to provide an effective technical solution to make these workloads faster.
However, again this motivation is weak at a high level, because since these workloads have become commonplace, we have already seenseemingly simplerand effective technology and standards emerge to address precisely these use cases.
For transactional workloads containing large reads before an update decision is taken, the common approach is to use suitable caching.Memcachedand related technology are an instance of this. For analytical processing in want of up-to-date data, an extra replication stack that maintains consistent materialized views ensures that the analytics input is both up-to-date and fast to access. Good caches and transaction/event logging to maintain materialized views externally are well-known and effective technical means to achieve this, and the corresponding technology relatively easier to provide by vendors than general-purpose distributed processing engines.
This back and forth betweenthe expression of new computing needs and the design of specialized solutions that accelerate themis the staple diet of computer and software architects and, let’s face it, the most recurring plot device in the history of computing. After all, it “just works,“, right?
It works, but it is so complicated!
“Complicated”, here, being an euphemism forexpensive to use.
Our motivation for distributed query processing in CockroachDB
This is the point in the story where we reveal the second most recurring plot device in the history of computing:rejection of complexity. This is how it goes:
“As a programmer, I really do not want to learn about all these specialized things. I just want to get my app out!”
“As a company owner, I do not want to have to deal with ten different technology providers to reach my performance numbers. Where’s my swiss army knife?”
Without a general-purpose distributed computing engine, a developer or CTO working with data must memorize a gigantic decision tree: what kind of workload is my app throwing at the database? What secondary indices do I need to make my queries fast? Which 3rd party technology do I need to cache? Which stack to use to keep track of update events in my data warehouse? Which client libraries do I use to keep a consistent view of the data, to avoid costly long-latency queries on the server?
The cognitive overhead required to design a working internet-scale application or service has become uncomfortably staggering, and the corresponding operational costs (software + human resources) correspondingly unacceptable.
This is where we would like to aim distributed processing in CockroachDB:a multi-tool to perform arbitrary computations close to your data. Our goal in the longer term is to remove the burden of having to think about atomicity, consistency, isolation and durability of your arbitrarily complex operations, nor about the integration between tools from separate vendors.
In short, we aim to enablecorrectness, operational simplicity and higher developer productivity at scalable performance, above all, and competitive performance in common cases as a supplemental benefit.
This is a natural extension for building CockroachDB in the first place. After all, CockroachDB rides the NewSQL wave, fuelled by the renewed interest in SQL after a period of NoSQL craze: the software community has tried, and failed, to manage transactions and schemas client-side, and has come to acknowledge that delegating this responsibility to a database engine has both direct and indirect benefits in terms of correctness, simplicity and productivity.
Our distributed processing engine extends this principle, by proposing to take over some of your more complex computing needs.
This includes, to start, supporting the execution SQL queries,when maintaining a good combination of secondary indices and materialized views is either impractical or too detrimental to update performance.
It also include supporting some analytical workloads thatregularly perform large aggregations where the results need up-to-date input data.
Eventually, however, we wish to also cater to a larger class of workloads. We are very respectful of the vision that has producedApache Samza, for example, and we encourage you towatch this presentation “Turning the Database Inside Out” by Martin Kleppmannto get an idea of the general direction where we’re aimed.
So we are set on implementing a distributed processing engine in CockroachDB.
It is inspired bySawzalland at a high-level works as follows:The request coming from the client app is translated to adistributed processing plan, akin to the blueprint of adataflow processing network.
The node that received the query thendeploysthis query plan onto one or more other nodes in the cluster. This deployment consists of creating“virtual processors”(like little compute engines) on every node remotely, as well as thedata flows(like little dedicated network connections) between them.
The distributed network of processors islaunchedto start the computation.
Concurrently, the node handling the querycollects resultsfrom the distributed network and forwards them to the client. The processing is considered complete when all the processors stop.
We highlight again that this is a general-purpose approach: dataflow processing networks are a powerful model from theoretical computer science known to handle pretty much any type of computation. Eventually, we will want our users to become able to leverage this general-purpose tool in arbitrary ways! However, in an initial phase we will restrict its exploitation to a few common SQL patterns, so that we can focus on robustness and stability.
In CockroachDB 1.0, for example, the distributed engine is leveraged automatically to handle SQL sorting, filtering, simple aggregations and some joins. In CockroachDB v1.1, it will take over more SQL aggregations and joins automatically. We will evaluate the reaction of our community to this first approach to decide where to extend the functionality further.
Plans for the future in CockroachDB
Lots remain to be done
Truth be told, there are some complex theoretical questions we need to learn to answer before we can recommend distributed processing as a general tool. Some example questions we are working on:While data extraction processors can be intuitively launched on the nodes where the data lives, other processors like those that sort or aggregate data can be placed anywhere. How many of these should be launched? On which cluster nodes?
How to ensure that computations stay close to the data while CockroachDB rebalances data automatically across nodes? Should virtual processors migrate together with the data ranges they are working on?
What should users expect when a node fails while a distributed query is ongoing? Should the processing resume elsewhere and try to recover? Is partial data loss acceptable in some queries?
How does distributed processing impact the performance of the cluster? When a node is running virtual processors on behalf of another node, how much throughput can it still provide to its own clients?
How to ensure that a large query does not exhaust network or memory resources on many nodes? What to do if a client closes its connection during a distributed computation?
We promise to share our progress on these aspects with you in subsequent blog posts.
Summary: SQL processing in CockroachDB
You now know that CockroachDB supports two modes of execution for SQL queries:localanddistributed.
In the local execution engine, data is pulled from where it is towards the one node that does the processing. This engine contains an optimization toaccelerate multiple data updates, given some annotations in the SQL statements (RETURNING NOTHING), by parallelizing the updates locally, using multiple cores.
In the distributed execution engine, the processing is shipped to run close to where the data is stored, usually on multiple nodes simultaneously. Under the hood, we are building a general-purpose distributed computing engine using dataflow networks as the fundamental abstraction. We plan to expose this functionality later to all our users to cater to various distributed processing workloads, but for the time being we just use it toaccelerate some SQL queries that use filtering, joins, sorts and aggregations, in particular those that you may not be able to, or do not want to, optimize manually using classical techniques (e.g. indexes or asynchronous materialized views).
Both engines can be active simultaneously. However, because we are working hard on distributed execution, we want users to experiment with it:we thus decided to make distributed execution the default, for those queries that can be distributed. You can override this default withSET, or you can useEXPLAIN(DISTSQL)to check whether a given query can be distributed. Subsequent blog posts will detail further how exactly this is achieved.
And there is so, so, so much more we want to share about this technology. We’ll write more. Stay tuned.