summaryrefslogtreecommitdiffstats
path: root/websockets.scm
blob: 11c6355bb425b401316cf8a96259e6a8e61e1a98 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
(module websockets
  (
   ; parameters
   ping-interval close-timeout
   connection-timeout accept-connection
   drop-incoming-pings propagate-common-errors
   max-frame-size max-message-size

   ; high level API
   with-websocket with-concurrent-websocket
   send-message receive-message

   ; low level API
   ;; send-frame read-frame read-frame-payload
   ;; receive-fragments valid-utf8?
   ;; control-frame? upgrade-to-websocket
   ;; current-websocket unmask close-websocket
   ;; process-fragments

   ;; ; fragment
   ;; make-fragment fragment? fragment-payload fragment-length
   ;; fragment-masked? fragment-masking-key fragment-last?
   ;; fragment-optype
   )

(import chicken scheme data-structures extras ports posix foreign
        srfi-13 srfi-14 srfi-18)
(use srfi-1 srfi-4 spiffy intarweb uri-common base64 simple-sha1
     mailbox comparse)

(define-inline (neq? obj1 obj2) (not (eq? obj1 obj2)))

(define current-websocket (make-parameter #f))
(define ping-interval (make-parameter 15))
(define close-timeout (make-parameter 5))
(define connection-timeout (make-parameter 58)) ; a little grace period from 60s
(define accept-connection (make-parameter (lambda (origin) #t)))
(define drop-incoming-pings (make-parameter #t))
(define propagate-common-errors (make-parameter #f))
(define access-denied ; TODO test
  (make-parameter (lambda () (send-status 'forbidden "<h1>Access denied</h1>"))))

(define max-frame-size (make-parameter 1048576)) ; 1MiB
(define max-message-size
  (make-parameter 1048576 ; 1MiB
                  (lambda (v)
                    (if (> v 1073741823) ; max int size for unmask/utf8 check
                        (signal (make-property-condition 'out-of-range))
                        v))))

(define (make-websocket-exception . conditions)
  (apply make-composite-condition (append `(,(make-property-condition 'websocket))
                                          conditions)))

(define (make-protocol-violation-exception msg)
  (make-composite-condition (make-property-condition 'websocket)
                            (make-property-condition 'protocol-error 'msg msg)))

(define (opcode->optype op)
  (case op
    ((0) 'continuation)
    ((1) 'text)
    ((2) 'binary)
    ((8) 'connection-close)
    ((9) 'ping)
    ((10) 'pong)
    (else (signal (make-protocol-violation-exception "bad opcode")))))

(define (optype->opcode t)
  (case t
    ('continuation 0)
    ('text 1)
    ('binary 2)
    ('connection-close 8)
    ('ping 9)
    ('pong 10)
    (else (signal (make-websocket-exception
                   (make-property-condition 'invalid-optype))))))

(define (control-frame? optype)
  (or (eq? optype 'ping) (eq? optype 'pong) (eq? optype 'connection-close)))

(define-record-type websocket
  (make-websocket inbound-port outbound-port user-thread
                  send-mutex read-mutex last-message-timestamp
                  state send-mailbox read-mailbox concurrent)
  websocket?
  (inbound-port websocket-inbound-port)
  (outbound-port websocket-outbound-port)
  (user-thread websocket-user-thread)
  (send-mutex websocket-send-mutex)
  (read-mutex websocket-read-mutex)
  (last-message-timestamp websocket-last-message-timestamp
                          set-websocket-last-message-timestamp!)
  (state websocket-state set-websocket-state!)
  (send-mailbox websocket-send-mailbox)
  (read-mailbox websocket-read-mailbox)
  (concurrent websocket-concurrent?))

(define-record-type websocket-fragment
  (make-fragment payload length masked masking-key
                           fin optype)
  fragment?
  (payload fragment-payload)
  (length fragment-length)
  (masked fragment-masked? set-fragment-masked!)
  (masking-key fragment-masking-key)
  (fin fragment-last?)
  (optype fragment-optype))

(define (hex-string->string hexstr)
  ;; convert a string like "a745ff12" to a string
  (let ((result (make-string (/ (string-length hexstr) 2))))
    (let loop ((hexs (string->list hexstr))
               (i 0))
      (if (< (length hexs) 2)
          result
          (let ((ascii (string->number (string (car hexs) (cadr hexs)) 16)))
            (string-set! result i (integer->char ascii))
            (loop (cddr hexs)
                  (+ i 1)))))))


(define (send-frame ws optype data last-frame)
  ; TODO this sucks
  (when (u8vector? data) (set! data (blob->string (u8vector->blob/shared data))))
  (let* ((len (if (string? data) (string-length data) (u8vector-length data)))
         (frame-fin (if last-frame 1 0))
         (frame-rsv1 0)
         (frame-rsv2 0)
         (frame-rsv3 0)
         (frame-opcode (optype->opcode optype))
         (octet0 (bitwise-ior (arithmetic-shift frame-fin 7)
                              (arithmetic-shift frame-rsv1 6)
                              (arithmetic-shift frame-rsv2 5)
                              (arithmetic-shift frame-rsv3 4)
                              frame-opcode))

         (frame-masked 0)
         (frame-payload-length (cond ((< len 126) len)
                                     ((< len 65536) 126)
                                     (else 127)))
         (octet1 (bitwise-ior (arithmetic-shift frame-masked 7)
                              frame-payload-length))
         (outbound-port (websocket-outbound-port ws)))

    (write-u8vector (u8vector octet0 octet1) outbound-port)

    (write-u8vector
     (cond
      ((= frame-payload-length 126)
       (u8vector
        (arithmetic-shift (bitwise-and len 65280) -8)
        (bitwise-and len 255)))
      ((= frame-payload-length 127)
       (u8vector
        0 0 0 0
        (arithmetic-shift
         (bitwise-and len 4278190080) -24)
        (arithmetic-shift
         (bitwise-and len 16711680) -16)
        (arithmetic-shift
         (bitwise-and len 65280) -8)
        (bitwise-and len 255)))
      (else (u8vector)))
     outbound-port)

    (write-string data len outbound-port)
   (flush-output (response-port (current-response)))
    #t))

(define (send-message data #!optional (optype 'text) (ws (current-websocket)))
  ;; TODO break up large data into multiple frames?
  (optype->opcode optype) ; triggers error if invalid
  (dynamic-wind
      (lambda () (mutex-lock! (websocket-send-mutex ws)))
      (lambda () (send-frame ws optype data #t))
      (lambda () (mutex-unlock! (websocket-send-mutex ws)))))

(define (websocket-unmask-frame-payload payload len frame-masking-key)
  (define tmaskkey (make-u8vector 4 #f #t #t))
  (u8vector-set! tmaskkey 0 (vector-ref frame-masking-key 0))
  (u8vector-set! tmaskkey 1 (vector-ref frame-masking-key 1))
  (u8vector-set! tmaskkey 2 (vector-ref frame-masking-key 2))
  (u8vector-set! tmaskkey 3 (vector-ref frame-masking-key 3))
  (define-external wsmaskkey blob (u8vector->blob/shared tmaskkey))

  (define-external wslen int len)

  ; TODO handle -1

  (define-external wsv scheme-pointer payload)
  ((foreign-lambda* void ()
"
    const unsigned char* maskkey2 = wsmaskkey;
    const unsigned int kd = *(unsigned int*)maskkey2;
    const unsigned char* __restrict kb = maskkey2;


    int i;
    for (i = wslen >> 2; i != 0; --i)
    {
        *((unsigned int*)wsv) ^= kd;
        wsv += 4;
    }

    const int rem = wslen & 3;
    for (i = 0; i < rem; ++i)
    {
        *((unsigned int*)wsv++) ^= kb[i];
    }
"
))
  payload)

(define (unmask fragment)
  (if (fragment-masked? fragment)
      (let ((r (websocket-unmask-frame-payload
                (fragment-payload fragment)
                (fragment-length fragment)
                (fragment-masking-key fragment))))
             (set-fragment-masked! fragment #f)
             r)
      (fragment-payload fragment)))

(define (read-frame-payload inbound-port frame-payload-length)
  (let ((masked-data (make-string frame-payload-length)))
    (read-string! frame-payload-length masked-data inbound-port)
    masked-data)
;;   (let* ((masked-data (make-string frame-payload-length)))
;;     (read-string! frame-payload-length masked-data inbound-port)

;;     (define tmaskkey (make-u8vector 4 #f #t #t))
;;     (u8vector-set! tmaskkey 0 (vector-ref frame-masking-key 0))
;;     (u8vector-set! tmaskkey 1 (vector-ref frame-masking-key 1))
;;     (u8vector-set! tmaskkey 2 (vector-ref frame-masking-key 2))
;;     (u8vector-set! tmaskkey 3 (vector-ref frame-masking-key 3))
;;     (define-external wsmaskkey blob (u8vector->blob/shared tmaskkey))

;;     (define-external wslen int frame-payload-length)

;;     (define-external wsv scheme-pointer masked-data)

;;     (if frame-masked
;;         (begin
;;           ((foreign-lambda* void ()
;; "
;;     const unsigned char* maskkey2 = wsmaskkey;
;;     const unsigned int kd = *(unsigned int*)maskkey2;
;;     const unsigned char* __restrict kb = maskkey2;


;;     for (int i = wslen >> 2; i != 0; --i)
;;     {
;;         *((unsigned int*)wsv) ^= kd;
;;         wsv += 4;
;;     }

;;     const int rem = wslen & 3;
;;     for (int i = 0; i < rem; ++i)
;;     {
;;         *((unsigned int*)wsv++) ^= kb[i];
;;     }
;; "
;;           ))
;;           masked-data)
;;     masked-data))
  )

(define (read-frame total-size ws)
  (let* ((inbound-port (websocket-inbound-port ws))
         (b0 (read-byte inbound-port)))
    ; we don't support reserved bits yet
    (when (or (> (bitwise-and b0 64) 0)
              (> (bitwise-and b0 32) 0)
              (> (bitwise-and b0 16) 0))
          (signal (make-websocket-exception
                   (make-property-condition 'reserved-bits-not-supported)
                   (make-property-condition 'protocol-error))))
    (cond
     ((eof-object? b0) b0)
     (else
      (let* ((frame-fin (> (bitwise-and b0 128) 0))
             (frame-opcode (bitwise-and b0 15))
             (frame-optype (opcode->optype frame-opcode))
             ;; second byte
             (b1 (read-byte inbound-port))
             ; TODO die on unmasked frame?
             (frame-masked (> (bitwise-and b1 128) 0))
             (frame-payload-length (bitwise-and b1 127)))
        (cond ((= frame-payload-length 126)
               (let ((bl0 (read-byte inbound-port))
                     (bl1 (read-byte inbound-port)))
                 (set! frame-payload-length (+ (arithmetic-shift bl0 8) bl1))))
              ((= frame-payload-length 127)
               (define (shift i r)
                 (if (< i 0)
                     r
                     (shift (- i 1) (+ (arithmetic-shift (read-byte inbound-port) (* 8 i))
                                       r))))
               (set! frame-payload-length (shift 7 0))))
        (when (or (> frame-payload-length (max-frame-size))
                  (> (+ frame-payload-length total-size) (max-message-size)))
              (signal (make-websocket-exception
                       (make-property-condition 'message-too-large))))
        (let* ((frame-masking-key
                (if frame-masked
                    (let* ((fm0 (read-byte inbound-port))
                           (fm1 (read-byte inbound-port))
                           (fm2 (read-byte inbound-port))
                           (fm3 (read-byte inbound-port)))
                      (vector fm0 fm1 fm2 fm3))
                    #f)))
          (cond
           ((or (eq? frame-optype 'text) (eq? frame-optype 'binary)
                (eq? frame-optype 'continuation) (eq? frame-optype 'ping)
                (eq? frame-optype 'pong))
            (make-fragment
             (read-frame-payload inbound-port frame-payload-length)
             frame-payload-length frame-masked
             frame-masking-key frame-fin frame-optype))
           ((eq? frame-optype 'connection-close) ; TODO, same as above?
            (make-fragment
             (read-frame-payload inbound-port frame-payload-length)
             frame-payload-length frame-masked frame-masking-key
             frame-fin frame-optype))
           (else
            (signal (make-websocket-exception
                     (make-property-condition 'unhandled-optype
                                              'optype frame-optype)))))))))))

(include "utf8-grammar.scm")

(define (valid-utf8? s)
  (or (let ((len (string-length s)))
         ; Try to validate as an ascii string first. Its essentially
         ; free, doesn't generate garbage and is many, many times
         ; faster than the general purpose validator.
         (define-external ws_utlen int len)
         (define-external ws_uts scheme-pointer s)
         (= 1
            ((foreign-lambda* int ()
"
    if (ws_utlen > UINT_MAX) { return -1; }

    int i;
    for (i = ws_utlen; i != 0; --i)
    {
        if (*((unsigned char*)ws_uts++) > 127)
        {
            C_return(0);
        }
    }

    C_return(1);
"))))
      (parse utf8-string (->parser-input s))))

(define (close-code->integer s)
  (if (string-null? s)
      1000
      (+ (arithmetic-shift (char->integer (string-ref s 0)) 8)
         (char->integer (string-ref s 1)))))

(define (close-code-string->close-reason s)
  (let ((c (close-code->integer s)))
    (case c
      ((1000) 'normal)
      ((1001) 'going-away)
      ((1002) 'protocol-error)
      ((1003) 'unknown-data-type)
      ((1007) 'invalid-data)
      ((1008) 'violated-policy)
      ((1009) 'message-too-large)
      ((1010) 'extension-negotiation-failed)
      ((1011) 'unexpected-error)
      (else
       (if (and (>= c 3000) (< c 5000))
           'unknown
           'invalid-close-code)))))

(define (valid-close-code? s)
  (neq? 'invalid-close-code (close-code-string->close-reason s)))

(define (receive-fragments #!optional (ws (current-websocket)))
  (dynamic-wind
      (lambda () (mutex-lock! (websocket-read-mutex ws)))
      (lambda ()
        (if (or (eq? (websocket-state ws) 'closing)
                (eq? (websocket-state ws) 'closed)
                (eq? (websocket-state ws) 'error))
            (values #!eof #!eof)
            (let loop ((fragments '())
                       (first #t)
                       (type 'text)
                       (total-size 0))
              (let* ((fragment (read-frame total-size ws))
                     (optype (fragment-optype fragment))
                     (len (fragment-length fragment))
                     (last-frame (fragment-last? fragment)))
                (set-websocket-last-message-timestamp! ws (current-time))
                (cond
                 ((and (control-frame? optype) (> len 125))
                  (set-websocket-state! ws 'error)
                  (signal (make-protocol-violation-exception
                           "control frame bodies must be less than 126 octets")))

                 ; connection close
                 ((and (eq? optype 'connection-close) (= len 1))
                  (set-websocket-state! ws 'error)
                  (signal (make-protocol-violation-exception
                           "close frames must not have a length of 1")))
                 ((and (eq? optype 'connection-close)
                       (not (valid-close-code? (unmask fragment))))
                  (set-websocket-state! ws 'error)
                  (signal (make-protocol-violation-exception
                           (string-append
                            "invalid close code "
                            (number->string (close-code->integer (unmask fragment)))))))
                 ((eq? optype 'connection-close)
                  (set-websocket-state! ws 'closing)
                  (values `(,fragment) optype))

                 ; immediate response
                 ((and (eq? optype 'ping) last-frame (<= len 125))
                  (unless (drop-incoming-pings)
                          (send-message (unmask fragment) 'pong))
                  (loop fragments first type total-size))

                 ; protocol violation checks
                 ((or (and first (eq? optype 'continuation))
                      (and (not first) (neq? optype 'continuation)))
                  (set-websocket-state! ws 'error)
                  (signal (make-protocol-violation-exception
                           "continuation frame out-of-order")))
                 ((and (not last-frame) (control-frame? optype))
                  (set-websocket-state! ws 'error)
                  (signal (make-protocol-violation-exception
                           "control frames can't be fragmented")))

                 ((eq? optype 'pong)
                  (loop fragments first type total-size))

                 (else
                  (if last-frame
                      (values (cons fragment fragments) (if (null? fragments) optype type))
                      (loop (cons fragment fragments) #f
                            (if first optype type)
                            (+ total-size len)))))))))
      (lambda () (mutex-unlock! (websocket-read-mutex ws)))))

(define (process-fragments fragments optype #!optional (ws (current-websocket)))
  (let ((message-body (string-concatenate/shared
                       (reverse (map unmask fragments)))))
    (when (and (or (eq? optype 'text) (eq? optype 'connection-close))
               (not (valid-utf8?
                     (if (eq? optype 'text)
                         message-body
                         (if (> (string-length message-body) 2)
                             (substring message-body 2)
                             "")))))
          (set-websocket-state! ws 'error)
          (signal (make-websocket-exception
                   (make-property-condition
                    'invalid-data 'msg "invalid UTF-8"))))
    (values message-body optype)))

(define (receive-message #!optional (ws (current-websocket)))
  (if (websocket-concurrent? ws)
      (let ((msg (mailbox-receive! (websocket-read-mailbox ws))))
        (values (car msg) (cdr msg)))
      (receive (fragments optype) (receive-fragments ws)
               (if (eof-object? fragments)
                   (values #!eof optype)
                   (process-fragments fragments optype)))))

; TODO does #!optional and #!key work together?
(define (close-websocket #!optional (ws (current-websocket))
                         #!key (close-reason 'normal) (data (make-u8vector 0)))
  (define invalid-close-reason #f)
  (define (close-reason->close-code reason)
    (case reason
      ('normal 1000)
      ('going-away 1001)
      ('protocol-error 1002)
      ('unknown-data-type 1003)
      ('invalid-data 1007)
      ('violated-policy 1008)
      ('message-too-large 1009)
      ('unexpected-error 1011)
      (else (set! invalid-close-reason reason)
            (close-reason->close-code 'unexpected-error))))

  ; Use thread timeout to handle the close-timeout
  (let ((close-thread
         (make-thread
          (lambda ()
            (if (eq? (websocket-state ws) 'open)
                (begin
                  (set-websocket-state! ws 'closed)
                  (send-frame ws 'connection-close
                                        (u8vector 3 (close-reason->close-code close-reason))
                                        #t)
                  (let loop ()
                    (receive (data type) (receive-message ws)
                             (if (eq? type 'connection-close)
                                 (unless (valid-utf8? data)
                                         (set! close-reason 'invalid-data))
                                 (loop)))))
                (begin
                  (send-frame ws 'connection-close
                                        (u8vector 3 (close-reason->close-code close-reason))
                                        #t))))
          "close timeout thread")))
    (thread-start! close-thread)
    (if (> (close-timeout) 0)
        (unless (thread-join! close-thread (close-timeout) #f)
                ; TODO actually signal error?
                ;; (thread-signal! (websocket-user-thread (current-websocket))
                ;;                 (make-websocket-exception
                ;;                  (make-property-condition 'close-timeout)))
                )
        (thread-join! close-thread))))


(define (sha1-sum in-bv)
  (hex-string->string (string->sha1sum in-bv)))

(define (websocket-compute-handshake client-key)
  (let* ((key-and-magic
          (string-append client-key "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
         (key-and-magic-sha1 (sha1-sum key-and-magic)))
    (base64-encode key-and-magic-sha1)))

(define (sec-websocket-accept-unparser header-contents)
  (map (lambda (header-content)
         (car (vector-ref header-content 0)))
       header-contents))

(header-unparsers
 (alist-update! 'sec-websocket-accept
                sec-websocket-accept-unparser
                (header-unparsers)))

(define (websocket-accept #!optional (concurrent #f))
  (let* ((user-thread (current-thread))
         (headers (request-headers (current-request)))
         (client-key (header-value 'sec-websocket-key headers))
         (ws-handshake (websocket-compute-handshake client-key))
         (ws (make-websocket
              (request-port (current-request))
              (response-port (current-response))
              user-thread
              (make-mutex "send")
              (make-mutex "read")
              (current-time)
              'open               ; websocket state
              (make-mailbox "send")
              (make-mailbox "read")
              concurrent))
         (ping-thread
          (make-thread
           (lambda ()
             (let loop ()
               (thread-sleep! (ping-interval))
               (when (eq? (websocket-state ws) 'open)
                     (send-message "" 'ping ws)
                     (loop))))
           "ping thread")))

    ; make sure the request meets the spec for websockets
    (cond ((not (and (member 'upgrade (header-values 'connection headers))
                     (string-ci= (car (header-value 'upgrade headers '(""))) "websocket")))
           (signal (make-websocket-exception
                    (make-property-condition 'missing-upgrade-header))))
          ((not (string= (header-value 'sec-websocket-version headers "") "13"))
           (with-headers ; TODO test
            `((sec-websocket-version "13"))
            (lambda () (send-status 'upgrade-required))))
          ((not ((accept-connection) (header-value 'origin headers "")))
           ((access-denied))))

    (with-headers
     `((upgrade ("WebSocket" . #f))
       (connection (upgrade . #t))
       (sec-websocket-accept (,ws-handshake . #t)))
     (lambda ()
       (send-response status: 'switching-protocols)))
    (flush-output (response-port (current-response)))

    ; connection timeout thread
    (when (> (connection-timeout) 0)
          (thread-start!
           (lambda ()
             (let loop ()
               (let ((t (websocket-last-message-timestamp ws)))
                  ; Add one to attempt to alleviate checking the timestamp
                  ; right before when the timeout should happen.
                 (thread-sleep! (+ 1 (connection-timeout)))
                 (when (eq? (websocket-state ws) 'open)
                       (if (< (- (time->seconds (current-time))
                                 (time->seconds (websocket-last-message-timestamp ws)))
                              (connection-timeout))
                           (loop)
                           (begin (thread-signal!
                                   (websocket-user-thread ws)
                                   (make-websocket-exception
                                    (make-property-condition 'connection-timeout)))
                                  (close-websocket ws close-reason: 'going-away)))))))))

    (when (> (ping-interval) 0)
          (thread-start! ping-thread))

    ws))

(define (with-websocket proc #!optional (concurrent #f))
  (define (handle-error close-reason exn)
    (set-websocket-state! (current-websocket) 'closing)
    (close-websocket (current-websocket) close-reason: close-reason)
    (unless (port-closed? (request-port (current-request)))
            (close-input-port (request-port (current-request))))
    (unless (port-closed? (response-port (current-response)))
            (close-output-port (response-port (current-response))))
    (when (propagate-common-errors)
          (signal exn)))
  (parameterize
   ((current-websocket (websocket-accept concurrent)))
   (condition-case
    (begin (proc)
           (close-websocket)
           (close-input-port (request-port (current-request)))
           (close-output-port (response-port (current-response))))
    (exn (websocket protocol-error) (handle-error 'protocol-error exn))
    (exn (websocket invalid-data) (handle-error 'invalid-data exn))
    (exn (websocket connection-timeout) (handle-error 'going-away exn))
    (exn (websocket message-too-large) (handle-error 'message-too-large exn))
    (exn () (handle-error 'unexpected-error exn)))))

(define (with-concurrent-websocket proc)
  (let ((parent-thread (current-thread)))
    (with-websocket
     (lambda ()
       (thread-start!
        (lambda ()
          (handle-exceptions
           exn
           (thread-signal! parent-thread exn)
           (let loop ()
             (receive (fragments optype) (receive-fragments)
                      (unless (eof-object? fragments)
                              (thread-start!
                               (lambda ()
                                 (handle-exceptions
                                  exn
                                  (thread-signal! parent-thread exn)
                                  (mailbox-send!
                                   (websocket-read-mailbox (current-websocket))
                                   (receive (msg-body optype)
                                            (process-fragments fragments optype)
                                            `(,msg-body . ,optype))))))
                              (loop)))))))
       (proc))
     #t)))

(define (upgrade-to-websocket #!optional (concurrent #f))
  (websocket-accept concurrent))

)