Codelab: SQL↔KV hacking

This Codelab touches on the sql layer Insert node and how it communicates its writes to the KV layer. It also touches on lower-level (storage-level) concepts such as transactions and intents. As an excuse to touch interesting code points, we’ll hack together a nifty feature: a circumvention of a current CRDB limitation pertaining to large transactions. 

Original Author
The content of this macro can only be viewed by users who have logged in.

This codelab is out of date - the feature has been implemented, and it's out of date. Nevertheless, it might be useful to see the process of editing this code, so it persists.

It is the case that there’s currently a limit on the number rows that a SQL transaction can write before becoming “too large to commit”. The reasons for this limitation will be explained.  It is surprisingly easy for an unsuspecting SQL user to find herself in this situation: statements like INSERT INTO <table> FROM SELECT … and CREATE TABLE <table> AS SELECT … can easily exceed the current limit of 100k rows. Even if no transaction has been explicitly used by the client, CRDB internally runs all SQL statements in implicit transactions. This codelab will modify the execution of CREATE TABLE … AS … such that it doesn’t run into this limitation any more. The point of the exercise is to familiarize the reader with parts of the CRDB codebase, not on the actual design of the solution.

Let’s start by observing the problem. First, let’s build and start a cockroach server:

user@bash: make build
user@bash: ./cockroach start --insecure --logtostderr

In another terminal, let’s open a SQL shell and attempt to create a large table:

user@bash: ./cockroach sql --insecure
# Welcome to the cockroach SQL interface.
# All statements must be terminated by a semicolon.
# To exit: CTRL + D.
root@:26257/> create database codelab;
CREATE DATABASE
root@:26257/> CREATE TABLE codelab.series AS SELECT generate_series FROM generate_series(1, 100100);
pq: kv/txn_coord_sender.go:423: transaction is too large to commit: 100108 intents

Intermezzo 1: Transactions and Intents

crdb is a transactional database, and this is reflected at all levels of our tiered architecture - sql, kv, storage. A kv-level transaction is logically identified by a UUID. KV requests carry this ID around so that the storage level can use it when evaluating reads and writes. A transactional write (a write part of txn T) of a key k writing value v places an intent on k. An intent is stored in our on-disk storage engine (RocksDB) like a committed value, but it has extra information: the intent contains v and the txn’s ID. The point of the intent is to block other conflicting transactions that try to access the key until T commits or rolls back. When T finally commits or rolls back, all its intents need to be resolved. The KV request causing a transaction to finish is EndTransaction{commit: true/false, intent_spans:...}. This request carries with it information about all the key spans that contain intents laid down by the finishing txn; all these intents will be cleaned up during execution of the EndTransaction.

With some idea about what intents are, we can now try to explain the “transaction is too large to commit” limitation. We generally don’t like very large KV requests; a request needs to be loaded into memory by the node evaluating it, and the resulting Raft command must be loaded into memory by each replica that’s applying it. All these points where large memory allocations are made are problematic, and the ones related to Raft commands even more so - once the command has been written to the Raft log, all the replicas are forced to apply it, and their execution cannot diverge. There’s no way for an individual replica to say “this is too big for me” and bail. So, if a command is indeed too large for a particular replica, that replica is forever going to crash trying to apply it and the cluster will be generally unhappy. So, in the particular case of EndTransaction requests, we limit their size by putting a limit on the number of intent key spans that they can carry. Generally, higher levels of crdb code control the sizes of the Raft commands that are going to be proposed; however, in the case of EndTransaction, that’s not as true: the EndTransaction request is automatically populated with the intent key spans. So, it makes sense for the number of these key spans to have a limit.

To understand where this limit is enforced and what we’ll need to do to work around it, it’ll be helpful to have a primer on how Cockroach write requests are executed.

Intermezzo 2: Cockroach KV requests

The SQL layer interacts with the data by means of KV requests. These requests are batched and sent through a hierarchy of layers until they make their way to the range holding the data in question. This sending happens through each layer calling into a lower-level implementation of the client.Sender interface. Each Sender does some processing on the batch and then sends it along to the next layer. Notable Senders are the DistSender, which splits the requests in a batch into sub-requests for different ranges, and then reassembles their results, and the TxnCoordSender, which does bookkeeping for transactional writes. Relevant to our quest, the TxnCoordSender keeps track of the keys being written by all the batches that are part of a transaction, as these batches go through it, and attaches these intents to the EndTransaction finishing that transaction.

Each request has a Header (e.g. check out one random request). This header indicates either a key or a range of key that the request needs to operate on. According to this span, the request will be routed to the right range.


Intermezzo 3: Terminology

We’ve used a couple of terms in this document without offering any explanation; to be successful in the crdb codebase, these terms have to be made more precise.
  • Request: a request refers to a KV request, namely a BatchRequest (even single requests are wrapped in batches before being sent to KV). These are protobuf messages that will be routed to the data range on which they’re supposed to operate and eventually return a response. They’re split into read and write requests; besides the types of results these categories produce, an important difference is that only writes need to go through Raft.
  • Evaluation: when a request arrives to its destination range, it is evaluated. For reads, this evaluation means actually reading data from the storage layer and packaging it into a response. For write, evaluation means figuring out exactly what low-level changes are to be made to the storage-level data (e.g. a DeleteRange request will cause a bunch of tombstone records to be written to storage) and serializing all these changes into a command. Evaluation happens on the current lease holder replica of the range in question.
  • Command: a command is the result of evaluating a write request. It contains a binary representation of changes to be made to storage (RocksDB). It is proposed to Raft and, if accepted, it will subsequently be applied on all the replicas of the range.
  • Proposing: a command is proposed to the Raft group formed by all the replicas of a range. If a quorum of the replicas accept it (according to the Raft consensus protocol rules), the command is written to the Raft log - at this point the command has occupied one position in the log. This log is constantly applied by all the members of the Raft group (i.e. all the range’s replicas).
  • Application: Commands are taken off from the log by the Raft machinery and applied to the local RocksDB instance. This involves deserializing the changes marshalled inside the command and… applying them to the storage engine. After application, the RocksDB instance on the specific replica has been changed according to the initial write request that triggered the whole saga.
