Workflows Coming soon
Workflows automate intelligence on your onchain data. Define multi-step tasks that trigger on blockchain events, run on a schedule, or fire on demand — secondlayer handles execution, retries, and observability. Each step runs independently and retries on failure without re-running earlier steps.
Install with bun add @secondlayer/workflows.
Workflows automate intelligence on your onchain data. Define multi-step tasks that trigger on blockchain events, run on a schedule, or fire on demand. Each step runs independently with automatic retries. Available step methods: step.ai() for LLM analysis, step.query() to read subgraph data, step.mcp() to call MCP server tools, step.deliver() to send to webhooks/Slack/Discord/Telegram/email, step.invoke() to chain workflows, and step.sleep() for delays.
Ensure setup (skip any step already done, use the project's package manager):
- Skill: run the `skills` npm package to install — e.g. `npx|bunx|pnpm dlx skills add ryanwaits/secondlayer --skill secondlayer -y`
- CLI: `@secondlayer/cli` installed globally
- Auth: `secondlayer auth login`
/secondlayer Help me create a workflow. Ask me:
1. What should trigger this workflow? (blockchain event, schedule, or manual)
2. What data do I need to read or analyze?
3. What action should it take? (webhook, Slack, email)
Create the workflow and deploy it.Getting started
A workflow has two parts: a trigger (when to run) and a handler (what to do). Handlers are built from steps — isolated units of work that retry independently and memoize results across failures.
import { defineWorkflow } from "@secondlayer/workflows"
export default defineWorkflow({
name: "whale-alert",
trigger: {
type: "event",
filter: { type: "stx_transfer", minAmount: 100_000_000_000 },
},
handler: async ({ event, step }) => {
const context = await step.run("enrich", async () => {
const sender = await step.query("accounts", "balances", {
where: { address: { eq: event.sender } },
})
return { ...event, senderBalance: sender[0]?.balance }
})
const analysis = await step.ai("assess-risk", {
prompt: `Whale transfer of ${event.amount} microSTX from ${event.sender}. Sender balance: ${context.senderBalance}. Is this unusual?`,
})
if (analysis.riskScore > 0.7) {
await step.deliver("alert", {
type: "webhook",
url: "https://api.example.com/alerts",
body: { transfer: context, analysis },
})
}
},
})Triggers
Four trigger types. Event and stream triggers use the same filter types as streams and subgraphs. Stream triggers fire the workflow directly when a block matches — no external webhook needed. Schedule triggers use cron expressions. Manual triggers accept typed input via the API or dashboard.
// Trigger on blockchain events — same filters as streams
trigger: {
type: "event",
filter: { type: "stx_transfer", minAmount: 50_000_000_000 },
}
// Stream trigger — fires directly when a block matches, no webhook
trigger: {
type: "stream",
filter: { type: "contract_call", contractId: "SP1234...::dex", functionName: "swap" },
}
// Trigger on a schedule
trigger: {
type: "schedule",
cron: "0 8 * * *", // 8 AM UTC daily
timezone: "America/Chicago",
}
// Trigger manually via API or dashboard
trigger: {
type: "manual",
input: {
contractId: { type: "string", required: true },
depth: { type: "string", default: "shallow" },
},
}Steps
Steps are the building blocks of a workflow. Each step.run() call is isolated — it retries on failure without re-running completed steps. Use Promise.all() for parallel execution.
handler: async ({ event, step }) => {
// Sequential steps
const data = await step.run("fetch-data", async () => {
return await fetchFromAPI(event.contractId)
})
const enriched = await step.run("enrich", async () => {
return await enrichWithMetadata(data)
})
// Parallel steps
const [analysis, history] = await Promise.all([
step.run("analyze", async () => analyzePatterns(enriched)),
step.run("get-history", async () => getHistoricalData(enriched)),
])
// Sleep between steps
await step.sleep("wait-for-settlement", 60_000) // 60 seconds
// Invoke another workflow
const result = await step.invoke("deep-analysis", {
workflow: "contract-analyzer",
input: { contractId: event.contractId },
})
}A complete pipeline — detect large swaps, enrich with subgraph data, run AI analysis, and alert via Slack:
export default defineWorkflow({
name: "large-swap-monitor",
trigger: {
type: "stream",
filter: { type: "contract_call", contractId: "SP1234...::amm-pool", functionName: "swap-exact-*" },
},
handler: async ({ event, step }) => {
// 1. Enrich — query subgraph for context
const context = await step.run("enrich", async () => {
const [recentSwaps, pool] = await Promise.all([
step.query("dex-swaps", "swaps", {
where: { sender: { eq: event.sender }, _blockHeight: { gte: event.block.height - 500 } },
orderBy: { _blockHeight: "desc" },
limit: 20,
}),
step.query("dex-pools", "pools", {
where: { contractId: { eq: event.contractId } },
limit: 1,
}),
])
return { recentSwaps, pool: pool[0], swapAmount: event.args.amount }
})
// 2. Analyze — AI evaluates the pattern
const analysis = await step.ai("assess-pattern", {
prompt: `Large swap of ${context.swapAmount} on pool ${context.pool?.name}. Sender has ${context.recentSwaps.length} swaps in last 500 blocks. Is this unusual activity?`,
model: "haiku",
schema: {
riskScore: { type: "number", description: "0-1 risk score" },
pattern: { type: "string", description: "detected pattern name" },
summary: { type: "string", description: "one-line summary" },
},
})
// 3. Alert — deliver if risk is elevated
if (analysis.riskScore > 0.5) {
await step.deliver("notify-team", {
type: "slack",
channel: "#dex-alerts",
text: `[${analysis.pattern}] ${analysis.summary} (risk: ${analysis.riskScore})`,
})
}
},
})AI analysis
step.ai() runs an LLM analysis as a discrete step. It retries independently, tracks token usage, and returns structured output. AI evaluations count toward your tier's daily throughput — when the cap is reached, AI steps are skipped and the workflow continues with condition-only logic.
// Basic analysis — returns unstructured text
const insight = await step.ai("summarize-activity", {
prompt: `Summarize the trading activity for ${contractId} over the last 24 hours: ${JSON.stringify(trades)}`,
})
// insight.text → "Trading volume increased 340% driven by..."
// Structured output — returns typed object
const assessment = await step.ai("risk-assessment", {
prompt: `Analyze this transfer pattern for anomalies: ${JSON.stringify(transfers)}`,
schema: {
riskScore: { type: "number", description: "0-1 risk score" },
flags: { type: "array", items: "string" },
recommendation: { type: "string" },
},
})
// assessment.riskScore → 0.82
// assessment.flags → ["unusual_volume", "new_recipient"]
// Model selection — defaults to haiku, use sonnet for complex analysis
const deepAnalysis = await step.ai("deep-analysis", {
prompt: "...",
model: "sonnet",
schema: { ... },
})MCP tools
step.mcp() calls tools on external MCP servers — GitHub, Slack, Notion, or any server in the MCP ecosystem. Configure servers via environment variables and call any tool from your workflow.
// Configure MCP servers via environment variables:
// MCP_SERVER_GITHUB=npx @modelcontextprotocol/server-github
// MCP_SERVER_FILESYSTEM=npx @modelcontextprotocol/server-filesystem /data
// Call any tool on a configured MCP server
const files = await step.mcp("list-files", {
server: "filesystem",
tool: "list_directory",
args: { path: "/data/reports" },
})
// Create a GitHub issue from workflow analysis
await step.mcp("file-issue", {
server: "github",
tool: "create_issue",
args: {
repo: "myorg/myrepo",
title: "Anomaly detected in swap volume",
body: analysis.summary,
},
})
// MCP results include content array and error flag
// result.content → [{ type: "text", text: "..." }]
// result.isError → falseQuerying subgraphs
step.query() reads from your deployed subgraph tables directly. No API overhead — workflows run co-located with your data and query Postgres directly.
// Query a subgraph table
const largeSwaps = await step.query("dex-swaps", "swaps", {
where: {
amount: { gte: "1000000000" },
_blockHeight: { gte: event.block.height - 100 },
},
orderBy: { amount: "desc" },
limit: 50,
})
// Aggregate queries
const volume = await step.count("dex-swaps", "swaps", {
timestamp: { gte: oneDayAgo },
})
// Cross-subgraph correlation
const positions = await step.query("lending-positions", "borrows", {
where: { borrower: { eq: event.sender } },
})
const prices = await step.query("price-feeds", "prices", {
where: { token: { eq: positions[0]?.token } },
orderBy: { _blockHeight: "desc" },
limit: 1,
})Delivering results
step.deliver() sends results to external systems. Supports webhook, Slack, Discord, Telegram, and email. Deliveries are retried on failure and tracked in the run log.
// Webhook delivery
await step.deliver("notify-backend", {
type: "webhook",
url: "https://api.example.com/events",
body: { event: "whale_alert", data: analysis },
headers: { "X-API-Key": process.env.API_KEY },
})
// Slack notification
await step.deliver("alert-team", {
type: "slack",
channel: "#alerts",
text: `Whale transfer detected: ${event.amount} microSTX from ${event.sender}`,
})
// Email summary
await step.deliver("daily-report", {
type: "email",
to: "team@example.com",
subject: "Daily DEX Volume Report",
body: reportHtml,
})
// Discord notification
await step.deliver("notify-discord", {
type: "discord",
webhookUrl: "https://discord.com/api/webhooks/YOUR/WEBHOOK",
content: "Whale transfer detected!",
username: "Secondlayer Bot",
})
// Telegram message
await step.deliver("alert-telegram", {
type: "telegram",
botToken: process.env.TELEGRAM_BOT_TOKEN,
chatId: "-1001234567890",
text: "⚠️ Large swap detected on DEX",
parseMode: "HTML",
})Deploy
Deploy workflows via the CLI. The CLI bundles your handler code and registers triggers with the platform. Workflows start running immediately after deploy.
# Deploy a workflow
sl workflows deploy workflows/whale-alert.ts
# Dev mode — watches for changes, auto-redeploys
sl workflows dev workflows/whale-alert.ts
# Deploy all workflows in a directory
sl workflows deploy workflows/Management
Manage workflows via the SDK or CLI. Each run is tracked with full step-level logs, timing, and AI token usage.
import { SecondLayer } from "@secondlayer/sdk"
const client = new SecondLayer({ apiKey: "sk-sl_..." })
// Deploy a workflow
const result = await client.workflows.deploy({
name: "whale-alert",
trigger: { type: "stream", filter: { type: "stx_transfer", minAmount: 100_000_000_000 } },
handlerCode: bundledCode,
})
// List workflows
const { workflows } = await client.workflows.list()
// Get workflow detail
const workflow = await client.workflows.get("whale-alert")
// List runs
const { runs } = await client.workflows.listRuns("whale-alert", {
status: "completed",
limit: 25,
})
// Get run detail — includes step-level logs and timing
const run = await client.workflows.getRun(runId)
// Trigger a manual workflow
const { runId } = await client.workflows.trigger("contract-analyzer", {
contractId: "SP1234...::my-contract",
depth: "deep",
})
// Pause / resume
await client.workflows.pause("whale-alert")
await client.workflows.resume("whale-alert")
// Delete
await client.workflows.delete("whale-alert")# CLI equivalents
sl workflows ls
sl workflows get whale-alert
sl workflows runs whale-alert --status completed
sl workflows trigger contract-analyzer --input '{"contractId": "SP1234..."}'
sl workflows pause whale-alert
sl workflows delete whale-alert