Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] [BUG] Primary to primary recovery (relocation) breaks with segment conflicts. #5242

Closed
Tracked by #5147
mch2 opened this issue Nov 14, 2022 · 5 comments
Assignees

Comments

@mch2
Copy link
Member

mch2 commented Nov 14, 2022

From #4665 & related . Primary-Primary recovery operates the same under segment replication as it does today by using Peer recovery.

This process works by copying segments out to the new primary and then any operations received during the copy duration to be replayed, followed by the relocation handoff. With segment replication the old primary shard will continue to copy out to other replicas during the relocation process. Once the new primary is recovered it will reindex the operations received. This means the new primary will reindex operations already sent out to the replication group, causing a segment conflict. This will cause the replicas to fail and recover again from the new primary.

Ideas to fix:

  1. Bump SegmentInfos counter on the new primary to write segments with a different name. Will mean segments containing the same ops will be copied twice. This conflict & solution is similar to the same problem encountered within failover steps. However, with failover we do not have the luxury of a graceful handoff.
  2. Recover the soon to be new primary as a replica. Once complete, block the old primary from copying out new segments to the group and force a segrep event to the new primary. Recover from xlog any ops received during segment copy & perform handoff.
  3. Block old primary from syncing to the rest of the replication group while relocation is in progress. Could cause replicas to fall far of sync with primary & after relocation is complete will trigger expensive syncs.
@dreamer-89
Copy link
Member

Looking into it

@dreamer-89
Copy link
Member

dreamer-89 commented Nov 16, 2022

Of three available fixes, third solution (block old primary from blocking replication) will cause delays on replicas and is not the considered. First solution (bump SegmentInfo counter) is hacky and also identifying ideal bump number is tricky to find.

Based on above, I will be following the second approach for this fix. Below is the rough plan for the same

  • Write integration test which mimics this scenario and fails.
  • Understand existing primary-primary relocation mechanism
    • Understand different phases in peer recovery
    • Translog recovery and handoff in primary-primary relocation case
  • Implementation
    • Recover target primary as replica initially
    • Ingest SegRetTargetService in IndexShard to allow force SegRep refresh capability
    • Keep state of whether SegRep is allowed on replicas or not from a primary. This is used to prevent SegRep on replicas from older primary
    • Update peer recovery logic to work with SegRep
  • Add unit test and ensure integration test passes

@dreamer-89
Copy link
Member

dreamer-89 commented Nov 18, 2022

With integration test below (thanks @Rishikesh1159 for sharing this); post relocation, the segment conflict occurs when new primary indexes a doc when segment replication event is triggered on older replica. The image below shows different steps during the relocation journey. No indexing operation is performed during the relocation in test case below. Need to identify how solution 2 handles the case:

  1. No doc indexed during relocation (covered by IT below).
  2. New doc index during phase 1
  3. New doc index during phase 2
  4. New doc index during handoff (this is currently blocked to avoid inconsistency).
    public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception{
        logger.info("--> starting [node1] ...");
        final String node_1 = internalCluster().startNode();

        logger.info("--> creating test index ...");
        prepareCreate(INDEX_NAME, Settings.builder().put("index.number_of_shards", 1)
            .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
            .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
            .put("index.number_of_replicas", 1)).get();

        final String node_2 = internalCluster().startNode();

        ensureGreen(INDEX_NAME);

        logger.info("--> index 1 doc");
        client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
        waitForReplicaUpdate();
        logger.info("--> verifying count");
        assertThat(client(node_1).prepareSearch(INDEX_NAME).setSize(0).setPreference("_local").execute().actionGet().getHits().getTotalHits().value, equalTo(1L));
        assertThat(client(node_2).prepareSearch(INDEX_NAME).setSize(0).setPreference("_local").execute().actionGet().getHits().getTotalHits().value, equalTo(1L));


        // If we do a flush before relocation, this test will pass.
//        flush(INDEX_NAME);


        logger.info("--> start another node");
        final String node_3 = internalCluster().startNode();
        ClusterHealthResponse clusterHealthResponse = client().admin()
            .cluster()
            .prepareHealth()
            .setWaitForEvents(Priority.LANGUID)
            .setWaitForNodes("3")
            .execute()
            .actionGet();
        assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

        logger.info("--> relocate the shard from node1 to node3");

        client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(INDEX_NAME, 0, node_1, node_3)).execute().actionGet();
        clusterHealthResponse = client().admin()
            .cluster()
            .prepareHealth()
            .setWaitForEvents(Priority.LANGUID)
            .setWaitForNoRelocatingShards(true)
            .setTimeout(ACCEPTABLE_RELOCATION_TIME)
            .execute()
            .actionGet();
        assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

        logger.info("--> Done relocating the shard from node1 to node3");

        client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); --> causes segment file conflicts
        waitForReplicaUpdate();
        logger.info("--> verifying count again...");
        client().admin().indices().prepareRefresh().execute().actionGet();
        assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(2L));
    }

5242 drawio

@dreamer-89
Copy link
Member

dreamer-89 commented Dec 31, 2022

