Skip to content

Python

How I Built It: Captains Log Pipeline for OpenWebUI

This is a technical companion to my main post about the Captains Log Pipeline. Here I'll share the actual code and implementation details for those who want to build something similar.

Pipeline Architecture

The Captains Log Pipeline has three main components:

  1. Pipeline Class - Implements the file operations (read, write, append, etc.)
  2. FastAPI Server - Exposes the pipeline API for OpenWebUI to connect to
  3. Docker Container - Packages everything with the necessary dependencies

The Core Pipeline Code

Here's the core pipeline implementation from captains_log_pipeline.py:

"""
title: Captains Log Pipeline
author: nickfixit
date: 2025-03-13
version: 1.0
license: MIT
description: A pipeline for reading from and writing to the captains-log Obsidian vault
requirements: pydantic, datetime, glob, os
"""

import os
import glob
import datetime
from typing import List, Union, Generator, Iterator, Dict, Any, Optional
from pathlib import Path
from pydantic import BaseModel

class Pipeline:
    class Valves(BaseModel):
        # Configuration options for the pipeline
        CAPTAINS_LOG_PATH: str = os.getenv("CAPTAINS_LOG_PATH", "/mnt/abyss/captains-log")
        DAILY_LOG_PATH: str = os.getenv("DAILY_LOG_PATH", "/mnt/abyss/captains-log/abyss-log")
        MAX_FILES: int = int(os.getenv("MAX_FILES", "100"))
        DEBUG: bool = os.getenv("CAPTAINS_LOG_DEBUG", "false").lower() == "true"

    def __init__(self):
        self.name = "Captains Log Pipeline"
        self.valves = self.Valves()
        
    async def on_startup(self):
        # This hook is called when the server starts
        if self.valves.DEBUG:
            print(f"[DEBUG] on_startup: {__name__}")
            print(f"[DEBUG] Valves: {self.valves.dict()}")
            
        # Verify that the captains log directory exists
        if not os.path.exists(self.valves.CAPTAINS_LOG_PATH):
            print(f"[ERROR] Captains Log path does not exist: {self.valves.CAPTAINS_LOG_PATH}")
            
        if not os.path.exists(self.valves.DAILY_LOG_PATH):
            print(f"[ERROR] Daily Log path does not exist: {self.valves.DAILY_LOG_PATH}")
    
    async def on_shutdown(self):
        # This hook is called when the server is stopped
        if self.valves.DEBUG:
            print(f"[DEBUG] on_shutdown: {__name__}")

    def _read_log_file(self, file_path: str) -> str:
        """Read the content of a log file"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        except Exception as e:
            return f"Error reading file {file_path}: {str(e)}"
            
    def _write_log_file(self, file_path: str, content: str) -> str:
        """Write content to a log file"""
        try:
            # Create directory if it doesn't exist
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            
            # Write the content
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(content)
            return f"Successfully wrote to {file_path}"
        except Exception as e:
            return f"Error writing to file {file_path}: {str(e)}"
            
    def _append_log_file(self, file_path: str, content: str) -> str:
        """Append content to a log file"""
        try:
            # Create directory if it doesn't exist
            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            
            # Append the content
            with open(file_path, 'a', encoding='utf-8') as f:
                f.write("\n\n" + content)
            return f"Successfully appended to {file_path}"
        except Exception as e:
            return f"Error appending to file {file_path}: {str(e)}"
    
    def _list_log_files(self, directory: str, pattern: str = "*.md") -> List[str]:
        """List log files in a directory"""
        try:
            files = glob.glob(os.path.join(directory, pattern))
            files.sort(key=os.path.getmtime, reverse=True)
            return files[:self.valves.MAX_FILES]
        except Exception as e:
            return [f"Error listing files: {str(e)}"]
    
    def _get_todays_log_path(self) -> str:
        """Get the path to today's log file"""
        today = datetime.datetime.now().strftime("%Y-%m-%d")
        return os.path.join(self.valves.DAILY_LOG_PATH, f"{today}.md")
    
    def _create_new_daily_log(self) -> str:
        """Create a new daily log file for today"""
        today_path = self._get_todays_log_path()
        
        # Check if file already exists
        if os.path.exists(today_path):
            return f"Today's log already exists at {today_path}"
        
        # Create the file with template content
        today = datetime.datetime.now().strftime("%Y-%m-%d")
        today_formatted = datetime.datetime.now().strftime("%B %d, %Y")
        
        content = f"""---
created: {datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")}
updated: {datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")}
icon: 📔
tags:
  - captainslog
  - qlog
  - dailylog
---


---
## 📔 Captain's Log: {today_formatted}

[◀️ {(datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')} ]({(datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')}) {today} [{(datetime.datetime.now() + datetime.timedelta(days=1)).strftime('%Y-%m-%d')}▶️]({(datetime.datetime.now() + datetime.timedelta(days=1)).strftime('%Y-%m-%d')})

## Tasks

[KANBAN](../KANBAN.md)


## Data view

### New Notes Created

```dataview
List FROM "" WHERE file.cday = date("{today}") SORT file.ctime asc

Notes Last Touched

List FROM "" WHERE file.mday = date("{today}") SORT file.mtime asc

{today}

"""
return self._write_log_file(today_path, content)

