Simple streaming example

Created on 7 February 2025, about 2 months ago

Problem/Motivation

My goal is to stream the output of Ai-assistant to a block. I've got the basic non-streamed version working like this:

$message = new UserMessage($keywords);
    $this->aiAssistantRunner->streamedOutput(FALSE);
    $this->aiAssistantRunner->setUserMessage($message);
    $response = $this->aiAssistantRunner->process();
    $text = $response->getNormalized()->getText();
    $converter = new \League\CommonMark\CommonMarkConverter();
    $formatted = $converter->convert($text)->__toString();
    $build['content'] = [
      '#markup' => $formatted,
    ];

But how do I stream it? I'm peeking through the `ai_chatbot` module, but not quite getting it yet...

Many thanks for any suggestions or ideas!

๐Ÿ’ฌ Support request
Status

Active

Version

1.0

Component

AI Assistants API

Created by

๐Ÿ‡ฉ๐Ÿ‡ชGermany dotist

Live updates comments and jobs are added and updated live.
Sign in to follow issues

Comments & Activities

  • Issue created by @dotist
  • ๐Ÿ‡ฉ๐Ÿ‡ชGermany dotist
  • ๐Ÿ‡ฉ๐Ÿ‡ชGermany dotist
  • ๐Ÿ‡ฉ๐Ÿ‡ชGermany dotist
  • ๐Ÿ‡ง๐Ÿ‡ชBelgium wouters_f Leuven

    I've added an example below.
    You can also find working code
    - here (backend)
    - and here (javascript)

    The (simplified a bit) example:
    Backend:

          $output = $provider->chat($input, $ai_model_to_use, ['my_module_tag']);
          $response = $output->getNormalized();
    return new StreamedResponse(function () use ($response) {
          $log_output = '';
          foreach ($responseas $part) {
            $item = [];
            $item['answer_piece'] = $part->getText();
            $out = Json::encode($item);
            echo $out . '|ยง|';
            ob_flush();
            flush();
          }
        }, 200, [
          'Cache-Control' => 'no-cache, must-revalidate',
          'Content-Type' => 'text/event-stream',
          'X-Accel-Buffering' => 'no',
        ]);
    

    And in the frontend you want to catch it like so:

    try {
                  var xhr = new XMLHttpRequest();
                  xhr.open('POST', drupalSettings.ai_search_block.submit_url, true);
                  xhr.setRequestHeader('Content-Type', 'application/json');
                  xhr.setRequestHeader('Accept', 'application/json');
    
                  // Cache variables to hold the full output.
                  var lastResponseLength = 0;
                  var joined = '';
    
                  xhr.onprogress = function () {
                    var responseText = xhr.responseText || '';
                    // Get only the new part of the response.
                    var newData = responseText.substring(lastResponseLength);
                    lastResponseLength = responseText.length;
    
                    // Split new data using the delimiter.
                    var chunks = newData.trim().split('|ยง|').filter(Boolean);
    
                    // Parse each chunk and accumulate the answer pieces.
                    chunks.forEach(function (chunk) {
                      try {
                        var parsed = JSON.parse(chunk);
                        joined += parsed.answer_piece || '';
                      } catch (e) {
                        console.error('Error parsing chunk:', e, chunk);
                      }
                    });
    
                    // Overwrite the full output (letting browsers fix broken HTML)
                    // and re-append the loader.
                    $resultsBlock.html(joined).append($loader);
                  };
    
                  xhr.onreadystatechange = function () {
                    if (xhr.readyState === 4) {
                      if (xhr.status === 200) {
                        // Remove the loader upon successful completion.
                        $loader.remove();
                        if ($suffixText.length) {
                          $suffixText.html(drupalSettings.ai_search_block.suffix_text);
                          Drupal.attachBehaviors($suffixText[0]);
                          $suffixText.show();
                        }
                        // (Optional) If needed, update log Id from final response here.
                      } else if (xhr.status === 500) {
                        $resultsBlock.html('An error happened.');
                        console.error('Error response:', xhr.responseText);
                        try {
                          var parsedError = JSON.parse(xhr.responseText);
                          if (parsedError.response && parsedError.response.answer_piece) {
                            $resultsBlock.html(parsedError.response.answer_piece);
                          }
                          Drupal.attachBehaviors($resultsBlock[0]);
                        } catch (e) {
                          console.error('Error parsing 500 response:', e);
                        }
                      }
                    }
                  };
    
                  // Send the streaming request.
                  xhr.send(
                      JSON.stringify({
                        query: queryVal,
                        stream: streamVal,
                        block_id: blockIdVal
                      })
                  );
                } catch (e) {
                  console.error('XHR error:', e);
                }
    
  • ๐Ÿ‡ง๐Ÿ‡ชBelgium wouters_f Leuven

    @dotist if this is sufficient you may close this issue.

  • ๐Ÿ‡ฉ๐Ÿ‡ชGermany dotist

    Thanks, this worked perfectly!

  • ๐Ÿ‡ฉ๐Ÿ‡ชGermany dotist
Production build 0.71.5 2024