From 4cee07d8a97bb0663e7bfbc3f2e1fbf539140adf Mon Sep 17 00:00:00 2001
From: Judah Schvimer <judah@mongodb.com>
Date: Mon, 21 May 2018 18:52:55 -0400
Subject: [PATCH] SERVER-35113 Allow single voting primaries to advance stable
 timestamp even when last applied does not advance

---
 .../replsets/storage_commit_out_of_order.js   | 74 +++++++++++++++++++
 src/mongo/db/repl/oplog.cpp                   | 11 +++
 .../db/repl/replication_coordinator_impl.cpp  |  5 ++
 3 files changed, 90 insertions(+)
 create mode 100644 jstests/replsets/storage_commit_out_of_order.js

diff --git a/jstests/replsets/storage_commit_out_of_order.js b/jstests/replsets/storage_commit_out_of_order.js
new file mode 100644
index 00000000000..d856d47cf7d
--- /dev/null
+++ b/jstests/replsets/storage_commit_out_of_order.js
@@ -0,0 +1,74 @@
+/**
+ * Tests that single voting primaries can commit majority writes when they storage-commit out of
+ * order. This test first inserts a document to set the last applied optime, all committed
+ * timestamp, and stable timestamp. It then spawns 'n' threads and holds them behind a barrier. Once
+ * the threads are all waiting at the barrier, the threads all do a w:majority insert. We turn on a
+ * fail point that will block the first thread to receive an optime from the optime generator for a
+ * few seconds while the other threads get later optimes and commit their inserts.  The hung thread
+ * is released after a few seconds and asserts that its write concern can be satisfied.
+ */
+(function() {
+    'use strict';
+
+    load('jstests/libs/parallelTester.js');
+
+    const rst = new ReplSetTest({nodes: 1});
+    rst.startSet();
+    rst.initiate();
+    const dbName = 'storage_commit_out_of_order';
+    const collName = 'foo';
+    const numThreads = 2;
+    const primary = rst.getPrimary();
+    const coll = primary.getDB(dbName).getCollection(collName);
+
+    /**
+     * Waits for the provided latch to reach 0 and then does a single w:majority insert.
+     */
+    const majorityInsert = function(num, host, dbName, collName, latch) {
+        const m = new Mongo(host);
+        latch.countDown();
+        while (latch.getCount() > 0) {
+            // do nothing
+        }
+        return m.getDB(dbName).runCommand({
+            insert: collName,
+            documents: [{b: num}],
+            writeConcern: {w: 'majority', wtimeout: ReplSetTest.kDefaultTimeoutMS}
+        });
+    };
+
+    assert.commandWorked(primary.setLogLevel(2, 'replication'));
+    assert.commandWorked(coll.insert(
+        {a: 1}, {writeConcern: {w: 'majority', wtimeout: ReplSetTest.kDefaultTimeoutMS}}));
+
+    // Turn on a fail point to force the first thread to receive an optime from the optime
+    // generator to wait a few seconds before storage-committing the insert.
+    assert.commandWorked(primary.adminCommand({
+        configureFailPoint: 'sleepBetweenInsertOpTimeGenerationAndLogOp',
+        mode: {times: 1},
+        data: {waitForMillis: 3000}
+    }));
+
+    // Start a bunch of threads. They will block waiting on the latch to hit 0.
+    const t = [];
+    const counter = new CountDownLatch(numThreads + 1);
+    for (let i = 0; i < numThreads; ++i) {
+        t[i] = new ScopedThread(majorityInsert, i, coll.getMongo().host, dbName, collName, counter);
+        t[i].start();
+    }
+
+    // Release the threads with the latch once they are all blocked on it.
+    jsTestLog('All threads started.');
+    assert.soon(() => counter.getCount() === 1);
+    jsTestLog('All threads at barrier.');
+    counter.countDown();
+    jsTestLog('All threads finishing.');
+
+    // Wait for all threads to complete and ensure they succeeded.
+    for (let i = 0; i < numThreads; ++i) {
+        t[i].join();
+        assert.commandWorked(t[i].returnData());
+    }
+
+    rst.stopSet();
+}());
\ No newline at end of file
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index bbeebdc85f9..d50da9506d1 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -111,6 +111,9 @@ using IndexVersion = IndexDescriptor::IndexVersion;
 
 namespace repl {
 namespace {
+
+MONGO_FP_DECLARE(sleepBetweenInsertOpTimeGenerationAndLogOp);
+
 /**
  * The `_localOplogCollection` pointer is always valid (or null) because an
  * operation must take the global exclusive lock to set the pointer to null when
@@ -526,6 +529,14 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
         opTimes.push_back(insertStatementOplogSlot.opTime);
     }
 
+    MONGO_FAIL_POINT_BLOCK(sleepBetweenInsertOpTimeGenerationAndLogOp, customWait) {
+        const BSONObj& data = customWait.getData();
+        auto numMillis = data["waitForMillis"].numberInt();
+        log() << "Sleeping for " << numMillis << "ms after receiving " << count << " optimes from "
+              << opTimes.front() << " to " << opTimes.back();
+        sleepmillis(numMillis);
+    }
+
     std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
     for (size_t i = 0; i < count; i++) {
         basePtrs[i] = &writers[i];
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 88d3e717419..d16dfcaa613 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1046,6 +1046,11 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opT
     if (opTime > _getMyLastAppliedOpTime_inlock()) {
         _setMyLastAppliedOpTime_inlock(opTime, false, consistency);
         _reportUpstream_inlock(std::move(lock));
+    } else if (consistency == DataConsistency::Consistent && _canAcceptNonLocalWrites &&
+               _rsConfig.getWriteMajority() == 1) {
+        // Single vote primaries may have a lagged stable timestamp due to paring back the stable
+        // timestamp to the all committed timestamp.
+        _setStableTimestampForStorage_inlock();
     }
 }
 
-- 
GitLab