def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
"""
The main pipeline method that processes the input and returns a result
"""
if self.valves.DEBUG:
print(f"[DEBUG] pipe called in {name}")
print(f"[DEBUG] Received user_message: {user_message}")

# Parse the command from the user message
command_parts = user_message.strip().split(" ", 1)
command = command_parts[0].lower()

# Process based on the command
if command == "read":
# Read a log file
if len(command_parts) > 1:
file_name = command_parts[1]
# Check if it's a full path or just a filename
if not file_name.startswith("/"):
# Try to find in daily logs first
file_path = os.path.join(self.valves.DAILY_LOG_PATH, file_name)
if not os.path.exists(file_path) and not file_name.endswith(".md"):
file_path = os.path.join(self.valves.DAILY_LOG_PATH, f"{file_name}.md")
else:
file_path = file_name

if os.path.exists(file_path):
return self._read_log_file(file_path)
else:
return f"File not found: {file_path}"
else:
# If no file specified, read today's log
today_path = self._get_todays_log_path()
if os.path.exists(today_path):
return self._read_log_file(today_path)
else:
return f"Today's log not found at {today_path}. Use 'create' to create it."

elif command == "write":
# ... write implementation

elif command == "append":
# ... append implementation

elif command == "list":
# ... list implementation

elif command == "create":
# ... create implementation

elif command == "today":
# ... today implementation

elif command == "help":
# ... help implementation

else:
# Unknown command
return f"Unknown command: {command}. Use 'help' to see available commands."

## The FastAPI Server

The FastAPI server in `main.py` loads and exposes the pipeline:

