forked from aws/aws-lambda-runtime-interface-emulator
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutil.go
More file actions
84 lines (69 loc) · 2.4 KB
/
util.go
File metadata and controls
84 lines (69 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package directinvoke
import (
"context"
"errors"
"go.amzn.com/lambda/core/bandwidthlimiter"
"io"
"net/http"
"time"
log "github.com/sirupsen/logrus"
)
const DefaultRefillIntervalMs = 125 // default refill interval in milliseconds
func NewStreamedResponseWriter(w http.ResponseWriter) (*bandwidthlimiter.BandwidthLimitingWriter, context.CancelFunc, error) {
flushingWriter, err := NewFlushingWriter(w) // after writing a chunk we have to flush it to avoid additional buffering by ResponseWriter
if err != nil {
return nil, nil, err
}
cancellableWriter, cancel := NewCancellableWriter(flushingWriter) // cancelling prevents next calls to Write() from happening
refillNumber := ResponseBandwidthRate * DefaultRefillIntervalMs / 1000 // refillNumber is calculated based on 'ResponseBandwidthRate' and bucket refill interval
refillInterval := DefaultRefillIntervalMs * time.Millisecond
// Initial bucket for token bucket algorithm allows for a burst of up to 6 MiB, and an average transmission rate of 2 MiB/s
bucket, err := bandwidthlimiter.NewBucket(ResponseBandwidthBurstSize, ResponseBandwidthBurstSize, refillNumber, refillInterval)
if err != nil {
cancel() // free resources
return nil, nil, err
}
bandwidthLimitingWriter, err := bandwidthlimiter.NewBandwidthLimitingWriter(cancellableWriter, bucket)
if err != nil {
cancel() // free resources
return nil, nil, err
}
return bandwidthLimitingWriter, cancel, nil
}
func NewFlushingWriter(w io.Writer) (*FlushingWriter, error) {
flusher, ok := w.(http.Flusher)
if !ok {
errorMsg := "expected http.ResponseWriter to be an http.Flusher"
log.Error(errorMsg)
return nil, errors.New(errorMsg)
}
return &FlushingWriter{
w: w,
flusher: flusher,
}, nil
}
type FlushingWriter struct {
w io.Writer
flusher http.Flusher
}
func (w *FlushingWriter) Write(p []byte) (n int, err error) {
n, err = w.w.Write(p)
w.flusher.Flush()
return
}
func NewCancellableWriter(w io.Writer) (*CancellableWriter, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
return &CancellableWriter{w: w, ctx: ctx}, cancel
}
type CancellableWriter struct {
w io.Writer
ctx context.Context
}
func (w *CancellableWriter) Write(p []byte) (int, error) {
if err := w.ctx.Err(); err != nil {
return 0, err
}
return w.w.Write(p)
}