Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] [BUG] Missing documents post ingestion during primary failover and relocation #5946

Closed
dreamer-89 opened this issue Jan 19, 2023 · 5 comments
Assignees
Labels
bug Something isn't working distributed framework

Comments

@dreamer-89
Copy link
Member

dreamer-89 commented Jan 19, 2023

Describe the bug
Coming from testing performed in #5898 on actual 3 node cluster, the searchable doc count does not match with the ingested doc count. I observed same with primary relocation behavior (backported #5344 & #5898 to test relocation behavior) as well. Cutting single issue as I believe the underlying cause is same for both.

Failover Case

To Reproduce

  1. Create an index with segrep enabled
curl -X PUT "http://localhost:9200/test-index" -H 'Content-Type: application/json' -d '{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "replication.type": "SEGMENT"
    }
  }
}'
  1. Ingest documents via script. Run this in multiple tabs. I used 3.
for i in {1..20000}
do
   curl --location --request POST "localhost:9200/test-index/_doc" \
    --header 'Content-Type: application/json' \
    --data-raw "{
      \"name\":\"abc${i}\"
    }"
    echo "\n"
done
  1. Verify 1 primary & 1 replica created
[root@ip-10-0-4-169 opensearch]# curl -X GET "localhost:9200/_cat/shards?v"
index      shard prirep state   docs store ip         node
test-index 0     p      STARTED    0  208b 10.0.5.122 ip-10-0-5-122.us-west-2.compute.internal
test-index 0     r      STARTED    0  208b 10.0.3.198 ip-10-0-3-198.us-west-2.compute.internal
  1. Kill node containing primary which instantly promotes replica as primary
[root@ip-10-0-5-122 ~]# ps -aux | grep opensearch
[root@ip-10-0-5-122 ~]# kill 2945

[root@ip-10-0-4-169 opensearch]# curl -X GET "localhost:9200/_cat/shards?v"
index      shard prirep state      docs   store ip         node
test-index 0     p      STARTED    2980 174.7kb 10.0.3.198 ip-10-0-3-198.us-west-2.compute.internal
test-index 0     r      UNASSIGNED
  1. Wait for ingestion to complete. The searchable doc count is less than ingested. Index refresh and flush don't have any impact on count.
[root@ip-10-0-4-169 opensearch]# curl -X GET "localhost:9200/_cat/shards?v"
index      shard prirep state    docs store ip         node
test-index 0     p      STARTED 59997 2.3mb 10.0.3.198 ip-10-0-3-198.us-west-2.compute.internal
test-index 0     r      STARTED 59997 3.1mb 10.0.4.82  ip-10-0-4-82.us-west-2.compute.internal

root@ip-10-0-4-169 opensearch]# curl -X POST localhost:9200/test-index/_refresh
{"_shards":{"total":2,"successful":2,"failed":0}}[root@ip-10-0-4-169 opensearch]#
[root@ip-10-0-4-169 opensearch]# curl -X GET "localhost:9200/_cat/shards?v"
index      shard prirep state    docs store ip         node
test-index 0     p      STARTED 59997 2.2mb 10.0.3.198 ip-10-0-3-198.us-west-2.compute.internal
test-index 0     r      STARTED 59997 2.2mb 10.0.4.82  ip-10-0-4-82.us-west-2.compute.internal

Expected behavior
All ingested documents should be searchable

Host/Environment (please complete the following information):

Additional context

  • Setup. 3 data node (c5.xlarge), 1 master (c5.xlarge)
@dreamer-89 dreamer-89 added bug Something isn't working untriaged labels Jan 19, 2023
@dreamer-89 dreamer-89 changed the title [Segment Replication] [BUG] Missing documents post ingestion with primary failover and relocation [Segment Replication] [BUG] Missing documents post ingestion during primary failover and relocation Jan 19, 2023
@dreamer-89
Copy link
Member Author

Tried this on latest changes on main and it is reproducible.

@dreamer-89
Copy link
Member Author

Had internal discussion with @mch2 and others on this where we decided to rule out data loss issue. Performed another round of test, added another doc post repro steps above. But, it does not solve the issue. Checking more.

@dreamer-89
Copy link
Member Author

dreamer-89 commented Jan 22, 2023

This issue appears pretty consistently with below integration test.

    public void testConcurrentIngestion() throws Exception {
        final String primary = internalCluster().startNode();
        createIndex(INDEX_NAME);
        final String replica = internalCluster().startNode();
        ensureGreen(INDEX_NAME);

        final int ingestionThreadCount = 3;
        final int docCount = 2000;
        final ConcurrentLinkedDeque<ActionFuture<IndexResponse>> pendingIndexResponses = new ConcurrentLinkedDeque<>();
        AtomicInteger integer = new AtomicInteger();
        Thread ingestionThreads[] = new Thread[3];
        for(int i=0;i<ingestionThreadCount;i++) {
            ingestionThreads[i] = new Thread(() -> {
                for (int j = 0; j < docCount; j++) {
                    pendingIndexResponses.add(
                        client().prepareIndex(INDEX_NAME)
                            .setId(Integer.toString(integer.incrementAndGet()))
                            .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
                            .setSource("field", "value" + j)
                            .execute()
                    );
                }
            });
        }
        for(int i=0;i<ingestionThreadCount;i++)  {
            ingestionThreads[i].start();
        }
        internalCluster().restartNode(primary);
        ensureGreen(INDEX_NAME);
        assertEquals(getNodeContainingPrimaryShard().getName(), replica);

        for(int i=0;i<ingestionThreadCount;i++) {
            ingestionThreads[i].join();
        }
        assertBusy(() -> {
            client().admin().indices().prepareRefresh().execute().actionGet();
            assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
        }, 1, TimeUnit.MINUTES);

        flushAndRefresh(INDEX_NAME);
        waitForReplicaUpdate();
        assertDocCounts(ingestionThreadCount * docCount, primary, replica);
    }

