Files
hatchet/examples/go/streaming/main.go
Mohammed Nafees cf21550502 Update docs to use Go SDK v1 (#2313)
* SDK fixes

* go docs generate

* simple changes

* more docs changes

* bulk docs done

* cancellation example

* concurrency example

* conditional example

* cron example

* dag example

* durable event example

* durable sleep example

* on failure example

* priority example

* rate limit example

* retries example

* run-no-wait example

* on event example

* run with results example

* running your task example

* scheduled runs example

* streaming example

* workers example

* timeouts example

* sticky example

* docker go

* fix lint and build

* update migration doc

* fix lint

* fix some docs

* fix docker mdx

* remove local lint

* go stub example

* make changes

* child spawning

* migration code examples

* fix child workflow example
2025-09-23 19:19:27 +02:00

278 lines
7.9 KiB
Go

package main
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type StreamingInput struct {
Content string `json:"content"`
ChunkSize int `json:"chunk_size"`
}
type StreamingOutput struct {
Message string `json:"message"`
TotalChunks int `json:"total_chunks"`
}
const sampleText = `
The Go programming language is an open source project to make programmers more productive.
Go is expressive, concise, clean, and efficient. Its concurrency mechanisms make it easy to
write programs that get the most out of multicore and networked machines, while its novel
type system enables flexible and modular program construction. Go compiles quickly to
machine code yet has the convenience of garbage collection and the power of run-time reflection.
It's a fast, statically typed, compiled language that feels like a dynamically typed, interpreted language.
`
func main() {
// Create a new Hatchet client
client, err := hatchet.NewClient()
if err != nil {
log.Fatalf("failed to create hatchet client: %v", err)
}
// Create a workflow for streaming
workflow := client.NewWorkflow("streaming-workflow")
// Define the streaming task
workflow.NewTask("stream-content", func(ctx hatchet.Context, input StreamingInput) (StreamingOutput, error) {
content := input.Content
if content == "" {
content = sampleText
}
chunkSize := input.ChunkSize
if chunkSize <= 0 {
chunkSize = 50
}
// Split content into chunks and stream them
chunks := createChunks(content, chunkSize)
log.Printf("Starting to stream %d chunks...", len(chunks))
// Add a small delay at the start to ensure subscription is ready
time.Sleep(200 * time.Millisecond)
// Send an initial message to establish the stream
ctx.PutStream("Stream initialized, starting chunks...")
time.Sleep(100 * time.Millisecond)
for i, chunk := range chunks {
// Stream each chunk
ctx.PutStream(fmt.Sprintf("Chunk %d: %s", i+1, strings.TrimSpace(chunk)))
// Small delay between chunks to simulate processing
time.Sleep(300 * time.Millisecond)
}
return StreamingOutput{
Message: "Content streaming finished",
TotalChunks: len(chunks),
}, nil
})
// Create a worker to run the workflow
worker, err := client.NewWorker("streaming-worker", hatchet.WithWorkflows(workflow))
if err != nil {
log.Fatalf("failed to create worker: %v", err)
}
interruptCtx, cancel := cmdutils.NewInterruptContext()
defer cancel()
// Start the worker in a goroutine
go func() {
log.Println("Starting streaming worker...")
if err := worker.StartBlocking(interruptCtx); err != nil {
log.Printf("worker failed: %v", err)
}
}()
// Wait a moment for the worker to start
time.Sleep(2 * time.Second)
// Start HTTP server to demonstrate streaming
http.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
// Set headers for streaming response
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Run the streaming workflow
workflowRun, err := client.RunNoWait(ctx, "streaming-workflow", StreamingInput{
Content: sampleText,
ChunkSize: 80,
})
if err != nil {
http.Error(w, fmt.Sprintf("failed to run workflow: %v", err), http.StatusInternalServerError)
return
}
// Wait a moment for the workflow to start before subscribing
time.Sleep(100 * time.Millisecond)
// Subscribe to the stream
stream := client.Runs().SubscribeToStream(ctx, workflowRun.RunId)
// Stream the content to the HTTP response
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
for message := range stream {
fmt.Fprintf(w, "data: %s\n\n", message)
flusher.Flush()
}
})
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
html := `
<!DOCTYPE html>
<html>
<head>
<title>Hatchet Streaming Example</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.container { max-width: 800px; margin: 0 auto; }
.output {
border: 1px solid #ccc;
padding: 20px;
height: 400px;
overflow-y: auto;
background-color: #f5f5f5;
white-space: pre-wrap;
}
button {
padding: 10px 20px;
font-size: 16px;
margin: 10px 0;
background-color: #007cba;
color: white;
border: none;
cursor: pointer;
}
button:hover { background-color: #005a87; }
</style>
</head>
<body>
<div class="container">
<h1>Hatchet Streaming Example</h1>
<p>Click the button below to start streaming content from a Hatchet workflow:</p>
<button onclick="startStream()">Start Streaming</button>
<button onclick="clearOutput()">Clear Output</button>
<div id="output" class="output"></div>
</div>
<script>
function startStream() {
const output = document.getElementById('output');
output.innerHTML = 'Starting stream...\n';
fetch('/stream')
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function readStream() {
reader.read().then(({ done, value }) => {
if (done) {
output.innerHTML += '\nStream completed.\n';
return;
}
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
lines.forEach(line => {
if (line.startsWith('data: ')) {
output.innerHTML += line.substring(6) + '\n';
output.scrollTop = output.scrollHeight;
}
});
readStream();
});
}
readStream();
})
.catch(err => {
output.innerHTML += 'Error: ' + err.message + '\n';
});
}
function clearOutput() {
document.getElementById('output').innerHTML = '';
}
</script>
</body>
</html>`
w.Header().Set("Content-Type", "text/html")
fmt.Fprint(w, html)
})
server := &http.Server{ //nolint:gosec // This is a demo
Addr: ":8888",
}
// Start server in goroutine
go func() {
log.Println("Starting HTTP server on :8888...")
log.Println("Visit http://localhost:8888 to see the streaming example")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("HTTP server error: %v", err)
}
}()
// Wait for interrupt signal
<-interruptCtx.Done()
log.Println("Shutting down HTTP server...")
// Gracefully shutdown the server
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP server shutdown error: %v", err)
} else {
log.Println("HTTP server stopped gracefully")
}
}
func createChunks(content string, chunkSize int) []string {
var chunks []string
words := strings.Fields(strings.TrimSpace(content))
currentChunk := ""
for _, word := range words {
if len(currentChunk)+len(word)+1 > chunkSize && currentChunk != "" {
chunks = append(chunks, currentChunk)
currentChunk = word
} else {
if currentChunk != "" {
currentChunk += " "
}
currentChunk += word
}
}
if currentChunk != "" {
chunks = append(chunks, currentChunk)
}
return chunks
}