-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Tracing With Langfuse #14591
Comments
The Langfuse tracing callback only works when the engine and index are re-initialized with each API call because the Langfuse SDKs queue and batch events in the background to reduce the number of network requests and improve overall performance. To ensure that all queued events are captured and sent to Langfuse servers, the callback handler needs to be flushed. Re-initializing the engine and index with each API call ensures that the callback handler is properly set up and flushed, capturing all necessary traces and metrics. To address this issue without re-initializing the engine and index with each API call, you can manually flush the callback handler at the end of each request. Here is an example of how you can modify your code to include the flush operation: def stream_generator(generator: Generator) -> Generator[str, Any, None]:
yield from (json.dumps({"type": "content_block", "text": text}) for text in generator)
client = qdrant_client.QdrantClient(...)
vector_store = QdrantVectorStore(client=client, ...)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
engine = index.as_chat_engine(
chat_mode="condense_plus_context",
streaming=True,
system_prompt=SYSTEM_PROMPT,
)
@router.post("/chat")
async def chat(request: ChatRequestBody):
langfuse_callback_handler.set_trace_params(tags=tags)
response_stream = engine.stream_chat(request.message, chat_history=request.history)
# Ensure all events are flushed to Langfuse
langfuse_callback_handler.flush()
return StreamingResponse(stream_generator(response_stream), media_type="application/x-ndjson") By adding |
@dosu i have tried |
To resolve the issue with the Langfuse tracing callback not functioning as expected, even after using
Ensure that all these steps are correctly followed. If the issue persists, double-check your API keys and network connectivity to Langfuse servers [1]. |
The reason why the second flow doesn't work is you have objects already in memory, and we cannot change the callback attached to them with updated params |
@logan-markewich so for user cases like serving a chatbot for multiple users, and also different API utilizing the LLM in a project. How to manage the tracing of langfuse to filter them say by user id. Secondly in another project i am using langfuse this for OpenAiAgnets where i can see the traces on the bases of different userid def stream_generator(
response: StreamingAgentChatResponse,
profile_id: str,
current_history: list[dict],
chat_id: str | ObjectId,
title: str | None = None,
chat_history: list[dict] | None = None,
exists: bool | None = False,
) -> Generator[str, None, None]:
yield from (
json.dumps({"type": "content_block", "text": text}) for text in response.response_gen
)
current_history.append({"role": "ASSISTANT", "content": response.response})
chat_id = mongodb.persist_chat_store(
profile_id, chat_id=chat_id, history=current_history, title=title, exists=exists
)
chat_history = chat_history + current_history if chat_history else current_history
chat_history = [d.model_dump() if isinstance(d, Message) else d for d in chat_history]
yield json.dumps({"type": "message_stop", "id": str(chat_id), "history": chat_history})
@router.post(
"/chat",
responses={
"200": {
"description": "Successful response will return a stream of JSON objects if stream true, else json response"
},
"500": {"model": ErrorResponse},
},
)
async def chat(body: ChatRequest):
try:
exists = bool(body.chatId)
title = ""
if not body.chatId:
response = llm.complete(
f"For the following query: {body.query}, Generate a title for the chat heading, without any additional text"
)
title = response.text
body.chatId = body.chatId or str(ObjectId())
langfuse_callback_handler.set_trace_params(
user_id=body.profileId, tags=[LANGFUSE_ENV_TAG], session_id=body.chatId
)
chat_history = load_history(body.history.copy()) if body.history else None
current_history = [{"role": "USER", "content": body.query}]
if body.stream:
response = agent.stream_chat(body.query, chat_history=chat_history)
return StreamingResponse(
stream_generator(
response,
body.profileId,
chat_id=body.chatId,
current_history=current_history,
chat_history=body.history,
title=title,
exists=exists,
),
media_type="application/x-ndjson",
)
else:
response = agent.chat(body.query, chat_history=chat_history)
current_history.append({"role": "ASSISTANT", "content": response.response})
chat_id = mongodb.persist_chat_store(
body.profileId,
chat_id=body.chatId,
history=current_history,
title=title,
exists=exists,
)
chat_history = chat_history + current_history if chat_history else current_history
return {"response": response.response, "id": str(chat_id), "history": chat_history}
except Exception as e:
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
) from e although here also i cannot see the function calling logs which i feel now is due to i have not redefining the agents with each api call |
we are working on the ux, but setting for multiple users isn't really possible unless you set everything up each time Not sure about the second thing |
Bug Description
When making APi calls langfuse tracing callback in only working when engine / index are initialized again with api call
working in this case:
Version
0.10.47
Steps to Reproduce
not working
Relevant Logs/Tracbacks
No response
The text was updated successfully, but these errors were encountered: