Developer's guide - Observability
The observability section of the Temporal Developer's guide covers the many ways to view the current state of your Temporal ApplicationWhat is a Temporal Application
A Temporal Application is a set of Workflow Executions.
Learn more—that is, ways to view which Workflow Executions are tracked by the Temporal PlatformWhat is the Temporal Platform?
The Temporal Platform consists of a Temporal Cluster and Worker Processes.
Learn more and the state of any specified Workflow Execution, either currently or at points of an execution.
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.
This section covers features related to viewing the state of the application, including:
Metrics
Each Temporal SDK is capable of emitting an optional set of metrics from either the Client or the Worker process. For a complete list of metrics capable of being emitted, see the SDK metrics reference.
Metrics can be scraped and stored in time series databases, such as:
Temporal also provides a dashboard you can integrate with graphing services like Grafana. For more information, see:
- Temporal's implementation of the Grafana dashboard
- How to export metrics in Grafana
- Go
- Java
- PHP
- Python
- TypeScript
To emit metrics from the Temporal Client in Go, create a metrics handler from the Client Options and specify a listener address to be used by Prometheus.
client.Options{
MetricsHandler: sdktally.NewMetricsHandler(newPrometheusScope(prometheus.Configuration{
ListenAddress: "0.0.0.0:9090",
TimerType: "histogram",
}
The Go SDK currently supports the Tally library; however, Tally offers extensible custom metrics reporting, which is exposed through the WithCustomMetricsReporter
API.
For more information, see the Go sample for metrics.
To emit metrics with the Java SDK, use theMicrometerClientStatsReporter
class to integrate with Micrometer MeterRegistry configured for your metrics backend.
Micrometer is a popular Java framework that provides integration with Prometheus and other backends.
The following example shows how to use MicrometerClientStatsReporter
to define the metrics scope and set it with the WorkflowServiceStubsOptions
.
//...
// see the Micrometer documentation for configuration details on other supported monitoring systems.
// in this example shows how to set up Prometheus registry and stats reported.
PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
StatsReporter reporter = new MicrometerClientStatsReporter(registry);
// set up a new scope, report every 10 seconds
Scope scope = new RootScopeBuilder()
.reporter(reporter)
.reportEvery(com.uber.m3.util.Duration.ofSeconds(10));
// for Prometheus collection, expose a scrape endpoint.
//...
// add metrics scope to WorkflowServiceStub options
WorkflowServiceStubsOptions stubOptions =
WorkflowServiceStubsOptions.newBuilder().setMetricsScope(scope).build();
//...
For more details, see the Java SDK Samples. For details on configuring a Prometheus scrape endpoint with Micrometer, see the Micrometer Prometheus Configuring documentation.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Metrics in Python are configured globally; therefore, you should set a Prometheus endpoint before any other Temporal code.
The following example exposes a Prometheus endpoint on port 9000
.
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
# Create a new runtime that has telemetry enabled. Create this first to avoid
# the default Runtime from being lazily created.
new_runtime = Runtime(telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9000")))
my_client = await Client.connect("my.temporal.host:7233", runtime=new_runtime)
Workers can emit metrics and traces. There are a few telemetry options that can be provided to Runtime.install
. The common options are:
metrics: { otel: { url } }
: The URL of a gRPC OpenTelemetry collector.metrics: { prometheus: { bindAddress } }
: Address on the Worker host that will have metrics for Prometheus to scrape.
To set up tracing of Workflows and Activities, use our opentelemetry-interceptors package.
telemetryOptions: {
metrics: {
prometheus: { bindAddress: '0.0.0.0:9464' },
},
logging: { forward: { level: 'DEBUG' } },
},
Tracing
Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.
Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.
For information about Workflow tracing, see Tracing Temporal Workflows with DataDog.
For information about how to configure exporters and instrument your code, see Tracing Temporal Services with OTEL.
- Go
- Java
- PHP
- Python
- TypeScript
The Go SDK provides support for distributed tracing through OpenTracing. Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.
Tracing can be configured by providing an opentracing.Tracer implementation in ClientOptions during client instantiation.
For more details on how to configure and leverage tracing, see the OpenTracing documentation.
The OpenTracing support has been validated using Jaeger, but other implementations mentioned here should also work.
Tracing functionality utilizes generic context propagation provided by the Client.
To configure tracing in Java, register the OpenTracingClientInterceptor()
interceptor.
You can register the interceptors on both the Temporal Client side and the Worker side.
The following code examples demonstrate the OpenTracingClientInterceptor()
on the Temporal Client.
WorkflowClientOptions.newBuilder()
//...
.setInterceptors(new OpenTracingClientInterceptor())
.build();
WorkflowClientOptions clientOptions =
WorkflowClientOptions.newBuilder()
.setInterceptors(new OpenTracingClientInterceptor(JaegerUtils.getJaegerOptions(type)))
.build();
WorkflowClient client = WorkflowClient.newInstance(service, clientOptions);
The following code examples demonstrate the OpenTracingClientInterceptor()
on the Worker.
WorkerFactoryOptions.newBuilder()
//...
.setWorkerInterceptors(new OpenTracingWorkerInterceptor())
.build();
WorkerFactoryOptions factoryOptions =
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(
new OpenTracingWorkerInterceptor(JaegerUtils.getJaegerOptions(type)))
.build();
WorkerFactory factory = WorkerFactory.newInstance(client, factoryOptions);
For more information, see the Temporal OpenTracing module.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To configure tracing in Python, install the opentelemetry
dependencies.
# This command installs the `opentelemetry` dependencies.
pip install temporalio[opentelemetry]
Then the temporalio.contrib.opentelemetry.TracingInterceptor
class can be set as an interceptor as an argument of Client.connect()
.
When your Client is connected, spans are created for all Client calls, Activities, and Workflow invocations on the Worker. Spans are created and serialized through the server to give one trace for a Workflow Execution.
The interceptors-opentelemetry
sample shows how to use the SDK's built-in OpenTelemetry tracing to trace everything from starting a Workflow to Workflow Execution to running an Activity from that Workflow.
The built-in tracing uses protobuf message headers (like this one when starting a Workflow) to propagate the tracing information from the client to the Workflow and from the Workflow to its successors (when Continued As New), children, and Activities.
All of these executions are linked with a single trace identifier and have the proper parent -> child
span relation.
Tracing is compatible between different Temporal SDKs as long as compatible context propagators are used.
Context propagation
The TypeScript SDK uses the global OpenTelemetry propagator.
To extend the default (Trace Context and Baggage propagators) to also include the Jaeger propagator, follow these steps:
npm i @opentelemetry/propagator-jaeger
At the top level of your Workflow code, add the following lines:
import { propagation } from '@opentelemetry/api';
import {
CompositePropagator,
W3CBaggagePropagator,
W3CTraceContextPropagator,
} from '@opentelemetry/core';
import { JaegerPropagator } from '@opentelemetry/propagator-jaeger';
propagation.setGlobalPropagator(
new CompositePropagator({
propagators: [
new W3CTraceContextPropagator(),
new W3CBaggagePropagator(),
new JaegerPropagator(),
],
}),
);
Similarly, you can customize the OpenTelemetry NodeSDK
propagators by following the instructions in the Initialize the SDK section of the README.md
file.
Logging
Send logs and errors to a logging service, so that when things go wrong, you can see what happened.
The SDK core uses WARN
for its default logging level.
- Go
- Java
- PHP
- Python
- TypeScript
In Workflow Definitions you can use workflow.GetLogger(ctx)
to write logs.
import (
"context"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
// Workflow is a standard workflow definition.
// Note that the Workflow and Activity don't need to care that
// their inputs/results are being compressed.
func Workflow(ctx workflow.Context, name string) (string, error) {
// ...
workflow.WithActivityOptions(ctx, ao)
// Getting the logger from the context.
logger := workflow.GetLogger(ctx)
// Logging a message with the key value pair `name` and `name`
logger.Info("Compressed Payloads workflow started", "name", name)
info := map[string]string{
"name": name,
}
logger.Info("Compressed Payloads workflow completed.", "result", result)
return result, nil
}
To get a standard slf4j
logger in your Workflow code, use the Workflow.getLogger
method.
private static final Logger logger = Workflow.getLogger(DynamicDslWorkflow.class);
Logs in replay mode are omitted unless the WorkerFactoryOptions.Builder.setEnableLoggingInReplay(boolean)
method is set to true.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
You can log from a Workflow using Python's standard library, by importing the logging module import logging
.
Set your logging configuration to a level you want to expose logs to.
The following example sets the logging information level to INFO
.
logging.basicConfig(level=logging.INFO)
Then in your Workflow, set your logger
and level on the Workflow. The following example logs the Workflow.
@workflow.defn
class SayHelloWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info(f"Running workflow with parameter {name}")
return await workflow.execute_activity(
your_activity, name, start_to_close_timeout=timedelta(seconds=10)
)
The following is an example output:
INFO:temporalio.workflow:Running workflow with parameter Temporal ({'attempt': 1, 'your-custom-namespace': 'default', 'run_id': 'your-run-id', 'task_queue': 'your-task-queue', 'workflow_id': 'your-workflow-id', 'workflow_type': 'SayHelloWorkflow'})
Logs are skipped during replay by default.
Logging from Workflows is tricky for two reasons:
- Workflows run in a sandboxed environment and cannot do any I/O.
- Workflow code might get replayed at any time, generating duplicate log messages.
To work around these limitations, we recommend using the Sinks feature in the TypeScript SDK. Sinks enable one-way export of logs, metrics, and traces from the Workflow isolate to the Node.js environment.
Sinks are written as objects with methods. Similar to Activities, they are declared in the Worker and then proxied in Workflow code, and it helps to share types between both.
Comparing Sinks, Activities and Interceptors
Sinks are similar to Activities in that they are both registered on the Worker and proxied into the Workflow. However, they differ from Activities in important ways:
- Sink functions don't return any value back to the Workflow and cannot not be awaited.
- Sink calls are not recorded in Workflow histories (no timeouts or retries).
- Sink functions are always run on the same Worker that runs the Workflow they are called from.
Declaring the Sink Interface
Explicitly declaring a Sink's interface is optional, but is useful for ensuring type safety in subsequent steps:
- TypeScript
- JavaScript
import { LoggerSinks, proxySinks, Sinks } from '@temporalio/workflow';
export interface AlertSinks extends Sinks {
alerter: {
alert(message: string): void;
};
}
export type MySinks = AlertSinks & LoggerSinks;
"use strict";
exports.__esModule = true;
Implementing Sinks
Implementing Sinks is a two-step process.
Implement and inject the Sink function into a Worker
- TypeScript
- JavaScript
import { defaultSinks, InjectedSinks, Worker } from '@temporalio/worker';
import { MySinks } from './workflows';
async function main() {
const sinks: InjectedSinks<MySinks> = {
...defaultSinks(),
alerter: {
alert: {
fn(workflowInfo, message) {
console.log(`sending SMS alert!
workflow: ${workflowInfo.runId}
message: ${message}`);
},
callDuringReplay: false, // The default
},
},
};
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'sinks',
sinks,
});
await worker.run();
console.log('Worker gracefully shutdown');
}
main().catch((err) => {
console.error(err);
process.exit(1);
});
"use strict";
var __assign = (this && this.__assign) || function () {
__assign = Object.assign || function(t) {
for (var s, i = 1, n = arguments.length; i < n; i++) {
s = arguments[i];
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p))
t[p] = s[p];
}
return t;
};
return __assign.apply(this, arguments);
};
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (g && (g = 0, op[0] && (_ = 0)), _) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
exports.__esModule = true;
// @ts-nocheck
var worker_1 = require("@temporalio/worker");
function main() {
return __awaiter(this, void 0, void 0, function () {
var sinks, worker;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
sinks = __assign(__assign({}, (0, worker_1.defaultSinks)()), { alerter: {
alert: {
fn: function (workflowInfo, message) {
console.log("sending SMS alert!\nworkflow: ".concat(workflowInfo.runId, "\nmessage: ").concat(message));
},
callDuringReplay: false
}
} });
return [4 /*yield*/, worker_1.Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'sinks',
sinks: sinks
})];
case 1:
worker = _a.sent();
return [4 /*yield*/, worker.run()];
case 2:
_a.sent();
console.log('Worker gracefully shutdown');
return [2 /*return*/];
}
});
});
}
main()["catch"](function (err) {
console.error(err);
process.exit(1);
});
- Sink function implementations are passed as an object into WorkerOptions
- You can specify whether you want the injected function to be called during Workflow replay by setting the
callDuringReplay
boolean option.
Proxy and call a Sink function from a Workflow
- TypeScript
- JavaScript
const { alerter, defaultWorkerLogger } = proxySinks<MySinks>();
export async function sinkWorkflow(): Promise<string> {
defaultWorkerLogger.info('default logger: Workflow Execution started', {});
alerter.alert('alerter: Workflow Execution started');
return 'Hello, Temporal!';
}
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (g && (g = 0, op[0] && (_ = 0)), _) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
exports.__esModule = true;
exports.sinkWorkflow = void 0;
// @ts-nocheck
var _a = proxySinks(), alerter = _a.alerter, defaultWorkerLogger = _a.defaultWorkerLogger;
function sinkWorkflow() {
return __awaiter(this, void 0, void 0, function () {
return __generator(this, function (_a) {
defaultWorkerLogger.info('default logger: Workflow Execution started', {});
alerter.alert('alerter: Workflow Execution started');
return [2 /*return*/, 'Hello, Temporal!'];
});
});
}
exports.sinkWorkflow = sinkWorkflow;
Some important features of the InjectedSinkFunction interface:
- Injected WorkflowInfo argument: The first argument of a Sink function implementation is a
workflowInfo
object that contains useful metadata. - Limited arguments types: The remaining Sink function arguments are copied between the sandbox and the Node.js environment using the structured clone algorithm.
- No return value: To prevent breaking determinism, Sink functions cannot return values to the Workflow.
Advanced: Performance considerations and non-blocking Sinks
The injected sink function contributes to the overall Workflow Task processing duration.
- If you have a long-running sink function, such as one that tries to communicate with external services, you might start seeing Workflow Task timeouts.
- The effect is multiplied when using
callDuringReplay: true
and replaying long Workflow histories because the Workflow Task timer starts when the first history page is delivered to the Worker.
Custom logger
Use a custom logger for logging.
- Go
- Java
- PHP
- Python
- TypeScript
This field sets a custom Logger that is used for all logging actions of the instance of the Temporal Client.
Although the Go SDK does not support most third-party logging solutions natively, our friends at Banzai Cloud built the adapter package logur which makes it possible to use third party loggers with minimal overhead. Most of the popular logging solutions have existing adapters in Logur, but you can find a full list in the Logur Github project.
Here is an example of using Logur to support Logrus:
package main
import (
"go.temporal.io/sdk/client"
"github.com/sirupsen/logrus"
logrusadapter "logur.dev/adapter/logrus"
"logur.dev/logur"
)
func main() {
// ...
logger := logur.LoggerToKV(logrusadapter.New(logrus.New()))
clientOptions := client.Options{
Logger: logger,
}
temporalClient, err := client.Dial(clientOptions)
// ...
}
To set a custom logger, supply your own logging implementation and configuration details the same way you would in any other Java application.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Use the built-in Logging facility for Python to set a custom logger.
Logging in Workers and Clients
The Worker comes with a default logger which defaults to log any messages with level INFO
and higher to STDERR
using console.error
.
The following log levels are listed in increasing order of severity.
Customizing the default logger
Temporal uses a DefaultLogger
that implements the basic interface:
- TypeScript
- JavaScript
import { DefaultLogger, Runtime } from '@temporalio/worker';
const logger = new DefaultLogger('WARN', ({ level, message }) => {
console.log(`Custom logger: ${level} — ${message}`);
});
Runtime.install({ logger });
"use strict";
exports.__esModule = true;
// @ts-nocheck
var worker_1 = require("@temporalio/worker");
var logger = new worker_1.DefaultLogger('WARN', function (_a) {
var level = _a.level, message = _a.message;
console.log("Custom logger: ".concat(level, " \u2014 ").concat(message));
});
worker_1.Runtime.install({ logger: logger });
The previous code example sets the default logger to only log messages with level WARN
and higher.
Accumulate logs for testing and reporting
- TypeScript
- JavaScript
import { DefaultLogger, LogEntry } from '@temporalio/worker';
const logs: LogEntry[] = [];
const logger = new DefaultLogger('TRACE', (entry) => logs.push(entry));
log.debug('hey', { a: 1 });
log.info('ho');
log.warn('lets', { a: 1 });
log.error('go');
"use strict";
exports.__esModule = true;
// @ts-nocheck
var worker_1 = require("@temporalio/worker");
var logs = [];
var logger = new worker_1.DefaultLogger('TRACE', function (entry) { return logs.push(entry); });
log.debug('hey', { a: 1 });
log.info('ho');
log.warn('lets', { a: 1 });
log.error('go');
A common logging use case is logging to a file to be picked up by a collector like the Datadog Agent.
- TypeScript
- JavaScript
import { Runtime } from '@temporalio/worker';
import winston from 'winston';
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [new transports.File({ filename: '/path/to/worker.log' })],
});
Runtime.install({ logger });
"use strict";
exports.__esModule = true;
// @ts-nocheck
var worker_1 = require("@temporalio/worker");
var winston_1 = require("winston");
var logger = winston_1["default"].createLogger({
level: 'info',
format: winston_1["default"].format.json(),
transports: [new transports.File({ filename: '/path/to/worker.log' })]
});
worker_1.Runtime.install({ logger: logger });
Visibility
The term Visibility, within the Temporal Platform, refers to the subsystems and APIs that enable an operator to view Workflow Executions that currently exist within a Cluster.
Search Attributes
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search AttributesWhat is a Search Attribute?
A Search Attribute is an indexed name used in List Filters to filter a list of Workflow Executions that have the Search Attribute in their metadata.
Learn more.
- Default Search AttributesWhat is a Search Attribute?
A Search Attribute is an indexed name used in List Filters to filter a list of Workflow Executions that have the Search Attribute in their metadata.
Learn more like `WorkflowType`, `StartTime` and `ExecutionStatus` are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search AttributesWhat is a Search Attribute?
A Search Attribute is an indexed name used in List Filters to filter a list of Workflow Executions that have the Search Attribute in their metadata.
Learn more likeCustomKeywordField
andCustomIntField
are created by default in Temporal's Docker ComposeHow to quickly install a Temporal Cluster for testing and local development
There are four ways to quickly install and run a Temporal Cluster.
Learn more.
- A few generic Custom Search AttributesWhat is a Search Attribute?
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Cluster using
tctl search-attribute create
or the Cloud UI. - Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
UpsertSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
DescribeWorkflow
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List FilterWhat is a List Filter?
A List Filter is the SQL-like string that is provided as the parameter to an Advanced Visibility List API.
Learn more:- In `tctl`tctl workflow list
How to list open or closed Workflow Executions using tctl.
Learn more. - In code by calling
ListWorkflowExecutions
.
- In `tctl`tctl workflow list
Here is how to query Workflow Executions:
- Go
- Java
- PHP
- Python
- TypeScript
Use Client.ListWorkflow
.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Use the list_workflows() method on the Client handle and pass a List FilterWhat is a List Filter?
A List Filter is the SQL-like string that is provided as the parameter to an Advanced Visibility List API.
Learn more as an argument to filter the listed Workflows.
async for workflow in client.list_workflows('WorkflowType="MyWorkflowClass"'):
print(f"Workflow: {workflow.id}")
Use WorkflowService.listWorkflowExecutions
:
import { Connection } from '@temporalio/client';
const connection = await Connection.connect();
const response = await connection.workflowService.listWorkflowExecutions({
query: `ExecutionStatus = "Running"`,
});
where query
is a List FilterWhat is a List Filter?
A List Filter is the SQL-like string that is provided as the parameter to an Advanced Visibility List API.
Learn more.
Custom Search Attributes
After you've created custom Search Attributes in your Cluster (using tctl search-attribute create
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
- Go
- Java
- PHP
- Python
- TypeScript
Provide key-value pairs in StartWorkflowOptions.SearchAttributes
.
Search Attributes are represented as map[string]interface{}
.
The values in the map must correspond to the Search Attribute's value type:
- Bool =
bool
- Datetime =
time.Time
- Double =
float64
- Int =
int64
- Keyword =
string
- Text =
string
If you had custom Search Attributes CustomerId
of type Keyword and MiscData
of type Text, you would provide string
values:
func (c *Client) CallYourWorkflow(ctx context.Context, workflowID string, payload map[string]interface{}) error {
// ...
searchAttributes := map[string]interface{}{
"CustomerId": payload["customer"],
"MiscData": payload["miscData"]
}
options := client.StartWorkflowOptions{
SearchAttributes: searchAttributes
// ...
}
we, err := c.Client.ExecuteWorkflow(ctx, options, app.YourWorkflow, payload)
// ...
}
To set a custom Search Attribute, call the setSearchAttributes()
method.
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder()
.setSearchAttributes(generateSearchAttributes())
.build();
generateSearchAttributes()
is a Map<String, ?>
from the Search Attribute used as the key to a value of one of the following types.
String
Long
Integer
Boolean
Double
OffsetDateTime
Use the WorkflowOptions::withSearchAttributes()
method to provide Search Attributes when you start a Workflow.
$workflow = $this->workflowClient->newWorkflowStub(
GreetingWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowExecutionTimeout(CarbonInterval::minute())
->withSearchAttributes(
[
'CustomKeywordField' => 'value',
'CustomIntField' => 123,
]
)
);
To set custom Search Attributes, use the search_attributes
parameter of the 'start_workflow()' method.
handle = await client.start_workflow(
"your-workflow-name",
id="your-workflow-id",
task_queue="your-task-queue",
search_attributes={"Your-Custom-Keyword-Field": ["value"]},
)
Use WorkflowOptions.searchAttributes
.
search-attributes/src/client.ts
- TypeScript
- JavaScript
const handle = await client.workflow.start(example, {
taskQueue: 'search-attributes',
workflowId: 'search-attributes-example-0',
searchAttributes: {
CustomIntField: [2],
CustomKeywordField: ['keywordA', 'keywordB'],
CustomBoolField: [true],
CustomDatetimeField: [new Date()],
CustomStringField: [
'String field is for text. When queried, it will be tokenized for partial match. StringTypeField cannot be used in Order By',
],
},
});
const { searchAttributes } = await handle.describe();
var handle = await client.workflow.start(example, {
taskQueue: 'search-attributes',
workflowId: 'search-attributes-example-0',
searchAttributes: {
CustomIntField: [2],
CustomKeywordField: ['keywordA', 'keywordB'],
CustomBoolField: [true],
CustomDatetimeField: [new Date()],
CustomStringField: [
'String field is for text. When queried, it will be tokenized for partial match. StringTypeField cannot be used in Order By',
]
}
});
var searchAttributes = (await handle.describe()).searchAttributes;
The type of searchAttributes
is Record<string, string[] | number[] | boolean[] | Date[]>
.
Upsert Search Attributes
You can upsert Search Attributes to add or update Search Attributes from within Workflow code.
- Go
- Java
- PHP
- Python
- TypeScript
In advanced cases, you may want to dynamically update these attributes as the Workflow progresses. UpsertSearchAttributes is used to add or update Search Attributes from within Workflow code.
UpsertSearchAttributes
will merge attributes to the existing map in the Workflow.
Consider this example Workflow code:
func YourWorkflow(ctx workflow.Context, input string) error {
attr1 := map[string]interface{}{
"CustomIntField": 1,
"CustomBoolField": true,
}
workflow.UpsertSearchAttributes(ctx, attr1)
attr2 := map[string]interface{}{
"CustomIntField": 2,
"CustomKeywordField": "seattle",
}
workflow.UpsertSearchAttributes(ctx, attr2)
}
After the second call to UpsertSearchAttributes
, the map will contain:
map[string]interface{}{
"CustomIntField": 2, // last update wins
"CustomBoolField": true,
"CustomKeywordField": "seattle",
}
In your Workflow code, call the upsertSearchAttributes(Map<String, ?> searchAttributes)
method.
Map<String, Object> attr1 = new HashMap<>();
attr1.put("CustomIntField", 1);
attr1.put("CustomBoolField", true);
Workflow.upsertSearchAttributes(attr1);
Map<String, Object> attr2 = new HashMap<>();
attr2.put("CustomIntField", Lists.newArrayList(1, 2));
attr2.put("CustomKeywordField", "Seattle");
Workflow.upsertSearchAttributes(attr2);
The results of upsertSearchAttributes()
output the following search attributes.
{
"CustomIntField": 1, 2,
"CustomBoolField": true,
"CustomKeywordField": "Seattle",
}
To upsert Search Attributes within a Workflow, use Workflow::upsertSearchAttributes()
.
class GreetingWorkflow implements GreetingWorkflowInterface
{
public function getGreeting(string $name)
{
Workflow::upsertSearchAttributes(
[
'CustomKeywordField' => 'attr1-value',
'CustomIntField' => 123,
]
);
// ...
}
}
To upsert custom Search Attributes, use the upsert_search_attributes()
method.
The keys are added to or replace the existing Search Attributes, similar to dict.update()
.
workflow.upsert_search_attributes({"Your-Custom-Keyword-Field": ["new-value"]})
Inside a Workflow, we can read from WorkflowInfo.searchAttributes
and call upsertSearchAttributes
:
search-attributes/src/workflows.ts
- TypeScript
- JavaScript
export async function example(): Promise<SearchAttributes> {
const customInt =
(workflowInfo().searchAttributes.CustomIntField?.[0] as number) || 0;
upsertSearchAttributes({
// overwrite the existing CustomIntField: [2]
CustomIntField: [customInt + 1],
// delete the existing CustomBoolField: [true]
CustomBoolField: [],
// add a new value
CustomDoubleField: [3.14],
});
return workflowInfo().searchAttributes;
}
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (g && (g = 0, op[0] && (_ = 0)), _) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
exports.__esModule = true;
exports.example = void 0;
// @ts-nocheck
function example() {
var _a;
return __awaiter(this, void 0, void 0, function () {
var customInt;
return __generator(this, function (_b) {
customInt = ((_a = workflowInfo().searchAttributes.CustomIntField) === null || _a === void 0 ? void 0 : _a[0]) || 0;
upsertSearchAttributes({
// overwrite the existing CustomIntField: [2]
CustomIntField: [customInt + 1],
// delete the existing CustomBoolField: [true]
CustomBoolField: [],
// add a new value
CustomDoubleField: [3.14]
});
return [2 /*return*/, workflowInfo().searchAttributes];
});
});
}
exports.example = example;
Remove Search Attribute
To remove a Search Attribute that was previously set, set it to an empty array: []
.
- Go
- Java
- PHP
- Python
- TypeScript
There is no support for removing a field.
However, to achieve a similar effect, set the field to some placeholder value.
For example, you could set CustomKeywordField
to impossibleVal
.
Then searching CustomKeywordField != 'impossibleVal'
will match Workflows with CustomKeywordField
not equal to impossibleVal
, which includes Workflows without the CustomKeywordField
set.
To remove a Search Attribute, call the upsertSearchAttributes()
method and set it to an empty map.
To remove a Search Attribute that was previously set, set it to an empty array []
.
To remove a Search Attribute, use the upsert_search_attributes()
function with an empty list as its value.
workflow.upsert_search_attributes({"Your-Custom-Keyword-Field": []})
import { upsertSearchAttributes } from '@temporalio/workflow';
async function yourWorkflow() {
upsertSearchAttributes({ CustomIntField: [1, 2, 3] });
// ... later, to remove:
upsertSearchAttributes({ CustomIntField: [] });
}