Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Unnecessary Expensive Operation during Pipeline UPSERTS Ingestion #14597

Open
headyin opened this issue Jul 5, 2024 · 1 comment
Open
Labels
bug Something isn't working P1

Comments

@headyin
Copy link

headyin commented Jul 5, 2024

Bug Description

self.docstore.get_all_document_hashes() in the _handle_upserts method is every expensive.
But its return value is only used in the UPSERTS_AND_DELETE case.

existing_doc_ids_before = set(self.docstore.get_all_document_hashes().values())

Version

0.10.50.post1

Steps to Reproduce

Sorry, I cannot share the code here.

But basically follow these steps:

  1. Create a Ingestion Pipeline with doc_store, cache and vector_store and UPSERTS strategy
  2. prepare lot of documents, in my cases 1.5k+
  3. use pipeline run to load all of the documents
  4. get a new document, and use pipeline run to load the new document only.

Because there are already 1.5k+ documents in the doc_store, the get_all_document_hashes method will be either very slow or very expensive depending on doc store backend.
In my cases I tested with DynamoDB doc_store and S3KVStore backed KVDocStore:
in the case of DynamoDB, it needs to do a full table scan
In the case of S3 backend, it takes 7 minutes to get all of the hashes.

But this is totally not necessary, as during UPSERT operation, we don't need to get all of the documents.

To workaround this issue, I implemented my own doc_store class that returns empty dict from get_all_document_hashes method, and the entire pipeline works correctly and much master

Relevant Logs/Tracbacks

No response

@headyin headyin added bug Something isn't working triage Issue needs to be triaged/prioritized labels Jul 5, 2024
Copy link

dosubot bot commented Jul 5, 2024

Hey @headyin! I'm here to help you with any bugs, questions, or contributions you have. Let's tackle this issue together!

To address the issue of the expensive self.docstore.get_all_document_hashes() call during the UPSERTS strategy in the _handle_upserts method, you can optimize the function to only fetch the necessary document hashes on a per-node basis. This avoids fetching all document hashes upfront, which can be particularly slow with backends like DynamoDB and S3KVStore.

Here is an optimized version of the _handle_upserts function:

async def _ahandle_upserts(
    self,
    nodes: List[BaseNode],
    store_doc_text: bool = True,
) -> List[BaseNode]:
    """Handle docstore upserts by checking hashes and ids."""
    assert self.docstore is not None

    doc_ids_from_nodes = set()
    deduped_nodes_to_run = {}
    for node in nodes:
        ref_doc_id = node.ref_doc_id if node.ref_doc_id else node.id_
        doc_ids_from_nodes.add(ref_doc_id)
        existing_hash = await self.docstore.aget_document_hash(ref_doc_id)
        if not existing_hash:
            # document doesn't exist, so add it
            await self.docstore.aset_document_hash(ref_doc_id, node.hash)
            deduped_nodes_to_run[ref_doc_id] = node
        elif existing_hash and existing_hash != node.hash:
            await self.docstore.adelete_ref_doc(ref_doc_id, raise_error=False)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

            await self.docstore.aset_document_hash(ref_doc_id, node.hash)

            deduped_nodes_to_run[ref_doc_id] = node
        else:
            continue  # document exists and is unchanged, so skip it

    if self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
        # Identify missing docs and delete them from docstore and vector store
        existing_doc_ids_before = set((await self.docstore.aget_all_document_hashes()).values())
        doc_ids_to_delete = existing_doc_ids_before - doc_ids_from_nodes
        for ref_doc_id in doc_ids_to_delete:
            await self.docstore.adelete_document(ref_doc_id)

            if self.vector_store is not None:
                await self.vector_store.adelete(ref_doc_id)

    nodes_to_run = list(deduped_nodes_to_run.values())
    await self.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)

    return nodes_to_run

This approach ensures that you only fetch the necessary document hashes for each node, thus avoiding the expensive call to self.docstore.get_all_document_hashes() [1][2].

To continue talking to Dosu, mention @dosu.

@logan-markewich logan-markewich added P1 and removed triage Issue needs to be triaged/prioritized labels Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1
2 participants