@dreamer-89
Copy link
Member Author

dreamer-89 commented Feb 8, 2023

Changing the assert on Future::Done to perform Future::get resolves the doc count mis-match on the integration test here. The exceptions from failover (primary change) during ingestion might be marking the future as completed without waiting it to complete.

Though, there are occasional failures due to NodeClosedException as shown below, which can be retried on client side. This issue can be tracked separately.

[2023-02-09T09:10:30,549][INFO ][o.o.i.r.SegmentReplicationIT] [testConcurrentIngestion] [seed=[A13AFB44AA8794E0:6A4A27234FBA3344]] after test
REPRODUCE WITH: ./gradlew ':server:internalClusterTest' --tests "org.opensearch.indices.replication.SegmentReplicationIT.testConcurrentIngestion {seed=[A13AFB44AA8794E0:6A4A27234FBA3344]}" -Dtests.seed=A13AFB44AA8794E0 -Dtests.opensearch.logger.level=INFO -Dtests.security.manager=true -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=fr -Dtests.timezone=Pacific/Kiritimati -Druntime.java=19

NodeClosedException[node closed {node_t1}{otNskrEbSMaTb9p3UH0SrA}{4Ugw13EpRPi0lEcKN59mmA}{127.0.0.1}{127.0.0.1:49840}{dimr}{shard_indexing_pressure_enabled=true}]
java.util.concurrent.ExecutionException: NodeClosedException[node closed {node_t1}{otNskrEbSMaTb9p3UH0SrA}{4Ugw13EpRPi0lEcKN59mmA}{127.0.0.1}{127.0.0.1:49840}{dimr}{shard_indexing_pressure_enabled=true}]
	at __randomizedtesting.SeedInfo.seed([A13AFB44AA8794E0:6A4A27234FBA3344]:0)
	at org.opensearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:286)
	at org.opensearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:273)
	at org.opensearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:104)
	at org.opensearch.indices.replication.SegmentReplicationIT.testConcurrentIngestion(SegmentReplicationIT.java:223)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)

Changed integration test

   public void testConcurrentIngestion() throws Exception {
        internalCluster().startClusterManagerOnlyNode();
        final String primary = internalCluster().startNode();
        createIndex(INDEX_NAME);
        final String replica = internalCluster().startNode();
        ensureGreen(INDEX_NAME);

        logger.info("--> Wait for green completed with state {}", client().admin().cluster().prepareState().execute().actionGet().getState());

        final int ingestionThreadCount = 2;
        final int docCount = 2;
        final ConcurrentLinkedDeque<ActionFuture<IndexResponse>> pendingIndexResponses = new ConcurrentLinkedDeque<>();
        AtomicInteger integer = new AtomicInteger();
        AtomicInteger valueInte = new AtomicInteger();
        Thread ingestionThreads[] = new Thread[ingestionThreadCount];
        for(int i=0;i<ingestionThreadCount;i++) {
            logger.info("--> Started ingestion");
            ingestionThreads[i] = new Thread(() -> {
                for (int j = 1; j <= docCount; j++) {
                    synchronized (this) {
                        pendingIndexResponses.add(
                            client().prepareIndex(INDEX_NAME)
                                .setSource("field", "value" + valueInte.getAndIncrement())
                                .setId(Integer.toString(integer.incrementAndGet()))
                                .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
                                .execute()
                        );
                    }
                }
            });
        }
        for(int i=0;i<ingestionThreadCount;i++)  {
            ingestionThreads[i].start();
        }
        logger.info("--> stop the primary");
        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
        ensureYellow(INDEX_NAME);
        assertEquals(getNodeContainingPrimaryShard().getName(), replica);

        for(int i=0;i<ingestionThreadCount;i++) {
            ingestionThreads[i].join();
        }
        client().admin().indices().prepareRefresh().execute().actionGet();
        for(ActionFuture<IndexResponse> response: pendingIndexResponses) {
            response.actionGet();
        }
        flushAndRefresh(INDEX_NAME);
        assertDocCounts(ingestionThreadCount * docCount, replica);
    }

@dreamer-89
Copy link
Member Author

dreamer-89 commented Feb 8, 2023

Generated a new min distribution from main and retried the repro steps from issue description above, the issue is not happening anymore. This may be resolved with #6122 which builds the segNo from SnapshotInfos snapshot (accurate) rather than live version (more recent) on primary.
Closing this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working distributed framework
Projects
Status: Done
Development

No branches or pull requests

2 participants