Streaming Chat Tutorial
Build a chat interface with streaming responses, simulating an LLM conversation using Akka.NET.
What You'll Build
A streaming chat application with:
- Real-time streaming text responses
- Text input with prompt history
- "Thinking" indicator during processing
- Cancellation support
- Akka.NET actor integration
Prerequisites
This is an advanced tutorial. You should understand:
- Reactive properties
- Input handling
- Basic Akka.NET concepts
Project Setup
dotnet new console -n StreamingChatDemo
cd StreamingChatDemo
dotnet add package Termina
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Akka.HostingStep 1: Create the ViewModel
The ViewModel manages chat state and coordinates with Akka actors.
View complete StreamingChatViewModel.cs
// Copyright (c) Petabridge, LLC. All rights reserved.
// Licensed under the Apache 2.0 license. See LICENSE file in the project root for full license information.
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Akka.Actor;
using Akka.Hosting;
using Termina.Components.Streaming;
using Termina.Demo.Streaming.Actors;
using Termina.Layout;
using Termina.Reactive;
using Termina.Terminal;
namespace Termina.Demo.Streaming.Pages;
/// <summary>
/// Base interface for chat messages.
/// </summary>
public interface IChatMessage;
/// <summary>
/// Append regular text to the chat history.
/// </summary>
public sealed record AppendText(
string Text,
Color? Foreground = null,
TextDecoration Decoration = TextDecoration.None,
bool IsNewLine = false) : IChatMessage;
/// <summary>
/// Append a tracked text segment that can be manipulated later.
/// </summary>
public sealed record AppendTrackedSegment(SegmentId Id, ITextSegment Segment) : IChatMessage;
/// <summary>
/// Remove a tracked segment by ID.
/// </summary>
public sealed record RemoveTrackedSegment(SegmentId Id) : IChatMessage;
/// <summary>
/// Replace a tracked segment with new content.
/// </summary>
public sealed record ReplaceTrackedSegment(SegmentId Id, ITextSegment NewSegment, bool KeepTracked = false) : IChatMessage;
/// <summary>
/// Show a decision point with choices for the user.
/// </summary>
public sealed record ShowDecisionPoint(string Question, IReadOnlyList<LlmMessages.DecisionChoice> Choices) : IChatMessage;
/// <summary>
/// Hide the decision list (after selection or cancellation).
/// </summary>
public sealed record HideDecisionPoint : IChatMessage;
/// <summary>
/// ViewModel for the streaming chat demo.
/// Contains only state and business logic - no UI concerns.
/// Exposes observables for chat content that the Page subscribes to.
/// </summary>
public partial class StreamingChatViewModel : ReactiveViewModel
{
// Predefined segment IDs for tracked elements
private static readonly SegmentId ThinkingSpinnerId = new(1);
private static readonly SegmentId ThinkingBlockId = new(2);
private readonly IRequiredActor<LlmSimulatorActor> _llmActorProvider;
private readonly List<string> _promptHistory = new();
private int _historyIndex = -1;
private IActorRef? _llmActor;
private CancellationTokenSource? _generationCts;
private SpinnerSegment? _currentSpinner;
private bool _thinkingBlockShown;
// Subjects for chat output - Page subscribes to these
private readonly Subject<IChatMessage> _chatOutput = new();
private readonly Subject<string> _promptTextChanged = new();
/// <summary>
/// Observable for chat messages.
/// Includes text appends and tracked segment operations.
/// </summary>
public IObservable<IChatMessage> ChatOutput => _chatOutput.AsObservable();
/// <summary>
/// Observable for prompt text changes (for history navigation).
/// </summary>
public IObservable<string> PromptTextChanged => _promptTextChanged.AsObservable();
// Reactive properties for UI state
[Reactive] private bool _isGenerating = false;
[Reactive] private bool _hasReceivedText = false; // Tracks if any text has arrived yet
[Reactive] private string _statusMessage = "Ready. Enter a question to begin.";
[Reactive] private bool _showDecisionList = false; // Whether to show the decision list
// Track current decision context for follow-up
private string? _pendingDecisionContext;
public StreamingChatViewModel(IRequiredActor<LlmSimulatorActor> llmActorProvider)
{
_llmActorProvider = llmActorProvider;
}
/// <summary>
/// Emits the initial welcome message.
/// </summary>
public void EmitWelcomeMessage()
{
_chatOutput.OnNext(new AppendText("🤖 ", Color.Yellow));
_chatOutput.OnNext(new AppendText("Assistant: ", Color.Green, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText("Hello! I'm a simulated LLM demo.", Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText(" Ask me anything and watch the streaming response!", Color.BrightBlack, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
}
public override void OnActivated()
{
_ = InitializeAsync();
}
private async Task InitializeAsync()
{
_llmActor = await _llmActorProvider.GetAsync();
}
/// <summary>
/// Navigate up through prompt history.
/// </summary>
public void NavigateHistoryUp()
{
if (_promptHistory.Count == 0)
return;
if (_historyIndex < 0)
{
_historyIndex = _promptHistory.Count - 1;
}
else if (_historyIndex > 0)
{
_historyIndex--;
}
_promptTextChanged.OnNext(_promptHistory[_historyIndex]);
}
/// <summary>
/// Navigate down through prompt history.
/// </summary>
public void NavigateHistoryDown()
{
if (_historyIndex < 0)
return;
if (_historyIndex < _promptHistory.Count - 1)
{
_historyIndex++;
_promptTextChanged.OnNext(_promptHistory[_historyIndex]);
}
else
{
_historyIndex = -1;
_promptTextChanged.OnNext("");
}
}
/// <summary>
/// Cancel the current generation.
/// </summary>
public void CancelGeneration()
{
_generationCts?.Cancel();
_chatOutput.OnNext(new AppendText(" [cancelled]", Color.Yellow, TextDecoration.Italic, IsNewLine: true));
CleanupGeneration();
StatusMessage = "Generation cancelled.";
}
/// <summary>
/// Handle prompt submission.
/// </summary>
public void HandleSubmit(string prompt)
{
prompt = prompt.Trim();
if (string.IsNullOrEmpty(prompt))
return;
// Add to history and reset index
_promptHistory.Add(prompt);
_historyIndex = -1;
// Add user message to chat
_chatOutput.OnNext(new AppendText("👤 ", Color.Cyan));
_chatOutput.OnNext(new AppendText("You: ", Color.Cyan, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText(prompt, Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
StartGeneration(prompt, decisionContext: null);
}
/// <summary>
/// Handle decision selection from the SelectionListNode.
/// </summary>
public void HandleDecisionSelection(string choiceTitle)
{
// Hide the decision list
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
// Show the user's choice in the chat
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText(" → ", Color.BrightBlack));
_chatOutput.OnNext(new AppendText(choiceTitle, Color.Cyan, TextDecoration.Bold, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
// Start follow-up generation with the decision context
StartGeneration("continue", decisionContext: choiceTitle);
}
/// <summary>
/// Handle decision cancellation.
/// </summary>
public void HandleDecisionCancelled()
{
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
_chatOutput.OnNext(new AppendText(" [decision skipped]", Color.Yellow, TextDecoration.Italic, IsNewLine: true));
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
}
/// <summary>
/// Handle custom prompt from "Something else..." option.
/// </summary>
public void HandleCustomPrompt(string customPrompt)
{
customPrompt = customPrompt.Trim();
if (string.IsNullOrEmpty(customPrompt))
{
HandleDecisionCancelled();
return;
}
// Hide the decision list
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
// Show the custom prompt as user input
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText("👤 ", Color.Cyan));
_chatOutput.OnNext(new AppendText("You: ", Color.Cyan, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText(customPrompt, Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
// Add to history
_promptHistory.Add(customPrompt);
_historyIndex = -1;
// Start generation with the custom prompt (not as decision context)
StartGeneration(customPrompt, decisionContext: null);
}
private void StartGeneration(string prompt, string? decisionContext)
{
// Add Assistant prefix and append animated spinner
_chatOutput.OnNext(new AppendText("🤖 ", Color.Yellow));
_chatOutput.OnNext(new AppendText("Assistant: ", Color.Green, TextDecoration.Bold));
// Append tracked spinner - will be replaced when first text arrives
_currentSpinner = new SpinnerSegment(Components.Streaming.SpinnerStyle.Dots, Color.Red);
_chatOutput.OnNext(new AppendTrackedSegment(ThinkingSpinnerId, _currentSpinner));
// Start generation
IsGenerating = true;
HasReceivedText = false;
_thinkingBlockShown = false;
StatusMessage = "Generating response...";
_ = ConsumeResponseStreamAsync(prompt, decisionContext);
}
private async Task ConsumeResponseStreamAsync(string prompt, string? decisionContext = null)
{
if (_llmActor is null)
{
StatusMessage = "Error: Actor not initialized";
IsGenerating = false;
return;
}
var completedNormally = false;
try
{
var response = await _llmActor.Ask<LlmMessages.GenerateResponse>(
new LlmMessages.GenerateRequest(prompt, decisionContext),
TimeSpan.FromSeconds(30));
_generationCts = response.Cancellation;
await foreach (var token in response.TokenStream.WithCancellation(_generationCts.Token))
{
switch (token)
{
case LlmMessages.ThinkingToken thinking:
// On first thinking token, replace spinner with thinking block
if (!_thinkingBlockShown)
{
_chatOutput.OnNext(new ReplaceTrackedSegment(
ThinkingSpinnerId,
new StaticTextSegment("", TextStyle.Default),
KeepTracked: false));
_currentSpinner?.Dispose();
_currentSpinner = null;
// Add thinking block that will update in place
var thinkingSegment = new StaticTextSegment(
$"💭 {thinking.Text}",
new TextStyle { Foreground = Color.BrightBlack, Decoration = TextDecoration.Italic }).AsBlock();
_chatOutput.OnNext(new AppendTrackedSegment(ThinkingBlockId, thinkingSegment));
_thinkingBlockShown = true;
}
else
{
// Update existing thinking block
var thinkingSegment = new StaticTextSegment(
$"💭 {thinking.Text}",
new TextStyle { Foreground = Color.BrightBlack, Decoration = TextDecoration.Italic }).AsBlock();
_chatOutput.OnNext(new ReplaceTrackedSegment(ThinkingBlockId, thinkingSegment, KeepTracked: true));
}
break;
case LlmMessages.TextChunk chunk:
// On first text chunk, remove thinking block
if (!HasReceivedText)
{
_chatOutput.OnNext(new RemoveTrackedSegment(ThinkingBlockId));
HasReceivedText = true;
}
_chatOutput.OnNext(new AppendText(chunk.Text));
break;
case LlmMessages.GenerationComplete:
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
completedNormally = true;
break;
case LlmMessages.DecisionPointToken decision:
// On first content (decision point), remove thinking block
if (!HasReceivedText)
{
_chatOutput.OnNext(new RemoveTrackedSegment(ThinkingBlockId));
HasReceivedText = true;
}
// Show the decision list
_chatOutput.OnNext(new ShowDecisionPoint(decision.Question, decision.Choices));
ShowDecisionList = true;
StatusMessage = "Make a selection below...";
// We don't mark this as complete - we wait for user to make a decision
return;
}
}
}
catch (OperationCanceledException)
{
// Normal cancellation
}
catch (Exception ex)
{
_chatOutput.OnNext(new AppendText(" [error: ", Color.Red));
_chatOutput.OnNext(new AppendText(ex.Message, Color.Red, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText("]", Color.Red, IsNewLine: true));
CleanupGeneration();
StatusMessage = $"Error: {ex.Message}";
}
finally
{
if (!completedNormally && IsGenerating)
{
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
}
}
}
private void CleanupGeneration()
{
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
IsGenerating = false;
_generationCts?.Dispose();
_generationCts = null;
}
public override void Dispose()
{
_currentSpinner?.Dispose();
_chatOutput.Dispose();
_promptTextChanged.Dispose();
DisposeReactiveFields();
base.Dispose();
}
}Key Points
Streaming Output via Observables
The ViewModel exposes observables for chat content that the Page subscribes to:
// ViewModel exposes output streams
public IObservable<ChatTextSegment> ChatOutput => _chatOutput.AsObservable();
public IObservable<ChatTextSegment> ThinkingOutput => _thinkingOutput.AsObservable();
public IObservable<Unit> ClearThinking => _clearThinking.AsObservable();
// Emit chat content
_chatOutput.OnNext(new ChatTextSegment("Hello!", Color.White, IsNewLine: true));Text Input Coordination
The ViewModel exposes an observable for prompt text changes (for history navigation):
public IObservable<string> PromptTextChanged => _promptTextChanged.AsObservable();
// Navigate history
_promptTextChanged.OnNext(_promptHistory[_historyIndex]);Async Stream Consumption
await foreach (var token in response.TokenStream.WithCancellation(cts.Token))
{
switch (token)
{
case TextChunk chunk:
ChatHistory.Append(chunk.Text);
break;
}
}Use IAsyncEnumerable to process streaming data.
Step 2: Create the Page
The Page renders the chat interface.
View complete StreamingChatPage.cs
// Copyright (c) Petabridge, LLC. All rights reserved.
// Licensed under the Apache 2.0 license. See LICENSE file in the project root for full license information.
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Termina.Components.Streaming;
using Termina.Demo.Streaming.Actors;
using Termina.Extensions;
using Termina.Input;
using Termina.Layout;
using Termina.Reactive;
using Termina.Rendering;
using Termina.Terminal;
namespace Termina.Demo.Streaming.Pages;
/// <summary>
/// Demo page showing streaming text components with LLM simulation.
/// Handles all UI concerns including layout nodes, focus, and input routing.
/// </summary>
public class StreamingChatPage : ReactivePage<StreamingChatViewModel>
{
// Layout nodes owned by the Page
private StreamingTextNode _chatHistory = null!;
private TextInputNode _promptInput = null!;
// Decision list for interactive choices
private SelectionListNode<LlmMessages.DecisionChoice>? _decisionList;
private readonly Subject<ILayoutNode?> _decisionListChanged = new();
protected override void OnBound()
{
// Create layout nodes
_chatHistory = StreamingTextNode.Create()
.WithPrefix(" ", Color.Gray);
_promptInput = new TextInputNode()
.WithPlaceholder("Enter your question...")
.WithForeground(Color.Cyan);
// Subscribe to ViewModel chat output and update nodes
ViewModel.ChatOutput
.Subscribe(message =>
{
switch (message)
{
case AppendText text:
if (text.IsNewLine)
_chatHistory.AppendLine(text.Text, text.Foreground, null, text.Decoration);
else
_chatHistory.Append(text.Text, text.Foreground, null, text.Decoration);
break;
case AppendTrackedSegment tracked:
_chatHistory.AppendTracked(tracked.Id, tracked.Segment);
break;
case RemoveTrackedSegment remove:
_chatHistory.Remove(remove.Id);
break;
case ReplaceTrackedSegment replace:
_chatHistory.Replace(replace.Id, replace.NewSegment, replace.KeepTracked);
break;
case ShowDecisionPoint decision:
ShowDecisionList(decision);
break;
case HideDecisionPoint:
HideDecisionList();
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}
})
.DisposeWith(Subscriptions);
ViewModel.PromptTextChanged
.Subscribe(text => _promptInput.Text = text)
.DisposeWith(Subscriptions);
// Subscribe to prompt input submission
_promptInput.Submitted
.Subscribe(text =>
{
ViewModel.HandleSubmit(text);
_promptInput.Clear();
})
.DisposeWith(Subscriptions);
// Handle keyboard input - Page routes to interactive layout nodes
ViewModel.Input.OfType<KeyPressed>()
.Subscribe(HandleKeyPress)
.DisposeWith(Subscriptions);
// Emit welcome message
ViewModel.EmitWelcomeMessage();
}
private void HandleKeyPress(KeyPressed key)
{
var keyInfo = key.KeyInfo;
// Ctrl+Q always quits
if (keyInfo.Key == ConsoleKey.Q && keyInfo.Modifiers.HasFlag(ConsoleModifiers.Control))
{
ViewModel.RequestShutdown();
return;
}
// When decision list is visible, route input to it
if (ViewModel.ShowDecisionList && _decisionList != null)
{
// Escape cancels the decision
if (keyInfo.Key == ConsoleKey.Escape)
{
ViewModel.HandleDecisionCancelled();
return;
}
// Let the selection list handle the input
_decisionList.HandleInput(keyInfo);
return;
}
// Escape handling
if (keyInfo.Key == ConsoleKey.Escape)
{
if (ViewModel.IsGenerating)
{
ViewModel.CancelGeneration();
}
else
{
ViewModel.RequestShutdown();
}
return;
}
// Page Up/Down scroll chat history
if (_chatHistory.HandleInput(keyInfo, viewportHeight: 10, viewportWidth: 80))
{
return;
}
// When not generating, handle other input
if (!ViewModel.IsGenerating)
{
if (keyInfo.Key == ConsoleKey.UpArrow)
{
ViewModel.NavigateHistoryUp();
return;
}
if (keyInfo.Key == ConsoleKey.DownArrow)
{
ViewModel.NavigateHistoryDown();
return;
}
// Let the text input handle other keys
_promptInput.HandleInput(keyInfo);
}
}
private void ShowDecisionList(ShowDecisionPoint decision)
{
// Dispose any existing decision list
_decisionList?.Dispose();
// Create rich content for each choice - inline style, no panel
_decisionList = new SelectionListNode<LlmMessages.DecisionChoice>(
decision.Choices,
choice => new SelectionItemContent()
.AddLine(
new StaticTextSegment(choice.Title, Color.White, decoration: TextDecoration.Bold),
new StaticTextSegment(" ", Color.Default),
new StaticTextSegment($"[{choice.Category}]", Color.BrightBlack))
.AddLine(
new StaticTextSegment(" " + choice.Description, Color.Gray)))
.WithHighlightColors(Color.Black, Color.Cyan)
.WithForeground(Color.White)
.WithVisibleRows(8)
.WithShowNumbers(true)
.WithOtherOption("Something else...");
// Subscribe to selection confirmation
_decisionList.SelectionConfirmed
.Subscribe(selected =>
{
if (selected.Count > 0)
{
ViewModel.HandleDecisionSelection(selected[0].Title);
}
})
.DisposeWith(Subscriptions);
// Subscribe to "Something else..." custom input
_decisionList.OtherSelected
.Subscribe(customPrompt =>
{
ViewModel.HandleCustomPrompt(customPrompt);
})
.DisposeWith(Subscriptions);
// Subscribe to cancellation
_decisionList.Cancelled
.Subscribe(_ => ViewModel.HandleDecisionCancelled())
.DisposeWith(Subscriptions);
// Focus the decision list
_decisionList.OnFocused();
// Signal layout change - just the list node itself, no panel wrapper
_decisionListChanged.OnNext(_decisionList);
}
private void HideDecisionList()
{
_decisionList?.Dispose();
_decisionList = null;
_decisionListChanged.OnNext(null);
}
public override ILayoutNode BuildLayout()
{
return Layouts.Vertical()
// Header
.WithChild(
new TextNode("🤖 Streaming Chat Demo - Simulated LLM with Akka.NET")
.WithForeground(Color.Cyan)
.Bold()
.Height(1))
.WithChild(new EmptyNode().Height(1))
// Chat history panel - fills available space
.WithChild(
new PanelNode()
.WithTitle("Chat History")
.WithTitleColor(Color.Yellow)
.WithBorder(BorderStyle.Rounded)
.WithBorderColor(Color.Gray)
.WithContent(_chatHistory.Fill())
.Fill())
.WithChild(new EmptyNode().Height(1))
// Decision list - appears between chat history and input when active
.WithChild(
_decisionListChanged
.StartWith((ILayoutNode?)null)
.Select(decisionList => decisionList == null
? (ILayoutNode)new EmptyNode().Height(0)
: Layouts.Vertical()
.WithChild(new TextNode(" Choose an option:").WithForeground(Color.Yellow).Height(1))
.WithChild(decisionList)
.HeightAuto()) // Auto-size to content, don't compete with Fill() elements
.AsLayout())
// Input panel - always visible
.WithChild(
new PanelNode()
.WithTitle("Your Prompt")
.WithTitleColor(Color.Cyan)
.WithBorder(BorderStyle.Rounded)
.WithBorderColor(Color.Cyan)
.WithContent(_promptInput)
.Height(3))
// Status bar
.WithChild(
Observable.CombineLatest(
ViewModel.IsGeneratingChanged,
ViewModel.ShowDecisionListChanged.StartWith(false),
(isGenerating, showDecision) => showDecision
? "[↑/↓] Navigate [Enter] Select [1-4] Quick Select [Esc] Skip"
: isGenerating
? "[Esc] Cancel [PgUp/PgDn] Scroll [Ctrl+Q] Quit"
: "[Enter] Send [↑/↓] History [PgUp/PgDn] Scroll [Esc] Clear/Quit [Ctrl+Q] Quit")
.Select(text => new TextNode(text).WithForeground(Color.BrightBlack).NoWrap())
.AsLayout()
.Height(1))
.WithChild(
ViewModel.StatusMessageChanged
.Select(msg => new TextNode(msg).WithForeground(Color.White))
.AsLayout()
.Height(1));
}
}Key Points
Page Owns Layout Nodes
The Page creates and owns interactive layout nodes:
private StreamingTextNode _chatHistory = null!;
private TextInputNode _promptInput = null!;
protected override void OnBound()
{
_chatHistory = StreamingTextNode.Create().WithPrefix(" ", Color.Gray);
_promptInput = new TextInputNode().WithPlaceholder("Enter your question...");
// Subscribe to ViewModel's output streams
ViewModel.ChatOutput.Subscribe(segment => _chatHistory.Append(segment.Text));
}Input Routing via ViewModel.Input
The Page accesses ViewModel.Input to route input to interactive layout nodes:
// Page subscribes to ViewModel's input for routing to layout nodes
ViewModel.Input.OfType<KeyPressed>()
.Subscribe(HandleKeyPress)
.DisposeWith(Subscriptions);
private void HandleKeyPress(KeyPressed key)
{
// Route to scrollable chat history
if (_chatHistory.HandleInput(key.KeyInfo, viewportHeight: 10, viewportWidth: 80))
return;
// Route to text input
_promptInput.HandleInput(key.KeyInfo);
}Conditional Panels
ViewModel.IsGeneratingChanged
.Select(isGenerating => isGenerating && _thinkingIndicator.Buffer.HasContent
? BuildThinkingPanel()
: new EmptyNode())
.AsLayout()Show/hide panels based on state.
Step 3: Create the Actor
The LLM simulator actor produces streaming tokens.
View complete LlmSimulatorActor.cs
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
namespace Termina.Demo.Streaming.Actors;
/// <summary>
/// Messages for the LLM simulator actor.
/// </summary>
public static class LlmMessages
{
/// <summary>
/// Request to generate a response for a prompt.
/// Returns an IAsyncEnumerable of StreamToken via a channel.
/// </summary>
public record GenerateRequest(string Prompt, string? DecisionContext = null);
/// <summary>
/// Response containing the async stream of tokens.
/// </summary>
public record GenerateResponse(IAsyncEnumerable<StreamToken> TokenStream, CancellationTokenSource Cancellation);
/// <summary>
/// A token from the stream (either thinking or text).
/// </summary>
public abstract record StreamToken;
/// <summary>
/// A thinking token (intermediate progress).
/// </summary>
public record ThinkingToken(string Text) : StreamToken;
/// <summary>
/// A chunk of generated text.
/// </summary>
public record TextChunk(string Text) : StreamToken;
/// <summary>
/// A decision point where the user must choose from options.
/// </summary>
public record DecisionPointToken(string Question, IReadOnlyList<DecisionChoice> Choices) : StreamToken;
/// <summary>
/// A choice for a decision point with title and description.
/// </summary>
public record DecisionChoice(string Title, string Description, string Category);
/// <summary>
/// Signal that generation is complete.
/// </summary>
public record GenerationComplete : StreamToken;
}
/// <summary>
/// Actor that simulates LLM streaming behavior using Akka Streams.
/// Returns an IAsyncEnumerable via Channel for consumption by StreamingText components.
/// </summary>
public class LlmSimulatorActor : ReceiveActor
{
private readonly ActorMaterializer _materializer;
private readonly Random _random = new();
// Sample responses for simulation
private static readonly string[] SampleResponses =
[
"Artificial intelligence (AI) is rapidly transforming how we interact with technology. " +
"From virtual assistants to autonomous vehicles, AI systems are becoming increasingly " +
"sophisticated and capable of handling complex tasks that were once thought to be " +
"exclusively human domains.\n\nMachine learning, a subset of AI, enables systems to " +
"learn and improve from experience without being explicitly programmed. Deep learning, " +
"using neural networks with many layers, has achieved remarkable breakthroughs in " +
"image recognition, natural language processing, and game playing.",
"The history of computing is a fascinating journey from mechanical calculators to " +
"quantum computers. Charles Babbage's Analytical Engine in the 1830s laid the " +
"conceptual groundwork for modern computers.\n\nThe ENIAC, completed in 1945, was " +
"one of the first general-purpose electronic computers. It weighed 30 tons and " +
"consumed 150 kilowatts of power. Today, a smartphone in your pocket has millions " +
"of times more computing power than that room-sized machine.",
"Software development best practices have evolved significantly over the decades. " +
"The waterfall model gave way to agile methodologies, emphasizing iterative " +
"development and customer collaboration.\n\nTest-driven development (TDD) and " +
"continuous integration have become standard practices in modern software teams. " +
"DevOps culture bridges the gap between development and operations, enabling " +
"faster and more reliable software delivery.",
"Terminal user interfaces (TUIs) offer a unique blend of efficiency and nostalgia. " +
"While graphical interfaces dominate consumer computing, TUIs remain popular among " +
"developers and system administrators.\n\nModern TUI frameworks like Spectre.Console " +
"bring rich formatting, animations, and interactive components to the command line. " +
"These tools prove that text-based interfaces can be both powerful and beautiful.",
"The actor model, pioneered by Carl Hewitt in 1973, provides a robust foundation " +
"for building concurrent and distributed systems. Each actor is an independent " +
"unit of computation that processes messages sequentially.\n\nAkka.NET brings " +
"the actor model to the .NET ecosystem, enabling developers to build highly " +
"scalable and fault-tolerant applications. Actors can supervise child actors, " +
"creating hierarchies that handle failures gracefully."
];
private static readonly string[] ThinkingPhrases =
[
"Analyzing the question...",
"Considering multiple approaches...",
"Searching knowledge base...",
"Formulating response structure...",
"Evaluating relevant context...",
"Processing semantic meaning...",
"Generating coherent narrative...",
"Refining output quality...",
"Checking factual accuracy...",
"Optimizing response clarity..."
];
// Decision point scenarios with follow-up responses
private static readonly DecisionScenario[] DecisionScenarios =
[
new DecisionScenario(
IntroText: "I'd be happy to explain different approaches to building concurrent systems. " +
"There are several paradigms to choose from, each with unique strengths.",
Question: "Which concurrency model would you like to explore?",
Choices:
[
new LlmMessages.DecisionChoice("Actor Model", "Message-passing between isolated actors", "Distributed"),
new LlmMessages.DecisionChoice("Task Parallel Library", "Task-based async/await patterns", "Threading"),
new LlmMessages.DecisionChoice("Reactive Extensions", "Observable streams and LINQ operators", "Reactive"),
],
FollowUps: new Dictionary<string, string>
{
["Actor Model"] = "The Actor Model is a powerful paradigm where computation is performed by " +
"independent actors that communicate exclusively through messages. Each actor has its own " +
"private state and processes messages sequentially, eliminating the need for locks.\n\n" +
"Akka.NET implements this model beautifully, providing location transparency, supervision " +
"hierarchies for fault tolerance, and clustering for distributed systems. Actors can " +
"supervise child actors, automatically restarting them when failures occur.",
["Task Parallel Library"] = "The Task Parallel Library (TPL) in .NET provides a robust foundation " +
"for parallel and asynchronous programming. Tasks represent units of work that can run " +
"concurrently, and async/await syntax makes asynchronous code read like synchronous code.\n\n" +
"TPL includes powerful constructs like Parallel.ForEach, Task.WhenAll, and dataflow blocks " +
"for building producer-consumer pipelines. It integrates seamlessly with the .NET runtime's " +
"thread pool for efficient resource utilization.",
["Reactive Extensions"] = "Reactive Extensions (Rx) treats events as streams of data that can be " +
"queried using LINQ-style operators. This declarative approach makes complex event processing " +
"surprisingly elegant and composable.\n\n" +
"With operators like Throttle, Buffer, Merge, and CombineLatest, you can express sophisticated " +
"event handling logic concisely. Rx is particularly powerful for UI programming, real-time " +
"data processing, and handling multiple asynchronous data sources."
}),
new DecisionScenario(
IntroText: "Great question about software architecture! There are several popular patterns " +
"for structuring applications, each suited to different scenarios.",
Question: "Which architectural pattern interests you most?",
Choices:
[
new LlmMessages.DecisionChoice("Microservices", "Independent deployable services", "Distributed"),
new LlmMessages.DecisionChoice("Clean Architecture", "Dependency inversion layers", "Monolithic"),
new LlmMessages.DecisionChoice("Event Sourcing", "Append-only event logs", "Data"),
],
FollowUps: new Dictionary<string, string>
{
["Microservices"] = "Microservices architecture decomposes applications into small, independent " +
"services that communicate via APIs. Each service owns its data and can be deployed, " +
"scaled, and updated independently.\n\n" +
"This approach enables teams to work autonomously, choose appropriate technologies per " +
"service, and scale specific components based on demand. However, it introduces complexity " +
"in service discovery, distributed transactions, and observability.",
["Clean Architecture"] = "Clean Architecture, popularized by Robert C. Martin, organizes code in " +
"concentric layers with dependencies pointing inward. The core business logic remains " +
"independent of frameworks, databases, and UI concerns.\n\n" +
"This separation makes the codebase highly testable and adaptable to change. You can swap " +
"out databases, web frameworks, or UI technologies without touching the business rules. " +
"The trade-off is additional abstraction layers and boilerplate.",
["Event Sourcing"] = "Event Sourcing stores the state of an application as a sequence of events " +
"rather than current state snapshots. Every change is captured as an immutable event, " +
"providing a complete audit trail and enabling temporal queries.\n\n" +
"Combined with CQRS (Command Query Responsibility Segregation), event sourcing enables " +
"powerful patterns like event replay, debugging production issues by replaying events, " +
"and building multiple read models from the same event stream."
}),
new DecisionScenario(
IntroText: "Terminal UI development has seen a renaissance lately! There are several " +
"approaches to building rich console applications.",
Question: "What aspect of TUI development would you like to learn about?",
Choices:
[
new LlmMessages.DecisionChoice("Layout Systems", "Constraint-based positioning", "Structure"),
new LlmMessages.DecisionChoice("Input Handling", "Keyboard and mouse events", "Interaction"),
new LlmMessages.DecisionChoice("Rendering Pipeline", "Efficient screen updates", "Performance"),
],
FollowUps: new Dictionary<string, string>
{
["Layout Systems"] = "Modern TUI frameworks use constraint-based layout systems similar to CSS " +
"Flexbox or native mobile layouts. Elements specify size constraints (fixed, fill, auto, " +
"percentage) and the layout engine calculates final positions.\n\n" +
"This declarative approach handles terminal resizing gracefully and supports nested layouts " +
"like vertical/horizontal stacks, grids, and overlapping layers for modals. The key is " +
"separating layout logic from rendering for cleaner, more maintainable code.",
["Input Handling"] = "Console input handling goes beyond simple ReadLine calls. Modern TUI apps " +
"need to handle arrow keys, function keys, mouse events, and key combinations while " +
"maintaining responsive UI updates.\n\n" +
"Focus management determines which component receives input. A focus stack enables modal " +
"dialogs to capture input temporarily, then restore focus when dismissed. Input routing " +
"typically flows from focused component up through parent containers.",
["Rendering Pipeline"] = "Efficient TUI rendering uses a diff-based approach similar to React's " +
"virtual DOM. Instead of clearing and redrawing the entire screen, the renderer compares " +
"the new frame against the previous one and only updates changed cells.\n\n" +
"This minimizes flickering and terminal escape sequence overhead. Double-buffering with " +
"a frame buffer allows compositing complex layouts before flushing to the terminal in a " +
"single write operation."
})
];
private record DecisionScenario(
string IntroText,
string Question,
LlmMessages.DecisionChoice[] Choices,
Dictionary<string, string> FollowUps);
// Track pending decision for follow-up responses
private DecisionScenario? _pendingDecision;
public LlmSimulatorActor()
{
_materializer = Context.Materializer();
Receive<LlmMessages.GenerateRequest>(HandleGenerateRequest);
}
private void HandleGenerateRequest(LlmMessages.GenerateRequest request)
{
var cts = new CancellationTokenSource();
var channel = Channel.CreateUnbounded<LlmMessages.StreamToken>();
// Create Akka Stream source that generates tokens
var source = CreateTokenSource(request.Prompt, request.DecisionContext, cts.Token);
// Run the stream, writing to the channel
source
.RunForeach(token => channel.Writer.TryWrite(token), _materializer)
.ContinueWith(_ =>
{
channel.Writer.Complete();
}, TaskContinuationOptions.ExecuteSynchronously);
// Return the async enumerable wrapping the channel
var asyncEnumerable = ReadFromChannelAsync(channel.Reader, cts.Token);
Sender.Tell(new LlmMessages.GenerateResponse(asyncEnumerable, cts));
}
private Source<LlmMessages.StreamToken, NotUsed> CreateTokenSource(string prompt, string? decisionContext, CancellationToken ct)
{
// Generate 5-10 thinking tokens over 1.25-2.5 seconds (250ms each)
var thinkingCount = _random.Next(5, 11);
// Create thinking tokens
var thinkingSource = Source.From(Enumerable.Range(0, thinkingCount))
.Select(_ => ThinkingPhrases[_random.Next(ThinkingPhrases.Length)])
.Select(text => (LlmMessages.StreamToken)new LlmMessages.ThinkingToken(text))
.Throttle(1, TimeSpan.FromMilliseconds(250), 1, ThrottleMode.Shaping);
// If we have a decision context, use the follow-up response
if (decisionContext != null && _pendingDecision != null)
{
var followUp = _pendingDecision.FollowUps.GetValueOrDefault(decisionContext)
?? $"You selected '{decisionContext}'. That's an interesting choice!";
_pendingDecision = null;
return CreateTextStreamSource(thinkingSource, followUp);
}
// Check for test trigger: "decision" prompt forces a decision point for headless testing
var useDecision = prompt.Equals("decision", StringComparison.OrdinalIgnoreCase)
|| _random.Next(100) < 40;
if (useDecision)
{
_pendingDecision = DecisionScenarios[_random.Next(DecisionScenarios.Length)];
return CreateDecisionSource(thinkingSource, _pendingDecision);
}
else
{
var response = SampleResponses[_random.Next(SampleResponses.Length)];
_pendingDecision = null;
return CreateTextStreamSource(thinkingSource, response);
}
}
private Source<LlmMessages.StreamToken, NotUsed> CreateTextStreamSource(
Source<LlmMessages.StreamToken, NotUsed> thinkingSource,
string response)
{
// Create text chunks source
var chunkSize = _random.Next(1, 4);
var chunks = new List<string>();
for (var i = 0; i < response.Length; i += chunkSize)
{
chunks.Add(response.Substring(i, Math.Min(chunkSize, response.Length - i)));
}
var textSource = Source.From(chunks)
.Select(chunk => (LlmMessages.StreamToken)new LlmMessages.TextChunk(chunk))
.Throttle(10, TimeSpan.FromMilliseconds(50), 10, ThrottleMode.Shaping);
var completionSource = Source.Single((LlmMessages.StreamToken)new LlmMessages.GenerationComplete());
return thinkingSource
.Concat(textSource)
.Concat(completionSource);
}
private Source<LlmMessages.StreamToken, NotUsed> CreateDecisionSource(
Source<LlmMessages.StreamToken, NotUsed> thinkingSource,
DecisionScenario scenario)
{
// Stream the intro text first
var chunkSize = _random.Next(1, 4);
var chunks = new List<string>();
for (var i = 0; i < scenario.IntroText.Length; i += chunkSize)
{
chunks.Add(scenario.IntroText.Substring(i, Math.Min(chunkSize, scenario.IntroText.Length - i)));
}
var introSource = Source.From(chunks)
.Select(chunk => (LlmMessages.StreamToken)new LlmMessages.TextChunk(chunk))
.Throttle(10, TimeSpan.FromMilliseconds(50), 10, ThrottleMode.Shaping);
// Then emit the decision point
var decisionSource = Source.Single(
(LlmMessages.StreamToken)new LlmMessages.DecisionPointToken(scenario.Question, scenario.Choices));
// No completion - wait for user decision
return thinkingSource
.Concat(introSource)
.Concat(decisionSource);
}
private static async IAsyncEnumerable<LlmMessages.StreamToken> ReadFromChannelAsync(
ChannelReader<LlmMessages.StreamToken> reader,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var token in reader.ReadAllAsync(ct).ConfigureAwait(false))
{
yield return token;
}
}
protected override void PostStop()
{
_materializer.Dispose();
base.PostStop();
}
public static Props Props() => Akka.Actor.Props.Create<LlmSimulatorActor>();
}Key Points
IAsyncEnumerable Response
public record GenerateResponse(
IAsyncEnumerable<IStreamToken> TokenStream,
CancellationTokenSource Cancellation);Return an async enumerable that the ViewModel can consume.
Simulated Delays
async IAsyncEnumerable<IStreamToken> GenerateTokens(string prompt, CancellationToken ct)
{
// Thinking phase
yield return new ThinkingToken("Analyzing...");
await Task.Delay(500, ct);
// Token generation
foreach (var word in response.Split(' '))
{
yield return new TextChunk(word + " ");
await Task.Delay(50, ct);
}
}Step 4: Wire Up the Host
View complete Program.cs
using Akka.Hosting;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Termina.Demo.Streaming.Actors;
using Termina.Demo.Streaming.Pages;
using Termina.Hosting;
using Termina.Input;
// Check for --test flag (used in CI/CD to run scripted test and exit)
var testMode = args.Contains("--test");
var builder = Host.CreateApplicationBuilder(args);
// Configure logging to only show warnings and errors (avoid cluttering TUI output)
builder.Logging.SetMinimumLevel(LogLevel.Warning);
// Set up input source based on mode
VirtualInputSource? scriptedInput = null;
if (testMode)
{
scriptedInput = new VirtualInputSource();
builder.Services.AddTerminaVirtualInput(scriptedInput);
}
// Register Akka.NET actor system
builder.Services.AddAkka("termina-streaming-demo", configurationBuilder =>
{
configurationBuilder.WithActors((system, registry) =>
{
var llmActor = system.ActorOf(LlmSimulatorActor.Props(), "llm-simulator");
registry.Register<LlmSimulatorActor>(llmActor);
});
});
// Register Termina with reactive pages
builder.Services.AddTermina("/chat", termina =>
{
termina.RegisterRoute<StreamingChatPage, StreamingChatViewModel>("/chat");
});
var host = builder.Build();
if (testMode && scriptedInput != null)
{
// Queue up scripted input to test streaming functionality then quit
_ = Task.Run(async () =>
{
await Task.Delay(500); // Wait for initial render
// Type a prompt: "Hello"
scriptedInput.EnqueueString("Hello");
// Submit the prompt
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for streaming to complete (actor produces tokens over ~3-5 seconds)
await Task.Delay(6000);
// Test decision point: "decision" prompt forces a decision list to appear
scriptedInput.EnqueueString("decision");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for intro text streaming and decision list to appear
await Task.Delay(4000);
// Navigate down in the selection list
scriptedInput.EnqueueKey(ConsoleKey.DownArrow);
await Task.Delay(200);
// Select the second option (Enter confirms)
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for follow-up response to stream
await Task.Delay(5000);
// Test "Something else..." option - trigger another decision
scriptedInput.EnqueueString("decision");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
await Task.Delay(4000);
// Navigate to "Something else..." (4th option)
scriptedInput.EnqueueKey(ConsoleKey.D4); // Quick select option 4
await Task.Delay(200);
// Type custom prompt and submit
scriptedInput.EnqueueString("custom question");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
await Task.Delay(5000);
// Quit with Ctrl+Q
scriptedInput.EnqueueKey(ConsoleKey.Q, control: true);
scriptedInput.Complete();
});
}
await host.RunAsync();Key Points
Akka.NET Registration
builder.Services.AddAkka("streaming-demo", configurationBuilder =>
{
configurationBuilder.WithActors((system, registry) =>
{
var llmActor = system.ActorOf(LlmSimulatorActor.Props(), "llm-simulator");
registry.Register<LlmSimulatorActor>(llmActor);
});
});Dependency Injection
The ViewModel receives the actor via DI:
public StreamingChatViewModel(IRequiredActor<LlmSimulatorActor> llmActorProvider)
{
_llmActorProvider = llmActorProvider;
}Run the App
dotnet runControls:
- Type your question and press
Enter ↑/↓- Navigate prompt historyPgUp/PgDn- Scroll chat historyEscape- Cancel generation or quitCtrl+Q- Force quit
Patterns Demonstrated
Streaming Text Updates
// Append character by character
ChatHistory.Append(chunk.Text);
// Append complete lines
ChatHistory.AppendLine("Complete message");
// Clear content
ThinkingIndicator.Clear();Cancellation
private CancellationTokenSource? _generationCts;
private void CancelGeneration()
{
_generationCts?.Cancel();
CleanupGeneration(" [cancelled]");
}Page-ViewModel Communication
// Page subscribes to TextInputNode events and calls ViewModel methods
_promptInput.Submitted
.Subscribe(text => ViewModel.HandleSubmit(text))
.DisposeWith(Subscriptions);
// Page routes input to layout nodes via ViewModel.Input
ViewModel.Input.OfType<KeyPressed>()
.Subscribe(key => _chatHistory.HandleInput(key.KeyInfo, 10, 80))
.DisposeWith(Subscriptions);State-Based UI Updates
IsGenerating = true;
StatusMessage = "Generating...";
// ... async work ...
IsGenerating = false;
StatusMessage = "Ready";Complete Code
// Copyright (c) Petabridge, LLC. All rights reserved.
// Licensed under the Apache 2.0 license. See LICENSE file in the project root for full license information.
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Akka.Actor;
using Akka.Hosting;
using Termina.Components.Streaming;
using Termina.Demo.Streaming.Actors;
using Termina.Layout;
using Termina.Reactive;
using Termina.Terminal;
namespace Termina.Demo.Streaming.Pages;
/// <summary>
/// Base interface for chat messages.
/// </summary>
public interface IChatMessage;
/// <summary>
/// Append regular text to the chat history.
/// </summary>
public sealed record AppendText(
string Text,
Color? Foreground = null,
TextDecoration Decoration = TextDecoration.None,
bool IsNewLine = false) : IChatMessage;
/// <summary>
/// Append a tracked text segment that can be manipulated later.
/// </summary>
public sealed record AppendTrackedSegment(SegmentId Id, ITextSegment Segment) : IChatMessage;
/// <summary>
/// Remove a tracked segment by ID.
/// </summary>
public sealed record RemoveTrackedSegment(SegmentId Id) : IChatMessage;
/// <summary>
/// Replace a tracked segment with new content.
/// </summary>
public sealed record ReplaceTrackedSegment(SegmentId Id, ITextSegment NewSegment, bool KeepTracked = false) : IChatMessage;
/// <summary>
/// Show a decision point with choices for the user.
/// </summary>
public sealed record ShowDecisionPoint(string Question, IReadOnlyList<LlmMessages.DecisionChoice> Choices) : IChatMessage;
/// <summary>
/// Hide the decision list (after selection or cancellation).
/// </summary>
public sealed record HideDecisionPoint : IChatMessage;
/// <summary>
/// ViewModel for the streaming chat demo.
/// Contains only state and business logic - no UI concerns.
/// Exposes observables for chat content that the Page subscribes to.
/// </summary>
public partial class StreamingChatViewModel : ReactiveViewModel
{
// Predefined segment IDs for tracked elements
private static readonly SegmentId ThinkingSpinnerId = new(1);
private static readonly SegmentId ThinkingBlockId = new(2);
private readonly IRequiredActor<LlmSimulatorActor> _llmActorProvider;
private readonly List<string> _promptHistory = new();
private int _historyIndex = -1;
private IActorRef? _llmActor;
private CancellationTokenSource? _generationCts;
private SpinnerSegment? _currentSpinner;
private bool _thinkingBlockShown;
// Subjects for chat output - Page subscribes to these
private readonly Subject<IChatMessage> _chatOutput = new();
private readonly Subject<string> _promptTextChanged = new();
/// <summary>
/// Observable for chat messages.
/// Includes text appends and tracked segment operations.
/// </summary>
public IObservable<IChatMessage> ChatOutput => _chatOutput.AsObservable();
/// <summary>
/// Observable for prompt text changes (for history navigation).
/// </summary>
public IObservable<string> PromptTextChanged => _promptTextChanged.AsObservable();
// Reactive properties for UI state
[Reactive] private bool _isGenerating = false;
[Reactive] private bool _hasReceivedText = false; // Tracks if any text has arrived yet
[Reactive] private string _statusMessage = "Ready. Enter a question to begin.";
[Reactive] private bool _showDecisionList = false; // Whether to show the decision list
// Track current decision context for follow-up
private string? _pendingDecisionContext;
public StreamingChatViewModel(IRequiredActor<LlmSimulatorActor> llmActorProvider)
{
_llmActorProvider = llmActorProvider;
}
/// <summary>
/// Emits the initial welcome message.
/// </summary>
public void EmitWelcomeMessage()
{
_chatOutput.OnNext(new AppendText("🤖 ", Color.Yellow));
_chatOutput.OnNext(new AppendText("Assistant: ", Color.Green, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText("Hello! I'm a simulated LLM demo.", Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText(" Ask me anything and watch the streaming response!", Color.BrightBlack, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
}
public override void OnActivated()
{
_ = InitializeAsync();
}
private async Task InitializeAsync()
{
_llmActor = await _llmActorProvider.GetAsync();
}
/// <summary>
/// Navigate up through prompt history.
/// </summary>
public void NavigateHistoryUp()
{
if (_promptHistory.Count == 0)
return;
if (_historyIndex < 0)
{
_historyIndex = _promptHistory.Count - 1;
}
else if (_historyIndex > 0)
{
_historyIndex--;
}
_promptTextChanged.OnNext(_promptHistory[_historyIndex]);
}
/// <summary>
/// Navigate down through prompt history.
/// </summary>
public void NavigateHistoryDown()
{
if (_historyIndex < 0)
return;
if (_historyIndex < _promptHistory.Count - 1)
{
_historyIndex++;
_promptTextChanged.OnNext(_promptHistory[_historyIndex]);
}
else
{
_historyIndex = -1;
_promptTextChanged.OnNext("");
}
}
/// <summary>
/// Cancel the current generation.
/// </summary>
public void CancelGeneration()
{
_generationCts?.Cancel();
_chatOutput.OnNext(new AppendText(" [cancelled]", Color.Yellow, TextDecoration.Italic, IsNewLine: true));
CleanupGeneration();
StatusMessage = "Generation cancelled.";
}
/// <summary>
/// Handle prompt submission.
/// </summary>
public void HandleSubmit(string prompt)
{
prompt = prompt.Trim();
if (string.IsNullOrEmpty(prompt))
return;
// Add to history and reset index
_promptHistory.Add(prompt);
_historyIndex = -1;
// Add user message to chat
_chatOutput.OnNext(new AppendText("👤 ", Color.Cyan));
_chatOutput.OnNext(new AppendText("You: ", Color.Cyan, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText(prompt, Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
StartGeneration(prompt, decisionContext: null);
}
/// <summary>
/// Handle decision selection from the SelectionListNode.
/// </summary>
public void HandleDecisionSelection(string choiceTitle)
{
// Hide the decision list
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
// Show the user's choice in the chat
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText(" → ", Color.BrightBlack));
_chatOutput.OnNext(new AppendText(choiceTitle, Color.Cyan, TextDecoration.Bold, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
// Start follow-up generation with the decision context
StartGeneration("continue", decisionContext: choiceTitle);
}
/// <summary>
/// Handle decision cancellation.
/// </summary>
public void HandleDecisionCancelled()
{
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
_chatOutput.OnNext(new AppendText(" [decision skipped]", Color.Yellow, TextDecoration.Italic, IsNewLine: true));
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
}
/// <summary>
/// Handle custom prompt from "Something else..." option.
/// </summary>
public void HandleCustomPrompt(string customPrompt)
{
customPrompt = customPrompt.Trim();
if (string.IsNullOrEmpty(customPrompt))
{
HandleDecisionCancelled();
return;
}
// Hide the decision list
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
// Show the custom prompt as user input
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText("👤 ", Color.Cyan));
_chatOutput.OnNext(new AppendText("You: ", Color.Cyan, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText(customPrompt, Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
// Add to history
_promptHistory.Add(customPrompt);
_historyIndex = -1;
// Start generation with the custom prompt (not as decision context)
StartGeneration(customPrompt, decisionContext: null);
}
private void StartGeneration(string prompt, string? decisionContext)
{
// Add Assistant prefix and append animated spinner
_chatOutput.OnNext(new AppendText("🤖 ", Color.Yellow));
_chatOutput.OnNext(new AppendText("Assistant: ", Color.Green, TextDecoration.Bold));
// Append tracked spinner - will be replaced when first text arrives
_currentSpinner = new SpinnerSegment(Components.Streaming.SpinnerStyle.Dots, Color.Red);
_chatOutput.OnNext(new AppendTrackedSegment(ThinkingSpinnerId, _currentSpinner));
// Start generation
IsGenerating = true;
HasReceivedText = false;
_thinkingBlockShown = false;
StatusMessage = "Generating response...";
_ = ConsumeResponseStreamAsync(prompt, decisionContext);
}
private async Task ConsumeResponseStreamAsync(string prompt, string? decisionContext = null)
{
if (_llmActor is null)
{
StatusMessage = "Error: Actor not initialized";
IsGenerating = false;
return;
}
var completedNormally = false;
try
{
var response = await _llmActor.Ask<LlmMessages.GenerateResponse>(
new LlmMessages.GenerateRequest(prompt, decisionContext),
TimeSpan.FromSeconds(30));
_generationCts = response.Cancellation;
await foreach (var token in response.TokenStream.WithCancellation(_generationCts.Token))
{
switch (token)
{
case LlmMessages.ThinkingToken thinking:
// On first thinking token, replace spinner with thinking block
if (!_thinkingBlockShown)
{
_chatOutput.OnNext(new ReplaceTrackedSegment(
ThinkingSpinnerId,
new StaticTextSegment("", TextStyle.Default),
KeepTracked: false));
_currentSpinner?.Dispose();
_currentSpinner = null;
// Add thinking block that will update in place
var thinkingSegment = new StaticTextSegment(
$"💭 {thinking.Text}",
new TextStyle { Foreground = Color.BrightBlack, Decoration = TextDecoration.Italic }).AsBlock();
_chatOutput.OnNext(new AppendTrackedSegment(ThinkingBlockId, thinkingSegment));
_thinkingBlockShown = true;
}
else
{
// Update existing thinking block
var thinkingSegment = new StaticTextSegment(
$"💭 {thinking.Text}",
new TextStyle { Foreground = Color.BrightBlack, Decoration = TextDecoration.Italic }).AsBlock();
_chatOutput.OnNext(new ReplaceTrackedSegment(ThinkingBlockId, thinkingSegment, KeepTracked: true));
}
break;
case LlmMessages.TextChunk chunk:
// On first text chunk, remove thinking block
if (!HasReceivedText)
{
_chatOutput.OnNext(new RemoveTrackedSegment(ThinkingBlockId));
HasReceivedText = true;
}
_chatOutput.OnNext(new AppendText(chunk.Text));
break;
case LlmMessages.GenerationComplete:
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
completedNormally = true;
break;
case LlmMessages.DecisionPointToken decision:
// On first content (decision point), remove thinking block
if (!HasReceivedText)
{
_chatOutput.OnNext(new RemoveTrackedSegment(ThinkingBlockId));
HasReceivedText = true;
}
// Show the decision list
_chatOutput.OnNext(new ShowDecisionPoint(decision.Question, decision.Choices));
ShowDecisionList = true;
StatusMessage = "Make a selection below...";
// We don't mark this as complete - we wait for user to make a decision
return;
}
}
}
catch (OperationCanceledException)
{
// Normal cancellation
}
catch (Exception ex)
{
_chatOutput.OnNext(new AppendText(" [error: ", Color.Red));
_chatOutput.OnNext(new AppendText(ex.Message, Color.Red, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText("]", Color.Red, IsNewLine: true));
CleanupGeneration();
StatusMessage = $"Error: {ex.Message}";
}
finally
{
if (!completedNormally && IsGenerating)
{
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
}
}
}
private void CleanupGeneration()
{
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
IsGenerating = false;
_generationCts?.Dispose();
_generationCts = null;
}
public override void Dispose()
{
_currentSpinner?.Dispose();
_chatOutput.Dispose();
_promptTextChanged.Dispose();
DisposeReactiveFields();
base.Dispose();
}
}// Copyright (c) Petabridge, LLC. All rights reserved.
// Licensed under the Apache 2.0 license. See LICENSE file in the project root for full license information.
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Termina.Components.Streaming;
using Termina.Demo.Streaming.Actors;
using Termina.Extensions;
using Termina.Input;
using Termina.Layout;
using Termina.Reactive;
using Termina.Rendering;
using Termina.Terminal;
namespace Termina.Demo.Streaming.Pages;
/// <summary>
/// Demo page showing streaming text components with LLM simulation.
/// Handles all UI concerns including layout nodes, focus, and input routing.
/// </summary>
public class StreamingChatPage : ReactivePage<StreamingChatViewModel>
{
// Layout nodes owned by the Page
private StreamingTextNode _chatHistory = null!;
private TextInputNode _promptInput = null!;
// Decision list for interactive choices
private SelectionListNode<LlmMessages.DecisionChoice>? _decisionList;
private readonly Subject<ILayoutNode?> _decisionListChanged = new();
protected override void OnBound()
{
// Create layout nodes
_chatHistory = StreamingTextNode.Create()
.WithPrefix(" ", Color.Gray);
_promptInput = new TextInputNode()
.WithPlaceholder("Enter your question...")
.WithForeground(Color.Cyan);
// Subscribe to ViewModel chat output and update nodes
ViewModel.ChatOutput
.Subscribe(message =>
{
switch (message)
{
case AppendText text:
if (text.IsNewLine)
_chatHistory.AppendLine(text.Text, text.Foreground, null, text.Decoration);
else
_chatHistory.Append(text.Text, text.Foreground, null, text.Decoration);
break;
case AppendTrackedSegment tracked:
_chatHistory.AppendTracked(tracked.Id, tracked.Segment);
break;
case RemoveTrackedSegment remove:
_chatHistory.Remove(remove.Id);
break;
case ReplaceTrackedSegment replace:
_chatHistory.Replace(replace.Id, replace.NewSegment, replace.KeepTracked);
break;
case ShowDecisionPoint decision:
ShowDecisionList(decision);
break;
case HideDecisionPoint:
HideDecisionList();
break;
default:
throw new ArgumentException($"Unknown message type: {message.GetType()}");
}
})
.DisposeWith(Subscriptions);
ViewModel.PromptTextChanged
.Subscribe(text => _promptInput.Text = text)
.DisposeWith(Subscriptions);
// Subscribe to prompt input submission
_promptInput.Submitted
.Subscribe(text =>
{
ViewModel.HandleSubmit(text);
_promptInput.Clear();
})
.DisposeWith(Subscriptions);
// Handle keyboard input - Page routes to interactive layout nodes
ViewModel.Input.OfType<KeyPressed>()
.Subscribe(HandleKeyPress)
.DisposeWith(Subscriptions);
// Emit welcome message
ViewModel.EmitWelcomeMessage();
}
private void HandleKeyPress(KeyPressed key)
{
var keyInfo = key.KeyInfo;
// Ctrl+Q always quits
if (keyInfo.Key == ConsoleKey.Q && keyInfo.Modifiers.HasFlag(ConsoleModifiers.Control))
{
ViewModel.RequestShutdown();
return;
}
// When decision list is visible, route input to it
if (ViewModel.ShowDecisionList && _decisionList != null)
{
// Escape cancels the decision
if (keyInfo.Key == ConsoleKey.Escape)
{
ViewModel.HandleDecisionCancelled();
return;
}
// Let the selection list handle the input
_decisionList.HandleInput(keyInfo);
return;
}
// Escape handling
if (keyInfo.Key == ConsoleKey.Escape)
{
if (ViewModel.IsGenerating)
{
ViewModel.CancelGeneration();
}
else
{
ViewModel.RequestShutdown();
}
return;
}
// Page Up/Down scroll chat history
if (_chatHistory.HandleInput(keyInfo, viewportHeight: 10, viewportWidth: 80))
{
return;
}
// When not generating, handle other input
if (!ViewModel.IsGenerating)
{
if (keyInfo.Key == ConsoleKey.UpArrow)
{
ViewModel.NavigateHistoryUp();
return;
}
if (keyInfo.Key == ConsoleKey.DownArrow)
{
ViewModel.NavigateHistoryDown();
return;
}
// Let the text input handle other keys
_promptInput.HandleInput(keyInfo);
}
}
private void ShowDecisionList(ShowDecisionPoint decision)
{
// Dispose any existing decision list
_decisionList?.Dispose();
// Create rich content for each choice - inline style, no panel
_decisionList = new SelectionListNode<LlmMessages.DecisionChoice>(
decision.Choices,
choice => new SelectionItemContent()
.AddLine(
new StaticTextSegment(choice.Title, Color.White, decoration: TextDecoration.Bold),
new StaticTextSegment(" ", Color.Default),
new StaticTextSegment($"[{choice.Category}]", Color.BrightBlack))
.AddLine(
new StaticTextSegment(" " + choice.Description, Color.Gray)))
.WithHighlightColors(Color.Black, Color.Cyan)
.WithForeground(Color.White)
.WithVisibleRows(8)
.WithShowNumbers(true)
.WithOtherOption("Something else...");
// Subscribe to selection confirmation
_decisionList.SelectionConfirmed
.Subscribe(selected =>
{
if (selected.Count > 0)
{
ViewModel.HandleDecisionSelection(selected[0].Title);
}
})
.DisposeWith(Subscriptions);
// Subscribe to "Something else..." custom input
_decisionList.OtherSelected
.Subscribe(customPrompt =>
{
ViewModel.HandleCustomPrompt(customPrompt);
})
.DisposeWith(Subscriptions);
// Subscribe to cancellation
_decisionList.Cancelled
.Subscribe(_ => ViewModel.HandleDecisionCancelled())
.DisposeWith(Subscriptions);
// Focus the decision list
_decisionList.OnFocused();
// Signal layout change - just the list node itself, no panel wrapper
_decisionListChanged.OnNext(_decisionList);
}
private void HideDecisionList()
{
_decisionList?.Dispose();
_decisionList = null;
_decisionListChanged.OnNext(null);
}
public override ILayoutNode BuildLayout()
{
return Layouts.Vertical()
// Header
.WithChild(
new TextNode("🤖 Streaming Chat Demo - Simulated LLM with Akka.NET")
.WithForeground(Color.Cyan)
.Bold()
.Height(1))
.WithChild(new EmptyNode().Height(1))
// Chat history panel - fills available space
.WithChild(
new PanelNode()
.WithTitle("Chat History")
.WithTitleColor(Color.Yellow)
.WithBorder(BorderStyle.Rounded)
.WithBorderColor(Color.Gray)
.WithContent(_chatHistory.Fill())
.Fill())
.WithChild(new EmptyNode().Height(1))
// Decision list - appears between chat history and input when active
.WithChild(
_decisionListChanged
.StartWith((ILayoutNode?)null)
.Select(decisionList => decisionList == null
? (ILayoutNode)new EmptyNode().Height(0)
: Layouts.Vertical()
.WithChild(new TextNode(" Choose an option:").WithForeground(Color.Yellow).Height(1))
.WithChild(decisionList)
.HeightAuto()) // Auto-size to content, don't compete with Fill() elements
.AsLayout())
// Input panel - always visible
.WithChild(
new PanelNode()
.WithTitle("Your Prompt")
.WithTitleColor(Color.Cyan)
.WithBorder(BorderStyle.Rounded)
.WithBorderColor(Color.Cyan)
.WithContent(_promptInput)
.Height(3))
// Status bar
.WithChild(
Observable.CombineLatest(
ViewModel.IsGeneratingChanged,
ViewModel.ShowDecisionListChanged.StartWith(false),
(isGenerating, showDecision) => showDecision
? "[↑/↓] Navigate [Enter] Select [1-4] Quick Select [Esc] Skip"
: isGenerating
? "[Esc] Cancel [PgUp/PgDn] Scroll [Ctrl+Q] Quit"
: "[Enter] Send [↑/↓] History [PgUp/PgDn] Scroll [Esc] Clear/Quit [Ctrl+Q] Quit")
.Select(text => new TextNode(text).WithForeground(Color.BrightBlack).NoWrap())
.AsLayout()
.Height(1))
.WithChild(
ViewModel.StatusMessageChanged
.Select(msg => new TextNode(msg).WithForeground(Color.White))
.AsLayout()
.Height(1));
}
}using Akka.Hosting;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Termina.Demo.Streaming.Actors;
using Termina.Demo.Streaming.Pages;
using Termina.Hosting;
using Termina.Input;
// Check for --test flag (used in CI/CD to run scripted test and exit)
var testMode = args.Contains("--test");
var builder = Host.CreateApplicationBuilder(args);
// Configure logging to only show warnings and errors (avoid cluttering TUI output)
builder.Logging.SetMinimumLevel(LogLevel.Warning);
// Set up input source based on mode
VirtualInputSource? scriptedInput = null;
if (testMode)
{
scriptedInput = new VirtualInputSource();
builder.Services.AddTerminaVirtualInput(scriptedInput);
}
// Register Akka.NET actor system
builder.Services.AddAkka("termina-streaming-demo", configurationBuilder =>
{
configurationBuilder.WithActors((system, registry) =>
{
var llmActor = system.ActorOf(LlmSimulatorActor.Props(), "llm-simulator");
registry.Register<LlmSimulatorActor>(llmActor);
});
});
// Register Termina with reactive pages
builder.Services.AddTermina("/chat", termina =>
{
termina.RegisterRoute<StreamingChatPage, StreamingChatViewModel>("/chat");
});
var host = builder.Build();
if (testMode && scriptedInput != null)
{
// Queue up scripted input to test streaming functionality then quit
_ = Task.Run(async () =>
{
await Task.Delay(500); // Wait for initial render
// Type a prompt: "Hello"
scriptedInput.EnqueueString("Hello");
// Submit the prompt
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for streaming to complete (actor produces tokens over ~3-5 seconds)
await Task.Delay(6000);
// Test decision point: "decision" prompt forces a decision list to appear
scriptedInput.EnqueueString("decision");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for intro text streaming and decision list to appear
await Task.Delay(4000);
// Navigate down in the selection list
scriptedInput.EnqueueKey(ConsoleKey.DownArrow);
await Task.Delay(200);
// Select the second option (Enter confirms)
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for follow-up response to stream
await Task.Delay(5000);
// Test "Something else..." option - trigger another decision
scriptedInput.EnqueueString("decision");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
await Task.Delay(4000);
// Navigate to "Something else..." (4th option)
scriptedInput.EnqueueKey(ConsoleKey.D4); // Quick select option 4
await Task.Delay(200);
// Type custom prompt and submit
scriptedInput.EnqueueString("custom question");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
await Task.Delay(5000);
// Quit with Ctrl+Q
scriptedInput.EnqueueKey(ConsoleKey.Q, control: true);
scriptedInput.Complete();
});
}
await host.RunAsync();using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
namespace Termina.Demo.Streaming.Actors;
/// <summary>
/// Messages for the LLM simulator actor.
/// </summary>
public static class LlmMessages
{
/// <summary>
/// Request to generate a response for a prompt.
/// Returns an IAsyncEnumerable of StreamToken via a channel.
/// </summary>
public record GenerateRequest(string Prompt, string? DecisionContext = null);
/// <summary>
/// Response containing the async stream of tokens.
/// </summary>
public record GenerateResponse(IAsyncEnumerable<StreamToken> TokenStream, CancellationTokenSource Cancellation);
/// <summary>
/// A token from the stream (either thinking or text).
/// </summary>
public abstract record StreamToken;
/// <summary>
/// A thinking token (intermediate progress).
/// </summary>
public record ThinkingToken(string Text) : StreamToken;
/// <summary>
/// A chunk of generated text.
/// </summary>
public record TextChunk(string Text) : StreamToken;
/// <summary>
/// A decision point where the user must choose from options.
/// </summary>
public record DecisionPointToken(string Question, IReadOnlyList<DecisionChoice> Choices) : StreamToken;
/// <summary>
/// A choice for a decision point with title and description.
/// </summary>
public record DecisionChoice(string Title, string Description, string Category);
/// <summary>
/// Signal that generation is complete.
/// </summary>
public record GenerationComplete : StreamToken;
}
/// <summary>
/// Actor that simulates LLM streaming behavior using Akka Streams.
/// Returns an IAsyncEnumerable via Channel for consumption by StreamingText components.
/// </summary>
public class LlmSimulatorActor : ReceiveActor
{
private readonly ActorMaterializer _materializer;
private readonly Random _random = new();
// Sample responses for simulation
private static readonly string[] SampleResponses =
[
"Artificial intelligence (AI) is rapidly transforming how we interact with technology. " +
"From virtual assistants to autonomous vehicles, AI systems are becoming increasingly " +
"sophisticated and capable of handling complex tasks that were once thought to be " +
"exclusively human domains.\n\nMachine learning, a subset of AI, enables systems to " +
"learn and improve from experience without being explicitly programmed. Deep learning, " +
"using neural networks with many layers, has achieved remarkable breakthroughs in " +
"image recognition, natural language processing, and game playing.",
"The history of computing is a fascinating journey from mechanical calculators to " +
"quantum computers. Charles Babbage's Analytical Engine in the 1830s laid the " +
"conceptual groundwork for modern computers.\n\nThe ENIAC, completed in 1945, was " +
"one of the first general-purpose electronic computers. It weighed 30 tons and " +
"consumed 150 kilowatts of power. Today, a smartphone in your pocket has millions " +
"of times more computing power than that room-sized machine.",
"Software development best practices have evolved significantly over the decades. " +
"The waterfall model gave way to agile methodologies, emphasizing iterative " +
"development and customer collaboration.\n\nTest-driven development (TDD) and " +
"continuous integration have become standard practices in modern software teams. " +
"DevOps culture bridges the gap between development and operations, enabling " +
"faster and more reliable software delivery.",
"Terminal user interfaces (TUIs) offer a unique blend of efficiency and nostalgia. " +
"While graphical interfaces dominate consumer computing, TUIs remain popular among " +
"developers and system administrators.\n\nModern TUI frameworks like Spectre.Console " +
"bring rich formatting, animations, and interactive components to the command line. " +
"These tools prove that text-based interfaces can be both powerful and beautiful.",
"The actor model, pioneered by Carl Hewitt in 1973, provides a robust foundation " +
"for building concurrent and distributed systems. Each actor is an independent " +
"unit of computation that processes messages sequentially.\n\nAkka.NET brings " +
"the actor model to the .NET ecosystem, enabling developers to build highly " +
"scalable and fault-tolerant applications. Actors can supervise child actors, " +
"creating hierarchies that handle failures gracefully."
];
private static readonly string[] ThinkingPhrases =
[
"Analyzing the question...",
"Considering multiple approaches...",
"Searching knowledge base...",
"Formulating response structure...",
"Evaluating relevant context...",
"Processing semantic meaning...",
"Generating coherent narrative...",
"Refining output quality...",
"Checking factual accuracy...",
"Optimizing response clarity..."
];
// Decision point scenarios with follow-up responses
private static readonly DecisionScenario[] DecisionScenarios =
[
new DecisionScenario(
IntroText: "I'd be happy to explain different approaches to building concurrent systems. " +
"There are several paradigms to choose from, each with unique strengths.",
Question: "Which concurrency model would you like to explore?",
Choices:
[
new LlmMessages.DecisionChoice("Actor Model", "Message-passing between isolated actors", "Distributed"),
new LlmMessages.DecisionChoice("Task Parallel Library", "Task-based async/await patterns", "Threading"),
new LlmMessages.DecisionChoice("Reactive Extensions", "Observable streams and LINQ operators", "Reactive"),
],
FollowUps: new Dictionary<string, string>
{
["Actor Model"] = "The Actor Model is a powerful paradigm where computation is performed by " +
"independent actors that communicate exclusively through messages. Each actor has its own " +
"private state and processes messages sequentially, eliminating the need for locks.\n\n" +
"Akka.NET implements this model beautifully, providing location transparency, supervision " +
"hierarchies for fault tolerance, and clustering for distributed systems. Actors can " +
"supervise child actors, automatically restarting them when failures occur.",
["Task Parallel Library"] = "The Task Parallel Library (TPL) in .NET provides a robust foundation " +
"for parallel and asynchronous programming. Tasks represent units of work that can run " +
"concurrently, and async/await syntax makes asynchronous code read like synchronous code.\n\n" +
"TPL includes powerful constructs like Parallel.ForEach, Task.WhenAll, and dataflow blocks " +
"for building producer-consumer pipelines. It integrates seamlessly with the .NET runtime's " +
"thread pool for efficient resource utilization.",
["Reactive Extensions"] = "Reactive Extensions (Rx) treats events as streams of data that can be " +
"queried using LINQ-style operators. This declarative approach makes complex event processing " +
"surprisingly elegant and composable.\n\n" +
"With operators like Throttle, Buffer, Merge, and CombineLatest, you can express sophisticated " +
"event handling logic concisely. Rx is particularly powerful for UI programming, real-time " +
"data processing, and handling multiple asynchronous data sources."
}),
new DecisionScenario(
IntroText: "Great question about software architecture! There are several popular patterns " +
"for structuring applications, each suited to different scenarios.",
Question: "Which architectural pattern interests you most?",
Choices:
[
new LlmMessages.DecisionChoice("Microservices", "Independent deployable services", "Distributed"),
new LlmMessages.DecisionChoice("Clean Architecture", "Dependency inversion layers", "Monolithic"),
new LlmMessages.DecisionChoice("Event Sourcing", "Append-only event logs", "Data"),
],
FollowUps: new Dictionary<string, string>
{
["Microservices"] = "Microservices architecture decomposes applications into small, independent " +
"services that communicate via APIs. Each service owns its data and can be deployed, " +
"scaled, and updated independently.\n\n" +
"This approach enables teams to work autonomously, choose appropriate technologies per " +
"service, and scale specific components based on demand. However, it introduces complexity " +
"in service discovery, distributed transactions, and observability.",
["Clean Architecture"] = "Clean Architecture, popularized by Robert C. Martin, organizes code in " +
"concentric layers with dependencies pointing inward. The core business logic remains " +
"independent of frameworks, databases, and UI concerns.\n\n" +
"This separation makes the codebase highly testable and adaptable to change. You can swap " +
"out databases, web frameworks, or UI technologies without touching the business rules. " +
"The trade-off is additional abstraction layers and boilerplate.",
["Event Sourcing"] = "Event Sourcing stores the state of an application as a sequence of events " +
"rather than current state snapshots. Every change is captured as an immutable event, " +
"providing a complete audit trail and enabling temporal queries.\n\n" +
"Combined with CQRS (Command Query Responsibility Segregation), event sourcing enables " +
"powerful patterns like event replay, debugging production issues by replaying events, " +
"and building multiple read models from the same event stream."
}),
new DecisionScenario(
IntroText: "Terminal UI development has seen a renaissance lately! There are several " +
"approaches to building rich console applications.",
Question: "What aspect of TUI development would you like to learn about?",
Choices:
[
new LlmMessages.DecisionChoice("Layout Systems", "Constraint-based positioning", "Structure"),
new LlmMessages.DecisionChoice("Input Handling", "Keyboard and mouse events", "Interaction"),
new LlmMessages.DecisionChoice("Rendering Pipeline", "Efficient screen updates", "Performance"),
],
FollowUps: new Dictionary<string, string>
{
["Layout Systems"] = "Modern TUI frameworks use constraint-based layout systems similar to CSS " +
"Flexbox or native mobile layouts. Elements specify size constraints (fixed, fill, auto, " +
"percentage) and the layout engine calculates final positions.\n\n" +
"This declarative approach handles terminal resizing gracefully and supports nested layouts " +
"like vertical/horizontal stacks, grids, and overlapping layers for modals. The key is " +
"separating layout logic from rendering for cleaner, more maintainable code.",
["Input Handling"] = "Console input handling goes beyond simple ReadLine calls. Modern TUI apps " +
"need to handle arrow keys, function keys, mouse events, and key combinations while " +
"maintaining responsive UI updates.\n\n" +
"Focus management determines which component receives input. A focus stack enables modal " +
"dialogs to capture input temporarily, then restore focus when dismissed. Input routing " +
"typically flows from focused component up through parent containers.",
["Rendering Pipeline"] = "Efficient TUI rendering uses a diff-based approach similar to React's " +
"virtual DOM. Instead of clearing and redrawing the entire screen, the renderer compares " +
"the new frame against the previous one and only updates changed cells.\n\n" +
"This minimizes flickering and terminal escape sequence overhead. Double-buffering with " +
"a frame buffer allows compositing complex layouts before flushing to the terminal in a " +
"single write operation."
})
];
private record DecisionScenario(
string IntroText,
string Question,
LlmMessages.DecisionChoice[] Choices,
Dictionary<string, string> FollowUps);
// Track pending decision for follow-up responses
private DecisionScenario? _pendingDecision;
public LlmSimulatorActor()
{
_materializer = Context.Materializer();
Receive<LlmMessages.GenerateRequest>(HandleGenerateRequest);
}
private void HandleGenerateRequest(LlmMessages.GenerateRequest request)
{
var cts = new CancellationTokenSource();
var channel = Channel.CreateUnbounded<LlmMessages.StreamToken>();
// Create Akka Stream source that generates tokens
var source = CreateTokenSource(request.Prompt, request.DecisionContext, cts.Token);
// Run the stream, writing to the channel
source
.RunForeach(token => channel.Writer.TryWrite(token), _materializer)
.ContinueWith(_ =>
{
channel.Writer.Complete();
}, TaskContinuationOptions.ExecuteSynchronously);
// Return the async enumerable wrapping the channel
var asyncEnumerable = ReadFromChannelAsync(channel.Reader, cts.Token);
Sender.Tell(new LlmMessages.GenerateResponse(asyncEnumerable, cts));
}
private Source<LlmMessages.StreamToken, NotUsed> CreateTokenSource(string prompt, string? decisionContext, CancellationToken ct)
{
// Generate 5-10 thinking tokens over 1.25-2.5 seconds (250ms each)
var thinkingCount = _random.Next(5, 11);
// Create thinking tokens
var thinkingSource = Source.From(Enumerable.Range(0, thinkingCount))
.Select(_ => ThinkingPhrases[_random.Next(ThinkingPhrases.Length)])
.Select(text => (LlmMessages.StreamToken)new LlmMessages.ThinkingToken(text))
.Throttle(1, TimeSpan.FromMilliseconds(250), 1, ThrottleMode.Shaping);
// If we have a decision context, use the follow-up response
if (decisionContext != null && _pendingDecision != null)
{
var followUp = _pendingDecision.FollowUps.GetValueOrDefault(decisionContext)
?? $"You selected '{decisionContext}'. That's an interesting choice!";
_pendingDecision = null;
return CreateTextStreamSource(thinkingSource, followUp);
}
// Check for test trigger: "decision" prompt forces a decision point for headless testing
var useDecision = prompt.Equals("decision", StringComparison.OrdinalIgnoreCase)
|| _random.Next(100) < 40;
if (useDecision)
{
_pendingDecision = DecisionScenarios[_random.Next(DecisionScenarios.Length)];
return CreateDecisionSource(thinkingSource, _pendingDecision);
}
else
{
var response = SampleResponses[_random.Next(SampleResponses.Length)];
_pendingDecision = null;
return CreateTextStreamSource(thinkingSource, response);
}
}
private Source<LlmMessages.StreamToken, NotUsed> CreateTextStreamSource(
Source<LlmMessages.StreamToken, NotUsed> thinkingSource,
string response)
{
// Create text chunks source
var chunkSize = _random.Next(1, 4);
var chunks = new List<string>();
for (var i = 0; i < response.Length; i += chunkSize)
{
chunks.Add(response.Substring(i, Math.Min(chunkSize, response.Length - i)));
}
var textSource = Source.From(chunks)
.Select(chunk => (LlmMessages.StreamToken)new LlmMessages.TextChunk(chunk))
.Throttle(10, TimeSpan.FromMilliseconds(50), 10, ThrottleMode.Shaping);
var completionSource = Source.Single((LlmMessages.StreamToken)new LlmMessages.GenerationComplete());
return thinkingSource
.Concat(textSource)
.Concat(completionSource);
}
private Source<LlmMessages.StreamToken, NotUsed> CreateDecisionSource(
Source<LlmMessages.StreamToken, NotUsed> thinkingSource,
DecisionScenario scenario)
{
// Stream the intro text first
var chunkSize = _random.Next(1, 4);
var chunks = new List<string>();
for (var i = 0; i < scenario.IntroText.Length; i += chunkSize)
{
chunks.Add(scenario.IntroText.Substring(i, Math.Min(chunkSize, scenario.IntroText.Length - i)));
}
var introSource = Source.From(chunks)
.Select(chunk => (LlmMessages.StreamToken)new LlmMessages.TextChunk(chunk))
.Throttle(10, TimeSpan.FromMilliseconds(50), 10, ThrottleMode.Shaping);
// Then emit the decision point
var decisionSource = Source.Single(
(LlmMessages.StreamToken)new LlmMessages.DecisionPointToken(scenario.Question, scenario.Choices));
// No completion - wait for user decision
return thinkingSource
.Concat(introSource)
.Concat(decisionSource);
}
private static async IAsyncEnumerable<LlmMessages.StreamToken> ReadFromChannelAsync(
ChannelReader<LlmMessages.StreamToken> reader,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var token in reader.ReadAllAsync(ct).ConfigureAwait(false))
{
yield return token;
}
}
protected override void PostStop()
{
_materializer.Dispose();
base.PostStop();
}
public static Props Props() => Akka.Actor.Props.Create<LlmSimulatorActor>();
}Next Steps
- Learn about testing with
VirtualInputSource - Explore custom components
- See Akka.NET integration patterns