Akka.NET Integration
Termina integrates seamlessly with Akka.NET for streaming data, background processing, and distributed systems.
Overview
Akka.NET provides:
- Actor Model - Concurrent, isolated state management
- Akka.Streams - Backpressured streaming
- Akka.Hosting - DI integration
- Clustering - Distributed applications
Setup
bash
dotnet add package Akka.Hostingcsharp
builder.Services.AddAkka("my-app", configurationBuilder =>
{
configurationBuilder.WithActors((system, registry) =>
{
var myActor = system.ActorOf(MyActor.Props(), "my-actor");
registry.Register<MyActor>(myActor);
});
});
builder.Services.AddTermina("/", termina =>
{
termina.RegisterRoute<MainPage, MainViewModel>("/");
});Injecting Actors
Use IRequiredActor<T> in ViewModels:
csharp
public class MyViewModel : ReactiveViewModel
{
private readonly IRequiredActor<DataActor> _dataActorProvider;
private IActorRef? _dataActor;
public MyViewModel(IRequiredActor<DataActor> dataActorProvider)
{
_dataActorProvider = dataActorProvider;
}
public override void OnActivated()
{
_ = InitializeAsync();
}
private async Task InitializeAsync()
{
_dataActor = await _dataActorProvider.GetAsync();
// Now use _dataActor
}
}Streaming Pattern
The streaming chat demo shows the recommended pattern:
1. Define Messages
csharp
public static class StreamMessages
{
public record StartStream(string Query);
public record StreamResponse(
IAsyncEnumerable<IStreamToken> Tokens,
CancellationTokenSource Cancellation);
public interface IStreamToken { }
public record TextChunk(string Text) : IStreamToken;
public record StreamComplete : IStreamToken;
}2. Create Streaming Actor
csharp
public class StreamingActor : ReceiveActor
{
public StreamingActor()
{
ReceiveAsync<StartStream>(HandleStartStream);
}
private async Task HandleStartStream(StartStream request)
{
var cts = new CancellationTokenSource();
var stream = GenerateTokens(request.Query, cts.Token);
Sender.Tell(new StreamResponse(stream, cts));
}
private async IAsyncEnumerable<IStreamToken> GenerateTokens(
string query,
[EnumeratorCancellation] CancellationToken ct)
{
// Simulate processing
foreach (var word in GetResponse(query).Split(' '))
{
ct.ThrowIfCancellationRequested();
yield return new TextChunk(word + " ");
await Task.Delay(50, ct);
}
yield return new StreamComplete();
}
private string GetResponse(string query) => "This is a simulated response.";
public static Props Props() => Akka.Actor.Props.Create<StreamingActor>();
}3. Consume in ViewModel
csharp
public partial class StreamViewModel : ReactiveViewModel
{
public StreamingTextNode Output { get; } = StreamingTextNode.Create();
[Reactive] private bool _isStreaming;
private CancellationTokenSource? _streamCts;
private async Task StartStreaming(string query)
{
IsStreaming = true;
try
{
var response = await _actor.Ask<StreamResponse>(
new StartStream(query),
TimeSpan.FromSeconds(30));
_streamCts = response.Cancellation;
await foreach (var token in response.Tokens.WithCancellation(_streamCts.Token))
{
switch (token)
{
case TextChunk chunk:
Output.Append(chunk.Text);
break;
case StreamComplete:
Output.AppendLine("\n[Complete]");
break;
}
}
}
catch (OperationCanceledException)
{
Output.AppendLine("\n[Cancelled]");
}
finally
{
IsStreaming = false;
_streamCts?.Dispose();
_streamCts = null;
}
}
private void CancelStream()
{
_streamCts?.Cancel();
}
}4. Wire Up Redraw
csharp
public override void OnActivated()
{
Output.ContentChanged
.Subscribe(_ => RequestRedraw())
.DisposeWith(Subscriptions);
}Background Processing
For long-running operations:
csharp
public class ProcessorActor : ReceiveActor
{
public ProcessorActor()
{
Receive<ProcessRequest>(req =>
{
// Process in background
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.Zero,
Self,
new DoWork(req),
Self);
Sender.Tell(new Acknowledged());
});
ReceiveAsync<DoWork>(async work =>
{
var result = await DoExpensiveWork(work);
// Notify completion via pub-sub or direct tell
});
}
}Pub-Sub for Updates
Broadcast updates to ViewModels:
csharp
// In actor
Context.System.EventStream.Publish(new DataUpdated(newData));
// In ViewModel
public override void OnActivated()
{
var eventStream = Context.System.EventStream;
eventStream.Subscribe<DataUpdated>(Self);
}Error Handling
Handle actor failures gracefully:
csharp
private async Task SafeActorCall()
{
try
{
var result = await _actor.Ask<Response>(
new Request(),
TimeSpan.FromSeconds(5));
// Handle result
}
catch (AskTimeoutException)
{
StatusMessage = "Request timed out";
}
catch (Exception ex)
{
StatusMessage = $"Error: {ex.Message}";
}
}Complete Example
See the streaming chat demo for a full implementation:
cs
// Copyright (c) Petabridge, LLC. All rights reserved.
// Licensed under the Apache 2.0 license. See LICENSE file in the project root for full license information.
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Akka.Actor;
using Akka.Hosting;
using Termina.Components.Streaming;
using Termina.Demo.Streaming.Actors;
using Termina.Layout;
using Termina.Reactive;
using Termina.Terminal;
namespace Termina.Demo.Streaming.Pages;
/// <summary>
/// Base interface for chat messages.
/// </summary>
public interface IChatMessage;
/// <summary>
/// Append regular text to the chat history.
/// </summary>
public sealed record AppendText(
string Text,
Color? Foreground = null,
TextDecoration Decoration = TextDecoration.None,
bool IsNewLine = false) : IChatMessage;
/// <summary>
/// Append a tracked text segment that can be manipulated later.
/// </summary>
public sealed record AppendTrackedSegment(SegmentId Id, ITextSegment Segment) : IChatMessage;
/// <summary>
/// Remove a tracked segment by ID.
/// </summary>
public sealed record RemoveTrackedSegment(SegmentId Id) : IChatMessage;
/// <summary>
/// Replace a tracked segment with new content.
/// </summary>
public sealed record ReplaceTrackedSegment(SegmentId Id, ITextSegment NewSegment, bool KeepTracked = false) : IChatMessage;
/// <summary>
/// Show a decision point with choices for the user.
/// </summary>
public sealed record ShowDecisionPoint(string Question, IReadOnlyList<LlmMessages.DecisionChoice> Choices) : IChatMessage;
/// <summary>
/// Hide the decision list (after selection or cancellation).
/// </summary>
public sealed record HideDecisionPoint : IChatMessage;
/// <summary>
/// ViewModel for the streaming chat demo.
/// Contains only state and business logic - no UI concerns.
/// Exposes observables for chat content that the Page subscribes to.
/// </summary>
public partial class StreamingChatViewModel : ReactiveViewModel
{
// Predefined segment IDs for tracked elements
private static readonly SegmentId ThinkingSpinnerId = new(1);
private static readonly SegmentId ThinkingBlockId = new(2);
private readonly IRequiredActor<LlmSimulatorActor> _llmActorProvider;
private readonly List<string> _promptHistory = new();
private int _historyIndex = -1;
private IActorRef? _llmActor;
private CancellationTokenSource? _generationCts;
private SpinnerSegment? _currentSpinner;
private bool _thinkingBlockShown;
// Subjects for chat output - Page subscribes to these
private readonly Subject<IChatMessage> _chatOutput = new();
private readonly Subject<string> _promptTextChanged = new();
/// <summary>
/// Observable for chat messages.
/// Includes text appends and tracked segment operations.
/// </summary>
public IObservable<IChatMessage> ChatOutput => _chatOutput.AsObservable();
/// <summary>
/// Observable for prompt text changes (for history navigation).
/// </summary>
public IObservable<string> PromptTextChanged => _promptTextChanged.AsObservable();
// Reactive properties for UI state
[Reactive] private bool _isGenerating = false;
[Reactive] private bool _hasReceivedText = false; // Tracks if any text has arrived yet
[Reactive] private string _statusMessage = "Ready. Enter a question to begin.";
[Reactive] private bool _showDecisionList = false; // Whether to show the decision list
// Track current decision context for follow-up
private string? _pendingDecisionContext;
public StreamingChatViewModel(IRequiredActor<LlmSimulatorActor> llmActorProvider)
{
_llmActorProvider = llmActorProvider;
}
/// <summary>
/// Emits the initial welcome message.
/// </summary>
public void EmitWelcomeMessage()
{
_chatOutput.OnNext(new AppendText("🤖 ", Color.Yellow));
_chatOutput.OnNext(new AppendText("Assistant: ", Color.Green, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText("Hello! I'm a simulated LLM demo.", Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText(" Ask me anything and watch the streaming response!", Color.BrightBlack, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
}
public override void OnActivated()
{
_ = InitializeAsync();
}
private async Task InitializeAsync()
{
_llmActor = await _llmActorProvider.GetAsync();
}
/// <summary>
/// Navigate up through prompt history.
/// </summary>
public void NavigateHistoryUp()
{
if (_promptHistory.Count == 0)
return;
if (_historyIndex < 0)
{
_historyIndex = _promptHistory.Count - 1;
}
else if (_historyIndex > 0)
{
_historyIndex--;
}
_promptTextChanged.OnNext(_promptHistory[_historyIndex]);
}
/// <summary>
/// Navigate down through prompt history.
/// </summary>
public void NavigateHistoryDown()
{
if (_historyIndex < 0)
return;
if (_historyIndex < _promptHistory.Count - 1)
{
_historyIndex++;
_promptTextChanged.OnNext(_promptHistory[_historyIndex]);
}
else
{
_historyIndex = -1;
_promptTextChanged.OnNext("");
}
}
/// <summary>
/// Cancel the current generation.
/// </summary>
public void CancelGeneration()
{
_generationCts?.Cancel();
_chatOutput.OnNext(new AppendText(" [cancelled]", Color.Yellow, TextDecoration.Italic, IsNewLine: true));
CleanupGeneration();
StatusMessage = "Generation cancelled.";
}
/// <summary>
/// Handle prompt submission.
/// </summary>
public void HandleSubmit(string prompt)
{
prompt = prompt.Trim();
if (string.IsNullOrEmpty(prompt))
return;
// Add to history and reset index
_promptHistory.Add(prompt);
_historyIndex = -1;
// Add user message to chat
_chatOutput.OnNext(new AppendText("👤 ", Color.Cyan));
_chatOutput.OnNext(new AppendText("You: ", Color.Cyan, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText(prompt, Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
StartGeneration(prompt, decisionContext: null);
}
/// <summary>
/// Handle decision selection from the SelectionListNode.
/// </summary>
public void HandleDecisionSelection(string choiceTitle)
{
// Hide the decision list
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
// Show the user's choice in the chat
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText(" → ", Color.BrightBlack));
_chatOutput.OnNext(new AppendText(choiceTitle, Color.Cyan, TextDecoration.Bold, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
// Start follow-up generation with the decision context
StartGeneration("continue", decisionContext: choiceTitle);
}
/// <summary>
/// Handle decision cancellation.
/// </summary>
public void HandleDecisionCancelled()
{
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
_chatOutput.OnNext(new AppendText(" [decision skipped]", Color.Yellow, TextDecoration.Italic, IsNewLine: true));
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
}
/// <summary>
/// Handle custom prompt from "Something else..." option.
/// </summary>
public void HandleCustomPrompt(string customPrompt)
{
customPrompt = customPrompt.Trim();
if (string.IsNullOrEmpty(customPrompt))
{
HandleDecisionCancelled();
return;
}
// Hide the decision list
ShowDecisionList = false;
_chatOutput.OnNext(new HideDecisionPoint());
// Show the custom prompt as user input
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText("👤 ", Color.Cyan));
_chatOutput.OnNext(new AppendText("You: ", Color.Cyan, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText(customPrompt, Color.White, IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
// Add to history
_promptHistory.Add(customPrompt);
_historyIndex = -1;
// Start generation with the custom prompt (not as decision context)
StartGeneration(customPrompt, decisionContext: null);
}
private void StartGeneration(string prompt, string? decisionContext)
{
// Add Assistant prefix and append animated spinner
_chatOutput.OnNext(new AppendText("🤖 ", Color.Yellow));
_chatOutput.OnNext(new AppendText("Assistant: ", Color.Green, TextDecoration.Bold));
// Append tracked spinner - will be replaced when first text arrives
_currentSpinner = new SpinnerSegment(Components.Streaming.SpinnerStyle.Dots, Color.Red);
_chatOutput.OnNext(new AppendTrackedSegment(ThinkingSpinnerId, _currentSpinner));
// Start generation
IsGenerating = true;
HasReceivedText = false;
_thinkingBlockShown = false;
StatusMessage = "Generating response...";
_ = ConsumeResponseStreamAsync(prompt, decisionContext);
}
private async Task ConsumeResponseStreamAsync(string prompt, string? decisionContext = null)
{
if (_llmActor is null)
{
StatusMessage = "Error: Actor not initialized";
IsGenerating = false;
return;
}
var completedNormally = false;
try
{
var response = await _llmActor.Ask<LlmMessages.GenerateResponse>(
new LlmMessages.GenerateRequest(prompt, decisionContext),
TimeSpan.FromSeconds(30));
_generationCts = response.Cancellation;
await foreach (var token in response.TokenStream.WithCancellation(_generationCts.Token))
{
switch (token)
{
case LlmMessages.ThinkingToken thinking:
// On first thinking token, replace spinner with thinking block
if (!_thinkingBlockShown)
{
_chatOutput.OnNext(new ReplaceTrackedSegment(
ThinkingSpinnerId,
new StaticTextSegment("", TextStyle.Default),
KeepTracked: false));
_currentSpinner?.Dispose();
_currentSpinner = null;
// Add thinking block that will update in place
var thinkingSegment = new StaticTextSegment(
$"💭 {thinking.Text}",
new TextStyle { Foreground = Color.BrightBlack, Decoration = TextDecoration.Italic }).AsBlock();
_chatOutput.OnNext(new AppendTrackedSegment(ThinkingBlockId, thinkingSegment));
_thinkingBlockShown = true;
}
else
{
// Update existing thinking block
var thinkingSegment = new StaticTextSegment(
$"💭 {thinking.Text}",
new TextStyle { Foreground = Color.BrightBlack, Decoration = TextDecoration.Italic }).AsBlock();
_chatOutput.OnNext(new ReplaceTrackedSegment(ThinkingBlockId, thinkingSegment, KeepTracked: true));
}
break;
case LlmMessages.TextChunk chunk:
// On first text chunk, remove thinking block
if (!HasReceivedText)
{
_chatOutput.OnNext(new RemoveTrackedSegment(ThinkingBlockId));
HasReceivedText = true;
}
_chatOutput.OnNext(new AppendText(chunk.Text));
break;
case LlmMessages.GenerationComplete:
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
completedNormally = true;
break;
case LlmMessages.DecisionPointToken decision:
// On first content (decision point), remove thinking block
if (!HasReceivedText)
{
_chatOutput.OnNext(new RemoveTrackedSegment(ThinkingBlockId));
HasReceivedText = true;
}
// Show the decision list
_chatOutput.OnNext(new ShowDecisionPoint(decision.Question, decision.Choices));
ShowDecisionList = true;
StatusMessage = "Make a selection below...";
// We don't mark this as complete - we wait for user to make a decision
return;
}
}
}
catch (OperationCanceledException)
{
// Normal cancellation
}
catch (Exception ex)
{
_chatOutput.OnNext(new AppendText(" [error: ", Color.Red));
_chatOutput.OnNext(new AppendText(ex.Message, Color.Red, TextDecoration.Bold));
_chatOutput.OnNext(new AppendText("]", Color.Red, IsNewLine: true));
CleanupGeneration();
StatusMessage = $"Error: {ex.Message}";
}
finally
{
if (!completedNormally && IsGenerating)
{
CleanupGeneration();
StatusMessage = "Ready. Enter another question.";
}
}
}
private void CleanupGeneration()
{
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
_chatOutput.OnNext(new AppendText("", IsNewLine: true));
IsGenerating = false;
_generationCts?.Dispose();
_generationCts = null;
}
public override void Dispose()
{
_currentSpinner?.Dispose();
_chatOutput.Dispose();
_promptTextChanged.Dispose();
DisposeReactiveFields();
base.Dispose();
}
}cs
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
namespace Termina.Demo.Streaming.Actors;
/// <summary>
/// Messages for the LLM simulator actor.
/// </summary>
public static class LlmMessages
{
/// <summary>
/// Request to generate a response for a prompt.
/// Returns an IAsyncEnumerable of StreamToken via a channel.
/// </summary>
public record GenerateRequest(string Prompt, string? DecisionContext = null);
/// <summary>
/// Response containing the async stream of tokens.
/// </summary>
public record GenerateResponse(IAsyncEnumerable<StreamToken> TokenStream, CancellationTokenSource Cancellation);
/// <summary>
/// A token from the stream (either thinking or text).
/// </summary>
public abstract record StreamToken;
/// <summary>
/// A thinking token (intermediate progress).
/// </summary>
public record ThinkingToken(string Text) : StreamToken;
/// <summary>
/// A chunk of generated text.
/// </summary>
public record TextChunk(string Text) : StreamToken;
/// <summary>
/// A decision point where the user must choose from options.
/// </summary>
public record DecisionPointToken(string Question, IReadOnlyList<DecisionChoice> Choices) : StreamToken;
/// <summary>
/// A choice for a decision point with title and description.
/// </summary>
public record DecisionChoice(string Title, string Description, string Category);
/// <summary>
/// Signal that generation is complete.
/// </summary>
public record GenerationComplete : StreamToken;
}
/// <summary>
/// Actor that simulates LLM streaming behavior using Akka Streams.
/// Returns an IAsyncEnumerable via Channel for consumption by StreamingText components.
/// </summary>
public class LlmSimulatorActor : ReceiveActor
{
private readonly ActorMaterializer _materializer;
private readonly Random _random = new();
// Sample responses for simulation
private static readonly string[] SampleResponses =
[
"Artificial intelligence (AI) is rapidly transforming how we interact with technology. " +
"From virtual assistants to autonomous vehicles, AI systems are becoming increasingly " +
"sophisticated and capable of handling complex tasks that were once thought to be " +
"exclusively human domains.\n\nMachine learning, a subset of AI, enables systems to " +
"learn and improve from experience without being explicitly programmed. Deep learning, " +
"using neural networks with many layers, has achieved remarkable breakthroughs in " +
"image recognition, natural language processing, and game playing.",
"The history of computing is a fascinating journey from mechanical calculators to " +
"quantum computers. Charles Babbage's Analytical Engine in the 1830s laid the " +
"conceptual groundwork for modern computers.\n\nThe ENIAC, completed in 1945, was " +
"one of the first general-purpose electronic computers. It weighed 30 tons and " +
"consumed 150 kilowatts of power. Today, a smartphone in your pocket has millions " +
"of times more computing power than that room-sized machine.",
"Software development best practices have evolved significantly over the decades. " +
"The waterfall model gave way to agile methodologies, emphasizing iterative " +
"development and customer collaboration.\n\nTest-driven development (TDD) and " +
"continuous integration have become standard practices in modern software teams. " +
"DevOps culture bridges the gap between development and operations, enabling " +
"faster and more reliable software delivery.",
"Terminal user interfaces (TUIs) offer a unique blend of efficiency and nostalgia. " +
"While graphical interfaces dominate consumer computing, TUIs remain popular among " +
"developers and system administrators.\n\nModern TUI frameworks like Spectre.Console " +
"bring rich formatting, animations, and interactive components to the command line. " +
"These tools prove that text-based interfaces can be both powerful and beautiful.",
"The actor model, pioneered by Carl Hewitt in 1973, provides a robust foundation " +
"for building concurrent and distributed systems. Each actor is an independent " +
"unit of computation that processes messages sequentially.\n\nAkka.NET brings " +
"the actor model to the .NET ecosystem, enabling developers to build highly " +
"scalable and fault-tolerant applications. Actors can supervise child actors, " +
"creating hierarchies that handle failures gracefully."
];
private static readonly string[] ThinkingPhrases =
[
"Analyzing the question...",
"Considering multiple approaches...",
"Searching knowledge base...",
"Formulating response structure...",
"Evaluating relevant context...",
"Processing semantic meaning...",
"Generating coherent narrative...",
"Refining output quality...",
"Checking factual accuracy...",
"Optimizing response clarity..."
];
// Decision point scenarios with follow-up responses
private static readonly DecisionScenario[] DecisionScenarios =
[
new DecisionScenario(
IntroText: "I'd be happy to explain different approaches to building concurrent systems. " +
"There are several paradigms to choose from, each with unique strengths.",
Question: "Which concurrency model would you like to explore?",
Choices:
[
new LlmMessages.DecisionChoice("Actor Model", "Message-passing between isolated actors", "Distributed"),
new LlmMessages.DecisionChoice("Task Parallel Library", "Task-based async/await patterns", "Threading"),
new LlmMessages.DecisionChoice("Reactive Extensions", "Observable streams and LINQ operators", "Reactive"),
],
FollowUps: new Dictionary<string, string>
{
["Actor Model"] = "The Actor Model is a powerful paradigm where computation is performed by " +
"independent actors that communicate exclusively through messages. Each actor has its own " +
"private state and processes messages sequentially, eliminating the need for locks.\n\n" +
"Akka.NET implements this model beautifully, providing location transparency, supervision " +
"hierarchies for fault tolerance, and clustering for distributed systems. Actors can " +
"supervise child actors, automatically restarting them when failures occur.",
["Task Parallel Library"] = "The Task Parallel Library (TPL) in .NET provides a robust foundation " +
"for parallel and asynchronous programming. Tasks represent units of work that can run " +
"concurrently, and async/await syntax makes asynchronous code read like synchronous code.\n\n" +
"TPL includes powerful constructs like Parallel.ForEach, Task.WhenAll, and dataflow blocks " +
"for building producer-consumer pipelines. It integrates seamlessly with the .NET runtime's " +
"thread pool for efficient resource utilization.",
["Reactive Extensions"] = "Reactive Extensions (Rx) treats events as streams of data that can be " +
"queried using LINQ-style operators. This declarative approach makes complex event processing " +
"surprisingly elegant and composable.\n\n" +
"With operators like Throttle, Buffer, Merge, and CombineLatest, you can express sophisticated " +
"event handling logic concisely. Rx is particularly powerful for UI programming, real-time " +
"data processing, and handling multiple asynchronous data sources."
}),
new DecisionScenario(
IntroText: "Great question about software architecture! There are several popular patterns " +
"for structuring applications, each suited to different scenarios.",
Question: "Which architectural pattern interests you most?",
Choices:
[
new LlmMessages.DecisionChoice("Microservices", "Independent deployable services", "Distributed"),
new LlmMessages.DecisionChoice("Clean Architecture", "Dependency inversion layers", "Monolithic"),
new LlmMessages.DecisionChoice("Event Sourcing", "Append-only event logs", "Data"),
],
FollowUps: new Dictionary<string, string>
{
["Microservices"] = "Microservices architecture decomposes applications into small, independent " +
"services that communicate via APIs. Each service owns its data and can be deployed, " +
"scaled, and updated independently.\n\n" +
"This approach enables teams to work autonomously, choose appropriate technologies per " +
"service, and scale specific components based on demand. However, it introduces complexity " +
"in service discovery, distributed transactions, and observability.",
["Clean Architecture"] = "Clean Architecture, popularized by Robert C. Martin, organizes code in " +
"concentric layers with dependencies pointing inward. The core business logic remains " +
"independent of frameworks, databases, and UI concerns.\n\n" +
"This separation makes the codebase highly testable and adaptable to change. You can swap " +
"out databases, web frameworks, or UI technologies without touching the business rules. " +
"The trade-off is additional abstraction layers and boilerplate.",
["Event Sourcing"] = "Event Sourcing stores the state of an application as a sequence of events " +
"rather than current state snapshots. Every change is captured as an immutable event, " +
"providing a complete audit trail and enabling temporal queries.\n\n" +
"Combined with CQRS (Command Query Responsibility Segregation), event sourcing enables " +
"powerful patterns like event replay, debugging production issues by replaying events, " +
"and building multiple read models from the same event stream."
}),
new DecisionScenario(
IntroText: "Terminal UI development has seen a renaissance lately! There are several " +
"approaches to building rich console applications.",
Question: "What aspect of TUI development would you like to learn about?",
Choices:
[
new LlmMessages.DecisionChoice("Layout Systems", "Constraint-based positioning", "Structure"),
new LlmMessages.DecisionChoice("Input Handling", "Keyboard and mouse events", "Interaction"),
new LlmMessages.DecisionChoice("Rendering Pipeline", "Efficient screen updates", "Performance"),
],
FollowUps: new Dictionary<string, string>
{
["Layout Systems"] = "Modern TUI frameworks use constraint-based layout systems similar to CSS " +
"Flexbox or native mobile layouts. Elements specify size constraints (fixed, fill, auto, " +
"percentage) and the layout engine calculates final positions.\n\n" +
"This declarative approach handles terminal resizing gracefully and supports nested layouts " +
"like vertical/horizontal stacks, grids, and overlapping layers for modals. The key is " +
"separating layout logic from rendering for cleaner, more maintainable code.",
["Input Handling"] = "Console input handling goes beyond simple ReadLine calls. Modern TUI apps " +
"need to handle arrow keys, function keys, mouse events, and key combinations while " +
"maintaining responsive UI updates.\n\n" +
"Focus management determines which component receives input. A focus stack enables modal " +
"dialogs to capture input temporarily, then restore focus when dismissed. Input routing " +
"typically flows from focused component up through parent containers.",
["Rendering Pipeline"] = "Efficient TUI rendering uses a diff-based approach similar to React's " +
"virtual DOM. Instead of clearing and redrawing the entire screen, the renderer compares " +
"the new frame against the previous one and only updates changed cells.\n\n" +
"This minimizes flickering and terminal escape sequence overhead. Double-buffering with " +
"a frame buffer allows compositing complex layouts before flushing to the terminal in a " +
"single write operation."
})
];
private record DecisionScenario(
string IntroText,
string Question,
LlmMessages.DecisionChoice[] Choices,
Dictionary<string, string> FollowUps);
// Track pending decision for follow-up responses
private DecisionScenario? _pendingDecision;
public LlmSimulatorActor()
{
_materializer = Context.Materializer();
Receive<LlmMessages.GenerateRequest>(HandleGenerateRequest);
}
private void HandleGenerateRequest(LlmMessages.GenerateRequest request)
{
var cts = new CancellationTokenSource();
var channel = Channel.CreateUnbounded<LlmMessages.StreamToken>();
// Create Akka Stream source that generates tokens
var source = CreateTokenSource(request.Prompt, request.DecisionContext, cts.Token);
// Run the stream, writing to the channel
source
.RunForeach(token => channel.Writer.TryWrite(token), _materializer)
.ContinueWith(_ =>
{
channel.Writer.Complete();
}, TaskContinuationOptions.ExecuteSynchronously);
// Return the async enumerable wrapping the channel
var asyncEnumerable = ReadFromChannelAsync(channel.Reader, cts.Token);
Sender.Tell(new LlmMessages.GenerateResponse(asyncEnumerable, cts));
}
private Source<LlmMessages.StreamToken, NotUsed> CreateTokenSource(string prompt, string? decisionContext, CancellationToken ct)
{
// Generate 5-10 thinking tokens over 1.25-2.5 seconds (250ms each)
var thinkingCount = _random.Next(5, 11);
// Create thinking tokens
var thinkingSource = Source.From(Enumerable.Range(0, thinkingCount))
.Select(_ => ThinkingPhrases[_random.Next(ThinkingPhrases.Length)])
.Select(text => (LlmMessages.StreamToken)new LlmMessages.ThinkingToken(text))
.Throttle(1, TimeSpan.FromMilliseconds(250), 1, ThrottleMode.Shaping);
// If we have a decision context, use the follow-up response
if (decisionContext != null && _pendingDecision != null)
{
var followUp = _pendingDecision.FollowUps.GetValueOrDefault(decisionContext)
?? $"You selected '{decisionContext}'. That's an interesting choice!";
_pendingDecision = null;
return CreateTextStreamSource(thinkingSource, followUp);
}
// Check for test trigger: "decision" prompt forces a decision point for headless testing
var useDecision = prompt.Equals("decision", StringComparison.OrdinalIgnoreCase)
|| _random.Next(100) < 40;
if (useDecision)
{
_pendingDecision = DecisionScenarios[_random.Next(DecisionScenarios.Length)];
return CreateDecisionSource(thinkingSource, _pendingDecision);
}
else
{
var response = SampleResponses[_random.Next(SampleResponses.Length)];
_pendingDecision = null;
return CreateTextStreamSource(thinkingSource, response);
}
}
private Source<LlmMessages.StreamToken, NotUsed> CreateTextStreamSource(
Source<LlmMessages.StreamToken, NotUsed> thinkingSource,
string response)
{
// Create text chunks source
var chunkSize = _random.Next(1, 4);
var chunks = new List<string>();
for (var i = 0; i < response.Length; i += chunkSize)
{
chunks.Add(response.Substring(i, Math.Min(chunkSize, response.Length - i)));
}
var textSource = Source.From(chunks)
.Select(chunk => (LlmMessages.StreamToken)new LlmMessages.TextChunk(chunk))
.Throttle(10, TimeSpan.FromMilliseconds(50), 10, ThrottleMode.Shaping);
var completionSource = Source.Single((LlmMessages.StreamToken)new LlmMessages.GenerationComplete());
return thinkingSource
.Concat(textSource)
.Concat(completionSource);
}
private Source<LlmMessages.StreamToken, NotUsed> CreateDecisionSource(
Source<LlmMessages.StreamToken, NotUsed> thinkingSource,
DecisionScenario scenario)
{
// Stream the intro text first
var chunkSize = _random.Next(1, 4);
var chunks = new List<string>();
for (var i = 0; i < scenario.IntroText.Length; i += chunkSize)
{
chunks.Add(scenario.IntroText.Substring(i, Math.Min(chunkSize, scenario.IntroText.Length - i)));
}
var introSource = Source.From(chunks)
.Select(chunk => (LlmMessages.StreamToken)new LlmMessages.TextChunk(chunk))
.Throttle(10, TimeSpan.FromMilliseconds(50), 10, ThrottleMode.Shaping);
// Then emit the decision point
var decisionSource = Source.Single(
(LlmMessages.StreamToken)new LlmMessages.DecisionPointToken(scenario.Question, scenario.Choices));
// No completion - wait for user decision
return thinkingSource
.Concat(introSource)
.Concat(decisionSource);
}
private static async IAsyncEnumerable<LlmMessages.StreamToken> ReadFromChannelAsync(
ChannelReader<LlmMessages.StreamToken> reader,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var token in reader.ReadAllAsync(ct).ConfigureAwait(false))
{
yield return token;
}
}
protected override void PostStop()
{
_materializer.Dispose();
base.PostStop();
}
public static Props Props() => Akka.Actor.Props.Create<LlmSimulatorActor>();
}cs
using Akka.Hosting;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Termina.Demo.Streaming.Actors;
using Termina.Demo.Streaming.Pages;
using Termina.Hosting;
using Termina.Input;
// Check for --test flag (used in CI/CD to run scripted test and exit)
var testMode = args.Contains("--test");
var builder = Host.CreateApplicationBuilder(args);
// Configure logging to only show warnings and errors (avoid cluttering TUI output)
builder.Logging.SetMinimumLevel(LogLevel.Warning);
// Set up input source based on mode
VirtualInputSource? scriptedInput = null;
if (testMode)
{
scriptedInput = new VirtualInputSource();
builder.Services.AddTerminaVirtualInput(scriptedInput);
}
// Register Akka.NET actor system
builder.Services.AddAkka("termina-streaming-demo", configurationBuilder =>
{
configurationBuilder.WithActors((system, registry) =>
{
var llmActor = system.ActorOf(LlmSimulatorActor.Props(), "llm-simulator");
registry.Register<LlmSimulatorActor>(llmActor);
});
});
// Register Termina with reactive pages
builder.Services.AddTermina("/chat", termina =>
{
termina.RegisterRoute<StreamingChatPage, StreamingChatViewModel>("/chat");
});
var host = builder.Build();
if (testMode && scriptedInput != null)
{
// Queue up scripted input to test streaming functionality then quit
_ = Task.Run(async () =>
{
await Task.Delay(500); // Wait for initial render
// Type a prompt: "Hello"
scriptedInput.EnqueueString("Hello");
// Submit the prompt
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for streaming to complete (actor produces tokens over ~3-5 seconds)
await Task.Delay(6000);
// Test decision point: "decision" prompt forces a decision list to appear
scriptedInput.EnqueueString("decision");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for intro text streaming and decision list to appear
await Task.Delay(4000);
// Navigate down in the selection list
scriptedInput.EnqueueKey(ConsoleKey.DownArrow);
await Task.Delay(200);
// Select the second option (Enter confirms)
scriptedInput.EnqueueKey(ConsoleKey.Enter);
// Wait for follow-up response to stream
await Task.Delay(5000);
// Test "Something else..." option - trigger another decision
scriptedInput.EnqueueString("decision");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
await Task.Delay(4000);
// Navigate to "Something else..." (4th option)
scriptedInput.EnqueueKey(ConsoleKey.D4); // Quick select option 4
await Task.Delay(200);
// Type custom prompt and submit
scriptedInput.EnqueueString("custom question");
scriptedInput.EnqueueKey(ConsoleKey.Enter);
await Task.Delay(5000);
// Quit with Ctrl+Q
scriptedInput.EnqueueKey(ConsoleKey.Q, control: true);
scriptedInput.Complete();
});
}
await host.RunAsync();