1クール続けるブログ

とりあえず1クール続けるソフトウェアエンジニアの備忘録

MongoDBのReplicaSetでInitial Syncを実行するか判断している処理を追う

記事一覧はこちら

背景・モチベーション

MongoDBサポートの方から、MongoDB ReplicaSetのSecondaryをリストアする方法は2つあると聞きました。
1つはInitial Syncを利用してSecondayをseedする方法、もう1つはファイルを直接コピーしてSecondaryをseedする方法です。
後者はネットワークを介さずに直接データをコピーしてくるので非常に高速です。AWSであれば、EBSスナップショットからボリュームを作りattachするだけで良いので楽ですしね。
後者はInitial Syncが走らないわけですが、どのようにInitial Syncを行わない判定をしているのか非常に気になりました。
雰囲気でしかC++を読めないですが、処理を追っていこうと思います。

docs.mongodb.com

ちなみにMongoDB Universityを利用してMongoDBに入門した記事は下記です。

44smkn.hatenadiary.com

Initial Syncとは

docs.mongodb.com

データセットを最新の状態に維持するために、ReplicaSetのSecondaryメンバは他のメンバからデータを同期・複製する必要があります。
MongoDBでのデータの同期は2つの形式があります。1つが今回取り上げるInitial Syncで、もう一つがReplicationです。
前者は、新しいメンバに対してすべてのデータセットを与えます。そしてデータセットに対して継続的に変更を適用するのが後者です。

Initial Syncの実装方法は、Logical Initial SyncFile Copy Based Initial Syncの2つです。
後者は、SERVER-57803 を見る限り、v5.1.0からEnterprise Serverのみに実装されたようです。feature flagがtrueになるのもSERVER-52337から察するにv5.2.0からみたいなので、かなり新しい機能のようです。

Initial Sync自体の処理に関しては、かなりドキュメントが整備されています。

github.com

今回のInitial Syncを行うかという判断に関しては、実装方法に関わらない共通の処理でしたので処理が非常に追いやすいはずでした。
C++をあまりに雰囲気で読みすぎて時間がかかってしまった…。

データをコピーしてSecondaryのメンバをseedする処理

Restore a Replica Set from MongoDB Backups — MongoDB Manual

  1. ファイルシステムのスナップショットからデータベースのファイル群を取得する
  2. Standaloneでmongodを起動する
  3. Local DBを削除して一度シャットダウンする
  4. シングルノードの ReplicaSetとして起動する
  5. PrimaryのdbPath配下のファイルをSecondaryにコピーする、つまりLocalDBを一度削除して新しく作成されたReplicaSetのメタデータを保持している状態
  6. Secondaryを ReplicaSetに追加する

Initial Syncを実行するか判断する処理を追う

コードは読みやすいようにいくつか改変を加えています。

Initial Syncを実行するかどうかを明示的に判断している処理

Initial Syncを実行するかを判断しているのはreplication_coordinator_impl.cpp#L836-L852の中にある const auto needsInitialSync = lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(opCtx); という条件ですが後者は実行時の設定次第で変わるようなので、lastOpTime.isNull()の結果が肝要ではと推測します。
ではどのようにこの値を取得しているのかが気になってきます。

// replication_coordinator_impl.cpp#L836-L852
void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx) {
    // Check to see if we need to do an initial sync.
    const auto lastOpTime = getMyLastAppliedOpTime();
    const auto needsInitialSync =
        lastOpTime.isNull() || _externalState->isInitialSyncFlagSet(opCtx);
    if (!needsInitialSync) {
        LOGV2(4280512, "No initial sync required. Attempting to begin steady replication");
        // Start steady replication, since we already have data.
        // (omitted by author...)
        return;
    }
}

エントリポイントから追っていき、どのようにlastOpTimeが設定されているか探る

まず mongodのエントリポイントを確認します。
mongod.cppで呼ばれている関数を追っていくと、mongod_main.cpp#L707にて、replCoord->startup(startupOpCtx.get(), lastShutdownState); という処理が見つかります。
どうやらここで ReplicaSet関連の処理が呼ばれているようです。

// mongod.cpp
int main(int argc, char* argv[]) {
    mongo::quickExit(mongo::mongod_main(argc, argv));
}
---
// mongod_main.cpp#L707
ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
   if (!storageGlobalParams.readOnly) {
        auto replCoord = repl::ReplicationCoordinator::get(startupOpCtx.get());
        replCoord->startup(startupOpCtx.get(), lastShutdownState);
    }
}

呼ばれているstartup関数の中ではreplication_coordinator_impl.cpp#L929にてローカルストレージからレプリケーション設定の読み込みが宣言されているようです。
もし有効な設定であれば、replication_coordinator_impl.cpp#L683のようにコールバックでスケジュールされる _finishLoadLocalConfig 関数の中で、_setMyLastAppliedOpTimeAndWallTime()が呼ばれてoptimeを設定しています。
ただし、これが設定されるのはoptimeのエントリがLocalDBにある場合のみです。
replication_coordinator_external_state_impl.cpp#L786-L813 の関数が呼ばれていて、それを見るとLocalDBのoplog.rsの最新のエントリを取得していることが分かります。

// replication_coordinator_impl.cpp#L866-L867
void ReplicationCoordinatorImpl::startup(OperationContext* opCtx,
                                         StorageEngine::LastShutdownState lastShutdownState) {
    bool doneLoadingConfig = _startLoadLocalConfig(opCtx, lastShutdownState);
}
---
// replication_coordinator_impl.cpp#L683
void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
    const executor::TaskExecutor::CallbackArgs& cbData,
    const ReplSetConfig& localConfig,
    const StatusWith<OpTimeAndWallTime>& lastOpTimeAndWallTimeStatus,
    const StatusWith<LastVote>& lastVoteStatus) {

    OpTimeAndWallTime lastOpTimeAndWallTime = OpTimeAndWallTime();
    if (!isArbiter) {
        if (lastOpTimeAndWallTimeStatus.isOK()) {
            lastOpTimeAndWallTime = lastOpTimeAndWallTimeStatus.getValue();
        }
    }

    const auto lastOpTime = lastOpTimeAndWallTime.opTime;
    // Set our last applied and durable optimes to the top of the oplog, if we have one.
    if (!lastOpTime.isNull()) {
        _setMyLastAppliedOpTimeAndWallTime(lock, lastOpTimeAndWallTime, isRollbackAllowed);
    } 
}
---
// replication_coordinator_external_state_impl.cpp#L786-L813
StatusWith<OpTimeAndWallTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTimeAndWallTime(
    OperationContext* opCtx) {
    try {
if (!writeConflictRetry(
                opCtx, "Load last opTime", NamespaceString::kRsOplogNamespace.ns().c_str(), [&] {
                    return Helpers::getLast(
                        opCtx, NamespaceString::kRsOplogNamespace.ns().c_str(), oplogEntry);
                })) { /* ... */ }
    }
}
---
// namespace_string.cpp
const NamespaceString NamespaceString::kRsOplogNamespace(NamespaceString::kLocalDb, "oplog.rs");
---
// namespace_string.h
class NamespaceString {
    // Namespace for the local database
    static constexpr StringData kLocalDb = "local"_sd;
}

まとめ

実際にコードを追っていくと、ドキュメントに書いてある手順も非常に腑に落ちていいですね。
ファイルの読み込みをしてオブジェクトにmapして、そのメンバの値によって処理を行うという一連の流れを追えたのも良かったです。