-
-
Notifications
You must be signed in to change notification settings - Fork 162
Expand file tree
/
Copy pathlogstream_utils.rs
More file actions
90 lines (86 loc) · 3.39 KB
/
logstream_utils.rs
File metadata and controls
90 lines (86 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/*
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use actix_web::http::header::HeaderMap;
use crate::{
event::format::LogSource,
handlers::{
CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
parse_dataset_labels, parse_dataset_tags,
},
storage::StreamType,
};
#[derive(Debug, Default)]
pub struct PutStreamHeaders {
pub time_partition: String,
pub time_partition_limit: String,
pub custom_partition: Option<String>,
pub static_schema_flag: bool,
pub update_stream_flag: bool,
pub stream_type: StreamType,
pub log_source: LogSource,
pub telemetry_type: TelemetryType,
pub dataset_tags: Vec<DatasetTag>,
pub dataset_labels: Vec<String>,
}
impl From<&HeaderMap> for PutStreamHeaders {
fn from(headers: &HeaderMap) -> Self {
PutStreamHeaders {
time_partition: headers
.get(TIME_PARTITION_KEY)
.map_or("", |v| v.to_str().unwrap())
.to_string(),
time_partition_limit: headers
.get(TIME_PARTITION_LIMIT_KEY)
.map_or("", |v| v.to_str().unwrap())
.to_string(),
custom_partition: headers
.get(CUSTOM_PARTITION_KEY)
.map(|v| v.to_str().unwrap().to_string()),
static_schema_flag: headers
.get(STATIC_SCHEMA_FLAG)
.is_some_and(|v| v.to_str().unwrap() == "true"),
update_stream_flag: headers
.get(UPDATE_STREAM_KEY)
.is_some_and(|v| v.to_str().unwrap() == "true"),
stream_type: headers
.get(STREAM_TYPE_KEY)
.map(|v| StreamType::from(v.to_str().unwrap()))
.unwrap_or_default(),
log_source: headers
.get(LOG_SOURCE_KEY)
.map_or(LogSource::default(), |v| v.to_str().unwrap().into()),
telemetry_type: headers
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok())
.map_or(TelemetryType::Logs, TelemetryType::from),
dataset_tags: headers
.get(DATASET_TAGS_KEY)
.or_else(|| headers.get(DATASET_TAG_KEY))
.and_then(|v| v.to_str().ok())
.map(parse_dataset_tags)
.unwrap_or_default(),
dataset_labels: headers
.get(DATASET_LABELS_KEY)
.and_then(|v| v.to_str().ok())
.map(parse_dataset_labels)
.unwrap_or_default(),
}
}
}