import time
import json
import shlex
import subprocess
import urllib.request
from unittest import mock
from sanic.worker import GunicornWorker
from sanic.app import Sanic
import asyncio
import logging
import pytest


@pytest.fixture(scope='module')
def gunicorn_worker():
    command = 'gunicorn --bind 127.0.0.1:1337 --worker-class sanic.worker.GunicornWorker examples.simple_server:app'
    worker = subprocess.Popen(shlex.split(command))
    time.sleep(3)
    yield
    worker.kill()


def test_gunicorn_worker(gunicorn_worker):
    with urllib.request.urlopen('http://localhost:1337/') as f:
        res = json.loads(f.read(100).decode())
    assert res['test']


class GunicornTestWorker(GunicornWorker):

    def __init__(self):
        self.app = mock.Mock()
        self.app.callable = Sanic("test_gunicorn_worker")
        self.servers = {}
        self.exit_code = 0
        self.cfg = mock.Mock()
        self.notify = mock.Mock()


@pytest.fixture
def worker():
    return GunicornTestWorker()


def test_worker_init_process(worker):
    with mock.patch('sanic.worker.asyncio') as mock_asyncio:
        try:
            worker.init_process()
        except TypeError:
            pass

        assert mock_asyncio.get_event_loop.return_value.close.called
        assert mock_asyncio.new_event_loop.called
        assert mock_asyncio.set_event_loop.called


def test_worker_init_signals(worker):
    worker.loop = mock.Mock()
    worker.init_signals()
    assert worker.loop.add_signal_handler.called


def test_handle_abort(worker):
    with mock.patch('sanic.worker.sys') as mock_sys:
        worker.handle_abort(object(), object())
        assert not worker.alive
        assert worker.exit_code == 1
        mock_sys.exit.assert_called_with(1)


def test_handle_quit(worker):
    worker.handle_quit(object(), object())
    assert not worker.alive
    assert worker.exit_code == 0


def test_run_max_requests_exceeded(worker):
    loop = asyncio.new_event_loop()
    worker.ppid = 1
    worker.alive = True
    sock = mock.Mock()
    sock.cfg_addr = ('localhost', 8080)
    worker.sockets = [sock]
    worker.wsgi = mock.Mock()
    worker.connections = set()
    worker.log = mock.Mock()
    worker.loop = loop
    worker.servers = {
        "server1": {"requests_count": 14},
        "server2": {"requests_count": 15},
    }
    worker.max_requests = 10
    worker._run = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))

    # exceeding request count
    _runner = asyncio.ensure_future(worker._check_alive(), loop=loop)
    loop.run_until_complete(_runner)

    assert worker.alive == False
    worker.notify.assert_called_with()
    worker.log.info.assert_called_with("Max requests exceeded, shutting down: %s",
                                       worker)

def test_worker_close(worker):
    loop = asyncio.new_event_loop()
    asyncio.sleep = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
    worker.ppid = 1
    worker.pid = 2
    worker.cfg.graceful_timeout = 1.0
    worker.signal = mock.Mock()
    worker.signal.stopped = False
    worker.wsgi = mock.Mock()
    conn = mock.Mock()
    conn.websocket = mock.Mock()
    conn.websocket.close_connection = mock.Mock(
            wraps=asyncio.coroutine(lambda *a, **kw: None)
        )
    worker.connections = set([conn])
    worker.log = mock.Mock()
    worker.loop = loop
    server = mock.Mock()
    server.close = mock.Mock(wraps=lambda *a, **kw: None)
    server.wait_closed = mock.Mock(wraps=asyncio.coroutine(lambda *a, **kw: None))
    worker.servers = {
        server: {"requests_count": 14},
    }
    worker.max_requests = 10

    # close worker
    _close = asyncio.ensure_future(worker.close(), loop=loop)
    loop.run_until_complete(_close)

    assert worker.signal.stopped == True
    conn.websocket.close_connection.assert_called_with(after_handshake=False)
    assert len(worker.servers) == 0