Skip to content

Commit 12ae8ee

Browse files
add system defined tags and free form labels to datasets
PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and X-P-Dataset-Labels headers (comma-separated) on stream creation PUT /api/prism/v1/datasets/{name}/tags - replace dataset tags (empty list clears all) PUT /api/prism/v1/datasets/{name}/labels - replace dataset labels (empty list clears all) GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag include tags and labels in home api response
1 parent 5c8bf86 commit 12ae8ee

14 files changed

Lines changed: 212 additions & 64 deletions

File tree

src/connectors/kafka/processor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ impl ParseableSinkProcessor {
5858
None,
5959
vec![log_source_entry],
6060
TelemetryType::default(),
61-
None,
61+
vec![],
62+
vec![],
6263
)
6364
.await?;
6465

src/handlers/http/ingest.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ pub async fn ingest(
118118
None,
119119
vec![log_source_entry.clone()],
120120
telemetry_type,
121-
None,
121+
vec![],
122+
vec![],
122123
)
123124
.await
124125
.map_err(|e| {
@@ -227,7 +228,8 @@ pub async fn setup_otel_stream(
227228
None,
228229
vec![log_source_entry.clone()],
229230
telemetry_type,
230-
None,
231+
vec![],
232+
vec![],
231233
)
232234
.await?;
233235
let mut time_partition = None;

src/handlers/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub mod about;
3232
pub mod alerts;
3333
pub mod cluster;
3434
pub mod correlation;
35+
pub mod datasets;
3536
pub mod demo_data;
3637
pub mod health_check;
3738
pub mod ingest;

src/handlers/http/modal/server.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,39 @@ impl Server {
199199
}
200200

201201
pub fn get_prism_datasets() -> Scope {
202-
web::scope("/datasets").route(
203-
"",
204-
web::post()
205-
.to(http::prism_logstream::post_datasets)
206-
.authorize_for_resource(Action::GetStreamInfo)
207-
.authorize_for_resource(Action::GetStats)
208-
.authorize_for_resource(Action::GetRetention),
209-
)
202+
web::scope("/datasets")
203+
.route(
204+
"",
205+
web::post()
206+
.to(http::prism_logstream::post_datasets)
207+
.authorize_for_resource(Action::GetStreamInfo)
208+
.authorize_for_resource(Action::GetStats)
209+
.authorize_for_resource(Action::GetRetention),
210+
)
211+
.route(
212+
"/tags/{tag}",
213+
web::get()
214+
.to(http::datasets::get_datasets_by_tag)
215+
.authorize_for_resource(Action::GetStreamInfo),
216+
)
217+
.route(
218+
"/{name}/correlated",
219+
web::get()
220+
.to(http::datasets::get_correlated_datasets)
221+
.authorize_for_resource(Action::GetStreamInfo),
222+
)
223+
.route(
224+
"/{name}/tags",
225+
web::put()
226+
.to(http::datasets::put_dataset_tags)
227+
.authorize_for_resource(Action::CreateStream),
228+
)
229+
.route(
230+
"/{name}/labels",
231+
web::put()
232+
.to(http::datasets::put_dataset_labels)
233+
.authorize_for_resource(Action::CreateStream),
234+
)
210235
}
211236

212237
pub fn get_demo_data_webscope() -> Scope {

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
*/
1818

1919
use actix_web::http::header::HeaderMap;
20-
use tracing::warn;
2120

2221
use crate::{
2322
event::format::LogSource,
2423
handlers::{
25-
CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
26-
STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
27-
TelemetryType, UPDATE_STREAM_KEY,
24+
CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
25+
LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
26+
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
27+
parse_dataset_labels, parse_dataset_tags,
2828
},
2929
storage::StreamType,
3030
};
@@ -39,7 +39,8 @@ pub struct PutStreamHeaders {
3939
pub stream_type: StreamType,
4040
pub log_source: LogSource,
4141
pub telemetry_type: TelemetryType,
42-
pub dataset_tag: Option<DatasetTag>,
42+
pub dataset_tags: Vec<DatasetTag>,
43+
pub dataset_labels: Vec<String>,
4344
}
4445

4546
impl From<&HeaderMap> for PutStreamHeaders {
@@ -73,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders {
7374
.get(TELEMETRY_TYPE_KEY)
7475
.and_then(|v| v.to_str().ok())
7576
.map_or(TelemetryType::Logs, TelemetryType::from),
76-
dataset_tag: headers
77-
.get(DATASET_TAG_KEY)
77+
dataset_tags: headers
78+
.get(DATASET_TAGS_KEY)
79+
.or_else(|| headers.get(DATASET_TAG_KEY))
7880
.and_then(|v| v.to_str().ok())
79-
.and_then(|v| match DatasetTag::try_from(v) {
80-
Ok(tag) => Some(tag),
81-
Err(err) => {
82-
warn!("Invalid dataset tag '{v}': {err}");
83-
None
84-
}
85-
}),
81+
.map(parse_dataset_tags)
82+
.unwrap_or_default(),
83+
dataset_labels: headers
84+
.get(DATASET_LABELS_KEY)
85+
.and_then(|v| v.to_str().ok())
86+
.map(parse_dataset_labels)
87+
.unwrap_or_default(),
8688
}
8789
}
8890
}

src/handlers/mod.rs

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
*
1717
*/
1818

19+
use std::collections::HashSet;
1920
use std::fmt::Display;
2021

2122
use serde::{Deserialize, Serialize};
23+
use tracing::warn;
2224

2325
pub mod airplane;
2426
pub mod http;
@@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3638
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3739
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
3840
pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag";
41+
pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags";
42+
pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels";
3943
const COOKIE_AGE_DAYS: usize = 7;
4044
const SESSION_COOKIE_NAME: &str = "session";
4145
const USER_COOKIE_NAME: &str = "username";
@@ -84,24 +88,28 @@ impl Display for TelemetryType {
8488
}
8589

8690
/// Tag for categorizing datasets/streams by observability domain
87-
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
91+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
8892
#[serde(rename_all = "kebab-case")]
8993
pub enum DatasetTag {
90-
AgentObservability,
91-
K8sObservability,
94+
AgentMonitoring,
95+
K8sMonitoring,
9296
DatabaseObservability,
97+
ApplicationMonitoring,
98+
ServiceMap,
9399
}
94100

95101
impl TryFrom<&str> for DatasetTag {
96102
type Error = &'static str;
97103

98104
fn try_from(s: &str) -> Result<Self, Self::Error> {
99105
match s.to_lowercase().as_str() {
100-
"agent-observability" => Ok(DatasetTag::AgentObservability),
101-
"k8s-observability" => Ok(DatasetTag::K8sObservability),
106+
"agent-monitoring" => Ok(DatasetTag::AgentMonitoring),
107+
"k8s-monitoring" => Ok(DatasetTag::K8sMonitoring),
102108
"database-observability" => Ok(DatasetTag::DatabaseObservability),
109+
"application-monitoring" => Ok(DatasetTag::ApplicationMonitoring),
110+
"service-map" => Ok(DatasetTag::ServiceMap),
103111
_ => Err(
104-
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability",
112+
"Invalid dataset tag. Supported values: agent-monitoring, k8s-monitoring, database-observability, application-monitoring, service-map",
105113
),
106114
}
107115
}
@@ -110,9 +118,43 @@ impl TryFrom<&str> for DatasetTag {
110118
impl Display for DatasetTag {
111119
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112120
f.write_str(match self {
113-
DatasetTag::AgentObservability => "agent-observability",
114-
DatasetTag::K8sObservability => "k8s-observability",
121+
DatasetTag::AgentMonitoring => "agent-monitoring",
122+
DatasetTag::K8sMonitoring => "k8s-monitoring",
115123
DatasetTag::DatabaseObservability => "database-observability",
124+
DatasetTag::ApplicationMonitoring => "application-monitoring",
125+
DatasetTag::ServiceMap => "service-map",
116126
})
117127
}
118128
}
129+
130+
pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
131+
header_value
132+
.split(',')
133+
.filter_map(|s| {
134+
let trimmed = s.trim();
135+
if trimmed.is_empty() {
136+
None
137+
} else {
138+
match DatasetTag::try_from(trimmed) {
139+
Ok(tag) => Some(tag),
140+
Err(err) => {
141+
warn!("Invalid dataset tag '{trimmed}': {err}");
142+
None
143+
}
144+
}
145+
}
146+
})
147+
.collect::<HashSet<_>>()
148+
.into_iter()
149+
.collect()
150+
}
151+
152+
pub fn parse_dataset_labels(header_value: &str) -> Vec<String> {
153+
header_value
154+
.split(',')
155+
.map(|s| s.trim().to_string())
156+
.filter(|s| !s.is_empty())
157+
.collect::<HashSet<_>>()
158+
.into_iter()
159+
.collect()
160+
}

src/metadata.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ pub struct LogStreamMetadata {
9292
pub stream_type: StreamType,
9393
pub log_source: Vec<LogSourceEntry>,
9494
pub telemetry_type: TelemetryType,
95-
pub dataset_tag: Option<DatasetTag>,
95+
pub dataset_tags: Vec<DatasetTag>,
96+
pub dataset_labels: Vec<String>,
9697
}
9798

9899
impl LogStreamMetadata {
@@ -108,7 +109,8 @@ impl LogStreamMetadata {
108109
schema_version: SchemaVersion,
109110
log_source: Vec<LogSourceEntry>,
110111
telemetry_type: TelemetryType,
111-
dataset_tag: Option<DatasetTag>,
112+
dataset_tags: Vec<DatasetTag>,
113+
dataset_labels: Vec<String>,
112114
) -> Self {
113115
LogStreamMetadata {
114116
created_at: if created_at.is_empty() {
@@ -133,7 +135,8 @@ impl LogStreamMetadata {
133135
schema_version,
134136
log_source,
135137
telemetry_type,
136-
dataset_tag,
138+
dataset_tags,
139+
dataset_labels,
137140
..Default::default()
138141
}
139142
}

src/migration/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,8 @@ async fn setup_logstream_metadata(
371371
stream_type,
372372
log_source,
373373
telemetry_type,
374-
dataset_tag,
374+
dataset_tags,
375+
dataset_labels,
375376
..
376377
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();
377378

@@ -408,7 +409,8 @@ async fn setup_logstream_metadata(
408409
stream_type,
409410
log_source,
410411
telemetry_type,
411-
dataset_tag,
412+
dataset_tags,
413+
dataset_labels,
412414
};
413415

414416
Ok(metadata)

0 commit comments

Comments
 (0)