With this information about crdb requests in mind, let’s get to our objective and the code. So, we know that CREATE TABLE <table> AS SELECT … lays down too many intents and, as a result, its transaction fails to commit. Who exactly is preventing is from committing? With the crdb codebase checked-out, let’s grep for the error:

⌁ [andrei:~/work2/src/github.com/cockroachdb/cockroach] codelab+ 130 ± grep -r  "transaction is too large to commit" .
./pkg/kv/txn_coord_sender.go:                                return roachpb.NewErrorf("transaction is too large to commit: %d intents", len(et.IntentSpans))

Ah, txn_coord_sender.go. That makes sense: we know that the TxnCoordSender is in charge of keeping track of all the intents that a transaction created. Looking at the code, we see that the number of intents being compared to maxIntents is coming from txnMeta := tc.txnMu.txns[txnID]. tc.txnMu.txns is a map in which the TxnCoordSender accumulates data for each txn, as txns send requests through the TxnCoordSender. Here’s where this map is updated with intents generated by each batch. How does the TxnCoordSender know what intents have been laid down exactly? It’s not like requests have a notion of intents, exactly. Different types of write requests have information about what keys they might write to (e.g. conditional requests such as a CPut might or might not write something). What the code currently does is it looks at the Header of each request and it assumes that intents might have been laid down on any key in the key span indicated by the request’s header. As with anything in crdb, the actual code is more complicated, but it’s all around here.
OK, so the TxnCoordSender peeks at all the requests going through it on behalf of a transaction, accumulates the key spans from each write request from the requests’ headers, and, for our CREATE TABLE statement, we somehow end up with too many of these key spans. What spans exactly is the TxnCoordSender ending up with? Let’s theorize first, and then we’ll convince ourselves that the theory is correct. It’d be a good guess to think that the TxnCoordSender ends up with a key span for each row being inserted in the new table. Before we delve into the sql-layer code that generates the KV requests and verify that this is indeed the case, let’s jump ahead and also propose a solution: it’d be correct to just attach a single range of intents to the EndTransaction that’s committing the table creation; that range would be the whole key space of the table. One big key range would be a lot better than a gazillion small ranges: for one, it’d make the TxnCoordSender happy and it wouldn’t block us from committing any more. More importantly, using a single range for cleaning up the intents is a smart idea - remember that we’re afraid of large requests, not so much of requests having to ultimately perform a lot of work. See Appendix A for a discussion of how this single EndTransaction  request is executed and what it turns into. Before we talk about how we’d actually code the solution proposed here, let’s make a note that this idea of replacing individual intent keys with one big range is not clearly a general win - you don’t want to be too pessimistic about what range needs to be scanned for intents to be cleaned up. If a txn writes keys “a” and “z”, you don’t want it to need to scan the potentially billions of keys in between. However, in the case of a brand new table, we know that all keys pertaining to this table will have intents intents, and that those intents belong to the current transaction. So, a key span encompassing the whole table key space is exactly what we need to scan for intents.

One more thing still before we start coding: we owe ourselves more convincing that the intents accumulated by the TxnCoordSender are what we think they are (i.e. one per SQL row being inserted in the new table). For that, we need to understand a bit about how the SQL layer works.

Intermezzo 4: SQL execution

CRDB contains a SQL compiler with a front-end (parsing and semantic analysis), middle-end (logical plan construction) and back-end (plan execution). We’ll focus on the execution here. The logical plan of a SQL statement is represented as a tree of planNodes. These nodes double as executable structures implementing the Volcano model: they have Start()/Next() methods that run through all the rows that each node wants to pass up to its parent. To get an intuition about these planNodes, let’s look at the plan created for a SELECT statement:

SELECT * FROM customers WHERE State LIKE 'C%' AND strpos(address, 'Infinite') != 0 ORDER BY Name;

This slightly contrived example is supposed to return customers from states starting with "N" and whose address contains the string "Infinite". To get excited, let's see the query plan for this statement:

root@:26257> EXPLAIN(EXPRS) SELECT * FROM customers WHERE State LIKE 'C%' and strpos(address, 'Infinite') != 0 order by name;
+-------+------------+--------+----------------------------------+
| Level |    Type    | Field  |           Description            |
+-------+------------+--------+----------------------------------+
|     0 | sort       |        |                                  |
|     0 |            | order  | +name                            |
|     1 | index-join |        |                                  |
|     2 | scan       |        |                                  |
|     2 |            | table  | customers@SI                     |
|     2 |            | spans  | /"C"-/"D"                        |
|     2 |            | filter | state LIKE 'C%'                  |
|     2 | scan       |        |                                  |
|     2 |            | table  | customers@primary                |
|     2 |            | filter | strpos(address, 'Infinite') != 0 |
+-------+------------+--------+----------------------------------+

So the plan produced for this query, from top (highest-level) to bottom, looks like:
sortNode -> indexJoinNode -> scanNode (index)
                          -> scanNode (PK)
When executing this query, the runner will iterate through the rows produced by the root node (the sortNode) which, in turn, will read all the nodes from indexJoinNode, which will read from the scanNodes that actually interface with the KV layer for reading data.

