You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
self.docstore.get_all_document_hashes() in the _handle_upserts method is every expensive.
But its return value is only used in the UPSERTS_AND_DELETE case.
Create a Ingestion Pipeline with doc_store, cache and vector_store and UPSERTS strategy
prepare lot of documents, in my cases 1.5k+
use pipeline run to load all of the documents
get a new document, and use pipeline run to load the new document only.
Because there are already 1.5k+ documents in the doc_store, the get_all_document_hashes method will be either very slow or very expensive depending on doc store backend.
In my cases I tested with DynamoDB doc_store and S3KVStore backed KVDocStore:
in the case of DynamoDB, it needs to do a full table scan
In the case of S3 backend, it takes 7 minutes to get all of the hashes.
But this is totally not necessary, as during UPSERT operation, we don't need to get all of the documents.
To workaround this issue, I implemented my own doc_store class that returns empty dict from get_all_document_hashes method, and the entire pipeline works correctly and much master
Relevant Logs/Tracbacks
No response
The text was updated successfully, but these errors were encountered:
Hey @headyin! I'm here to help you with any bugs, questions, or contributions you have. Let's tackle this issue together!
To address the issue of the expensive self.docstore.get_all_document_hashes() call during the UPSERTS strategy in the _handle_upserts method, you can optimize the function to only fetch the necessary document hashes on a per-node basis. This avoids fetching all document hashes upfront, which can be particularly slow with backends like DynamoDB and S3KVStore.
Here is an optimized version of the _handle_upserts function:
asyncdef_ahandle_upserts(
self,
nodes: List[BaseNode],
store_doc_text: bool=True,
) ->List[BaseNode]:
"""Handle docstore upserts by checking hashes and ids."""assertself.docstoreisnotNonedoc_ids_from_nodes=set()
deduped_nodes_to_run= {}
fornodeinnodes:
ref_doc_id=node.ref_doc_idifnode.ref_doc_idelsenode.id_doc_ids_from_nodes.add(ref_doc_id)
existing_hash=awaitself.docstore.aget_document_hash(ref_doc_id)
ifnotexisting_hash:
# document doesn't exist, so add itawaitself.docstore.aset_document_hash(ref_doc_id, node.hash)
deduped_nodes_to_run[ref_doc_id] =nodeelifexisting_hashandexisting_hash!=node.hash:
awaitself.docstore.adelete_ref_doc(ref_doc_id, raise_error=False)
ifself.vector_storeisnotNone:
awaitself.vector_store.adelete(ref_doc_id)
awaitself.docstore.aset_document_hash(ref_doc_id, node.hash)
deduped_nodes_to_run[ref_doc_id] =nodeelse:
continue# document exists and is unchanged, so skip itifself.docstore_strategy==DocstoreStrategy.UPSERTS_AND_DELETE:
# Identify missing docs and delete them from docstore and vector storeexisting_doc_ids_before=set((awaitself.docstore.aget_all_document_hashes()).values())
doc_ids_to_delete=existing_doc_ids_before-doc_ids_from_nodesforref_doc_idindoc_ids_to_delete:
awaitself.docstore.adelete_document(ref_doc_id)
ifself.vector_storeisnotNone:
awaitself.vector_store.adelete(ref_doc_id)
nodes_to_run=list(deduped_nodes_to_run.values())
awaitself.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)
returnnodes_to_run
This approach ensures that you only fetch the necessary document hashes for each node, thus avoiding the expensive call to self.docstore.get_all_document_hashes()[1][2].
Bug Description
self.docstore.get_all_document_hashes()
in the_handle_upserts
method is every expensive.But its return value is only used in the UPSERTS_AND_DELETE case.
llama_index/llama-index-core/llama_index/core/ingestion/pipeline.py
Line 429 in 08989d4
Version
0.10.50.post1
Steps to Reproduce
Sorry, I cannot share the code here.
But basically follow these steps:
Because there are already 1.5k+ documents in the doc_store, the
get_all_document_hashes
method will be either very slow or very expensive depending on doc store backend.In my cases I tested with DynamoDB doc_store and S3KVStore backed KVDocStore:
in the case of DynamoDB, it needs to do a full table scan
In the case of S3 backend, it takes 7 minutes to get all of the hashes.
But this is totally not necessary, as during UPSERT operation, we don't need to get all of the documents.
To workaround this issue, I implemented my own doc_store class that returns empty dict from
get_all_document_hashes
method, and the entire pipeline works correctly and much masterRelevant Logs/Tracbacks
No response
The text was updated successfully, but these errors were encountered: