Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Tracing With Langfuse #14591

Open
anantgupta129 opened this issue Jul 5, 2024 · 6 comments
Open

[Bug]: Tracing With Langfuse #14591

anantgupta129 opened this issue Jul 5, 2024 · 6 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@anantgupta129
Copy link

Bug Description

When making APi calls langfuse tracing callback in only working when engine / index are initialized again with api call

working in this case:

def stream_generator(generator: Generator) -> Generator[str, Any, None]:
    yield from (json.dumps({"type": "content_block", "text": text}) for text in generator)


@router.post("/chat")
async def chat(request: ChatRequestBody):
    langfuse_callback_handler.set_trace_params(tags=tags)

    client = qdrant_client.QdrantClient(...)
    vector_store = QdrantVectorStore(client=client, ...)
    index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

    engine = index.as_chat_engine(
        chat_mode="condense_plus_context",
        streaming=True,
        system_prompt=SYSTEM_PROMPT,
    )
    response_stream = engine.stream_chat(request.message, chat_history=request.history)
    return StreamingResponse(stream_generator(response_stream), media_type="application/x-ndjson")

Version

0.10.47

Steps to Reproduce

not working

client = qdrant_client.QdrantClient(...)
vector_store = QdrantVectorStore(client=client, ...)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

engine = index.as_chat_engine(
    chat_mode="condense_plus_context",
    streaming=True,
    system_prompt=SYSTEM_PROMPT,
)

def stream_generator(generator: Generator) -> Generator[str, Any, None]:
    yield from (json.dumps({"type": "content_block", "text": text}) for text in generator)


@router.post("/chat")
async def chat(request: ChatRequestBody):
    langfuse_callback_handler.set_trace_params(tags=tags)

    response_stream = engine.stream_chat(request.message, chat_history=request.history)
    return StreamingResponse(stream_generator(response_stream), media_type="application/x-ndjson")

Relevant Logs/Tracbacks

No response

@anantgupta129 anantgupta129 added bug Something isn't working triage Issue needs to be triaged/prioritized labels Jul 5, 2024
Copy link

dosubot bot commented Jul 5, 2024

The Langfuse tracing callback only works when the engine and index are re-initialized with each API call because the Langfuse SDKs queue and batch events in the background to reduce the number of network requests and improve overall performance. To ensure that all queued events are captured and sent to Langfuse servers, the callback handler needs to be flushed. Re-initializing the engine and index with each API call ensures that the callback handler is properly set up and flushed, capturing all necessary traces and metrics.

To address this issue without re-initializing the engine and index with each API call, you can manually flush the callback handler at the end of each request. Here is an example of how you can modify your code to include the flush operation:

def stream_generator(generator: Generator) -> Generator[str, Any, None]:
    yield from (json.dumps({"type": "content_block", "text": text}) for text in generator)

client = qdrant_client.QdrantClient(...)
vector_store = QdrantVectorStore(client=client, ...)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

engine = index.as_chat_engine(
    chat_mode="condense_plus_context",
    streaming=True,
    system_prompt=SYSTEM_PROMPT,
)

@router.post("/chat")
async def chat(request: ChatRequestBody):
    langfuse_callback_handler.set_trace_params(tags=tags)

    response_stream = engine.stream_chat(request.message, chat_history=request.history)
    
    # Ensure all events are flushed to Langfuse
    langfuse_callback_handler.flush()
    
    return StreamingResponse(stream_generator(response_stream), media_type="application/x-ndjson")

By adding langfuse_callback_handler.flush() at the end of the request, you ensure that all queued events are sent to Langfuse servers without needing to re-initialize the engine and index with each API call [1].

To continue talking to Dosu, mention @dosu.

@anantgupta129
Copy link
Author

@dosu i have tried langfuse_callback_handler.flush() its not working

Copy link

dosubot bot commented Jul 5, 2024