Now, with some mental picture of how a SQL query is executed, let’s see how a CREATE TABLE… AS… is executed. CREATE TABLE is compiled into a createTableNode; if an AS  clause is present, this node does something quite odd internally: at runtime, in its Start() method, it creates and executes an insertNode which actually populates the new table. So, to understand what intents are created by CREATE TABLE AS, we will dig into what intents are created by an insertNode (which is also what INSERT statements get compiled to).
The insertNode is a bit complicated; we’ll have to dig through a couple of layers. But it’ll all be worth it, as we’ll see how data is created in CRDB. Let’s start in insertNode.Next(), which is called in a loop by the parent until it says that its job is done. It start with this line:
if next, err := n.run.rows.Next(ctx); !next {
...
}

So, we’ll be iterating through all the rows coming from n.run.rows, which is the insertNode's source. In our case, the source had been set by the CreateNode as the execution plan for the AS … clause - the root of this plan is a selectNode. The insertNode.Next() code proceeds to massage a row from the source into a row suitable for the new table, and eventually calls:

_, err = n.tw.row(ctx, rowVals)

This n.tw is a tableWriter, which is where the journey to producing KV values begins. As we’ll see, the n.tw.row(…) call batches a bunch of KV pairs to be written in the database later. tableWriter is an interface; the implementation we care about here is the tableInserter; tableInserter is a pretty thin layer on top of a RowInserter, but it does two important things: it creates one of those KV batch requests that we’ve mentioned before, and it later (in tableInserter.finalize()) sends that batch for execution. This finalize() is called by the insertNode only after the source has been exhausted; this means that everything that the insertNode wants to insert is batched in one big request. This is not great (issues #15713, #16180), but we’ll ignore that for now. As we’ve talked about in our discussion of the TxnCoordSender, write requests in this BatchRequest will result in intent key spans that get tracked. Let’s dig deeper to see what these write requests are: onto the RowInserter that the tableInserter uses for batching each row.
The magic happens in RowInserter.InsertRow(): this fella is aware of the schema of the table in which we’re inserting, the primary key, the secondary indexes, etc., and so it is in charge of encoding the correct raw bytes for each of these. A discussion of these encodings is beyond our scope; see Structured data encoding in CockroachDB SQL. What we’re interested in are the KV requests generated. We see that, for each row, there’s a call to putFn for each column family, and one for each secondary index (btw, tables created with CREATE TABLE AS … do not have secondary indexes when they’re created, so we don’t need to worry about that). This putFn sounds promising; let’s pierce the veil. We can see that putFn is initialized to either insertPutFn or insertCPutFn; these functions are just calls to b.Put()/CPut(), where b  is the client.Batch that was initialized by the tableInserter and passed to RowInserter.InsertRow(). The client.Batch just accumulates these requests into a buffer; the accumulated requests are already in protocol buffer form. The client.Batch will eventually be transformed into a roachpb.BatchRequest containing all the Put requests, before being sent to KV for execution. It is these Put requests that the TxnCoordSender will iterate through, and each one’s key will constitute a (degenerate, single-key) intent key-span.

The patch

OK, now we understand the problem - the insertNode one Put request per row, and each Put gets turned into an intent key span that the TxnCoordSender needs to keep track of and attach to the final EndTransaction request that will ultimately commit the data in the new table. Thus, the EndTransaction request’s size is proportional to the number of rows.

As we suggested earlier, one thing we could do about it is to take advantage of the nature of the writes generated by CREATE TABLE AS …; we know that all these writes will target the key space of the new table, and we also know that the key space is empty before the transaction inserting the new data (because the table didn’t exist before and because the new table is not interleaved in any other table). So, instead of keeping track of individual intents, we could simply assume, at EndTransaction  time, that the whole of the table’s key space needs to be scanned for intents - the EndTransaction could thus contain a single intent key span.

Let’s write a patch doing that; we’ll touch several of the code layers that we talked about and it’ll be fun. The goal is more to get our hands dirty than to come up with a clean design for fixing this problem. Therefore, the suggested changes are as follows:
  1. We’ll add a flag to the BatchRequest informing the TxnCoordSender that it shouldn’t keep track of intents pertaining to write requests in this batch. This flag will be set on the batch produced by the insertNode that’s created by a createTableNode. Setting the flag implies that the client takes responsibility for informing the TxnCoordSender in some other way about the intents. It is imperative that the TxnCoordSender is informed about the intents before the EndTransaction request committing the transaction is sent - otherwise the data will be silently rolled back.
  2. We’ll create a new type of KV requests - RecordIntentsRequest - which a client can send to inform the TxnCoordSender of a range of intents that needs to be scanned on EndTransaction. This request is quite hacky; it’s unlike any other request in that it is terminated by the TxnCoordSender and not actually sent further (i.e. to a data range). It is pretty unfortunate that we need to create a new type of dummy request for that as opposed to more directly talking to the TxnCoordSender. The problem is that the TxnCoordSender is hidden behind the narrow client.Sender interface that we’ve described. This is generally a problem that will get addressed soon (issue #10511).
  • The tableInserter will send such a request before committing the transaction. As a result, the TxnCoordSender will attach the new table’s whole key space to the EndTransaction request.

The proposed patch can all be seen here. But we’ll walk through it. All patch chunks pasted below are relative to git revision dbd54977889302942c2e98ae9195aa61cc032678.
Let’s start by adding the “ignore intents” flag to the BatchRequest proto. All the KV requests are defined, as protobuf messages, in pkg/roachpb/api.proto. The BatchRequest has a Header, and it is in this Header that we’re going to put our new flag.

@@ -1018,6 +1024,15 @@ message Header {
   // gateway_node_id is the ID of the gateway node where the request originated.
   optional int32 gateway_node_id = 11 [(gogoproto.nullable) = false,
       (gogoproto.customname) = "GatewayNodeID", (gogoproto.casttype) = "NodeID"];
+  // If set, the TxnCoordSender will not do its regular job of keeping track of
+  // intents laid down by write requests in this batch. This can be used when
+  // the client wants to take upon itself the job of tracking intents -
+  // presumably because it knows how to group them into a few key spans.
+  //
+  // This should only be used for transactional requests. The client should
+  // either fill in the IntentSpans on the EndTransaction or should send a
+  // RecordIntentsRequest.
+  optional bool txn_coord_sender_ignore_intents = 12 [(gogoproto.nullable) = false];
 }

After editing a protobuf definition, we need to run the protoc compiler to get generated code. We generate Go code for all our protos, and also C++ code for some of them needed for the interaction with the storage layer (RocksDB). We do that by running:

user@bash: make protobuf

Running git status now should show us a modified generated file - pkg/roachpb/api.pb.go.

Now that we have our new flag, let’s use it. We remember that the tableInserter is responsible for creating the batch that all the writes will go in, so that sounds like the guy to set txn_coord_sender_ignore_intents. The decision on whether the tableInserter wants to set this flag or not, though, should come from a higher-level: the flag should only be used when the insertion is done for a CREATE TABLE AS ....; let’s make the tableInserter configurable by giving it an insertType configuration field. And, in fact, the insertNode needs the same kind of configuration, so let’s define an enum to be used by everybody.
+// InsertType differentiates between different behaviours of an Insert planNode.
+type InsertType bool
+
+const (
+       // RegularInsert activates the default behaviour.
+       RegularInsert InsertType = false
+       // NoIntentTracking will cause the Insert node to ask the TxnCoordSender to
+       // not track the intents produced by its write batches. Instead, the node will
+       // manually inform the TxnCoordSender of a single intent range, equal to the
+       // whole key span of the table's PK. This is useful for going around
+       // limitations on the maximum number of intents that a txn can accumulate and
+       // works well when the table did not have any data previous to the insert.
+       // This is used for CREATE TABLE AS SELECT... statements.
+       //
+       // ATTENTION: Do not use this on tables with secondary indexes; intents on
+       // those will not be cleaned up properly, resulting on writes to those indexes
+       // being dropped.
+       NoIntentTracking InsertType = true
+)
+

Let’s use the field in the tableInserter to conditionally set the txn_coord_sender_ignore_intents field on the batch - pkg/sql/tablewriter.go:

@@ -91,6 +92,8 @@ type tableInserter struct {
        // Set by init.
        txn *client.Txn
        b   *client.Batch
+
+       insertType InsertType
 }

 func (ti *tableInserter) walkExprs(_ func(desc string, index int, expr parser.TypedExpr)) {}
@@ -98,6 +101,9 @@ func (ti *tableInserter) walkExprs(_ func(desc string, index int, expr parser.Ty
 func (ti *tableInserter) init(txn *client.Txn) error {
        ti.txn = txn
        ti.b = txn.NewBatch()
+       if ti.insertType == NoIntentTracking {
+               ti.b.Header.TxnCoordSenderIgnoreIntents = true
+       }
        return nil
 }

Let’s pass the configuration to the insertNode “constructor”, and plumb it along the embedded tableInserter:
 // Insert inserts rows into the database.
 // Privileges: INSERT on table. Also requires UPDATE on "ON DUPLICATE KEY UPDATE".
 //   Notes: postgres requires INSERT. No "on duplicate key update" option.
 //          mysql requires INSERT. Also requires UPDATE on "ON DUPLICATE KEY UPDATE".
 func (p *planner) Insert(
-       ctx context.Context, n *parser.Insert, desiredTypes []parser.Type,
+       ctx context.Context, n *parser.Insert, desiredTypes []parser.Type, insertType InsertType,
 ) (planNode, error) {
        tn, err := p.getAliasedTableName(n.Table)
        if err != nil {
@@ -139,7 +159,7 @@ func (p *planner) Insert(

        var tw tableWriter
        if n.OnConflict == nil {
-               tw = &tableInserter{ri: ri, autoCommit: p.autoCommit}
+               tw = &tableInserter{ri: ri, autoCommit: p.autoCommit, insertType: insertType}
        } else {
                updateExprs, conflictIndex, err := upsertExprsAndIndex(en.tableDesc, *n.OnConflict, ri.InsertCols)
                if err != nil {

We can let ourselves be a bit compiler-driven and see where insertNodes are being created by calls to planner.Insert(). Hopefully, one of the call-sites will be the CREATE TABLE AS … case. Run a make build and check the compilation errors. Let’s fix the uninteresting ones first; insertNodes are being created during the regular logical planning process by planner.newPlan() for INSERT statements. And also by the companion planner.prepare() method, used for “preparing” SQL statements. These calls have nothing to do with our change, so let’s leave their behaviour unchanged by using the RegularInsert option.

@@ -323,7 +323,7 @@ func (p *planner) newPlan(
        case *parser.Help:
                return p.Help(ctx, n)
        case *parser.Insert:
-               return p.Insert(ctx, n, desiredTypes)
+               return p.Insert(ctx, n, desiredTypes, RegularInsert)
        case *parser.ParenSelect:
                return p.newPlan(ctx, n.Select, desiredTypes)
        case *parser.Relocate:
@@ -410,7 +410,7 @@ func (p *planner) prepare(ctx context.Context, stmt parser.Statement) (planNode,
        case *parser.Help:
                return p.Help(ctx, n)
        case *parser.Insert:
-               return p.Insert(ctx, n, nil)
+               return p.Insert(ctx, n, nil, RegularInsert)
        case *parser.Select:
                return p.Select(ctx, n, nil)
        case *parser.SelectClause:

Now let’s also fix the call of interest, which was in the expected place: createTableNode creating an insertNode when it has an AS clause. This time, we’ll pass the hip new NoIntentTracking option.

@@ -727,7 +727,8 @@ func (n *createTableNode) Start(ctx context.Context) error {
                        Rows:      n.n.AsSource,
                        Returning: parser.AbsentReturningClause,
                }
-               insertPlan, err := n.p.Insert(ctx, insert, nil /* desiredTypes */)
+               // We'll use the NoIntentTracking optimization since this is a new table.
+               insertPlan, err := n.p.Insert(ctx, insert, nil /* desiredTypes */, NoIntentTracking)
                if err != nil {
                        return err
                }

All right, we now have correctly configured tableInserters creating batches with the txn_coord_sender_ignore_intents flag set. But there’s nobody at the other end of that string; we need to teach the TxnCoordSender to do something with the new flag. Namely, we need to inhibit its usual “intent tracking” behaviour. Looking at the TxnCoordSender code, we can see that it uses the IntentSpanIterate() call to append intent key spans that it needs to track to its internal state, based on the batches that are flowing through it. It does this in two places: the first one is in TxnCoordSender.updateState(), which is called on every requests return path - so, after a request has been executed, the TxnCoordSender keeps track of what intents it has laid down. The second one handles the special case of the batch containing an EndTransaction request; it’ll be too late to take any action after the EndTransaction has been already executed, so we need to handle the other possible write requests in its batch earlier, in the TxnCoordSender.Send() call. We’ll conditionally inhibit both of these places, so that no intents are tracked for batches marked accordingly, by simply wrapping them in an if !ba.TxnCoordSenderIgnoreIntents {…}

@@ -392,23 +409,25 @@ func (tc *TxnCoordSender) Send(
                        txnMeta := tc.txnMu.txns[txnID]
                        distinctSpans := true
                        if txnMeta != nil {
                                et.IntentSpans = txnMeta.keys
                                // Defensively set distinctSpans to false if we had any previous
                                // requests in this transaction. This effectively limits the distinct
                                // spans optimization to 1pc transactions.
                                distinctSpans = len(txnMeta.keys) == 0
                        }
-                       // We can't pass in a batch response here to better limit the key
-                       // spans as we don't know what is going to be affected. This will
-                       // affect queries such as `DELETE FROM my.table LIMIT 10` when
-                       // executed as a 1PC transaction. e.g.: a (BeginTransaction,
-                       // DeleteRange, EndTransaction) batch.
-                       ba.IntentSpanIterate(nil, func(key, endKey roachpb.Key) {
-                               et.IntentSpans = append(et.IntentSpans, roachpb.Span{
-                                       Key:    key,
-                                       EndKey: endKey,
+                       if !ba.TxnCoordSenderIgnoreIntents {
+                               // We can't pass in a batch response here to better limit the key
+                               // spans as we don't know what is going to be affected. This will
+                               // affect queries such as `DELETE FROM my.table LIMIT 10` when
+                               // executed as a 1PC transaction. e.g.: a (BeginTransaction,
+                               // DeleteRange, EndTransaction) batch.
+                               ba.IntentSpanIterate(nil, func(key, endKey roachpb.Key) {
+                                       et.IntentSpans = append(et.IntentSpans, roachpb.Span{
+                                               Key:    key,
+                                               EndKey: endKey,
+                                       })
                                })
-                       })
+                       }

and the other one:

@@ -942,12 +961,14 @@ func (tc *TxnCoordSender) updateState(
                if txnMeta != nil {
                        keys = txnMeta.keys
                }
-               ba.IntentSpanIterate(br, func(key, endKey roachpb.Key) {
-                       keys = append(keys, roachpb.Span{
-                               Key:    key,
-                               EndKey: endKey,
+               if !ba.TxnCoordSenderIgnoreIntents {
+                       ba.IntentSpanIterate(br, func(key, endKey roachpb.Key) {
+                               keys = append(keys, roachpb.Span{
+                                       Key:    key,
+                                       EndKey: endKey,
+                               })
                        })
-               })
+               }

To keep things working, we need another slight tweak: the TxnCoordSender is responsible for heartbeating a transaction record. We don’t have to explain that here, beyond saying that, in order to prevent transactions started by nodes that subsequently died from having intents that indefinitely block other transactions, we save the transaction’s state in the database and periodically ping it to let it know that it’s still alive. This heartbeating is started the first time the transaction tries to write something. Our change above broke the way the code was checking for this “first write” condition, because it appears that batches marked as TxnCoordSenderIgnoreIntents no longer write anything. We need a little adjustment:

@@ -959,7 +980,7 @@ func (tc *TxnCoordSender) updateState(

                if txnMeta != nil {
                        txnMeta.keys = keys
-               } else if len(keys) > 0 {
+               } else if len(keys) > 0 || ba.TxnCoordSenderIgnoreIntents {
                        // If the transaction is already over, there's no point in
                        // launching a one-off coordinator which will shut down right
                        // away. If we ended up here with an error, we'll always start
@@ -997,7 +1018,6 @@ func (tc *TxnCoordSender) updateState(
                        }
                }
        }

OK, fantastic. Now the TxnCoordSender is no longer tracking intents for our batches. Let’s move on to step 2 - creating the RecordIntentsRequest that we talked about.

RecordIntentsRequest

As we were saying, we’ll create a dummy request that will be sent by the tableInserter to tell the TxnCoordSender about the intent key span that it needs to track to compensate for all the intents that it didn’t track.

Let’s go back to our KV api file and create a proto message for this new request. It will contain a list of key spans (our code will only ever use one, but let’s be flexible).

@@ -283,6 +283,11 @@ message EndTransactionRequest {
   optional bool require_1pc = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "Require1PC"];
 }

+message RecordIntentsRequest {
+  optional Span header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
+  repeated Span intent_spans = 2 [(gogoproto.nullable) = false];
+}
+

We’ll need to add the RecordIntentsRequest to the RequestUnion that’s used in several places for containing any request.

@@ -908,6 +913,7 @@ message RequestUnion {
   optional ImportRequest import = 34;
   optional QueryTxnRequest query_txn = 33;
   optional AdminScatterRequest admin_scatter = 36;
+  optional RecordIntentsRequest record_intents = 37;
 }

We need to add some boilerplate for this new request. It’s all mechanical and compiler-driver, so we’ll just list it here without commentary.

@@ -232,6 +232,9 @@ func (b *Batch) fillResults() error {
                                        result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
                                }

+                       case *roachpb.RecordIntentsRequest:
+                               // nothing to do. This request has an empty result.
+
                        default:
                                if result.Err == nil {
                                        result.Err = errors.Errorf("unsupported reply: %T for %T",

@@ -429,6 +429,9 @@ func (*BeginTransactionRequest) Method() Method { return BeginTransaction }
 func (*EndTransactionRequest) Method() Method { return EndTransaction }

 // Method implements the Request interface.
+func (*RecordIntentsRequest) Method() Method { return RecordIntents }
+
+// Method implements the Request interface.
 func (*AdminSplitRequest) Method() Method { return AdminSplit }

 // Method implements the Request interface.
@@ -570,6 +573,12 @@ func (etr *EndTransactionRequest) ShallowCopy() Request {
 }

 // ShallowCopy implements the Request interface.
+func (rir *RecordIntentsRequest) ShallowCopy() Request {
+       shallowCopy := *rir
+       return &shallowCopy
+}
+
+// ShallowCopy implements the Request interface.
 func (asr *AdminSplitRequest) ShallowCopy() Request {
        shallowCopy := *asr
        return &shallowCopy
@@ -883,6 +892,7 @@ func (*BeginTransactionRequest) flags() int { return isWrite | isTxn | consultsT
 // replays. Replays for the same transaction key and timestamp will
 // have Txn.WriteTooOld=true and must retry on EndTransaction.
 func (*EndTransactionRequest) flags() int      { return isWrite | isTxn | isAlone | updatesTSCache }
+func (*RecordIntentsRequest) flags() int       { return isWrite | isTxn | isAlone }
 func (*AdminSplitRequest) flags() int          { return isAdmin | isAlone }
 func (*AdminMergeRequest) flags() int          { return isAdmin | isAlone }
 func (*AdminTransferLeaseRequest) flags() int  { return isAdmin | isAlone }

@@ -59,6 +59,7 @@ const (
        BeginTransaction
        // EndTransaction either commits or aborts an ongoing transaction.
        EndTransaction
+       RecordIntents
        // AdminSplit is called to coordinate a split of a range.
        AdminSplit
        // AdminMerge is called to coordinate a merge of two adjacent ranges.

With the boilerplate out of the way, we need to deal with two things: sending this new request, and handling it in the TxnCoordSender. Let’s start with the latter.

All requests go through TxnCoordSender.Send(), so that sounds like a good place to handle and terminate our RecordIntents request. Handling the request entails taking the key spans out of it and record them in the state the the TxnCoordSender is maintaining for the transaction in question. 

@@ -325,6 +325,29 @@ func (tc *TxnCoordSender) Send(
 ) (*roachpb.BatchResponse, *roachpb.Error) {
        ctx = tc.AnnotateCtx(ctx)

+       // Handle the RecordIntents request. That's a special request not meant to be
+       // passed further than the TxnCoordSender.
+       if len(ba.Requests) == 1 &&
+               ba.Requests[0].GetInner().Method() == roachpb.RecordIntents {
+               union := roachpb.ResponseUnion{}
+               union.MustSetInner(&roachpb.NoopResponse{})
+               br := &roachpb.BatchResponse{
+                       Responses: []roachpb.ResponseUnion{union},
+               }
+
+               txnMeta, ok := tc.txnMu.txns[*ba.Txn.ID]
+               if !ok {
+                       // This transaction didn't previously perform any writes, so it's
+                       // pointless to record any intents for it.
+                       return br, nil
+               }
+               tc.txnMu.Lock()
+               ri := ba.Requests[0].GetInner().(*roachpb.RecordIntentsRequest)
+               txnMeta.keys = append(txnMeta.keys, ri.IntentSpans...)
+               tc.txnMu.Unlock()
+               return br, nil
+       }
+

Now that the RecordIntents is handled, let’s have the tableInserter send it. As we’ve been saying, we want to record a key span covering the new whole new table. Each table has a contiguous key space that can be deduced from the table descriptor ID. A descriptor is a structure describing a table’s schema, and the tableInserter is (indirectly) configured with one. With a descriptor in hand, we can get its ID and use keys.MakeTablePrefix() to get the start of its key space; the next ID will give us the start of the next table.
The RecordIntents request only has to be sent once (not once per row), so the tableInserter.finalize() sounds like a good place to do it.

@@ -107,13 +113,38 @@ func (ti *tableInserter) row(ctx context.Context, values parser.Datums) (parser.

 func (ti *tableInserter) finalize(ctx context.Context) error {
        var err error
+       desc := ti.ri.Helper.TableDesc
+       tableStartKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID)))
+       tableEndKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID + 1)))
+       intentSpan := roachpb.Span{Key: tableStartKey, EndKey: tableEndKey}
        if ti.autoCommit {
                // An auto-txn can commit the transaction with the batch. This is an
                // optimization to avoid an extra round-trip to the transaction
                // coordinator.
               err = ti.txn.CommitInBatch(ctx, ti.b)
        } else {
                err = ti.txn.Run(ctx, ti.b)
+               if ti.insertType == NoIntentTracking {
+                       // In the NoIntentTracking case, the batch we just ran didn't record any
+                       // intents in the TxnCoordSender. We'll send a RecordIntentsRequest to
+                       // compensate for that.
+                       recordIntentsBatch := ti.txn.NewBatch()
+                       et := &roachpb.RecordIntentsRequest{
+                               IntentSpans: []roachpb.Span{intentSpan},
+                       }
+                       recordIntentsBatch.AddRawRequest(et)
+                       errRecIntents := ti.txn.Run(ctx, recordIntentsBatch)
+                       if errRecIntents != nil {
+                               log.Errorf(ctx, "error recording intents: %s", errRecIntents)
+                               if err == nil {
+                                       err = errRecIntents
+                               }
+                       }
+               }

This is pretty good, but there’s something in the patch above that should raise alarms: we’ve modified the code branch where the tableInserter was doing ti.txn.Run(ctx, ti.b), but there’s another branch above it that does ti.txn.CommitInBatch(ctx, ti.b). So, the tableInserter has can run its batch in two modes - one where it just runs it like any other batch, and otherwise leaves its transaction open, and another where it runs the batch and also commits the transaction. We haven’t dealt with the “also commit” one face screaming in fear.  Choosing between these branches happens based on whether the INSERT statement was part of a larger SQL transaction or whether it was a standalone statement. For standalone statements, we perform various optimizations.
So, let’s deal with the “also commit” case. Let’s look at what Txn.CommitInBatch() does:

b.AddRawRequest(endTxnReq(true /* commit */, txn.deadline, txn.systemConfigTrigger))
return txn.Run(ctx, b)

So, it simply appends an EndTransaction request to the batch before running it. The code as we’ve written it doesn’t allow us to also (conditionally) append a RecordIntents  request as one might like to do (our handling of RecordIntents only works when that request is the only one in a batch, otherwise it might have been a bit more complicated). But Since we’ve been hacking things, let’s hack some more to solve our case: we know that EndTransaction requests carry intent spans in them. Normally, these intent spans are not set by the client, but filled in by the TxnCoordSender. But nothing says that the client can’t also directly specify some intent spans (well, something does say that, but we’ll silence it shortly). So, the suggestion here is to not use a RecordIntents request at all; instead, we can just put the intents span manually in the EndTransaction. Let’s do it by creating a version of Txn.CommitInBatch that also takes one of these spans:

+// CommitInBatchWithIntentSpan is like CommitInBatch, except that an IntentSpan
+// is set in the EndTransactionRequest. This is used when at least some of the
+// batches in the transaction had TxnCoordSenderIgnoreIntents set.
+func (txn *Txn) CommitInBatchWithIntentSpan(
+       ctx context.Context, b *Batch, intentSpan roachpb.Span,
+) error {
+       if txn != b.txn {
+               return errors.Errorf("a batch b can only be committed by b.txn")
+       }
+       et := endTxnReq(true /* commit */, txn.deadline, txn.systemConfigTrigger)
+       et.(*roachpb.EndTransactionRequest).IntentSpans = []roachpb.Span{intentSpan}
+       b.AddRawRequest(et)
+       return txn.Run(ctx, b)
+}
+

The interface of Txn is under some control, as it was originally supposed to be similar to that of other objects. This is an antiquated concept, but it forces us to add this new method to a list of exceptions.

@@ -379,6 +379,7 @@ func TestCommonMethods(t *testing.T) {
                {txnType, "AcceptUnhandledRetryableErrors"}:  {},
                {txnType, "Commit"}:                          {},
                {txnType, "CommitInBatch"}:                   {},
+               {txnType, "CommitInBatchWithIntentSpan"}:     {},
                {txnType, "CommitOrCleanup"}:                 {},
                {txnType, "Rollback"}:                        {},
                {txnType, "CleanupOnError"}:                  {},

Let’s now fix the tableInserter.finalize() branch by using this new method:

@@ -107,13 +113,38 @@ func (ti *tableInserter) row(ctx context.Context, values parser.Datums) (parser.

 func (ti *tableInserter) finalize(ctx context.Context) error {
        var err error
+       desc := ti.ri.Helper.TableDesc
+       tableStartKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID)))
+       tableEndKey := roachpb.Key(keys.MakeTablePrefix(uint32(desc.ID + 1)))
+       intentSpan := roachpb.Span{Key: tableStartKey, EndKey: tableEndKey}
        if ti.autoCommit {
                // An auto-txn can commit the transaction with the batch. This is an
                // optimization to avoid an extra round-trip to the transaction
                // coordinator.
-               err = ti.txn.CommitInBatch(ctx, ti.b)
+               if ti.insertType == NoIntentTracking {
+                       err = ti.txn.CommitInBatchWithIntentSpan(ctx, ti.b, intentSpan)
+               } else {
+                       err = ti.txn.CommitInBatch(ctx, ti.b)
+               }
        } else {

We’ll need a slight adjustment in TxnCoordSender.Send(), as it wasn’t prepared to handle EndTransaction requests with intent spans in them; it was blatantly overwriting that field of the request and now it needs to be more accommodating:
@@ -392,23 +409,25 @@ func (tc *TxnCoordSender) Send(
                        txnMeta := tc.txnMu.txns[txnID]
                        distinctSpans := true
                        if txnMeta != nil {
-                               et.IntentSpans = txnMeta.keys
+                               et.IntentSpans = append(et.IntentSpans, txnMeta.keys...)
                                // Defensively set distinctSpans to false if we had any previous
                                // requests in this transaction. This effectively limits the distinct
                                // spans optimization to 1pc transactions.
                                distinctSpans = len(txnMeta.keys) == 0
                        }

Fantastic! Let’s try it! Rebuild cockroach, start it, go back to your sql shell and try CREATE TABLE t.x AS SELECT generate_series FROM generate_series(1, 100100);.
We get an error: client must not pass intents to EndTransaction. Hmmm… let’s see where that comes from. We can find the error string in the TxnCoordSender, which is sanity-checking the EndTransaction request and asserting that the client hasn’t set any intent spans. It has this though: TODO(tschottdorf): it may be useful to allow this later. - I guess the time has come. Let’s wipe that check out.

@@ -366,12 +389,6 @@ func (tc *TxnCoordSender) Send(
                                        return nil, roachpb.NewErrorf("EndTransaction must not have a Key set")
                                }
                                et.Key = ba.Txn.Key
-                               if len(et.IntentSpans) > 0 {
-                                       // TODO(tschottdorf): it may be useful to allow this later.
-                                       // That would be part of a possible plan to allow txns which
-                                       // write on multiple coordinators.
-                                       return nil, roachpb.NewErrorf("client must not pass intents to EndTransaction")
-                               }
                        }
                }

AND NOW:

root@:26257/dd> CREATE TABLE codelab.series AS SELECT generate_series FROM generate_series(1, 100100);
SELECT 100100
root@:26257/dd> select count(1) from codelab.series;
+----------+
| count(1) |
+----------+
|   100100 |
+----------+
(1 row)
Let’s also do it from an explicit transaction to verify that all is well:
root@:26257/dd> begin;
Now adding input for a multi-line SQL transaction client-side.
Press Enter two times to send the SQL text collected so far to the server, or Ctrl+C to cancel.
             ->
BEGIN
root@:26257/dd  OPEN> CREATE TABLE codelab.series2 AS SELECT generate_series FROM generate_series(1, 100100);
SELECT 100100
root@:26257/dd  OPEN> commit;
COMMIT
root@:26257/dd> SELECT count(1) FROM codelab.series2;
+----------+
| count(1) |
+----------+
|   100100 |
+----------+
(1 row)

Appendix 1: EndTransaction execution and intent resolving

In this codelab, we’ve modified an EndTransaction request to carry a single intent span for a whole table instead of one intent key per table row. This obviously reduces the size of the EndTransaction request itself, which seems like a good thing, but the inquisitive reader might wonder what the downstream costs of evaluating and applying the EndTransaction requests are, and whether we actually accomplished much. In this appendix, we’ll explain what the execution of the EndTransaction entails, and we’ll let readers make up their own mind about the efficacy of the patch we developed.

So, you’re sending an EndTransaction because you want to commit a transaction. What needs to happen for this commit? Each transaction has a transaction record - a record, stored in the database, containing the current status of a transaction (pending/committed/aborted). As we now know, each piece of data written by a transaction is actually an intent; intents point to the transaction record, allowing a reader running into an intent to determine the disposition of the transaction. This is crdb’s design for implementing atomic transaction commit, in the face of transactions touching potentially multiple data ranges. Each data range corresponds to a Raft group, so doing things “atomically” at the level of a range is easy (courtesy of the Raft log), but operations acting across ranges need to agree on a single central data location for coordination.
So, when committing a transaction, the first thing we need to do is mark its transaction as committed. Now, functionally, a reader running into one of the transaction’s intents will be able to resolve the intent by figuring out that the transaction has been committed, but that’s now quite good enough. This process of resolving an intent involves making a round-trip to the transaction record, which is expensive. What we want is to clean up all of the transaction’s intents (turn them from intents into regular data). And additionally, we want to delete the transaction record itself, as it should continue taking up space for no reason. We can’t delete the transaction record until we clean up the intents (otherwise, attempting to resolve an intent whose txn record has been deleted results in erroneously considering the transaction aborted).
So this dictates the operations that need to happen as a result of an EndTransaction:
  1. mark the txn record as committed
  2. asynchronously cleanup all the intents
  3. once all intents have been successfully cleaned up, delete the transaction record.

Let’s see the mechanics of how this all happens.

Evaluating the EndTransaction (like evaluation of all KV requests), happens in replica_command.go. evalEndTransaction() generates the Raft command that, upon application, will mark the txn record as committed. Besides updating the status field of the txn record, this Raft command needs to do something else too: it needs to persist the intent spans inside the txn record itself. Remember that, up until this point, the intents spans were first held in memory, in the TxnCoordSender and more recently they were part of the EndTransaction request. But moving forward, since the process of cleaning up the intents is async, we need the intent spans persisted in the database. All these updates of the txn record happen in the timidly named updateTxnWithInternalIntents(). It is at this point that we observe an improvement that our patch has made - the Raft command that will carry these updates will only hold one intent span, not one per row. This will make the Raft command stay under the 64MB limit on each Raft proposal that we currently enforce.
Now that we’ve updated the txn record, let’s see what intent cleanup entails. We won’t track the code so much, as this cleanup can be triggered from multiple places: as a direct consequence of evaluating the EndTransaction, but, if that fails, the cleanup is also done completely async through a background processor. Much of the code involved is in the IntentResolver struct; the EndTransaction call into it is here. The cleanup is done through the sending of ResolveIntentRangeRequests; one such request is sent per intent span. As with all range requests, there’s a layer in the code splitting such a request into one per data range - this is the job of the DistSender. So, in the case of our intents laid down by the CREATE TABLE AS … statement, we’re going to be sending one single ResolveIntentRangeRequest that is going to get split by the DistSender into however many ranges the range splitting process has managed to create during the time we’ve been building this new table.

Each ResolveIntentRangeRequest is evaluated by scanning the range in question and constructing a Raft command that deletes the intent metadata from each intent encountered (thus transforming intents into regular data). In our case, there’s going to be one intent per table row, so the size of these Raft commands is proportional to the number of rows in each range. If you believe that the split process manages to keep ranges for our new table around the desired size of each range (64MB), then you can believe that these Raft commands are going to be reasonably sized.

Finally, after all this cleanup happens, the txn record is deleted.

Copyright (C) Cockroach Labs.
Attention: This documentation is provided on an "as is" basis, without warranties or conditions of any kind, either express or implied, including, without limitation, any warranties or conditions of title, non-infringement, merchantability, or fitness for a particular purpose.