```python
"""
Main API for the Captains Log Pipeline
"""

import os
import importlib.util
import inspect
import asyncio
import json
from typing import Dict, List, Any, Optional
from pathlib import Path
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

app = FastAPI(
    title="Captains Log Pipeline API",
    description="API for the Captains Log Pipeline",
    version="1.0.0"
)

# Enable CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Global variable to store loaded pipelines
pipelines = {}

def load_pipeline(pipeline_path: str):
    """Load a pipeline from a Python file."""
    try:
        # Extract the pipeline name from the file path
        pipeline_name = os.path.splitext(os.path.basename(pipeline_path))[0]
        
        # Load the module
        spec = importlib.util.spec_from_file_location(pipeline_name, pipeline_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        
        # Find the Pipeline class in the module
        pipeline_class = None
        for name, obj in inspect.getmembers(module):
            if inspect.isclass(obj) and name == "Pipeline":
                pipeline_class = obj
                break
                
        if pipeline_class is None:
            print(f"Warning: No Pipeline class found in {pipeline_path}")
            return None
            
        # Instantiate the pipeline
        pipeline_instance = pipeline_class()
        
        # Get pipeline ID
        pipeline_id = getattr(pipeline_instance, "id", pipeline_name)
        
        # Add it to the global pipelines dictionary
        pipelines[pipeline_id] = pipeline_instance
        
        print(f"Loaded pipeline: {pipeline_id} from {pipeline_path}")
        return pipeline_instance
    except Exception as e:
        print(f"Error loading pipeline {pipeline_path}: {str(e)}")
        return None

# Load pipelines from the pipelines directory
pipelines_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "pipelines")
print(f"Loading pipelines from {pipelines_dir}")

if os.path.exists(pipelines_dir):
    for file in os.listdir(pipelines_dir):
        if file.endswith(".py"):
            pipeline_path = os.path.join(pipelines_dir, file)
            load_pipeline(pipeline_path)
else:
    os.makedirs(pipelines_dir, exist_ok=True)
    print(f"Created pipelines directory: {pipelines_dir}")
    
# Check if the captains_log_pipeline.py exists in the pipelines directory
pipeline_path = os.path.join(pipelines_dir, "captains_log_pipeline.py")
if not os.path.exists(pipeline_path):
    # Copy the file from the current directory if it exists
    source_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "captains_log_pipeline.py")
    if os.path.exists(source_path):
        import shutil
        shutil.copy(source_path, pipeline_path)
        print(f"Copied captains_log_pipeline.py to {pipeline_path}")
        # Load the pipeline
        load_pipeline(pipeline_path)
    else:
        print(f"Warning: captains_log_pipeline.py not found in {os.path.dirname(os.path.abspath(__file__))}")

# API Routes
@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {
        "status": "ok",
        "pipelines": list(pipelines.keys()),
        "version": "1.0.0"
    }

@app.get("/pipelines")
async def list_pipelines():
    """List all available pipelines."""
    result = []
    for pipeline_id, pipeline in pipelines.items():
        name = getattr(pipeline, "name", pipeline_id)
        description = getattr(pipeline, "__doc__", "No description provided.")
        result.append({
            "id": pipeline_id,
            "name": name,
            "description": description
        })
    return {"pipelines": result}

@app.post("/pipe/{pipeline_id}")
async def pipe(pipeline_id: str, request: Request):
    """Process a message through a pipeline."""
    if pipeline_id not in pipelines:
        raise HTTPException(status_code=404, detail=f"Pipeline {pipeline_id} not found.")
        
    pipeline = pipelines[pipeline_id]
    
    # Get the request data
    try:
        data = await request.json()
    except:
        raise HTTPException(status_code=400, detail="Invalid JSON data.")
        
    # Extract the parameters
    user_message = data.get("user_message", "")
    model_id = data.get("model_id", "")
    messages = data.get("messages", [])
    body = data.get("body", {})
    
    # Process the message through the pipeline
    try:
        # Check if the pipeline has an async pipe method
        if asyncio.iscoroutinefunction(pipeline.pipe):
            result = await pipeline.pipe(user_message, model_id, messages, body)
        else:
            result = pipeline.pipe(user_message, model_id, messages, body)
        
        # Convert generators/iterators to lists
        if hasattr(result, "__iter__") and not isinstance(result, (str, dict, list)):
            result = list(result)
            
        return {"result": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}")
        
@app.on_event("startup")
async def startup_event():
    """Run startup tasks for each pipeline."""
    for pipeline_id, pipeline in pipelines.items():
        if hasattr(pipeline, "on_startup"):
            try:
                if asyncio.iscoroutinefunction(pipeline.on_startup):
                    await pipeline.on_startup()
                else:
                    pipeline.on_startup()
            except Exception as e:
                print(f"Error in on_startup for pipeline {pipeline_id}: {str(e)}")
                
@app.on_event("shutdown")
async def shutdown_event():
    """Run shutdown tasks for each pipeline."""
    for pipeline_id, pipeline in pipelines.items():
        if hasattr(pipeline, "on_shutdown"):
            try:
                if asyncio.iscoroutinefunction(pipeline.on_shutdown):
                    await pipeline.on_shutdown()
                else:
                    pipeline.on_shutdown()
            except Exception as e:
                print(f"Error in on_shutdown for pipeline {pipeline_id}: {str(e)}")
                
if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=9099, reload=True)

Docker Configuration

The Dockerfile contains the configuration for the container:

FROM python:3.9-slim

WORKDIR /app

# Install dependencies
RUN apt-get update && apt-get install -y curl && \
    pip install --no-cache-dir pydantic fastapi uvicorn && \
    apt-get clean && rm -rf /var/lib/apt/lists/*

# Create pipelines directory
RUN mkdir -p /app/pipelines

# Copy the pipeline code and main.py
COPY captains_log_pipeline.py /app/pipelines/captains_log_pipeline.py
COPY main.py /app/main.py

# Set environment variables
ENV CAPTAINS_LOG_PATH=/mnt/abyss/captains-log
ENV DAILY_LOG_PATH=/mnt/abyss/captains-log/abyss-log
ENV MAX_FILES=100
ENV CAPTAINS_LOG_DEBUG=false

# Expose port for the pipeline API
EXPOSE 9099

# Command to run the pipeline server
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "9099"]

And the docker-compose.yml for local development:

version: '3'

services:
  captains-log-pipeline:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: captains-log-pipeline
    restart: unless-stopped
    ports:
      - "9098:9099"
    volumes:
      - /mnt/abyss/captains-log:/mnt/abyss/captains-log:rw
    environment:
      - CAPTAINS_LOG_PATH=/mnt/abyss/captains-log
      - DAILY_LOG_PATH=/mnt/abyss/captains-log/abyss-log
      - MAX_FILES=100
      - CAPTAINS_LOG_DEBUG=true

Integration with OpenWebUI

To integrate with OpenWebUI, you need to add the pipeline URL to the OpenWebUI configuration. This can be done by modifying the augment.yml file to add our pipeline service:

# Captains Log Pipeline for OpenWebUI
captains-log-pipeline:
  build:
    context: ./captains-log-pipeline
    dockerfile: Dockerfile
  container_name: captains-log-pipeline
  restart: unless-stopped
  ports:
    - "9098:9099"
  volumes:
    - /mnt/abyss/captains-log:/mnt/abyss/captains-log:rw
  environment:
    - CAPTAINS_LOG_PATH=/mnt/abyss/captains-log
    - DAILY_LOG_PATH=/mnt/abyss/captains-log/abyss-log
    - MAX_FILES=100
    - CAPTAINS_LOG_DEBUG=false

Then update the OpenWebUI service to use this pipeline:

environment:
  - PIPELINES_URL=http://captains-log-pipeline:9099

If you already have other pipelines configured, you can list them with commas:

environment:
  - PIPELINES_URL=http://existing-pipeline:8000,http://captains-log-pipeline:9099

Testing the Pipeline

To test if the pipeline is working, you can use curl to send a request directly to the API:

curl -X POST "http://localhost:9098/pipe/captains_log_pipeline" \
  -H "Content-Type: application/json" \
  -d '{"user_message": "help", "model_id": "test", "messages": []}'

You should get a response with the help text:

{
  "result": "\nCaptains Log Pipeline Commands:\n\nread [file]           - Read a log file (defaults to today's log)\nwrite file: content   - Write content to a file (overwrite)\nappend file: content  - Append content to a file\nlist [directory]      - List log files (defaults to daily logs)\ncreate                - Create a new daily log\ntoday                 - Check if today's log exists\nhelp                  - Display this help message\n\nExamples:\n  read 2025-03-13     - Read the log for March 13, 2025\n  write today: # New entry\\n\\nThis is a new entry.  - Write to today's log\n  append 2025-03-13: ## Meeting Notes\\n\\nDiscussed project status.  - Append to a log\n  list abyss-log      - List files in the abyss-log directory\n"
}

Advanced Customization

You can customize the pipeline to fit your own Obsidian workflow by modifying the captains_log_pipeline.py file. Some ideas for customization:

  1. Add search functionality using regex patterns across files
  2. Implement tag management to add/remove tags from notes
  3. Add template support for different note types
  4. Implement linking between notes to create and manage connections
  5. Add authentication with API keys

Conclusion

This OpenWebUI pipeline provides a powerful bridge between AI assistants and Obsidian notes. The implementation is relatively simple but can be extended in many ways to support more complex workflows.

If you have any questions or suggestions for improving this pipeline, feel free to reach out or contribute to the project!


This technical post is part of my ongoing exploration of AI-enhanced knowledge management. For a more general overview of why this matters, see my main post about the Captains Log Pipeline.