Tried out the changes on 3gb data set with 3 data nodes. Force merged the segments to 1 and see the segrep round took 3472ms during relocation handoff; making the overall recovery time to 3.5sec. Unfortunately, getting persistent OOM heap errors even on new beefy ec2 host (c5.24xlarge), so couldn't run for different segment counts.

[2022-12-30T18:21:47,946][INFO ][o.o.i.r.PeerRecoverySourceService] [runTask-0] [geonames][0] starting recovery to {runTask-1}{J8UB_CreR3eLksYcTYTarg}{TaFmmo3lQayop7BDCQpNmw}{127.0.0.1}{127.0.0.1:9301}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}
[2022-12-30T18:21:47,931][INFO ][o.o.i.s.IndexShard       ] [runTask-1] [geonames][0] [geonames][0] writing shard state, reason [initial state with allocation id [[id=Ky7cfzVzQZ6u4-Lk_01-bQ, rId=UYXqn-CvTQ6yxYd8S02iog]]]
[2022-12-30T18:21:47,951][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.nvd], does not exist in remote
[2022-12-30T18:21:47,942][DEBUG][o.o.c.c.C.CoordinatorPublication] [runTask-1] publication ended successfully: Publication{term=4, version=37}
[2022-12-30T18:21:47,951][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3_Lucene90_0.tip], does not exist in remote
[2022-12-30T18:21:47,942][INFO ][o.o.i.r.PeerRecoveryTargetService] [runTask-1] [geonames][0] preparing shard for peer recovery
[2022-12-30T18:21:47,951][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3_Lucene90_0.dvm], does not exist in remote
[2022-12-30T18:21:47,943][INFO ][o.o.i.s.IndexShard       ] [runTask-1] [geonames][0] skip local recovery as no index commit found
[2022-12-30T18:21:47,951][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3_Lucene90_0.doc], does not exist in remote
[2022-12-30T18:21:47,943][INFO ][o.o.i.r.PeerRecoveryTargetService] [runTask-1] [geonames][0] collecting local files for [{runTask-0}{_66hjBZaQaGdr4eM81BbUg}{b2yeMvYhRvmcxjeNVUseBg}{127.0.0.1}{127.0.0.1:9300}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}]
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3_Lucene90_0.tim], does not exist in remote
[2022-12-30T18:21:47,944][INFO ][o.o.i.r.PeerRecoveryTargetService] [runTask-1] [geonames][0] [1] shard folder empty, recovering all files
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.nvm], does not exist in remote
[2022-12-30T18:21:47,944][INFO ][o.o.i.r.PeerRecoveryTargetService] [runTask-1] [geonames][0] local file count [0]
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.fnm], does not exist in remote
[2022-12-30T18:21:47,945][INFO ][o.o.i.r.PeerRecoveryTargetService] [runTask-1] [geonames][0] starting recovery from {runTask-0}{_66hjBZaQaGdr4eM81BbUg}{b2yeMvYhRvmcxjeNVUseBg}{127.0.0.1}{127.0.0.1:9300}{dimr}{testattr=test, shard_indexing_pressure_enabled=true}
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.kdd], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.si], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3_Lucene90_0.tmd], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.fdm], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.kdi], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.fdt], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3_Lucene90_0.pos], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.kdm], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3.fdx], does not exist in remote
[2022-12-30T18:21:47,952][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [_4e3_Lucene90_0.dvd], does not exist in remote
[2022-12-30T18:21:47,953][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering [segments_f], does not exist in remote
[2022-12-30T18:21:47,953][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: recovering_files [18] with total_size [2.7gb], reusing_files [0] with total_size [0b]
[2022-12-30T18:22:06,245][INFO ][o.o.i.s.IndexShard       ] [runTask-0] [geonames][0] background syncing retention leases [RetentionLeases{primaryTerm=3, version=82, leases={peer_recovery/_66hjBZaQaGdr4eM81BbUg=RetentionLease{id='peer_recovery/_66hjBZaQaGdr4eM81BbUg', retainingSequenceNumber=11396503, timestamp=1672421548475, source='peer recovery'}, peer_recovery/ZFdsWwXnQPaYw9zQCck9Eg=RetentionLease{id='peer_recovery/ZFdsWwXnQPaYw9zQCck9Eg', retainingSequenceNumber=11396503, timestamp=1672423345358, source='peer recovery'}}}] after expiration check
[2022-12-30T18:22:36,247][INFO ][o.o.i.s.IndexShard       ] [runTask-0] [geonames][0] background syncing retention leases [RetentionLeases{primaryTerm=3, version=82, leases={peer_recovery/_66hjBZaQaGdr4eM81BbUg=RetentionLease{id='peer_recovery/_66hjBZaQaGdr4eM81BbUg', retainingSequenceNumber=11396503, timestamp=1672421548475, source='peer recovery'}, peer_recovery/ZFdsWwXnQPaYw9zQCck9Eg=RetentionLease{id='peer_recovery/ZFdsWwXnQPaYw9zQCck9Eg', retainingSequenceNumber=11396503, timestamp=1672423345358, source='peer recovery'}}}] after expiration check
[2022-12-30T18:22:58,061][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] cloning primary's retention lease
[2022-12-30T18:22:58,067][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] cloned primary's retention lease as [RetentionLease{id='peer_recovery/J8UB_CreR3eLksYcTYTarg', retainingSequenceNumber=11396503, timestamp=1672424577973, source='peer recovery'}]
[2022-12-30T18:22:58,142][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: took [0s]
[2022-12-30T18:22:58,145][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: prepare remote engine for translog
[2022-12-30T18:22:58,204][INFO ][o.o.i.r.SegmentReplicationTarget] [runTask-1] [geonames][0] [shardId 0] Replica starting replication [id 2]
[2022-12-30T18:22:58,181][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase1]: remote engine start took [36.1ms]
[2022-12-30T18:22:58,213][INFO ][o.o.i.r.SegmentReplicationTarget] [runTask-1] [geonames][0] Replication diff RecoveryDiff{identical=[name [_4e3.nvd], length [71085134], checksum [30wnt9], writtenBy [9.5.0], name [_4e3_Lucene90_0.tip], length [12739182], checksum [a5f4c], writtenBy [9.5.0], name [_4e3_Lucene90_0.dvm], length [3312], checksum [1hw81ay], writtenBy [9.5.0], name [_4e3_Lucene90_0.doc], length [333209690], checksum [c5znp9], writtenBy [9.5.0], name [_4e3_Lucene90_0.tim], length [501129690], checksum [1akj3ua], writtenBy [9.5.0], name [_4e3.nvm], length [535], checksum [11ui58n], writtenBy [9.5.0], name [_4e3.fnm], length [4448], checksum [tac9zs], writtenBy [9.5.0], name [_4e3.kdd], length [202634252], checksum [est16h], writtenBy [9.5.0], name [_4e3.si], length [603], checksum [bdefyh], writtenBy [9.5.0], name [_4e3_Lucene90_0.tmd], length [2759], checksum [792l51], writtenBy [9.5.0], name [_4e3.fdm], length [2008], checksum [167u17q], writtenBy [9.5.0], name [_4e3.kdi], length [447867], checksum [5mqwzi], writtenBy [9.5.0], name [_4e3.fdt], length [1165257249], checksum [w6c6q7], writtenBy [9.5.0], name [_4e3_Lucene90_0.pos], length [52197628], checksum [1vwx6db], writtenBy [9.5.0], name [_4e3.kdm], length [412], checksum [1aqd8pn], writtenBy [9.5.0], name [_4e3.fdx], length [184379], checksum [ekorv3], writtenBy [9.5.0], name [_4e3_Lucene90_0.dvd], length [599322206], checksum [13axavd], writtenBy [9.5.0]], different=[], missing=[]}
[2022-12-30T18:22:58,183][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase2]: sending transaction log operations (from [11396503] to [11396502]
[2022-12-30T18:22:58,188][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] recovery [phase2]: took [5ms]
[2022-12-30T18:22:58,188][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] finalizing recovery
[2022-12-30T18:22:58,195][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] performing relocation hand-off
[2022-12-30T18:22:58,208][INFO ][o.o.i.r.SegmentReplicationSourceService] [runTask-0] [replication id 2] Source node sent checkpoint info [ReplicationCheckpoint{shardId=[geonames][0], primaryTerm=4, segmentsGen=15, seqNo=11396502, version=11414}] to target node [J8UB_CreR3eLksYcTYTarg], timing: 1
[2022-12-30T18:23:01,742][INFO ][o.o.i.r.RecoverySourceHandler] [runTask-0] [geonames][0][recover to runTask-1] finalizing recovery took [3.5s]
[2022-12-30T18:23:01,678][INFO ][o.o.i.r.RecoveryTarget   ] [runTask-1] [geonames][0] [shardId 0] [replication id 2] Replication complete, timing data: [Tuple [v1=INIT, v2=1], Tuple [v1=REPLICATING, v2=0], Tuple [v1=GET_CHECKPOINT_INFO, v2=4], Tuple [v1=FILE_DIFF, v2=4], Tuple [v1=GET_FILES, v2=1], Tuple [v1=FINALIZE_REPLICATION, v2=3461], Tuple [v1=OVERALL, v2=3472]]
ubuntu@ip-172-31-41-165:~$ curl localhost:9200/_cat/shards?v
index    shard prirep state       docs store ip        node
geonames 0     r      STARTED 11396503 2.7gb 127.0.0.1 runTask-2
geonames 0     p      STARTED 11396503 2.7gb 127.0.0.1 runTask-0
ubuntu@ip-172-31-41-165:~$ curl localhost:9200/_cat/segments?v
index    shard prirep ip        segment generation docs.count docs.deleted  size size.memory committed searchable version compound
geonames 0     r      127.0.0.1 _4e3          5691   11396503            0 2.7gb           0 true      true       9.5.0   false
geonames 0     p      127.0.0.1 _4e3          5691   11396503            0 2.7gb           0 true      true       9.5.0   false

@dreamer-89
Copy link
Member

Closing as #5344 is merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

No branches or pull requests

2 participants