-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathhttp.ex
More file actions
126 lines (102 loc) · 3.19 KB
/
http.ex
File metadata and controls
126 lines (102 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
defmodule Hexdocs.HTTP do
@max_retry_times 5
@base_sleep_time 200
require Logger
def head(url, headers) do
case Req.head(url, headers: headers, retry: false, decode_body: false) do
{:ok, response} ->
{:ok, response.status, normalize_headers(response.headers)}
{:error, reason} ->
{:error, reason}
end
end
def get(url, headers, _opts \\ []) do
case Req.get(url, headers: headers, retry: false, decode_body: false) do
{:ok, response} ->
{:ok, response.status, normalize_headers(response.headers), response.body}
{:error, reason} ->
{:error, reason}
end
end
def get_stream(url, headers) do
case Req.get(url, headers: headers, retry: false, decode_body: false, into: :self) do
{:ok, response} ->
stream = stream_body(response.body)
{:ok, response.status, normalize_headers(response.headers), stream}
{:error, reason} ->
{:error, reason}
end
end
def put(url, headers, body) do
case Req.put(url,
headers: headers,
body: body,
retry: false,
decode_body: false,
receive_timeout: 10_000
) do
{:ok, response} ->
{:ok, response.status, normalize_headers(response.headers), response.body}
{:error, reason} ->
{:error, reason}
end
end
def post(url, headers, body, _opts \\ []) do
case Req.post(url, headers: headers, body: body, retry: false, decode_body: false) do
{:ok, response} ->
{:ok, response.status, normalize_headers(response.headers), response.body}
{:error, reason} ->
{:error, reason}
end
end
def delete(url, headers, _opts \\ []) do
case Req.delete(url, headers: headers, retry: false, decode_body: false) do
{:ok, response} ->
{:ok, response.status, normalize_headers(response.headers), response.body}
{:error, reason} ->
{:error, reason}
end
end
defp normalize_headers(headers) do
Enum.map(headers, fn {name, values} -> {name, Enum.join(values, ", ")} end)
end
defp stream_body(ref) do
start_fun = fn -> :cont end
after_fun = fn _ -> :ok end
next_fun = fn
:cont ->
receive do
{^ref, {:data, data}} -> {[{:ok, data}], :cont}
{^ref, :done} -> {:halt, :ok}
after
30_000 -> {[{:error, :timeout}], :stop}
end
:stop ->
{:halt, :ok}
end
Stream.resource(start_fun, next_fun, after_fun)
end
def retry(service, url, fun) do
retry(fun, service, url, 0)
end
defp retry(fun, service, url, times) do
case fun.() do
{:ok, status, _headers, _body} when status in 500..599 or status == 429 ->
do_retry(fun, service, url, times, "status #{status}")
{:error, reason} ->
do_retry(fun, service, url, times, reason)
result ->
result
end
end
defp do_retry(fun, service, url, times, reason) do
Logger.warning("#{service} API ERROR #{url}: #{inspect(reason)}")
if times + 1 < @max_retry_times do
sleep = trunc(:math.pow(3, times) * @base_sleep_time)
:timer.sleep(sleep)
retry(fun, service, url, times + 1)
else
{:error, reason}
end
end
end