To resolve the issue with the Langfuse tracing callback not functioning as expected, even after using langfuse_callback_handler.flush(), ensure that you have correctly set up and registered the LangfuseCallbackHandler. Here is a checklist based on the provided documentation:

  1. Install Required Packages:

    %pip install llama-index llama-index-callbacks-langfuse
  2. Configure Environment Variables:

    import os
    
    # Langfuse
    os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
    os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
    os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"  # 🇪🇺 EU region, 🇺🇸 US region: "https://us.cloud.langfuse.com"
    
    # OpenAI
    os.environ["OPENAI_API_KEY"] = "sk-..."
  3. Register the Langfuse Callback Handler:

    • Option 1: Set global LlamaIndex handler
      from llama_index.core import global_handler, set_global_handler
      
      set_global_handler("langfuse")
      langfuse_callback_handler = global_handler
    • Option 2: Use Langfuse callback directly
      from llama_index.core import Settings
      from llama_index.core.callbacks import CallbackManager
      from langfuse.llama_index import LlamaIndexCallbackHandler
      
      langfuse_callback_handler = LlamaIndexCallbackHandler()
      Settings.callback_manager = CallbackManager([langfuse_callback_handler])
  4. Flush Events to Langfuse:

    # ... your LlamaIndex calls here ...
    
    langfuse_callback_handler.flush()

Ensure that all these steps are correctly followed. If the issue persists, double-check your API keys and network connectivity to Langfuse servers [1].

To continue talking to Dosu, mention @dosu.

@logan-markewich
Copy link
Collaborator

langfuse_callback_handler.set_trace_params(tags=tags) -- this cannot change the callback that is already attached to existing objects in memory

The reason why the second flow doesn't work is you have objects already in memory, and we cannot change the callback attached to them with updated params

@anantgupta129
Copy link
Author

@logan-markewich so for user cases like serving a chatbot for multiple users, and also different API utilizing the LLM in a project. How to manage the tracing of langfuse to filter them say by user id.
What would be the best way

Secondly in another project i am using langfuse this for OpenAiAgnets where i can see the traces on the bases of different userid

def stream_generator(
    response: StreamingAgentChatResponse,
    profile_id: str,
    current_history: list[dict],
    chat_id: str | ObjectId,
    title: str | None = None,
    chat_history: list[dict] | None = None,
    exists: bool | None = False,
) -> Generator[str, None, None]:

    yield from (
        json.dumps({"type": "content_block", "text": text}) for text in response.response_gen
    )

    current_history.append({"role": "ASSISTANT", "content": response.response})
    chat_id = mongodb.persist_chat_store(
        profile_id, chat_id=chat_id, history=current_history, title=title, exists=exists
    )

    chat_history = chat_history + current_history if chat_history else current_history
    chat_history = [d.model_dump() if isinstance(d, Message) else d for d in chat_history]

    yield json.dumps({"type": "message_stop", "id": str(chat_id), "history": chat_history})


@router.post(
    "/chat",
    responses={
        "200": {
            "description": "Successful response will return a stream of JSON objects if stream true, else json response"
        },
        "500": {"model": ErrorResponse},
    },
)
async def chat(body: ChatRequest):
    try:
        exists = bool(body.chatId)
        title = ""
        if not body.chatId:
            response = llm.complete(
                f"For the following query: {body.query}, Generate a title for the chat heading, without any additional text"
            )
            title = response.text

        body.chatId = body.chatId or str(ObjectId())
        langfuse_callback_handler.set_trace_params(
            user_id=body.profileId, tags=[LANGFUSE_ENV_TAG], session_id=body.chatId
        )

        chat_history = load_history(body.history.copy()) if body.history else None
        current_history = [{"role": "USER", "content": body.query}]

        if body.stream:
            response = agent.stream_chat(body.query, chat_history=chat_history)
            return StreamingResponse(
                stream_generator(
                    response,
                    body.profileId,
                    chat_id=body.chatId,
                    current_history=current_history,
                    chat_history=body.history,
                    title=title,
                    exists=exists,
                ),
                media_type="application/x-ndjson",
            )
        else:
            response = agent.chat(body.query, chat_history=chat_history)

            current_history.append({"role": "ASSISTANT", "content": response.response})
            chat_id = mongodb.persist_chat_store(
                body.profileId,
                chat_id=body.chatId,
                history=current_history,
                title=title,
                exists=exists,
            )
            chat_history = chat_history + current_history if chat_history else current_history
            return {"response": response.response, "id": str(chat_id), "history": chat_history}

    except Exception as e:
        traceback.print_exc()
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
        ) from e

although here also i cannot see the function calling logs which i feel now is due to i have not redefining the agents with each api call

@logan-markewich
Copy link
Collaborator

we are working on the ux, but setting for multiple users isn't really possible unless you set everything up each time

Not sure about the second thing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
2 participants