OSIsoft PI System
The OSIsoft PI System is a real-time data management platform widely used in various industries, including oil and gas, manufacturing, utilities, and more. It is designed to collect, store, analyze, and visualize large volumes of time-series data from sensors, devices, and other sources.
How to Access PI System Data
To access data from the PI System you will need to use the PI Web API. The PI Web API provides a RESTful interface to interact with the PI System, allowing you to retrieve and manipulate data using standard HTTP methods.
PI Web API Example
This example should be run on Databricks on an interactive cluster. It requries that the cluster is in our corportate network or connected via VPN.
PI Webapi:
https://piwebapi.arcresources.ca/piwebapi
alternatively
https://10.65.31.72/piwebapi
IP: 10.65.31.72
We will probably need to use the IP based address as the dns will not resolve for databricks.
Here is a simple example of how to use the PI Web API to retrieve data from the ARC PI System using Python:
PI is based on tags, a tag is a single data point, for example a temperature sensor or a pressure sensor. Each tag has a unique name and can be used to retrieve data.
Tags
These are the tags we will use for our example, they are all related to the 05-20 well. You can find more tags by using the PI System Explorer or by using the PI Web API to search for tags.
| PiTag | Description |
|---|---|
| 0520084246.fqit_160401.ydy_vol | 05-20 YDay Cond Sales Volume (m3) |
| 0520084246.fqit_154701.ydy_vol | 05-20 YDay NGL Sales Volume (m3) |
| 0520084246.fqit_013010.ydy_vol | 05-20 YDay Gas Sales Volume (e3m3) |
| 0520084246.fqit_137201.ydy_vol | 05-20 YDay Water Volume (m3) |
import requests
import json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
from pyspark.sql.functions import lit
schema = StructType([
StructField("Tag", StringType(), True),
StructField("Timestamp", TimestampType(), True),
StructField("Value", DoubleType(), True)
])
days_back = 180
tag_df = spark.createDataFrame([], schema)
tags = ["0520084246.fqit_160401.ydy_vol",
"0520084246.fqit_154701.ydy_vol",
"0520084246.fqit_013010.ydy_vol",
"0520084246.fqit_137201.ydy_vol"]
tag_details=[]
for tag in tags:
tag_detail = requests.get(f"https://10.65.31.72/piwebapi/points/search?dataServerWebId=F1DS8eMZxjcJaE6phkLX1nxCaQUTktUEktREE&query=name:=*{tag}*",
timeout=30,
verify=False)
tag_data = requests.get(tag_detail.json()["Items"][0]["Links"]["InterpolatedData"] + f"?interval=1d&startTime=*-{days_back}d&syncTime=2024-12-01T10:00:00Z",
timeout=30,
verify=False)
tag_details.append(tag_detail)
df = spark.read.json(sc.parallelize([json.dumps(tag_data.json()["Items"])]))
df = df.withColumn("Value", df["Value"].cast("numeric"))
df = df.withColumn("Timestamp", df["Timestamp"].cast("timestamp"))
df = df.withColumn("Tag", lit(tag_detail.json()["Items"][0]["Name"]))
df = df.select("Tag", "Timestamp", "Value").where("Value is not null")
tag_df = tag_df.union(df)
display(tag_df.orderBy("Tag", "Timestamp"))
This code retrieves daily interpolated data for the specified tags over the past 180 days and loads it into a Spark DataFrame. You can modify the days_back variable to change the time range of the data retrieved.
Here are some additional functions you can use with the PI Web API:
import urllib.parse
import requests
BASE = "https://10.65.31.72/piwebapi"
AUTH = None # e.g., ("DOMAIN\\user", "password") if using Basic; or leave None if Negotiate is handled outside
VERIFY_SSL = False # you currently use verify=False
def _encode_path(path: str) -> str:
# Encode backslashes, %, /, spaces, parentheses, etc.
return urllib.parse.quote(path, safe="")
def get_piwebapi_id(path: str) -> str:
"""
Resolve an AF Attribute by path -> WebId
Expects AF attribute path like: \\AFSERVER\\DB\\Element\\SubElement|Attribute
"""
url = f"{BASE}/attributes?path={_encode_path(path)}"
resp = requests.get(url, verify=VERIFY_SSL, timeout=30, auth=AUTH)
resp.raise_for_status()
j = resp.json()
if "WebId" in j:
return j["WebId"]
# Some servers return 404 with a minimal body; surface a clearer error
raise KeyError(f"Attribute not found for path: {path}. Response: {j}")
def get_piwebapi_value(path: str):
"""
Current value for an AF Attribute path.
"""
web_id = get_piwebapi_id(path)
url = f"{BASE}/streams/{web_id}/value"
resp = requests.get(url, verify=VERIFY_SSL, timeout=30, auth=AUTH)
resp.raise_for_status()
j = resp.json()
return j.get("Value", None)
def get_piwebapi_recorded(
path: str,
start_time: str = "*-1d",
end_time: str = "*",
boundary_type: str = "Inside", # Inside|Outside|Both
max_count: int = 10000, # page size
include_bad: bool = False,
time_zone: str = "UTC"
):
"""
Recorded (historical) values for an AF Attribute path, auto-paginated.
start_time/end_time accept PI time (e.g., '*-8h', '2025-09-01T00:00:00Z', 'y', 't', etc.)
boundary_type: how to handle boundary events when Outside/Inside window.
Returns a list of events: [{Timestamp, Value, Good, Questionable, ...}, ...]
"""
web_id = get_piwebapi_id(path)
params = {
"startTime": start_time,
"endTime": end_time,
"boundaryType": boundary_type,
"maxCount": max_count,
"includeBad": str(include_bad).lower(),
"timeZone": time_zone,
}
url = f"{BASE}/streams/{web_id}/recorded"
items = []
while url:
resp = requests.get(url, params=params if "?" not in url else None,
verify=VERIFY_SSL, timeout=60, auth=AUTH)
resp.raise_for_status()
j = resp.json()
items.extend(j.get("Items", []))
# follow pagination
next_link = (j.get("Links") or {}).get("Next")
url = next_link
params = None # subsequent pages already have query params in Next link
return items
def get_piwebapi_interpolated(
path: str,
start_time: str,
end_time: str,
interval: str = "1h",
time_zone: str = "UTC",
):
"""
Interpolated values on a fixed interval.
interval examples: '1m', '5m', '1h', '8h', '1d'
"""
web_id = get_piwebapi_id(path)
url = f"{BASE}/streams/{web_id}/interpolated"
params = {
"startTime": start_time,
"endTime": end_time,
"interval": interval,
"timeZone": time_zone,
}
resp = requests.get(url, params=params, verify=VERIFY_SSL, timeout=60, auth=AUTH)
resp.raise_for_status()
return resp.json().get("Items", [])
# Example usage:
# testing
paths = [
r"\\osi-af\AFPRD\NE BC\Attachie\Attachie 07-17 Terminal\Sales\Attachie 07-17 Condy Lact to Pembina FE-210|Condensate Production Today",
r"\\osi-af\AFPRD\NE BC\Attachie\Attachie 07-17 Terminal\Sales\Attachie 07-17 NGL Lact to Pembina FE-110|NGL Production Today",
r"\\osi-af\AFPRD\NE BC\Attachie\Attachie 07-17 Terminal\Storage\Attachie 07-17 Cond Tank TK-04000|Level (%)",
r"\\osi-af\AFPRD\NE BC\Attachie\Attachie 07-17 Terminal\Storage\Attachie 07-17 Cond Tank TK-04010|Level (%)",
r"\\osi-af\AFPRD\NE BC\Attachie\Attachie 07-17 Terminal\Sales\Attachie 07-17 Condy Lact to Pembina FE-210|Condensate Flow Rate"
]
t = 2
# 1) Current value
val = get_piwebapi_value(paths[t])
display(val)
# 2) Recorded (last 8 hours)
hist = get_piwebapi_recorded(paths[t],
start_time="*-8h",
end_time="*")
display(hist)
# 3) Interpolated hourly for yesterday (00:00โ24:00 local-ish)
interp = get_piwebapi_interpolated(paths[t],
start_time="y",
end_time="t",
interval="1h")
display(interp)
How to use this in a loop and load to a Spark DataFrame.
# Loop through all paths and get interpolated values for the last two days by the hour
from datetime import datetime, timedelta
from pyspark.sql.functions import col, to_timestamp, regexp_extract
from pyspark.sql.types import DoubleType
from pyspark.sql import Row
# Calculate start and end times for the last two days
end_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
start_time = (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%SZ")
interpolated_data = []
for path in paths:
interp = get_piwebapi_interpolated(
path,
start_time=start_time,
end_time=end_time,
interval="1h"
)
for item in interp:
interpolated_data.append(Row(Path=path, Timestamp=item['Timestamp'], Value=item['Value']))
interp_df = spark.createDataFrame(interpolated_data)
interp_df = interp_df.withColumn("Timestamp", to_timestamp(col("Timestamp")))
interp_df = interp_df.withColumn("Value", col("Value").cast(DoubleType()))
# parse path to get tag description
interp_df = interp_df.withColumn("Description", regexp_extract(col("Path"), r"(?<=\\)[^\\|]+(?=\|)", 0))
interp_df = interp_df.withColumn("Variable", regexp_extract(col("Path"), r"(?<=\|)[^|]+$", 0))
display(interp_df)