-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: multi-worker IngestionPipeline does not work with caches. #14572
Comments
This is why I was hesitant to add multithreading to the ingestion pipeline -- a ton of objects are not pickle friendly 😅 I'm not sure what the fix would look like here. Maybe if the transformations and cache were accessed outside of the partial. Something like (feels hacky, but could work)
|
I don't think this will work because the local functions and objects defined within functions cannot be pickled. 😕 I think a potential workaround could be to pass the parameters needed to initialize the cache to each thread and have each threads re-initialize it's own cache object. This would require rewriting the IngestionPipeline to take in a cache_factory rather than the instance itself. I'll try this out and see how it goes. I think this would help with the cache situation, but the transformations would be a whole other problem. |
I feel like it will work actually... let me try the method above :) |
[2024-07-04T17:02:56.074Z] AttributeError: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper'
[2024-07-04T17:02:56.075Z] An unexpected error occurred: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper'
[2024-07-04T17:02:56.075Z] concurrent.futures.process._RemoteTraceback:
[2024-07-04T17:02:56.075Z] """
[2024-07-04T17:02:56.075Z] Traceback (most recent call last):
[2024-07-04T17:02:56.075Z] File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
[2024-07-04T17:02:56.075Z] obj = _ForkingPickler.dumps(obj)
[2024-07-04T17:02:56.075Z] ^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z] File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
[2024-07-04T17:02:56.075Z] cls(buf, protocol).dump(obj)
[2024-07-04T17:02:56.075Z] AttributeError: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper'
[2024-07-04T17:02:56.075Z] """
[2024-07-04T17:02:56.075Z]
[2024-07-04T17:02:56.075Z] The above exception was the direct cause of the following exception:
[2024-07-04T17:02:56.075Z]
[2024-07-04T17:02:56.075Z] Traceback (most recent call last):
[2024-07-04T17:02:56.075Z] File "/Users/falven/Source/AJG/src/ingestion/ingest/__init__.py", line 116, in ingest
[2024-07-04T17:02:56.075Z] await asyncio.gather(*ingestion_tasks)
[2024-07-04T17:02:56.075Z] File "/Users/falven/Source/AJG/src/ingestion/.venv/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 255, in async_wrapper
[2024-07-04T17:02:56.075Z] result = await func(*args, **kwargs)
[2024-07-04T17:02:56.075Z] ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z] File "/Users/falven/Source/AJG/src/ingestion/.venv/lib/python3.11/site-packages/llama_index/core/ingestion/pipeline.py", line 752, in arun
[2024-07-04T17:02:56.075Z] result: List[List[BaseNode]] = await asyncio.gather(*tasks)
[2024-07-04T17:02:56.075Z] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z] File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
[2024-07-04T17:02:56.075Z] obj = _ForkingPickler.dumps(obj)
[2024-07-04T17:02:56.075Z] ^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z] File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
[2024-07-04T17:02:56.075Z] cls(buf, protocol).dump(obj)
[2024-07-04T17:02:56.075Z] AttributeError: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper' Can't pickle local object 😕 |
lol just hit that too. Hmm |
But even the way I proposed probably wouldn't work because of the variety of caches. We would essentially need to pass not just the type of your |
I agree. I'm pretty sure there's a hacky way to do this and get around the pickling... messing around with a few things |
Bug Description
When creating an IngestionPipeline with
num_workers
, the pipeline divies up the work as follows:The problem here is that this partial function needs to be serializable to be able to be used in another thread. There could be issues with some non-serializable transformations, but for my simple parsing and embedding case it works fine. The problem comes in because I am using a Redis IngestionCache. The cache itself is not serializable and therefore this will throw an Exception.
I think it's specifically the
_redis_client
that is not serializable.I think a potential workaround could be to pass the parameters needed to initialize the cache to each thread and have each threads re-initialize it's own cache object. This would require rewriting the IngestionPipeline to take in a cache_factory rather than the object itself.
Version
0.10.52
Steps to Reproduce
Just create an IngestionPipeline that uses a Redis IngestionCache:
Relevant Logs/Tracbacks
No response
The text was updated successfully, but these errors were encountered: