nixpkgs/nixos/tests/temporal.nix
2025-08-24 16:36:56 +00:00

312 lines
11 KiB
Nix

(
{ lib, pkgs, ... }:
{
name = "temporal";
meta.maintainers = [ pkgs.lib.maintainers.jpds ];
nodes = {
temporal =
{ config, pkgs, ... }:
{
networking.firewall.allowedTCPPorts = [ 7233 ];
environment.systemPackages = [
(pkgs.writers.writePython3Bin "temporal-hello-workflow.py"
{
libraries = [ pkgs.python3Packages.temporalio ];
}
# Graciously taken from https://github.com/temporalio/samples-python/blob/main/hello/hello_activity.py
''
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
# While we could use multiple parameters in the activity, Temporal strongly
# encourages using a single dataclass instead which can have fields added to it
# in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic activity that logs and does string concatenation
@activity.defn
def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
async def main():
# Uncomment the lines below to see logging output
# import logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
# Non-async activities require an executor;
# a thread pool executor is recommended.
# This same thread pool could be passed to multiple workers if desired.
activity_executor=ThreadPoolExecutor(5),
):
# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
''
)
pkgs.temporal-cli
];
services.temporal = {
enable = true;
settings = {
# Based on https://github.com/temporalio/temporal/blob/main/config/development-sqlite.yaml
log = {
stdout = true;
level = "info";
};
services = {
frontend = {
rpc = {
grpcPort = 7233;
membershipPort = 6933;
bindOnLocalHost = true;
httpPort = 7243;
};
};
matching = {
rpc = {
grpcPort = 7235;
membershipPort = 6935;
bindOnLocalHost = true;
};
};
history = {
rpc = {
grpcPort = 7234;
membershipPort = 6934;
bindOnLocalHost = true;
};
};
worker = {
rpc = {
grpcPort = 7239;
membershipPort = 6939;
bindOnLocalHost = true;
};
};
};
persistence = {
defaultStore = "sqlite-default";
visibilityStore = "sqlite-visibility";
numHistoryShards = 1;
datastores = {
sqlite-default = {
sql = {
user = "";
password = "";
pluginName = "sqlite";
databaseName = "default";
connectAddr = "localhost";
connectProtocol = "tcp";
connectAttributes = {
mode = "memory";
cache = "private";
};
maxConns = 1;
maxIdleConns = 1;
maxConnLifetime = "1h";
tls = {
enabled = false;
caFile = "";
certFile = "";
keyFile = "";
enableHostVerification = false;
serverName = "";
};
};
};
sqlite-visibility = {
sql = {
user = "";
password = "";
pluginName = "sqlite";
databaseName = "default";
connectAddr = "localhost";
connectProtocol = "tcp";
connectAttributes = {
mode = "memory";
cache = "private";
};
maxConns = 1;
maxIdleConns = 1;
maxConnLifetime = "1h";
tls = {
enabled = false;
caFile = "";
certFile = "";
keyFile = "";
enableHostVerification = false;
serverName = "";
};
};
};
};
};
clusterMetadata = {
enableGlobalNamespace = false;
failoverVersionIncrement = 10;
masterClusterName = "active";
currentClusterName = "active";
clusterInformation = {
active = {
enabled = true;
initialFailoverVersion = 1;
rpcName = "frontend";
rpcAddress = "localhost:7233";
httpAddress = "localhost:7243";
};
};
};
dcRedirectionPolicy = {
policy = "noop";
};
archival = {
history = {
state = "enabled";
enableRead = true;
provider = {
filestore = {
fileMode = "0666";
dirMode = "0766";
};
gstorage = {
credentialsPath = "/tmp/gcloud/keyfile.json";
};
};
};
visibility = {
state = "enabled";
enableRead = true;
provider = {
filestore = {
fileMode = "0666";
dirMode = "0766";
};
};
};
};
namespaceDefaults = {
archival = {
history = {
state = "disabled";
URI = "file:///tmp/temporal_archival/development";
};
visibility = {
state = "disabled";
URI = "file:///tmp/temporal_vis_archival/development";
};
};
};
};
};
};
};
testScript = ''
temporal.wait_for_unit("temporal")
temporal.wait_for_open_port(6933)
temporal.wait_for_open_port(6934)
temporal.wait_for_open_port(6935)
temporal.wait_for_open_port(7233)
temporal.wait_for_open_port(7234)
temporal.wait_for_open_port(7235)
temporal.wait_until_succeeds(
"journalctl -o cat -u temporal.service | grep 'server-version' | grep '${pkgs.temporal.version}'"
)
temporal.wait_until_succeeds(
"journalctl -o cat -u temporal.service | grep 'Frontend is now healthy'"
)
import json
cluster_list_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster list --output json"))
assert cluster_list_json[0]['clusterName'] == "active"
cluster_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster describe --output json"))
assert cluster_describe_json['serverVersion'] in "${pkgs.temporal.version}"
temporal.log(temporal.wait_until_succeeds("temporal operator namespace create --namespace default"))
temporal.wait_until_succeeds(
"journalctl -o cat -u temporal.service | grep 'Register namespace succeeded'"
)
namespace_list_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace list --output json"))
assert len(namespace_list_json) == 2
namespace_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace describe --output json --namespace default"))
assert namespace_describe_json['namespaceInfo']['name'] == "default"
assert namespace_describe_json['namespaceInfo']['state'] == "NAMESPACE_STATE_REGISTERED"
workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json"))
assert len(workflow_json) == 0
out = temporal.wait_until_succeeds("temporal-hello-workflow.py")
assert "Result: Hello, World!" in out
workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json"))
assert workflow_json[0]['execution']['workflowId'] == "hello-activity-workflow-id"
assert workflow_json[0]['status'] == "WORKFLOW_EXECUTION_STATUS_COMPLETED"
temporal.log(temporal.succeed(
"systemd-analyze security temporal.service | grep -v ''"
))
'';
}
)