diff --git a/.gitignore b/.gitignore index a928f06..49269b2 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ tmp dist stackimpact.egg-info __pycache__ +venv* +env/ diff --git a/README.md b/README.md index f94f790..6a519af 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,27 @@ -# StackImpact Python Agent +# StackImpact Python Profiler ## Overview -StackImpact is a performance profiler for production applications. It gives developers continuous and historical view of application performance with line-of-code precision, which includes CPU, memory allocation and blocking call hot spots as well as execution bottlenecks, errors and runtime metrics. Learn more at [stackimpact.com](https://stackimpact.com/). +StackImpact is a production-grade performance profiler built for both production and development environments. It gives developers continuous and historical code-level view of application performance that is essential for locating CPU, memory allocation and I/O hot spots as well as latency bottlenecks. Included runtime metrics and error monitoring complement profiles for extensive performance analysis. Learn more at [stackimpact.com](https://stackimpact.com/). -![dashboard](https://stackimpact.com/wp-content/uploads/2017/06/hotspots-cpu-1.4-python.png) +![dashboard](https://stackimpact.com/img/readme/hotspots-cpu-1.4-python.png) #### Features -* Automatic hot spot profiling for CPU, memory allocations, blocking calls -* Automatic bottleneck tracing for HTTP handlers and other libraries -* Exception monitoring -* Health monitoring including CPU, memory, garbage collection and other runtime metrics -* Anomaly alerts on most important metrics -* Multiple account users for team collaboration +* Continuous hot spot profiling of CPU usage, memory allocation and blocking calls. +* Error and exception monitoring. +* Health monitoring including CPU, memory, garbage collection and other runtime metrics. +* Alerts on profile anomalies. +* Team access. Learn more on the [features](https://stackimpact.com/features/) page (with screenshots). +#### How it works + +The StackImpact profiler agent is imported into a program and used as a normal package. When the program runs, various sampling profilers are started and stopped automatically by the agent and/or programmatically using the agent methods. The agent periodically reports recorded profiles and metrics to the StackImpact Dashboard. The agent can also operate in manual mode, which should be used in development only. + + #### Documentation See full [documentation](https://stackimpact.com/docs/) for reference. @@ -28,8 +32,9 @@ See full [documentation](https://stackimpact.com/docs/) for reference. * Linux, OS X or Windows. Python version 2.7, 3.4 or higher. * Memory allocation profiler and some GC metrics are only available for Python 3. -* CPU and Time profilers only support Linux and OS X. +* Profilers only support Linux and OS X. * Time (blocking call) profiler supports threads and gevent. +* On unix systems the profilers use the following signals: SIGPROF, SIGALRM, SIGUSR2. Only SIGUSR2 is handled transparently, i.e. it should not conflict with previousely registered handlers. ## Getting started @@ -37,7 +42,7 @@ See full [documentation](https://stackimpact.com/docs/) for reference. #### Create StackImpact account -Sign up for a free account at [stackimpact.com](https://stackimpact.com/). +Sign up for a free trial account at [stackimpact.com](https://stackimpact.com) (also with GitHub login). #### Installing the agent @@ -67,14 +72,75 @@ agent = stackimpact.start( Add the agent initialization to the worker code, e.g. wsgi.py, if applicable. -Other initialization options: +All initialization options: +* `agent_key` (Required) The access key for communication with the StackImpact servers. +* `app_name` (Required) A name to identify and group application data. Typically, a single codebase, deployable unit or executable module corresponds to one application. * `app_version` (Optional) Sets application version, which can be used to associate profiling information with the source code release. * `app_environment` (Optional) Used to differentiate applications in different environments. * `host_name` (Optional) By default, host name will be the OS hostname. +* `auto_profiling` (Optional) If set to `False`, disables automatic profiling and reporting. Focused or manual profiling should be used instead. Useful for environments without support for timers or background tasks. * `debug` (Optional) Enables debug logging. +* `cpu_profiler_disabled`, `allocation_profiler_disabled`, `block_profiler_disabled`, `error_profiler_disabled` (Optional) Disables respective profiler when `True`. +* `include_agent_frames` (Optional) Set to `True` to not exclude agent stack frames from profile call graphs. +* `auto_destroy` (Optional) Set to `False` to disable agent's exit handlers. If necessary, call `destroy()` to gracefully shutdown the agent. + + +#### Focused profiling + +Use `agent.profile(name)` to instruct the agent when to start and stop profiling. The agent decides if and which profiler is activated. Normally, this method should be used in repeating code, such as request or event handlers. In addition to more precise profiling, timing information will also be reported for the profiled spans. Usage example: + +```python +span = agent.profile('span1'); + +# your code here + +span.stop(); +``` + +Alternatively, a `with` statement can be used: + +```python +with agent.profile('span1'): + # your code ehere +``` + + +#### Manual profiling + +*Manual profiling should not be used in production!* +By default, the agent starts and stops profiling automatically. Manual profiling allows to start and stop profilers directly. It is suitable for profiling short-lived programs and should not be used for long-running production applications. Automatic profiling should be disabled with `auto_profiling: False`. +```python +# Start CPU profiler. +agent.start_cpu_profiler(); +``` + +```python +# Stop CPU profiler and report the recorded profile to the Dashboard. +agent.stop_cpu_profiler(); +``` + +```python +# Start blocking call profiler. +agent.start_block_profiler(); +``` + +```python +# Stop blocking call profiler and report the recorded profile to the Dashboard. +agent.stop_block_profiler(); +``` + +```python +# Start heap allocation profiler. +agent.start_allocation_profiler(); +``` + +```python +# Stop heap allocation profiler and report the recorded profile to the Dashboard. +agent.stop_allocation_profiler(); +``` #### Analyzing performance data in the Dashboard diff --git a/README.rst b/README.rst index b3bb759..a98a63b 100644 --- a/README.rst +++ b/README.rst @@ -1,17 +1,18 @@ -StackImpact Python Agent -======================== +StackImpact Python Profiler +=========================== Overview -------- -StackImpact is a performance profiler for production applications. It -gives developers continuous and historical view of application -performance with line-of-code precision, which includes CPU, memory -allocation and blocking call hot spots as well as execution bottlenecks, -errors and runtime metrics. Learn more at +StackImpact is a production-grade performance profiler built for both +production and development environments. It gives developers continuous +and historical code-level view of application performance that is +essential for locating CPU, memory allocation and I/O hot spots as well +as latency bottlenecks. Included runtime metrics and error monitoring +complement profiles for extensive performance analysis. Learn more at `stackimpact.com `__. -.. figure:: https://stackimpact.com/wp-content/uploads/2017/06/hotspots-cpu-1.4-python.png +.. figure:: https://stackimpact.com/img/readme/hotspots-cpu-1.4-python.png :alt: dashboard dashboard @@ -19,32 +20,44 @@ errors and runtime metrics. Learn more at Features ^^^^^^^^ -- Automatic hot spot profiling for CPU, memory allocations, blocking - calls -- Automatic bottleneck tracing for HTTP handlers and other libraries -- Exception monitoring +- Continuous hot spot profiling of CPU usage, memory allocation and + blocking calls. +- Error and exception monitoring. - Health monitoring including CPU, memory, garbage collection and other - runtime metrics -- Anomaly alerts on most important metrics -- Multiple account users for team collaboration + runtime metrics. +- Alerts on profile anomalies. +- Team access. Learn more on the `features `__ page (with screenshots). +How it works +^^^^^^^^^^^^ + +The StackImpact profiler agent is imported into a program and used as a +normal package. When the program runs, various sampling profilers are +started and stopped automatically by the agent and/or programmatically +using the agent methods. The agent periodically reports recorded +profiles and metrics to the StackImpact Dashboard. The agent can also +operate in manual mode, which should be used in development only. + Documentation ^^^^^^^^^^^^^ See full `documentation `__ for reference. -Requirements ------------- +Supported environment +--------------------- - Linux, OS X or Windows. Python version 2.7, 3.4 or higher. -- Memorly allocation profiler and some GC metrics are only available - for Python 3. -- CPU and Time profilers only supports Linux and OS X. +- Memory allocation profiler and some GC metrics are only available for + Python 3. +- Profilers only support Linux and OS X. - Time (blocking call) profiler supports threads and gevent. +- On unix systems the profilers use the following signals: SIGPROF, + SIGALRM, SIGUSR2. Only SIGUSR2 is handled transparently, i.e. it + should not conflict with previousely registered handlers. Getting started --------------- @@ -52,13 +65,13 @@ Getting started Create StackImpact account ^^^^^^^^^^^^^^^^^^^^^^^^^^ -Sign up for a free account at -`stackimpact.com `__. +Sign up for a free trial account at +`stackimpact.com `__ (also with GitHub login). Installing the agent ^^^^^^^^^^^^^^^^^^^^ -Install the Go agent by running +Install the Python agent by running :: @@ -81,24 +94,109 @@ Configuration section. agent = stackimpact.start( agent_key = 'agent key here', - app_name = 'MyPythonApp', + app_name = 'MyPythonApp') -Other initialization options: +Add the agent initialization to the worker code, e.g. wsgi.py, if +applicable. +All initialization options: + +- ``agent_key`` (Required) The access key for communication with the + StackImpact servers. +- ``app_name`` (Required) A name to identify and group application + data. Typically, a single codebase, deployable unit or executable + module corresponds to one application. - ``app_version`` (Optional) Sets application version, which can be used to associate profiling information with the source code release. - ``app_environment`` (Optional) Used to differentiate applications in different environments. - ``host_name`` (Optional) By default, host name will be the OS hostname. +- ``auto_profiling`` (Optional) If set to ``False``, disables automatic + profiling and reporting. Focused or manual profiling should be used + instead. Useful for environments without support for timers or + background tasks. - ``debug`` (Optional) Enables debug logging. +- ``cpu_profiler_disabled``, ``allocation_profiler_disabled``, + ``block_profiler_disabled``, ``error_profiler_disabled`` (Optional) + Disables respective profiler when ``True``. +- ``include_agent_frames`` (Optional) Set to ``True`` to not exclude + agent stack frames from profile call graphs. +- ``auto_destroy`` (Optional) Set to ``False`` to disable agent's exit + handlers. If necessary, call ``destroy()`` to gracefully shutdown the + agent. + +Focused profiling +^^^^^^^^^^^^^^^^^ + +Use ``agent.profile(name)`` to instruct the agent when to start and stop +profiling. The agent decides if and which profiler is activated. +Normally, this method should be used in repeating code, such as request +or event handlers. In addition to more precise profiling, timing +information will also be reported for the profiled spans. Usage example: + +.. code:: python + + span = agent.profile('span1'); + + # your code here + + span.stop(); + +Alternatively, a ``with`` statement can be used: + +.. code:: python + + with agent.profile('span1'): + # your code ehere + +Manual profiling +^^^^^^^^^^^^^^^^ + +*Manual profiling should not be used in production!* + +By default, the agent starts and stops profiling automatically. Manual +profiling allows to start and stop profilers directly. It is suitable +for profiling short-lived programs and should not be used for +long-running production applications. Automatic profiling should be +disabled with ``auto_profiling: False``. + +.. code:: python + + # Start CPU profiler. + agent.start_cpu_profiler(); + +.. code:: python + + # Stop CPU profiler and report the recorded profile to the Dashboard. + agent.stop_cpu_profiler(); + +.. code:: python + + # Start blocking call profiler. + agent.start_block_profiler(); + +.. code:: python + + # Stop blocking call profiler and report the recorded profile to the Dashboard. + agent.stop_block_profiler(); + +.. code:: python + + # Start heap allocation profiler. + agent.start_allocation_profiler(); + +.. code:: python + + # Stop heap allocation profiler and report the recorded profile to the Dashboard. + agent.stop_allocation_profiler(); Analyzing performance data in the Dashboard ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Once your application is restarted, you can start observing regular and -anomaly-triggered CPU, memory, I/O, and other hot spot profiles, -execution bottlenecks as well as process metrics in the +Once your application is restarted, you can start observing continuous +CPU, memory, I/O, and other hot spot profiles, execution bottlenecks as +well as process metrics in the `Dashboard `__. Troubleshooting diff --git a/examples/aws-lambda/aws_lambda.py b/examples/aws-lambda/aws_lambda.py new file mode 100644 index 0000000..4fda51b --- /dev/null +++ b/examples/aws-lambda/aws_lambda.py @@ -0,0 +1,40 @@ +from __future__ import print_function +import stackimpact +import random +import threading +import time +import signal + +agent = stackimpact.start( + agent_key = 'agent key here', + app_name = 'LambdaDemoPython', + app_environment = 'prod', + block_profiler_disabled = True) + + +def simulate_cpu_work(): + for j in range(0, 100000): + random.randint(1, 1000000) + +mem = [] +def simulate_mem_leak(): + for i in range(0, 1000): + obj = {'v': random.randint(0, 1000000)} + mem.append(obj) + +def handler(event, context): + span = agent.profile() + + simulate_cpu_work() + simulate_mem_leak() + + span.stop() + + response = { + "statusCode": 200, + "body": 'Done' + } + + return response + + diff --git a/examples/flask_app.py b/examples/demo/flask_app.py similarity index 91% rename from examples/flask_app.py rename to examples/demo/flask_app.py index 48ef704..30030a4 100644 --- a/examples/flask_app.py +++ b/examples/demo/flask_app.py @@ -1,6 +1,4 @@ -# export AGENT_KEY=agent_key_here -# gunicorn --workers 2 --bind 127.0.0.1:5010 flask_app:app -# gunicorn --workers 2 -k gevent --bind 127.0.0.1:5010 flask_app:app +#env AGENT_KEY=agnetkeyhere FLASK_APP=examples/demo/flask_app.py flask run -p 5010 from __future__ import print_function @@ -22,7 +20,7 @@ # python 3 from urllib.request import urlopen - +sys.path.append(".") import stackimpact @@ -30,6 +28,7 @@ # StackImpact agent initialization agent = stackimpact.start( agent_key = os.environ['AGENT_KEY'], + dashboard_address = os.environ['DASHBOARD_ADDRESS'], app_name = 'ExamplePythonFlaskApp', app_version = '1.0.0', debug = True) @@ -39,11 +38,11 @@ # Simulate CPU intensive work def simulate_cpu(): duration = 10 * 60 * 60 - usage = 20 + usage = 10 while True: for j in range(0, duration): - for i in range(0, usage * 20000): + for i in range(0, usage * 15000): text = "text1" + str(i) text = text + "text2" diff --git a/examples/focused/app.py b/examples/focused/app.py new file mode 100644 index 0000000..b8566e9 --- /dev/null +++ b/examples/focused/app.py @@ -0,0 +1,38 @@ +from __future__ import print_function +import random +import time +import sys +import threading +sys.path.append(".") +import stackimpact + +agent = stackimpact.start( + agent_key = 'agent key here', + app_name = 'MyPythonApp') + + +def simulate_cpu_work(): + for j in range(0, 100000): + random.randint(1, 1000000) + + +def handle_some_event(): + span = agent.profile('some event') + + simulate_cpu_work() + + span.stop() + + response = { + "statusCode": 200, + "body": 'Done' + } + + return response + + +# Simulate events +while True: + handle_some_event() + time.sleep(2) + diff --git a/examples/manual/app.py b/examples/manual/app.py new file mode 100644 index 0000000..af34096 --- /dev/null +++ b/examples/manual/app.py @@ -0,0 +1,45 @@ +from __future__ import print_function +import random +import time +import sys +import threading +sys.path.append(".") +import stackimpact + +agent = stackimpact.start( + agent_key = 'agent key here', + app_name = 'MyPythonApp', + auto_profiling = False) + + +agent.start_cpu_profiler() + +for j in range(0, 1000000): + random.randint(1, 1000000) + +agent.stop_cpu_profiler() + + +''' +agent.start_allocation_profiler() + +mem1 = [] +for i in range(0, 1000): + obj1 = {'v': random.randint(0, 1000000)} + mem1.append(obj1) + +agent.stop_allocation_profiler() +''' + + +''' +agent.start_block_profiler() + +def blocking_call(): + time.sleep(0.1) + +for i in range(5): + blocking_call() + +agent.stop_block_profiler() +''' \ No newline at end of file diff --git a/publish.sh b/publish.sh new file mode 100755 index 0000000..e502767 --- /dev/null +++ b/publish.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +set -e + +pandoc --from=markdown --to=rst --output=README.rst 'README.md' + +python3 -m unittest discover -v -s tests -p *_test.py + +rm -f dist/*.tar.gz +python setup.py sdist + +for bundle in dist/*.tar.gz; do + echo "Publishing $bundle..." + twine upload $bundle +done diff --git a/setup.py b/setup.py index c7c964e..e5a2368 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,14 @@ +import os from setuptools import setup, find_packages +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() + setup( name = 'stackimpact', - version = '1.0.0', - description = 'StackImpact Python Agent', + version = '1.2.6', + description = 'StackImpact Python Profiler', + long_description = read('README.rst'), author = 'StackImpact', author_email = 'devops@stackimpact.com', url = 'https://stackimpact.com', @@ -29,6 +34,8 @@ 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: Implementation :: CPython', 'Topic :: Software Development', 'Topic :: System :: Monitoring', ], diff --git a/stackimpact/agent.py b/stackimpact/agent.py index 8fd1cce..a30fcef 100644 --- a/stackimpact/agent.py +++ b/stackimpact/agent.py @@ -9,30 +9,58 @@ import os import signal import atexit +import platform +import random +import math -from .runtime import min_version, runtime_info +from .runtime import min_version, runtime_info, register_signal from .utils import timestamp, generate_uuid from .config import Config from .config_loader import ConfigLoader from .message_queue import MessageQueue -from .frame_selector import FrameSelector +from .frame_cache import FrameCache from .reporters.process_reporter import ProcessReporter -from .reporters.cpu_reporter import CPUReporter -from .reporters.allocation_reporter import AllocationReporter -from .reporters.block_reporter import BlockReporter +from .reporters.profile_reporter import ProfileReporter, ProfilerConfig from .reporters.error_reporter import ErrorReporter +from .reporters.span_reporter import SpanReporter +from .profilers.cpu_profiler import CPUProfiler +from .profilers.allocation_profiler import AllocationProfiler +from .profilers.block_profiler import BlockProfiler -class Agent: +class Span(object): - AGENT_VERSION = "1.0.0" + def __init__(self, stop_func = None): + if stop_func: + self.stop_func = stop_func + else: + self.stop_func = None + + + def stop(self): + if self.stop_func: + self.stop_func() + + + def __enter__(self): + pass + + + def __exit__(self, exc_type, exc_value, traceback): + self.stop() + + +class Agent(object): + + AGENT_VERSION = "1.2.6" SAAS_DASHBOARD_ADDRESS = "https://agent-api.stackimpact.com" def __init__(self, **kwargs): self.agent_started = False self.agent_destroyed = False - self.profiler_lock = threading.Lock() + self.profiler_active = False + self.span_active = False self.main_thread_func = None @@ -41,17 +69,42 @@ def __init__(self, **kwargs): self.config = Config(self) self.config_loader = ConfigLoader(self) self.message_queue = MessageQueue(self) - self.frame_selector = FrameSelector(self) + self.frame_cache = FrameCache(self) self.process_reporter = ProcessReporter(self) - self.cpu_reporter = CPUReporter(self) - self.allocation_reporter = AllocationReporter(self) - self.block_reporter = BlockReporter(self) self.error_reporter = ErrorReporter(self) + self.span_reporter = SpanReporter(self) + + config = ProfilerConfig() + config.log_prefix = 'CPU profiler' + config.max_profile_duration = 20 + config.max_span_duration = 5 + config.max_span_count = 30 + config.span_interval = 20 + config.report_interval = 120 + self.cpu_reporter = ProfileReporter(self, CPUProfiler(self), config) + + config = ProfilerConfig() + config.log_prefix = 'Allocation profiler' + config.max_profile_duration = 20 + config.max_span_duration = 5 + config.max_span_count = 30 + config.span_interval = 20 + config.report_interval = 120 + self.allocation_reporter = ProfileReporter(self, AllocationProfiler(self), config) + + config = ProfilerConfig() + config.log_prefix = 'Block profiler' + config.max_profile_duration = 20 + config.max_span_duration = 5 + config.max_span_count = 30 + config.span_interval = 20 + config.report_interval = 120 + self.block_reporter = ProfileReporter(self, BlockProfiler(self), config) self.options = None - def get_option(self, name, default_val = None): + def get_option(self, name, default_val=None): if name not in self.options: return default_val else: @@ -60,7 +113,10 @@ def get_option(self, name, default_val = None): def start(self, **kwargs): if not min_version(2, 7) and not min_version(3, 4): - raise Exception('Supported Python versions 2.6 or highter and 3.4 or higher') + raise Exception('Supported Python versions 2.6 or higher and 3.4 or higher') + + if platform.python_implementation() != 'CPython': + raise Exception('Supported Python interpreter is CPython') if self.agent_destroyed: self.log('Destroyed agent cannot be started') @@ -71,6 +127,9 @@ def start(self, **kwargs): self.options = kwargs + if 'auto_profiling' not in self.options: + self.options['auto_profiling'] = True + if 'dashboard_address' not in self.options: self.options['dashboard_address'] = self.SAAS_DASHBOARD_ADDRESS @@ -83,18 +142,19 @@ def start(self, **kwargs): if 'host_name' not in self.options: self.options['host_name'] = socket.gethostname() - self.run_id = generate_uuid() self.run_ts = timestamp() self.config_loader.start() self.message_queue.start() - self.frame_selector.start() - self.process_reporter.start() - self.cpu_reporter.start() - self.allocation_reporter.start() - self.block_reporter.start() - self.error_reporter.start() + self.frame_cache.start() + + self.cpu_reporter.setup() + self.allocation_reporter.setup() + self.block_reporter.setup() + self.span_reporter.setup() + self.error_reporter.setup() + self.process_reporter.setup() # execute main_thread_func in main thread on signal def _signal_handler(signum, frame): @@ -106,26 +166,145 @@ def _signal_handler(signum, frame): except Exception: self.exception() - signal.signal(signal.SIGUSR2, _signal_handler) + return True - # destroy agent on exit - def _exit(): - if not self.agent_started or self.agent_destroyed: - return + if not runtime_info.OS_WIN: + register_signal(signal.SIGUSR2, _signal_handler) + + if self.get_option('auto_destroy') is None or self.get_option('auto_destroy') is True: + # destroy agent on exit + def _exit_handler(*arg): + if not self.agent_started or self.agent_destroyed: + return + + try: + self.message_queue.flush() + self.destroy() + except Exception: + self.exception() - try: - self.message_queue.flush() - self.destroy() - except Exception: - self.exception() + atexit.register(_exit_handler) + + if not runtime_info.OS_WIN: + register_signal(signal.SIGQUIT, _exit_handler, once = True) + register_signal(signal.SIGINT, _exit_handler, once = True) + register_signal(signal.SIGTERM, _exit_handler, once = True) + register_signal(signal.SIGHUP, _exit_handler, once = True) - atexit.register(_exit) self.agent_started = True self.log('Agent started') + def enable(self): + if not self.config.is_agent_enabled(): + self.cpu_reporter.start() + self.allocation_reporter.start() + self.block_reporter.start() + self.span_reporter.start() + self.error_reporter.start() + self.process_reporter.start() + self.config.set_agent_enabled(True) + + + def disable(self): + if self.config.is_agent_enabled(): + self.cpu_reporter.stop() + self.allocation_reporter.stop() + self.block_reporter.stop() + self.span_reporter.stop() + self.error_reporter.stop() + self.process_reporter.stop() + self.config.set_agent_enabled(False) + + + def profile(self, name='Default'): + if not self.agent_started or self.span_active: + return Span(None) + + self.span_active = True + + selected_reporter = None + active_reporters = [] + if self.cpu_reporter.started: + active_reporters.append(self.cpu_reporter) + if self.allocation_reporter.started: + active_reporters.append(self.allocation_reporter) + if self.block_reporter.started: + active_reporters.append(self.block_reporter) + + if len(active_reporters) > 0: + selected_reporter = active_reporters[int(math.floor(random.random() * len(active_reporters)))] + if not selected_reporter.start_profiling(True, True): + selected_reporter = None + + start_timestamp = time.time() + + def stop_func(): + if selected_reporter: + selected_reporter.stop_profiling() + + duration = time.time() - start_timestamp + self.span_reporter.record_span(name, duration) + + if not self.get_option('auto_profiling'): + self.config_loader.load(True) + if selected_reporter: + selected_reporter.report(True); + self.message_queue.flush(True) + + self.span_active = False + + return Span(stop_func) + + + def _start_profiler(self, reporter): + if not self.agent_started or self.get_option('auto_profiling'): + return + + self.span_active = True + + reporter.start() + reporter.start_profiling(True, False) + + + def _stop_profiler(self, reporter): + if not self.agent_started or self.get_option('auto_profiling'): + return + + reporter.stop_profiling() + reporter.report(False) + reporter.stop() + self.message_queue.flush(False) + + self.span_active = False + + + def start_cpu_profiler(self): + self._start_profiler(self.cpu_reporter) + + + def stop_cpu_profiler(self): + self._stop_profiler(self.cpu_reporter) + + + def start_allocation_profiler(self): + self._start_profiler(self.allocation_reporter) + + + def stop_allocation_profiler(self): + self._stop_profiler(self.allocation_reporter) + + + def start_block_profiler(self): + self._start_profiler(self.block_reporter) + + + def stop_block_profiler(self): + self._stop_profiler(self.block_reporter) + + def destroy(self): if not self.agent_started: self.log('Agent has not been started') @@ -134,14 +313,22 @@ def destroy(self): if self.agent_destroyed: return - self.config_loader.destroy() - self.message_queue.destroy() - self.frame_selector.destroy() - self.process_reporter.destroy() + self.config_loader.stop() + self.message_queue.stop() + self.frame_cache.stop() + self.cpu_reporter.stop() + self.allocation_reporter.stop() + self.block_reporter.stop() + self.error_reporter.stop() + self.span_reporter.stop() + self.process_reporter.stop() + self.cpu_reporter.destroy() self.allocation_reporter.destroy() self.block_reporter.destroy() self.error_reporter.destroy() + self.span_reporter.destroy() + self.process_reporter.destroy() self.agent_destroyed = True self.log('Agent destroyed') @@ -167,14 +354,13 @@ def error(self, message): def exception(self): if self.get_option('debug'): - self.print_err(sys.exc_info()[0]) traceback.print_exc() - def delay(self, timeout, func): + def delay(self, timeout, func, *args): def func_wrapper(): try: - func() + func(*args) except Exception: self.exception() @@ -184,14 +370,14 @@ def func_wrapper(): return t - def schedule(self, timeout, interval, func): + def schedule(self, timeout, interval, func, *args): tw = TimerWraper() def func_wrapper(): start = time.time() try: - func() + func(*args) except Exception: self.exception() @@ -229,7 +415,7 @@ def run_in_main_thread(self, func): -class TimerWraper(): +class TimerWraper(object): def __init__(self): self.timer = None self.cancel_lock = threading.Lock() diff --git a/stackimpact/api_request.py b/stackimpact/api_request.py index 2fbf8de..c2dd016 100644 --- a/stackimpact/api_request.py +++ b/stackimpact/api_request.py @@ -20,7 +20,7 @@ from urllib.parse import urlencode -class APIRequest: +class APIRequest(object): def __init__(self, agent): self.agent = agent @@ -33,20 +33,21 @@ def post(self, endpoint, payload): 'Content-Encoding': 'gzip' } - host_name = 'host' + host_name = 'undefined' try: host_name = socket.gethostname() except Exception: - pass + self.agent.exception() req_body = { 'runtime_type': 'python', 'runtime_version': '{0.major}.{0.minor}.{0.micro}'.format(sys.version_info), + 'runtime_path': sys.prefix, 'agent_version': self.agent.AGENT_VERSION, 'app_name': self.agent.get_option('app_name'), 'app_version': self.agent.get_option('app_version'), 'app_environment': self.agent.get_option('app_environment'), - 'host_name': self.agent.get_option('host_name', socket.gethostname()), + 'host_name': self.agent.get_option('host_name', host_name), 'process_id': os.getpid(), 'run_id': self.agent.run_id, 'run_ts': self.agent.run_ts, @@ -55,9 +56,9 @@ def post(self, endpoint, payload): } gzip_out = BytesIO() - with gzip.GzipFile(fileobj=gzip_out, mode="w") as f: - f.write(json.dumps(req_body).encode('utf-8')) - f.close() + with gzip.GzipFile(fileobj=gzip_out, mode="w") as out_file: + out_file.write(json.dumps(req_body).encode('utf-8')) + out_file.close() gzip_out_val = gzip_out.getvalue() if isinstance(gzip_out_val, str): @@ -71,6 +72,7 @@ def post(self, endpoint, payload): headers = headers) response = urlopen(request, timeout = 20) + result_data = response.read() if response.info(): diff --git a/stackimpact/config.py b/stackimpact/config.py index 50d6426..17f8c5c 100644 --- a/stackimpact/config.py +++ b/stackimpact/config.py @@ -1,11 +1,22 @@ import threading -class Config: +class Config(object): def __init__(self, agent): self.agent = agent + self.agent_enabled = False self.profiling_disabled = False self.config_lock = threading.Lock() + + def set_agent_enabled(self, val): + with self.config_lock: + self.agent_enabled = val + + def is_agent_enabled(self): + with self.config_lock: + val = self.agent_enabled + return val + def set_profiling_disabled(self, val): with self.config_lock: self.profiling_disabled = val diff --git a/stackimpact/config_loader.py b/stackimpact/config_loader.py index 0467435..d122e8e 100644 --- a/stackimpact/config_loader.py +++ b/stackimpact/config_loader.py @@ -1,33 +1,77 @@ from .api_request import APIRequest +from .utils import timestamp + + +class ConfigLoader(object): + LOAD_DELAY = 2 + LOAD_INTERVAL = 120 -class ConfigLoader: def __init__(self, agent): self.agent = agent self.load_timer = None + self.last_load_ts = 0 def start(self): - self.load_timer = self.agent.schedule(2, 120, self.load) + if self.agent.get_option('auto_profiling'): + self.load_timer = self.agent.schedule(self.LOAD_DELAY, self.LOAD_INTERVAL, self.load) - def destroy(self): + def stop(self): if self.load_timer: self.load_timer.cancel() self.load_timer = None - def load(self): + def load(self, with_interval=False): + now = timestamp() + if with_interval and self.last_load_ts > now - self.LOAD_INTERVAL: + return + + self.last_load_ts = now; + + try: api_request = APIRequest(self.agent) config = api_request.post('config', {}) - # profiling_enabled yes|no + # agent_enabled yes|no + if 'agent_enabled' in config: + self.agent.config.set_agent_enabled(config['agent_enabled'] == 'yes') + else: + self.agent.config.set_agent_enabled(False) + + # profiling_disabled yes|no if 'profiling_disabled' in config: self.agent.config.set_profiling_disabled(config['profiling_disabled'] == 'yes') else: self.agent.config.set_profiling_disabled(False) + + if self.agent.config.is_agent_enabled() and not self.agent.config.is_profiling_disabled(): + self.agent.cpu_reporter.start() + self.agent.allocation_reporter.start() + self.agent.block_reporter.start() + self.agent.tf_reporter.start() + else: + self.agent.cpu_reporter.stop() + self.agent.allocation_reporter.stop() + self.agent.block_reporter.stop() + self.agent.tf_reporter.stop() + + if self.agent.config.is_agent_enabled(): + self.agent.error_reporter.start() + self.agent.span_reporter.start() + self.agent.process_reporter.start() + self.agent.log('Agent activated') + else: + self.agent.error_reporter.stop() + self.agent.span_reporter.stop() + self.agent.process_reporter.stop() + self.agent.log('Agent deactivated') + + except Exception: self.agent.log('Error loading config') self.agent.exception() diff --git a/stackimpact/frame.py b/stackimpact/frame.py index 5f4a984..f89204f 100644 --- a/stackimpact/frame.py +++ b/stackimpact/frame.py @@ -1,7 +1,7 @@ import re -class Frame: +class Frame(object): def __init__(self, func_name, filename, lineno): self.func_name = func_name @@ -14,9 +14,9 @@ def __init__(self, func_name, filename, lineno): def match(self, other): - return ((other.func_name == None or other.func_name == self.func_name) and - (other.filename == None or other.filename == self.filename) and - (other.lineno == None or other.lineno == self.lineno)) + return ((other.func_name is None or other.func_name == self.func_name) and + (other.filename is None or other.filename == self.filename) and + (other.lineno is None or other.lineno == self.lineno)) def __eq__(self, other): @@ -26,7 +26,7 @@ def __eq__(self, other): def __str__(self): if not self.cached_str: - if not self.lineno == None and self.lineno > 0: + if self.lineno is not None and self.lineno > 0: self.cached_str = '{0} ({1}:{2})'.format(self.func_name, self.filename, self.lineno) else: self.cached_str = '{0} ({1})'.format(self.func_name, self.filename) diff --git a/stackimpact/frame_selector.py b/stackimpact/frame_cache.py similarity index 54% rename from stackimpact/frame_selector.py rename to stackimpact/frame_cache.py index 521365e..f6f1eb7 100644 --- a/stackimpact/frame_selector.py +++ b/stackimpact/frame_cache.py @@ -9,66 +9,32 @@ if runtime_info.GEVENT: import gevent -class FrameSelector: +class FrameCache(object): MAX_CACHE_SIZE = 2500 def __init__(self, agent): self.agent = agent self.agent_frame_cache = None self.system_frame_cache = None - self.http_frame_cache = None self.include_agent_frames = None self.include_system_frames = None - self.http_frame_regexp = None - self.agent_dir = os.path.dirname(os.path.realpath(__file__)) self.system_dir = os.path.dirname(os.path.realpath(threading.__file__)) if runtime_info.GEVENT: self.gevent_dir = os.path.dirname(os.path.realpath(gevent.__file__)) - def add_http_package(self, name): - try: - m = importlib.import_module(name) - if m and m.__file__: - self.add_http_frame_regexp(os.path.dirname(os.path.realpath(m.__file__))) - except Exception: - pass - - - def add_http_frame_regexp(self, regexp_str): - self.http_frame_regexp.append(re.compile(regexp_str, re.IGNORECASE)) - - def start(self): self.agent_frame_cache = dict() self.system_frame_cache = dict() - self.http_frame_cache = dict() self.include_agent_frames = self.agent.get_option('include_agent_frames') self.include_system_frames = self.agent.get_option('include_system_frames') - self.http_frame_regexp = [] - self.add_http_package('gunicorn') - self.add_http_package('waitress') - self.add_http_package('werkzeug') - self.add_http_package('flask') - self.add_http_package('django') - self.add_http_package('pyramid') - self.add_http_package('cherrypy') - self.add_http_package('tornado') - self.add_http_package('web2py') - self.add_http_package('bottle') - - - def destroy(self): - self.agent_frame_cache = None - self.system_frame_cache = None - self.http_frame_cache = None - - self.http_frame_regexp = None + def stop(self): + pass def is_agent_frame(self, filename): @@ -102,21 +68,3 @@ def is_system_frame(self, filename): self.system_frame_cache[filename] = system_frame return system_frame - - - def is_http_frame(self, filename): - if filename in self.http_frame_cache: - return self.http_frame_cache[filename] - - http_frame = False - - for r in self.http_frame_regexp: - if r.search(filename): - http_frame = True - break - - if len(self.http_frame_cache) < self.MAX_CACHE_SIZE: - self.http_frame_cache[filename] = http_frame - - return http_frame - diff --git a/stackimpact/message_queue.py b/stackimpact/message_queue.py index 2cd6698..2b96704 100644 --- a/stackimpact/message_queue.py +++ b/stackimpact/message_queue.py @@ -6,71 +6,61 @@ from .utils import timestamp, base64_encode -class MessageQueue: +class MessageQueue(object): + FLUSH_INTERVAL = 5; MESSAGE_TTL = 10 * 60 + def __init__(self, agent): self.agent = agent self.queue = [] self.queue_lock = threading.Lock() self.flush_timer = None - self.expire_timer = None self.backoff_seconds = 0 self.last_flush_ts = 0 def start(self): - self.expire_timer = self.agent.schedule(60, 60, self.expire) - self.flush_timer = self.agent.schedule(5, 5, self.flush) - + if self.agent.get_option('auto_profiling'): + self.flush_timer = self.agent.schedule(self.FLUSH_INTERVAL, self.FLUSH_INTERVAL, self.flush) - def destroy(self): - self.queue = [] + def stop(self): if self.flush_timer: self.flush_timer.cancel() self.flush_timer = None - - if self.expire_timer: - self.expire_timer.cancel() - self.expire_timer = None - + def add(self, topic, message): - m = { + entry = { 'topic': topic, 'content': message, 'added_at': timestamp() } with self.queue_lock: - self.queue.append(m) + self.queue.append(entry) self.agent.log('Added message to the queue for topic: ' + topic) self.agent.log(message) - def expire(self): + def flush(self, with_interval=False): if len(self.queue) == 0: return now = timestamp() - - # expire and copy messages - with self.queue_lock: - self.queue = [m for m in self.queue if m['added_at'] >= now - self.MESSAGE_TTL] - - - def flush(self): - if len(self.queue) == 0: + if with_interval and self.last_flush_ts > now - self.FLUSH_INTERVAL: return - now = timestamp() - # flush only if backoff time is elapsed if self.last_flush_ts + self.backoff_seconds > now: return + # expire old messages + with self.queue_lock: + self.queue = [m for m in self.queue if m['added_at'] >= now - self.MESSAGE_TTL] + # read queue outgoing = None with self.queue_lock: @@ -86,7 +76,7 @@ def flush(self): 'messages': outgoing_copy } - self.last_flush_ts = timestamp() + self.last_flush_ts = now try: api_request = APIRequest(self.agent) diff --git a/stackimpact/metric.py b/stackimpact/metric.py index c5bd9ca..d3abc0c 100644 --- a/stackimpact/metric.py +++ b/stackimpact/metric.py @@ -6,7 +6,7 @@ from .utils import timestamp, generate_uuid, generate_sha1 -class Metric: +class Metric(object): TYPE_STATE = 'state' TYPE_COUNTER = 'counter' @@ -17,10 +17,10 @@ class Metric: CATEGORY_MEMORY = 'memory' CATEGORY_GC = 'gc' CATEGORY_RUNTIME = 'runtime' + CATEGORY_SPAN = 'span' CATEGORY_CPU_PROFILE = 'cpu-profile' CATEGORY_MEMORY_PROFILE = 'memory-profile' CATEGORY_BLOCK_PROFILE = 'block-profile' - CATEGORY_HTTP_TRACE = 'http-trace' CATEGORY_ERROR_PROFILE = 'error-profile' NAME_CPU_TIME = 'CPU time' @@ -36,9 +36,10 @@ class Metric: NAME_THREAD_COUNT = 'Active threads' NAME_UNCOLLECTED_ALLOCATIONS = 'Uncollected allocations' NAME_BLOCKING_CALL_TIMES = 'Blocking call times' - NAME_HTTP_TRANSACTION_BREAKDOWN = 'HTTP transaction breakdown' NAME_HANDLED_EXCEPTIONS = 'Handled exceptions' - + NAME_TF_OPERATION_TIMES = 'TensorFlow operation times' + NAME_TF_OPERATION_ALLOCATION_RATE = 'TensorFlow operation allocation rate' + UNIT_NONE = '' UNIT_MILLISECOND = 'millisecond' UNIT_MICROSECOND = 'microsecond' @@ -48,7 +49,7 @@ class Metric: UNIT_PERCENT = 'percent' TRIGGER_TIMER = 'timer' - TRIGGER_ANOMALY = 'anomaly' + TRIGGER_API = 'api' def __init__(self, agent, typ, category, name, unit): @@ -142,15 +143,37 @@ def to_dict(self): class Breakdown: + TYPE_CALLGRAPH = 'callgraph' + TYPE_DEVICE = 'device' + TYPE_CALLSITE = 'callsite' + TYPE_OPERATION = 'operation' + TYPE_ERROR = 'error' + RESERVOIR_SIZE = 1000 - def __init__(self, name): + def __init__(self, name, typ = None): self.name = name + self.type = typ + self.metadata = dict() + self.children = dict() self.measurement = 0 self.num_samples = 0 self.reservoir = [] - self.children = {} - self._overhead = 0 + + + def set_type(self, typ): + self.type = typ + + + def add_metadata(self, key, value): + self.metadata[key] = value + + + def get_metadata(self, key): + if key in self.metadata: + return self.metadata[key] + else: + return None def find_child(self, name): @@ -161,21 +184,21 @@ def find_child(self, name): def max_child(self): - max = None + max_ch = None for name, child in self.children.items(): - if max == None or child.measurement > max.measurement: - max = child + if max_ch is None or child.measurement > max_ch.measurement: + max_ch = child - return max + return max_ch def min_child(self): - min = None + min_ch = None for name, child in self.children.items(): - if min == None or child.measurement < min.measurement: - min = child + if min_ch == None or child.measurement < min_ch.measurement: + min_ch = child - return min + return min_ch def add_child(self, child): @@ -195,28 +218,28 @@ def find_or_add_child(self, name): return child - def filter(self, from_level, min, max): - self.filter_level(1, from_level, min, max) + def filter(self, from_level, min_measurement, max_measurement): + self.filter_level(1, from_level, min_measurement, max_measurement) - def filter_level(self, current_level, from_level, min, max): + def filter_level(self, current_level, from_level, min_measurement, max_measurement): for name in list(self.children.keys()): child = self.children[name] - if current_level >= from_level and (child.measurement < min or child.measurement > max): + if current_level >= from_level and (child.measurement < min_measurement or child.measurement > max_measurement): del self.children[name] else: - child.filter_level(current_level + 1, from_level, min, max) + child.filter_level(current_level + 1, from_level, min_measurement, max_measurement) def depth(self): - max = 0 + max_depth = 0 for name, child in self.children.items(): - d = child.depth() - if d > max: - max = d + child_depth = child.depth() + if child_depth > max_depth: + max_depth = child_depth - return max + 1 + return max_depth + 1 def propagate(self): @@ -285,6 +308,28 @@ def normalize(self, factor): child.normalize(factor) + def scale(self, factor): + self.measurement = self.measurement * factor + self.num_samples = int(math.ceil(self.num_samples * factor)) + + for name, child in self.children.items(): + child.scale(factor) + + + def round(self): + self.measurement = round(self.measurement) + + for name, child in self.children.items(): + child.round() + + + def floor(self): + self.measurement = int(self.measurement) + + for name, child in self.children.items(): + child.floor() + + def to_dict(self): children_map = [] for name, child in self.children.items(): @@ -292,6 +337,7 @@ def to_dict(self): node_map = { "name": self.name, + "metadata": self.metadata, "measurement": self.measurement, "num_samples": self.num_samples, "children": children_map, @@ -305,13 +351,13 @@ def __str__(self): def dump_level(self, level): - s = '' + dump_str = '' for i in range(0, level): - s += ' ' + dump_str += ' ' - s += '{0} - {1} ({2})\n'.format(self.name, self.measurement, self.num_samples) + dump_str += '{0} - {1} ({2})\n'.format(self.name, self.measurement, self.num_samples) for name, child in self.children.items(): - s += child.dump_level(level + 1) + dump_str += child.dump_level(level + 1) - return s + return dump_str diff --git a/stackimpact/profiler_scheduler.py b/stackimpact/profiler_scheduler.py deleted file mode 100644 index 0606f65..0000000 --- a/stackimpact/profiler_scheduler.py +++ /dev/null @@ -1,62 +0,0 @@ -from __future__ import division - -import random -import math - -from .utils import timestamp, base64_encode -from .metric import Metric - -class ProfilerScheduler: - - def __init__(self, agent, record_interval, record_duration, report_interval, record_func, report_func): - self.agent = agent - self.record_interval = record_interval - self.record_duration = record_duration - self.report_interval = report_interval - self.record_func = record_func - self.report_func = report_func - self.random_timer = None - self.record_timer = None - self.report_timer = None - - - def start(self): - if self.record_func: - def random_delay(): - timeout = random.randint(0, round(self.record_interval - self.record_duration)) - self.random_timer = self.agent.delay(timeout, self.execute_record) - - self.record_timer = self.agent.schedule(self.record_interval, self.record_interval, random_delay) - - self.report_timer = self.agent.schedule(self.report_interval, self.report_interval, self.execute_report) - - - def destroy(self): - if self.random_timer: - self.random_timer.cancel() - self.random_timer = None - - if self.record_timer: - self.record_timer.cancel() - self.record_timer = None - - if self.report_timer: - self.report_timer.cancel() - self.report_timer = None - - - def execute_record(self): - with self.agent.profiler_lock: - try: - self.record_func(self.record_duration) - except Exception: - self.agent.exception() - - - def execute_report(self): - with self.agent.profiler_lock: - try: - self.report_func() - except Exception: - self.agent.exception() - diff --git a/stackimpact/profilers/__init__.py b/stackimpact/profilers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stackimpact/profilers/allocation_profiler.py b/stackimpact/profilers/allocation_profiler.py new file mode 100644 index 0000000..a852593 --- /dev/null +++ b/stackimpact/profilers/allocation_profiler.py @@ -0,0 +1,124 @@ +from __future__ import division + +import sys +import time +import re +import threading + +from ..runtime import min_version, runtime_info, read_vm_size +from ..utils import timestamp +from ..metric import Metric +from ..metric import Breakdown + +if min_version(3, 4): + import tracemalloc + + +class AllocationProfiler(object): + MAX_TRACEBACK_SIZE = 25 # number of frames + MAX_MEMORY_OVERHEAD = 10 * 1e6 # 10MB + MAX_PROFILED_ALLOCATIONS = 25 + + + def __init__(self, agent): + self.agent = agent + self.ready = False + self.profile = None + self.profile_lock = threading.Lock() + self.overhead_monitor = None + self.start_ts = None + + + def setup(self): + if self.agent.get_option('allocation_profiler_disabled'): + return + + if not runtime_info.OS_LINUX and not runtime_info.OS_DARWIN: + self.agent.log('CPU profiler is only supported on Linux and OS X.') + return + + if not min_version(3, 4): + self.agent.log('Memory allocation profiling is available for Python 3.4 or higher') + return + + self.ready = True + + + def reset(self): + self.profile = Breakdown('Allocation call graph', Breakdown.TYPE_CALLGRAPH) + + + def start_profiler(self): + self.agent.log('Activating memory allocation profiler.') + + def start(): + tracemalloc.start(self.MAX_TRACEBACK_SIZE) + self.agent.run_in_main_thread(start) + + self.start_ts = time.time() + + def monitor_overhead(): + if tracemalloc.is_tracing() and tracemalloc.get_tracemalloc_memory() > self.MAX_MEMORY_OVERHEAD: + self.agent.log('Allocation profiler memory overhead limit exceeded: {0} bytes'.format(tracemalloc.get_tracemalloc_memory())) + self.stop_profiler() + + self.overhead_monitor = self.agent.schedule(0.5, 0.5, monitor_overhead) + + + def stop_profiler(self): + self.agent.log('Deactivating memory allocation profiler.') + + with self.profile_lock: + if self.overhead_monitor: + self.overhead_monitor.cancel() + self.overhead_monitor = None + + if tracemalloc.is_tracing(): + snapshot = tracemalloc.take_snapshot() + self.agent.log('Allocation profiler memory overhead {0} bytes'.format(tracemalloc.get_tracemalloc_memory())) + tracemalloc.stop() + self.process_snapshot(snapshot, time.time() - self.start_ts) + + + def build_profile(self, duration): + with self.profile_lock: + self.profile.normalize(duration) + self.profile.propagate() + self.profile.floor() + self.profile.filter(2, 1000, float("inf")) + + return [{ + 'category': Metric.CATEGORY_MEMORY_PROFILE, + 'name': Metric.NAME_UNCOLLECTED_ALLOCATIONS, + 'unit': Metric.UNIT_BYTE, + 'unit_interval': 1, + 'profile': self.profile + }] + + + def destroy(self): + pass + + + def process_snapshot(self, snapshot, duration): + stats = snapshot.statistics('traceback') + + for stat in stats[:self.MAX_PROFILED_ALLOCATIONS]: + if stat.traceback: + skip_stack = False + for frame in stat.traceback: + if self.agent.frame_cache.is_agent_frame(frame.filename): + skip_stack = True + break + if skip_stack: + continue + + current_node = self.profile + for frame in reversed(stat.traceback): + if frame.filename == '': + continue + + frame_name = '{0}:{1}'.format(frame.filename, frame.lineno) + current_node = current_node.find_or_add_child(frame_name) + current_node.set_type(Breakdown.TYPE_CALLSITE) + current_node.increment(stat.size, stat.count) diff --git a/stackimpact/profilers/block_profiler.py b/stackimpact/profilers/block_profiler.py new file mode 100644 index 0000000..81782a0 --- /dev/null +++ b/stackimpact/profilers/block_profiler.py @@ -0,0 +1,161 @@ +from __future__ import division + +import os +import sys +import time +import threading +import re +import signal + +from ..runtime import min_version, runtime_info +from ..metric import Metric +from ..metric import Breakdown +from ..frame import Frame + +if runtime_info.GEVENT: + import gevent + + +class BlockProfiler(object): + SAMPLING_RATE = 0.05 + MAX_TRACEBACK_SIZE = 25 # number of frames + + + def __init__(self, agent): + self.agent = agent + self.ready = False + self.profile = None + self.profile_lock = threading.Lock() + self.prev_signal_handler = None + self.sampler_active = False + + + def setup(self): + if self.agent.get_option('block_profiler_disabled'): + return + + if not runtime_info.OS_LINUX and not runtime_info.OS_DARWIN: + self.agent.log('CPU profiler is only supported on Linux and OS X.') + return + + sample_time = self.SAMPLING_RATE * 1000 + + main_thread_id = None + if runtime_info.GEVENT: + main_thread_id = gevent._threading.get_ident() + else: + main_thread_id = threading.current_thread().ident + + def _sample(signum, signal_frame): + if self.sampler_active: + return + self.sampler_active = True + + with self.profile_lock: + try: + self.process_sample(signal_frame, sample_time, main_thread_id) + signal_frame = None + except Exception: + self.agent.exception() + + self.sampler_active = False + + self.prev_signal_handler = signal.signal(signal.SIGALRM, _sample) + + self.ready = True + + + def destroy(self): + if not self.ready: + return + + signal.signal(signal.SIGALRM, self.prev_signal_handler) + + + def reset(self): + self.profile = Breakdown('Execution call graph', Breakdown.TYPE_CALLGRAPH) + + + def start_profiler(self): + self.agent.log('Activating block profiler.') + + signal.setitimer(signal.ITIMER_REAL, self.SAMPLING_RATE, self.SAMPLING_RATE) + + + def stop_profiler(self): + signal.setitimer(signal.ITIMER_REAL, 0) + + self.agent.log('Deactivating block profiler.') + + + def build_profile(self, duration): + with self.profile_lock: + self.profile.normalize(duration) + self.profile.propagate() + self.profile.floor() + self.profile.filter(2, 1, float("inf")) + + return [{ + 'category': Metric.CATEGORY_BLOCK_PROFILE, + 'name': Metric.NAME_BLOCKING_CALL_TIMES, + 'unit': Metric.UNIT_MILLISECOND, + 'unit_interval': 1, + 'profile': self.profile + }] + + + def process_sample(self, signal_frame, sample_time, main_thread_id): + if self.profile: + start = time.clock() + + current_frames = sys._current_frames() + items = current_frames.items() + for thread_id, thread_frame in items: + if thread_id == main_thread_id: + thread_frame = signal_frame + + stack = self.recover_stack(thread_frame) + if stack: + current_node = self.profile + for frame in reversed(stack): + current_node = current_node.find_or_add_child(str(frame)) + current_node.set_type(Breakdown.TYPE_CALLSITE) + current_node.increment(sample_time, 1) + + thread_id, thread_frame, stack = None, None, None + + items = None + current_frames = None + + + def recover_stack(self, thread_frame): + stack = [] + + system_only = True + depth = 0 + while thread_frame is not None and depth <= self.MAX_TRACEBACK_SIZE: + if thread_frame.f_code and thread_frame.f_code.co_name and thread_frame.f_code.co_filename: + func_name = thread_frame.f_code.co_name + filename = thread_frame.f_code.co_filename + lineno = thread_frame.f_lineno + + if self.agent.frame_cache.is_agent_frame(filename): + return None + + if not self.agent.frame_cache.is_system_frame(filename): + system_only = False + + frame = Frame(func_name, filename, lineno) + stack.append(frame) + + thread_frame = thread_frame.f_back + + depth += 1 + + if system_only: + return None + + if len(stack) == 0: + return None + else: + return stack diff --git a/stackimpact/profilers/cpu_profiler.py b/stackimpact/profilers/cpu_profiler.py new file mode 100644 index 0000000..dacce79 --- /dev/null +++ b/stackimpact/profilers/cpu_profiler.py @@ -0,0 +1,138 @@ +from __future__ import division + +import os +import sys +import time +import threading +import re +import signal + +from ..runtime import min_version, runtime_info +from ..metric import Metric +from ..metric import Breakdown +from ..frame import Frame + + + +class CPUProfiler(object): + SAMPLING_RATE = 0.01 + MAX_TRACEBACK_SIZE = 25 # number of frames + + + def __init__(self, agent): + self.agent = agent + self.ready = False + self.profile = None + self.profile_lock = threading.Lock() + self.prev_signal_handler = None + self.sampler_active = False + + + def setup(self): + if self.agent.get_option('cpu_profiler_disabled'): + return + + if not runtime_info.OS_LINUX and not runtime_info.OS_DARWIN: + self.agent.log('CPU profiler is only supported on Linux and OS X.') + return + + def _sample(signum, signal_frame): + if self.sampler_active: + return + self.sampler_active = True + + with self.profile_lock: + try: + self.process_sample(signal_frame) + signal_frame = None + except Exception: + self.agent.exception() + + self.sampler_active = False + + self.prev_signal_handler = signal.signal(signal.SIGPROF, _sample) + + self.ready = True + + + def reset(self): + self.profile = Breakdown('Execution call graph', Breakdown.TYPE_CALLGRAPH) + + + def start_profiler(self): + self.agent.log('Activating CPU profiler.') + + signal.setitimer(signal.ITIMER_PROF, self.SAMPLING_RATE, self.SAMPLING_RATE) + + + def stop_profiler(self): + signal.setitimer(signal.ITIMER_PROF, 0) + + + def destroy(self): + if not self.ready: + return + + signal.signal(signal.SIGPROF, self.prev_signal_handler) + + + def build_profile(self, duration): + with self.profile_lock: + self.profile.propagate() + self.profile.evaluate_percent(duration / self.SAMPLING_RATE) + self.profile.filter(2, 1, 100) + + return [{ + 'category': Metric.CATEGORY_CPU_PROFILE, + 'name': Metric.NAME_MAIN_THREAD_CPU_USAGE, + 'unit': Metric.UNIT_PERCENT, + 'unit_interval': None, + 'profile': self.profile + }] + + + def process_sample(self, signal_frame): + if self.profile: + start = time.clock() + if signal_frame: + stack = self.recover_stack(signal_frame) + if stack: + self.update_profile(self.profile, stack) + + stack = None + + + def recover_stack(self, signal_frame): + stack = [] + + depth = 0 + while signal_frame is not None and depth <= self.MAX_TRACEBACK_SIZE: + if signal_frame.f_code and signal_frame.f_code.co_name and signal_frame.f_code.co_filename: + func_name = signal_frame.f_code.co_name + filename = signal_frame.f_code.co_filename + lineno = signal_frame.f_lineno + + if self.agent.frame_cache.is_agent_frame(filename): + return None + + frame = Frame(func_name, filename, lineno) + stack.append(frame) + + signal_frame = signal_frame.f_back + + depth += 1 + + if len(stack) == 0: + return None + else: + return stack + + + def update_profile(self, profile, stack): + current_node = profile + + for frame in reversed(stack): + current_node = current_node.find_or_add_child(str(frame)) + current_node.set_type(Breakdown.TYPE_CALLSITE) + + current_node.increment(0, 1) diff --git a/stackimpact/reporters/allocation_reporter.py b/stackimpact/reporters/allocation_reporter.py deleted file mode 100644 index ea4a2e3..0000000 --- a/stackimpact/reporters/allocation_reporter.py +++ /dev/null @@ -1,132 +0,0 @@ -from __future__ import division - -import sys -import time -import re - -from ..runtime import min_version, runtime_info, read_vm_size -from ..profiler_scheduler import ProfilerScheduler -from ..metric import Metric -from ..metric import Breakdown - -if min_version(3, 4): - import tracemalloc - - -class AllocationReporter: - MAX_TRACEBACK_SIZE = 25 # number of frames - MAX_MEMORY_OVERHEAD = 10 * 1e6 # 10MB - MAX_PROFILED_ALLOCATIONS = 25 - - - def __init__(self, agent): - self.agent = agent - self.profiler_scheduler = None - self.profile = None - self.profile_duration = 0 - - - def start(self): - if self.agent.get_option('allocation_profiler_disabled'): - return - - if not min_version(3, 4): - self.agent.log('Memory allocation profiling is available for Python 3.4 or higher') - return - - self.reset() - - self.profiler_scheduler = ProfilerScheduler(self.agent, 20, 5, 120, self.record, self.report) - self.profiler_scheduler.start() - - - def destroy(self): - if self.agent.get_option('allocation_profiler_disabled'): - return - - if self.profiler_scheduler: - self.profiler_scheduler.destroy() - - - def metrics(self): - if runtime_info.OS_LINUX: - return { - 'vm-size': read_vm_size() - } - - return None - - - def reset(self): - self.profile = Breakdown('root') - self.profile_duration = 0 - - - def record(self, max_duration): - def start(): - tracemalloc.start(self.MAX_TRACEBACK_SIZE) - self.agent.run_in_main_thread(start) - - duration = 0 - step = 1 - while duration < max_duration: - time.sleep(step) - duration += step - - if tracemalloc.get_tracemalloc_memory() > self.MAX_MEMORY_OVERHEAD: - break - - if tracemalloc.is_tracing(): - snapshot = tracemalloc.take_snapshot() - self.agent.log('Allocation profiler memory overhead {0} bytes'.format(tracemalloc.get_tracemalloc_memory())) - tracemalloc.stop() - self.process_snapshot(snapshot, duration) - - self.profile_duration += duration - - - - def process_snapshot(self, snapshot, duration): - stats = snapshot.statistics('traceback') - - for stat in stats[:self.MAX_PROFILED_ALLOCATIONS]: - if stat.traceback: - skip_stack = False - for frame in stat.traceback: - if self.agent.frame_selector.is_agent_frame(frame.filename): - skip_stack = True - break - if skip_stack: - continue - - current_node = self.profile - current_node.increment(stat.size, stat.count) - - for frame in reversed(stat.traceback): - if frame.filename == '': - continue - - if self.agent.frame_selector.is_system_frame(frame.filename): - continue - - frame_name = '{0}:{1}'.format(frame.filename, frame.lineno) - - current_node = current_node.find_or_add_child(frame_name) - current_node.increment(stat.size, stat.count) - - - def report(self): - if self.agent.config.is_profiling_disabled(): - return - - if self.profile_duration == 0: - return - - self.profile.normalize(self.profile_duration) - - metric = Metric(self.agent, Metric.TYPE_PROFILE, Metric.CATEGORY_MEMORY_PROFILE, Metric.NAME_UNCOLLECTED_ALLOCATIONS, Metric.UNIT_BYTE) - measurement = metric.create_measurement(Metric.TRIGGER_TIMER, self.profile.measurement, 1, self.profile) - self.agent.message_queue.add('metric', metric.to_dict()) - - self.reset() - diff --git a/stackimpact/reporters/block_reporter.py b/stackimpact/reporters/block_reporter.py deleted file mode 100644 index 917c1b1..0000000 --- a/stackimpact/reporters/block_reporter.py +++ /dev/null @@ -1,190 +0,0 @@ -from __future__ import division - -import os -import sys -import time -import threading -import re -import signal - -from ..runtime import min_version, runtime_info -from ..profiler_scheduler import ProfilerScheduler -from ..metric import Metric -from ..metric import Breakdown -from ..frame import Frame - -if runtime_info.GEVENT: - import gevent - - -class BlockReporter: - SAMPLING_RATE = 0.05 - MAX_TRACEBACK_SIZE = 25 # number of frames - - - def __init__(self, agent): - self.agent = agent - self.profiler_scheduler = None - self.profile_lock = threading.Lock() - self.block_profile = None - self.http_profile = None - self.profile_duration = 0 - self.prev_signal_handler = None - self.handler_active = False - - - def start(self): - if self.agent.get_option('block_profiler_disabled'): - return - - if not runtime_info.OS_LINUX and not runtime_info.OS_DARWIN: - self.agent.log('Time profiler is only supported on Linux and OS X.') - return - - sample_time = self.SAMPLING_RATE * 1000 - - main_thread_id = None - if runtime_info.GEVENT: - main_thread_id = gevent._threading.get_ident() - else: - main_thread_id = threading.current_thread().ident - - def _sample(signum, signal_frame): - if self.handler_active: - return - self.handler_active = True - - with self.profile_lock: - try: - if self.block_profile: - start = time.clock() - - current_frames = sys._current_frames() - for thread_id, thread_frame in current_frames.items(): - if thread_id == main_thread_id: - thread_frame = signal_frame - - stack = self.recover_stack(thread_frame) - if stack: - self.update_block_profile(stack, sample_time) - self.update_http_profile(stack, sample_time) - - self.block_profile._overhead += (time.clock() - start) - except Exception: - self.agent.exception() - - self.handler_active = False - - self.prev_signal_handler = signal.signal(signal.SIGALRM, _sample) - self.reset() - - self.profiler_scheduler = ProfilerScheduler(self.agent, 10, 2, 120, self.record, self.report) - self.profiler_scheduler.start() - - - def destroy(self): - if self.agent.get_option('block_profiler_disabled'): - return - - if self.prev_signal_handler: - signal.signal(signal.SIGALRM, self.prev_signal_handler) - - if self.profiler_scheduler: - self.profiler_scheduler.destroy() - - - def reset(self): - self.block_profile = Breakdown('root') - self.http_profile = Breakdown('root') - self.profile_duration = 0 - - - def record(self, duration): - if self.agent.config.is_profiling_disabled(): - return - - signal.setitimer(signal.ITIMER_REAL, self.SAMPLING_RATE, self.SAMPLING_RATE) - time.sleep(duration) - signal.setitimer(signal.ITIMER_REAL, 0) - - self.profile_duration += duration - - self.agent.log('Time profiler CPU overhead per activity second: {0} seconds'.format(self.block_profile._overhead / self.profile_duration)) - - - def recover_stack(self, thread_frame): - stack = [] - - depth = 0 - while thread_frame is not None and depth <= self.MAX_TRACEBACK_SIZE: - if thread_frame.f_code and thread_frame.f_code.co_name and thread_frame.f_code.co_filename: - func_name = thread_frame.f_code.co_name - filename = thread_frame.f_code.co_filename - lineno = thread_frame.f_lineno - - if self.agent.frame_selector.is_agent_frame(filename): - return None - - if not self.agent.frame_selector.is_system_frame(filename): - frame = Frame(func_name, filename, lineno) - stack.append(frame) - - thread_frame = thread_frame.f_back - - depth += 1 - - if len(stack) == 0: - return None - else: - return stack - - - def update_block_profile(self, stack, sample_time): - current_node = self.block_profile - current_node.increment(sample_time, 1) - - for frame in reversed(stack): - current_node = current_node.find_or_add_child(str(frame)) - current_node.increment(sample_time, 1) - - - def update_http_profile(self, stack, sample_time): - include = False - for frame in stack: - if self.agent.frame_selector.is_http_frame(frame.filename): - frame._skip = True - include = True - - if include: - current_node = self.http_profile - current_node.increment(sample_time, 1) - - for frame in reversed(stack): - if not frame._skip: - current_node = current_node.find_or_add_child(str(frame)) - current_node.increment(sample_time, 1) - - - def report(self): - if self.agent.config.is_profiling_disabled(): - return - - if self.profile_duration == 0: - return - - with self.profile_lock: - self.block_profile.normalize(self.profile_duration) - - metric = Metric(self.agent, Metric.TYPE_PROFILE, Metric.CATEGORY_BLOCK_PROFILE, Metric.NAME_BLOCKING_CALL_TIMES, Metric.UNIT_MILLISECOND) - measurement = metric.create_measurement(Metric.TRIGGER_TIMER, self.block_profile.measurement, 1, self.block_profile) - self.agent.message_queue.add('metric', metric.to_dict()) - - if self.block_profile.num_samples > 0 and self.http_profile.num_samples > 0: - self.http_profile.normalize(self.profile_duration) - self.http_profile.convert_to_percent(self.block_profile.measurement) - - metric = Metric(self.agent, Metric.TYPE_PROFILE, Metric.CATEGORY_HTTP_TRACE, Metric.NAME_HTTP_TRANSACTION_BREAKDOWN, Metric.UNIT_PERCENT) - measurement = metric.create_measurement(Metric.TRIGGER_TIMER, self.http_profile.measurement, None, self.http_profile) - self.agent.message_queue.add('metric', metric.to_dict()) - - self.reset() diff --git a/stackimpact/reporters/cpu_reporter.py b/stackimpact/reporters/cpu_reporter.py deleted file mode 100644 index 241269d..0000000 --- a/stackimpact/reporters/cpu_reporter.py +++ /dev/null @@ -1,148 +0,0 @@ -from __future__ import division - -import os -import sys -import time -import threading -import re -import signal - -from ..runtime import min_version, runtime_info -from ..profiler_scheduler import ProfilerScheduler -from ..metric import Metric -from ..metric import Breakdown -from ..frame import Frame - - - -class CPUReporter: - SAMPLING_RATE = 0.01 - MAX_TRACEBACK_SIZE = 25 # number of frames - - - def __init__(self, agent): - self.agent = agent - self.profiler_scheduler = None - self.profile = None - self.profile_lock = threading.Lock() - self.profile_duration = 0 - self.prev_signal_handler = None - self.handler_active = False - - - def start(self): - if self.agent.get_option('cpu_profiler_disabled'): - return - - if not runtime_info.OS_LINUX and not runtime_info.OS_DARWIN: - self.agent.log('CPU profiler is only supported on Linux and OS X.') - return - - def _sample(signum, signal_frame): - if self.handler_active: - return - self.handler_active = True - - with self.profile_lock: - try: - if self.profile: - start = time.clock() - if signal_frame: - stack = self.recover_stack(signal_frame) - if stack: - self.update_profile(self.profile, stack) - self.profile._overhead += (time.clock() - start) - except Exception: - self.agent.exception() - - self.handler_active = False - - self.prev_signal_handler = signal.signal(signal.SIGPROF, _sample) - - self.reset() - - self.profiler_scheduler = ProfilerScheduler(self.agent, 10, 2, 120, self.record, self.report) - self.profiler_scheduler.start() - - - def destroy(self): - if self.agent.get_option('cpu_profiler_disabled'): - return - - if self.prev_signal_handler: - signal.signal(signal.SIGPROF, self.prev_signal_handler) - - if self.profiler_scheduler: - self.profiler_scheduler.destroy() - - - def reset(self): - with self.profile_lock: - self.profile = Breakdown('root') - self.profile_duration = 0 - - - def record(self, duration): - if self.agent.config.is_profiling_disabled(): - return - - signal.setitimer(signal.ITIMER_PROF, self.SAMPLING_RATE, self.SAMPLING_RATE) - time.sleep(duration) - signal.setitimer(signal.ITIMER_PROF, 0) - - self.profile_duration += duration - - self.agent.log('CPU profiler CPU overhead per activity second: {0} seconds'.format(self.profile._overhead / self.profile_duration)) - - - def recover_stack(self, signal_frame): - stack = [] - - depth = 0 - while signal_frame is not None and depth <= self.MAX_TRACEBACK_SIZE: - if signal_frame.f_code and signal_frame.f_code.co_name and signal_frame.f_code.co_filename: - func_name = signal_frame.f_code.co_name - filename = signal_frame.f_code.co_filename - lineno = signal_frame.f_lineno - - if self.agent.frame_selector.is_agent_frame(filename): - return None - - if not self.agent.frame_selector.is_system_frame(filename): - frame = Frame(func_name, filename, lineno) - stack.append(frame) - - signal_frame = signal_frame.f_back - - depth += 1 - - if len(stack) == 0: - return None - else: - return stack - - - def update_profile(self, profile, stack): - current_node = profile - current_node.increment(0, 1) - - for frame in reversed(stack): - current_node = current_node.find_or_add_child(str(frame)) - current_node.increment(0, 1) - - - def report(self): - if self.agent.config.is_profiling_disabled(): - return - - if self.profile_duration == 0: - return - - with self.profile_lock: - self.profile.evaluate_percent(self.profile_duration / self.SAMPLING_RATE) - - metric = Metric(self.agent, Metric.TYPE_PROFILE, Metric.CATEGORY_CPU_PROFILE, Metric.NAME_MAIN_THREAD_CPU_USAGE, Metric.UNIT_PERCENT) - measurement = metric.create_measurement(Metric.TRIGGER_TIMER, self.profile.measurement, None, self.profile) - self.agent.message_queue.add('metric', metric.to_dict()) - - self.reset() diff --git a/stackimpact/reporters/error_reporter.py b/stackimpact/reporters/error_reporter.py index 39dfba3..e66de83 100644 --- a/stackimpact/reporters/error_reporter.py +++ b/stackimpact/reporters/error_reporter.py @@ -10,12 +10,14 @@ from ..frame import Frame -class ErrorReporter: +class ErrorReporter(object): MAX_QUEUED_EXC = 100 def __init__(self, agent): self.agent = agent + self.ready = False + self.started = False self.process_timer = None self.report_timer = None self.exc_queue = collections.deque() @@ -24,16 +26,11 @@ def __init__(self, agent): self.added_exceptions = None - def start(self): + def setup(self): if self.agent.get_option('error_profiler_disabled'): return - self.reset_profile() - - self.process_timer = self.agent.schedule(1, 1, self.process) - self.report_timer = self.agent.schedule(60, 60, self.report) - - def _exc_info(ret): + def _exc_info(args, kwargs, ret, data): try: if not self.agent.agent_started or self.agent.agent_destroyed: return @@ -46,37 +43,61 @@ def _exc_info(ret): patch(sys, 'exc_info', None, _exc_info) + self.ready = True + def destroy(self): - if self.agent.get_option('error_profiler_disabled'): + if not self.ready: return unpatch(sys, 'exc_info') - - self.reset_profile() - - if self.process_timer: - self.process_timer.cancel() - self.process_timer = None - if self.report_timer: - self.report_timer.cancel() - self.report_timer = None - - def reset_profile(self): + def reset(self): with self.profile_lock: - self.profile = Breakdown('root') + self.profile = Breakdown('Error call graph', Breakdown.TYPE_ERROR) self.added_exceptions = {} + def start(self): + if self.agent.get_option('error_profiler_disabled'): + return + + if not self.agent.get_option('auto_profiling'): + return + + if self.started: + return + self.started = True + + self.reset() + + + self.process_timer = self.agent.schedule(1, 1, self.process) + self.report_timer = self.agent.schedule(60, 60, self.report) + + + def stop(self): + if not self.started: + return + self.started = False + + self.process_timer.cancel() + self.process_timer = None + + self.report_timer.cancel() + self.report_timer = None + + def report(self): with self.profile_lock: + self.profile.propagate() + metric = Metric(self.agent, Metric.TYPE_PROFILE, Metric.CATEGORY_ERROR_PROFILE, Metric.NAME_HANDLED_EXCEPTIONS, Metric.UNIT_NONE) measurement = metric.create_measurement(Metric.TRIGGER_TIMER, self.profile.measurement, 60, self.profile) self.agent.message_queue.add('metric', metric.to_dict()) - self.reset_profile() + self.reset() def process(self): @@ -99,10 +120,10 @@ def recover_stack(self, exc): filename = tb_frame[0] lineno = tb_frame[1] - if self.agent.frame_selector.is_agent_frame(filename): + if self.agent.frame_cache.is_agent_frame(filename): return None - if not self.agent.frame_selector.is_system_frame(filename): + if not self.agent.frame_cache.is_system_frame(filename): frame = Frame(func_name, filename, lineno) stack.append(frame) @@ -127,12 +148,10 @@ def update_profile(self, exc): return current_node = self.profile - current_node.increment(1, 0) for frame in reversed(stack): current_node = current_node.find_or_add_child(str(frame)) - current_node.increment(1, 0) - + current_node.set_type(Breakdown.TYPE_CALLSITE) message = '' if exc_type: @@ -152,4 +171,5 @@ def update_profile(self, exc): else: message_node = current_node.find_or_add_child('Other') + message_node.set_type(Breakdown.TYPE_ERROR) message_node.increment(1, 0) diff --git a/stackimpact/reporters/process_reporter.py b/stackimpact/reporters/process_reporter.py index a44c03c..5a1f0c7 100644 --- a/stackimpact/reporters/process_reporter.py +++ b/stackimpact/reporters/process_reporter.py @@ -10,27 +10,56 @@ from ..runtime import runtime_info, min_version, read_cpu_time, read_max_rss, read_current_rss, read_vm_size from ..metric import Metric -class ProcessReporter: +class ProcessReporter(object): def __init__(self, agent): self.agent = agent - self.metrics = {} + self.started = False + self.metrics = None self.report_timer = None - def start(self): - self.report_timer = self.agent.schedule(5, 60, self.report) + def setup(self): + pass def destroy(self): - if self.report_timer: - self.report_timer.cancel() - self.report_timer = None + pass + + + def reset(self): + pass + + + def start(self): + if not self.agent.get_option('auto_profiling'): + return + + if self.started: + return + self.started = True + + self.reset() + + self.report_timer = self.agent.schedule(60, 60, self.report) + + + def stop(self): + if not self.started: + return + self.started = False + + self.report_timer.cancel() + self.report_timer = None + + + def reset(self): + self.metrics = {} def report(self): # CPU - if runtime_info.OS_LINUX or runtime_info.OS_DARWIN: + if not runtime_info.OS_WIN: cpu_time = read_cpu_time() if cpu_time != None: cpu_time_metric = self.report_metric(Metric.TYPE_COUNTER, Metric.CATEGORY_CPU, Metric.NAME_CPU_TIME, Metric.UNIT_NANOSECOND, cpu_time) @@ -45,7 +74,7 @@ def report(self): # Memory - if runtime_info.OS_LINUX or runtime_info.OS_DARWIN: + if not runtime_info.OS_WIN: max_rss = read_max_rss() if max_rss != None: self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_MAX_RSS, Metric.UNIT_KILOBYTE, max_rss) diff --git a/stackimpact/reporters/profile_reporter.py b/stackimpact/reporters/profile_reporter.py new file mode 100644 index 0000000..57a9e9d --- /dev/null +++ b/stackimpact/reporters/profile_reporter.py @@ -0,0 +1,187 @@ +from __future__ import division + +import os +import sys +import time +import threading +import re +import random + +from ..runtime import min_version, runtime_info +from ..utils import timestamp +from ..metric import Metric +from ..metric import Breakdown +from ..frame import Frame + + +class ProfilerConfig(object): + + def __init__(self): + self.log_prefix = None + self.max_profile_duration = None + self.max_span_duration = None + self.max_span_count = None + self.span_interval = None + self.report_interval = None + self.report_only = False + + +class ProfileReporter: + + def __init__(self, agent, profiler, config): + self.agent = agent + self.profiler = profiler + self.config = config + self.started = False + self.span_timer = None + self.span_timeout = None + self.random_timer = None + self.report_timer = None + self.profile_start_ts = None + self.profile_duration = None + self.span_count = None + self.span_active = False + self.span_start_ts = None + self.span_trigger = None + + + def setup(self): + self.profiler.setup() + + + def start(self): + if not self.profiler.ready: + return + + if self.started: + return + self.started = True + + self.reset() + + if self.agent.get_option('auto_profiling'): + if not self.config.report_only: + def random_delay(): + timeout = random.randint(0, round(self.config.span_interval - self.config.max_span_duration)) + self.random_timer = self.agent.delay(timeout, self.start_profiling, False, True) + + self.span_timer = self.agent.schedule(0, self.config.span_interval, random_delay) + + self.report_timer = self.agent.schedule(self.config.report_interval, self.config.report_interval, self.report) + + + def stop(self): + if not self.started: + return + + self.started = False + + if self.span_timer: + self.span_timer.cancel() + self.span_timer = None + + if self.random_timer: + self.random_timer.cancel() + self.random_timer = None + + if self.report_timer: + self.report_timer.cancel() + self.report_timer = None + + self.stop_profiling() + + + def destroy(self): + self.profiler.destroy() + + + def reset(self): + self.profiler.reset() + self.profile_start_ts = timestamp() + self.profile_duration = 0 + self.span_count = 0 + self.span_trigger = Metric.TRIGGER_TIMER + + + def start_profiling(self, api_call, with_timeout): + if not self.started: + return False + + if self.profile_duration > self.config.max_profile_duration: + self.agent.log(self.config.log_prefix + ': max profiling duration reached.') + return False + + if api_call and self.span_count > self.config.max_span_count: + self.agent.log(self.config.log_prefix + ': max recording count reached.') + return False + + if self.agent.profiler_active: + self.agent.log(self.config.log_prefix + ': profiler lock exists.') + return False + + self.agent.profiler_active = True + self.agent.log(self.config.log_prefix + ': started.') + + try: + self.profiler.start_profiler() + except Exception: + self.agent.profiler_active = False + self.exception() + return False + + if with_timeout: + self.span_timeout = self.agent.delay(self.config.max_span_duration, self.stop_profiling) + + self.span_count = self.span_count + 1 + self.span_active = True + self.span_start_ts = time.time() + + if api_call: + self.span_trigger = Metric.TRIGGER_API + + return True + + + def stop_profiling(self): + if not self.span_active: + return + self.span_active = False + + try: + self.profile_duration = self.profile_duration + time.time() - self.span_start_ts + self.profiler.stop_profiler() + except Exception: + self.exception() + + self.agent.profiler_active = False + + if self.span_timeout: + self.span_timeout.cancel() + + self.agent.log(self.config.log_prefix + ': stopped.') + + + def report(self, with_interval=False): + if not self.started: + return + + if with_interval: + if self.profile_start_ts > timestamp() - self.config.report_interval: + return + elif self.profile_start_ts < timestamp() - 2 * self.config.report_interval: + self.reset() + return + + if not self.config.report_only and self.profile_duration == 0: + return + + self.agent.log(self.config.log_prefix + ': reporting profile.') + + profile_data = self.profiler.build_profile(self.profile_duration) + + for data in profile_data: + metric = Metric(self.agent, Metric.TYPE_PROFILE, data['category'], data['name'], data['unit']) + metric.create_measurement(self.span_trigger, data['profile'].measurement, data['unit_interval'], data['profile']) + self.agent.message_queue.add('metric', metric.to_dict()) + + self.reset() diff --git a/stackimpact/reporters/span_reporter.py b/stackimpact/reporters/span_reporter.py new file mode 100644 index 0000000..0db0c32 --- /dev/null +++ b/stackimpact/reporters/span_reporter.py @@ -0,0 +1,83 @@ + +import sys +import threading +import traceback +import collections + +from ..runtime import runtime_info, patch, unpatch +from ..metric import Metric +from ..metric import Breakdown +from ..frame import Frame + + +class SpanReporter(object): + + MAX_QUEUED_EXC = 100 + + + def __init__(self, agent): + self.agent = agent + self.started = False + self.report_timer = None + self.span_counters = None + self.span_lock = threading.Lock() + + + def setup(self): + pass + + + def destroy(self): + pass + + + def reset(self): + self.span_counters = dict() + + + def start(self): + if not self.agent.get_option('auto_profiling'): + return + + if self.started: + return + self.started = True + + self.reset() + + self.report_timer = self.agent.schedule(60, 60, self.report) + + + def stop(self): + if not self.started: + return + self.started = False + + self.report_timer.cancel() + self.report_timer = None + + + def record_span(self, name, duration): + if not self.started: + return + + counter = None + if name in self.span_counters: + counter = self.span_counters[name] + else: + with self.span_lock: + counter = Breakdown(name) + self.span_counters[name] = counter + + counter.update_p95(duration * 1000) + + + def report(self): + for name, counter in self.span_counters.items(): + counter.evaluate_p95(); + + metric = Metric(self.agent, Metric.TYPE_STATE, Metric.CATEGORY_SPAN, counter.name, Metric.UNIT_MILLISECOND) + measurement = metric.create_measurement(Metric.TRIGGER_TIMER, counter.measurement, 60) + self.agent.message_queue.add('metric', metric.to_dict()) + + self.reset() diff --git a/stackimpact/runtime.py b/stackimpact/runtime.py index 0f040a9..0df5a41 100644 --- a/stackimpact/runtime.py +++ b/stackimpact/runtime.py @@ -1,14 +1,17 @@ import time import sys -import resource import re import os +import signal from functools import wraps +try: + import resource +except ImportError: + pass - -class runtime_info: +class runtime_info(object): OS_LINUX = (sys.platform.startswith('linux')) OS_DARWIN = (sys.platform == 'darwin') OS_WIN = (sys.platform == 'win32') @@ -24,11 +27,11 @@ class runtime_info: pass -VmRSSRe = re.compile('VmRSS:\s+(\d+)\s+kB') -VmSizeRe = re.compile('VmSize:\s+(\d+)\s+kB') +VM_RSS_REGEXP = re.compile('VmRSS:\s+(\d+)\s+kB') +VM_SIZE_REGEXP = re.compile('VmSize:\s+(\d+)\s+kB') -def min_version(major, minor = 0): +def min_version(major, minor=0): return (sys.version_info.major == major and sys.version_info.minor >= minor) @@ -57,9 +60,9 @@ def read_current_rss(): except Exception: return None - m = VmRSSRe.search(output) - if m: - return int(float(m.group(1))) + match = VM_RSS_REGEXP.search(output) + if match: + return int(float(match.group(1))) return None @@ -75,38 +78,42 @@ def read_vm_size(): except Exception: return None - m = VmSizeRe.search(output) - if m: - return int(float(m.group(1))) + match = VM_SIZE_REGEXP.search(output) + if match: + return int(float(match.group(1))) return None def patch(obj, func_name, before_func, after_func): if not hasattr(obj, func_name): - return + return False target_func = getattr(obj, func_name) # already patched if hasattr(target_func, '__stackimpact_orig__'): - return + return True @wraps(target_func) - def wrapper(*args, **kwds): + def wrapper(*args, **kwargs): + data = None + if before_func: - before_func(*args, **kwds) + args, kwargs, data = before_func(args, kwargs) - ret = target_func(*args, **kwds) + ret = target_func(*args, **kwargs) if after_func: - after_func(ret) + after_func(args, kwargs, ret, data) return ret wrapper.__orig__ = target_func setattr(obj, func_name, wrapper) + return True + def unpatch(obj, func_name): if not hasattr(obj, func_name): @@ -117,3 +124,22 @@ def unpatch(obj, func_name): return setattr(obj, func_name, getattr(wrapper, '__stackimpact_orig__')) + + +def register_signal(signal_number, handler_func, once=False): + prev_handler = None + + def _handler(signum, frame): + skip_prev = handler_func(signum, frame) + + if not skip_prev: + if callable(prev_handler): + if once: + signal.signal(signum, prev_handler) + prev_handler(signum, frame) + elif prev_handler == signal.SIG_DFL and once: + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + + prev_handler = signal.signal(signal_number, _handler) + diff --git a/stackimpact/utils.py b/stackimpact/utils.py index 1ed5258..caf7109 100644 --- a/stackimpact/utils.py +++ b/stackimpact/utils.py @@ -27,7 +27,7 @@ def generate_uuid(): def generate_sha1(text): - h = hashlib.sha1() - h.update(text.encode('utf-8')) - return h.hexdigest() + sha1_hash = hashlib.sha1() + sha1_hash.update(text.encode('utf-8')) + return sha1_hash.hexdigest() diff --git a/tests/agent_test.py b/tests/agent_test.py index 4a28c2c..6f87141 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -1,15 +1,21 @@ import unittest import sys import threading +import random +import time import stackimpact +from stackimpact.runtime import runtime_info, min_version -# python -m unittest discover -s tests -p *_test.py +# python3 -m unittest discover -v -s tests -p *_test.py class AgentTestCase(unittest.TestCase): def test_run_in_main_thread(self): + if runtime_info.OS_WIN: + return + stackimpact._agent = None agent = stackimpact.start( dashboard_address = 'http://localhost:5001', @@ -35,5 +41,148 @@ def _thread(): agent.destroy() + def test_profile(self): + if runtime_info.OS_WIN: + return + + stackimpact._agent = None + agent = stackimpact.start( + dashboard_address = 'http://localhost:5001', + agent_key = 'key1', + app_name = 'TestPythonApp', + debug = True + ) + + agent.cpu_reporter.start() + + span = agent.profile() + for i in range(0, 2000000): + random.randint(1, 1000000) + span.stop() + + agent.cpu_reporter.report() + + self.assertTrue('test_profile' in str(agent.message_queue.queue)) + + agent.destroy() + + + def test_with_profile(self): + if runtime_info.OS_WIN: + return + + stackimpact._agent = None + agent = stackimpact.start( + dashboard_address = 'http://localhost:5001', + agent_key = 'key1', + app_name = 'TestPythonApp', + debug = True + ) + + agent.cpu_reporter.start() + + with agent.profile(): + for i in range(0, 2000000): + random.randint(1, 1000000) + + agent.cpu_reporter.report() + + self.assertTrue('test_with_profile' in str(agent.message_queue.queue)) + + agent.destroy() + + + def test_cpu_profile(self): + stackimpact._agent = None + agent = stackimpact.start( + dashboard_address = 'http://localhost:5001', + agent_key = 'key1', + app_name = 'TestPythonApp', + auto_profiling = False, + debug = True + ) + + messages = [] + def add_mock(topic, message): + messages.append(message) + agent.message_queue.add = add_mock + + agent.start_cpu_profiler() + + for j in range(0, 2000000): + random.randint(1, 1000000) + + agent.stop_cpu_profiler() + + self.assertTrue('test_cpu_profile' in str(messages)) + + agent.destroy() + + + def test_allocation_profile(self): + if runtime_info.OS_WIN or not min_version(3, 4): + return + + stackimpact._agent = None + agent = stackimpact.start( + dashboard_address = 'http://localhost:5001', + agent_key = 'key1', + app_name = 'TestPythonApp', + auto_profiling = False, + debug = True + ) + + messages = [] + def add_mock(topic, message): + messages.append(message) + agent.message_queue.add = add_mock + + agent.start_allocation_profiler() + + mem1 = [] + for i in range(0, 1000): + obj1 = {'v': random.randint(0, 1000000)} + mem1.append(obj1) + + agent.stop_allocation_profiler() + + self.assertTrue('agent_test.py' in str(messages)) + + agent.destroy() + + + def test_block_profile(self): + if runtime_info.OS_WIN or not min_version(3, 4): + return + + stackimpact._agent = None + agent = stackimpact.start( + dashboard_address = 'http://localhost:5001', + agent_key = 'key1', + app_name = 'TestPythonApp', + auto_profiling = False, + debug = True + ) + + messages = [] + def add_mock(topic, message): + messages.append(message) + agent.message_queue.add = add_mock + + agent.start_block_profiler() + + def blocking_call(): + time.sleep(0.1) + + for i in range(5): + blocking_call() + + agent.stop_block_profiler() + + self.assertTrue('blocking_call' in str(messages)) + + agent.destroy() + + if __name__ == '__main__': unittest.main() diff --git a/tests/frame_selector_test.py b/tests/frame_cache_test.py similarity index 58% rename from tests/frame_selector_test.py rename to tests/frame_cache_test.py index 8d0ca04..92ab785 100644 --- a/tests/frame_selector_test.py +++ b/tests/frame_cache_test.py @@ -6,7 +6,7 @@ import stackimpact -class FrameSelectorTestCase(unittest.TestCase): +class FrameCacheTestCase(unittest.TestCase): def test_skip_stack(self): stackimpact._agent = None @@ -18,13 +18,10 @@ def test_skip_stack(self): ) test_agent_file = os.path.realpath(stackimpact.__file__) - self.assertTrue(agent.frame_selector.is_agent_frame(test_agent_file)) + self.assertTrue(agent.frame_cache.is_agent_frame(test_agent_file)) test_system_file = os.path.realpath(threading.__file__) - self.assertTrue(agent.frame_selector.is_system_frame(test_system_file)) - - agent.frame_selector.add_http_frame_regexp(os.path.join('a', 'b', 'c')) - self.assertTrue(agent.frame_selector.is_http_frame(os.path.join('a', 'b', 'c', 'd'))) + self.assertTrue(agent.frame_cache.is_system_frame(test_system_file)) agent.destroy() diff --git a/tests/message_queue_test.py b/tests/message_queue_test.py index 908dc53..6732755 100644 --- a/tests/message_queue_test.py +++ b/tests/message_queue_test.py @@ -11,42 +11,14 @@ class MessageQueueTest(unittest.TestCase): - def test_expire(self): - stackimpact._agent = None - agent = stackimpact.start( - dashboard_address = 'http://localhost:5003', - agent_key = 'key1', - app_name = 'TestPythonApp', - debug = True - ) - - m = { - 'm1': 1 - } - agent.message_queue.add('t1', m) - - m = { - 'm2': 2 - } - agent.message_queue.add('t1', m) - - agent.message_queue.queue[0]['added_at'] = timestamp() - 20 * 60 - - agent.message_queue.expire() - - self.assertEqual(len(agent.message_queue.queue), 1) - self.assertEqual(agent.message_queue.queue[0]['content']['m2'], 2) - - agent.destroy() - def test_flush(self): - server = TestServer(5004) + server = TestServer(5005) server.start() stackimpact._agent = None agent = stackimpact.start( - dashboard_address = 'http://localhost:5004', + dashboard_address = 'http://localhost:5005', agent_key = 'key1', app_name = 'TestPythonApp', debug = True @@ -62,24 +34,25 @@ def test_flush(self): } agent.message_queue.add('t1', m) + agent.message_queue.queue[0]['added_at'] = timestamp() - 20 * 60 + agent.message_queue.flush() data = json.loads(server.get_request_data()) - self.assertEqual(data['payload']['messages'][0]['content']['m1'], 1) - self.assertEqual(data['payload']['messages'][1]['content']['m2'], 2) + self.assertEqual(data['payload']['messages'][0]['content']['m2'], 2) agent.destroy() server.join() def test_flush_fail(self): - server = TestServer(5005) + server = TestServer(5006) server.set_response_data("unparsablejson") server.start() stackimpact._agent = None agent = stackimpact.start( - dashboard_address = 'http://localhost:5005', + dashboard_address = 'http://localhost:5006', agent_key = 'key1', app_name = 'TestPythonApp', debug = True diff --git a/tests/metric_test.py b/tests/metric_test.py index 53cc3e0..9d112ec 100644 --- a/tests/metric_test.py +++ b/tests/metric_test.py @@ -33,7 +33,7 @@ def test_counter_metric(self): - def test_breakdown_filter(self): + def test_profile_filter(self): root = Breakdown('root') root.measurement = 10 @@ -56,7 +56,7 @@ def test_breakdown_filter(self): self.assertFalse(child2.find_child('child2child1')) - def test_breakdown_depth(self): + def test_profile_depth(self): root = Breakdown("root") child1 = Breakdown("child1") @@ -73,7 +73,7 @@ def test_breakdown_depth(self): self.assertEqual(child2.depth(), 2) - def test_breakdown_p95(self): + def test_profile_p95(self): root = Breakdown("root") child1 = Breakdown("child1") @@ -94,7 +94,7 @@ def test_breakdown_p95(self): self.assertEqual(root.measurement, 6.5) - def test_breakdown_p95_big(self): + def test_profile_p95_big(self): root = Breakdown("root") for i in range(0, 10000): diff --git a/tests/profiler_scheduler_test.py b/tests/profiler_scheduler_test.py deleted file mode 100644 index 686cd37..0000000 --- a/tests/profiler_scheduler_test.py +++ /dev/null @@ -1,47 +0,0 @@ -import unittest -import sys -import random -import time - -import stackimpact -from stackimpact.profiler_scheduler import ProfilerScheduler - - -class ProfilerSchedulerTestCase(unittest.TestCase): - - - def test_start_profiler(self): - stackimpact._agent = None - agent = stackimpact.start( - dashboard_address = 'http://localhost:5001', - agent_key = 'key1', - app_name = 'TestPythonApp', - debug = True - ) - - stats = { - "records": 0, - "reports": 0, - } - - def record_func(duration): - stats["records"] += 1 - - def report_func(): - stats["reports"] += 1 - - ps = ProfilerScheduler(agent, 0.010, 0.002, 0.050, record_func, report_func) - ps.start() - - time.sleep(0.150) - - self.assertFalse(stats["records"] < 10) - self.assertFalse(stats["reports"] < 2) - - ps.destroy() - agent.destroy() - - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/profilers/__init__.py b/tests/profilers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/reporters/allocation_reporter_test.py b/tests/profilers/allocation_profiler_test.py similarity index 63% rename from tests/reporters/allocation_reporter_test.py rename to tests/profilers/allocation_profiler_test.py index 96f9e60..6f20754 100644 --- a/tests/reporters/allocation_reporter_test.py +++ b/tests/profilers/allocation_profiler_test.py @@ -5,13 +5,13 @@ import threading import stackimpact -from stackimpact.runtime import min_version +from stackimpact.runtime import min_version, runtime_info -class AllocationReporterTestCase(unittest.TestCase): +class AllocationProfilerTestCase(unittest.TestCase): def test_record_allocation_profile(self): - if not min_version(3, 4): + if runtime_info.OS_WIN or not min_version(3, 4): return stackimpact._agent = None @@ -19,9 +19,12 @@ def test_record_allocation_profile(self): dashboard_address = 'http://localhost:5001', agent_key = 'key1', app_name = 'TestPythonApp', + auto_profiling = False, debug = True ) + agent.allocation_reporter.profiler.reset() + mem1 = [] def mem_leak(n = 100000): mem2 = [] @@ -43,7 +46,9 @@ def mem_leak5(): result = {} def record(): - agent.allocation_reporter.record(2) + agent.allocation_reporter.profiler.start_profiler() + time.sleep(2) + agent.allocation_reporter.profiler.stop_profiler() t = threading.Thread(target=record) t.start() @@ -53,9 +58,10 @@ def record(): t.join() - #print(agent.allocation_reporter.profile) + profile = agent.allocation_reporter.profiler.build_profile(2)[0]['profile'].to_dict() + #print(str(profile)) - self.assertTrue('allocation_reporter_test.py' in str(agent.allocation_reporter.profile)) + self.assertTrue('allocation_profiler_test.py' in str(profile)) agent.destroy() diff --git a/tests/reporters/block_reporter_test.py b/tests/profilers/block_profiler_test.py similarity index 76% rename from tests/reporters/block_reporter_test.py rename to tests/profilers/block_profiler_test.py index e348a73..e1cd8de 100644 --- a/tests/reporters/block_reporter_test.py +++ b/tests/profilers/block_profiler_test.py @@ -19,17 +19,22 @@ from test_server import TestServer -class BlockReporterTestCase(unittest.TestCase): - +class BlockProfilerTestCase(unittest.TestCase): def test_record_block_profile(self): + if runtime_info.OS_WIN: + return + stackimpact._agent = None agent = stackimpact.start( dashboard_address = 'http://localhost:5001', agent_key = 'key1', app_name = 'TestPythonApp', + auto_profiling = False, debug = True ) + agent.block_reporter.profiler.reset() + lock = threading.Lock() event = threading.Event() @@ -64,8 +69,9 @@ def url_wait(): result = {} def record(): - agent.frame_selector.add_http_frame_regexp(os.path.join('tests', 'test_server.py')) - agent.block_reporter.record(2) + agent.block_reporter.profiler.start_profiler() + time.sleep(2) + agent.block_reporter.profiler.stop_profiler() record_t = threading.Thread(target=record) record_t.start() @@ -95,13 +101,12 @@ def record(): record_t.join() - #print(agent.block_reporter.block_profile) - #print(agent.block_reporter.http_profile) + profile = agent.block_reporter.profiler.build_profile(2)[0]['profile'].to_dict() + #print(profile) - self.assertTrue('lock_wait' in str(agent.block_reporter.block_profile)) - self.assertTrue('event_wait' in str(agent.block_reporter.block_profile)) - self.assertTrue('url_wait' in str(agent.block_reporter.block_profile)) - self.assertTrue('handler' in str(agent.block_reporter.http_profile)) + self.assertTrue('lock_wait' in str(profile)) + self.assertTrue('event_wait' in str(profile)) + self.assertTrue('url_wait' in str(profile)) agent.destroy() diff --git a/tests/reporters/cpu_reporter_test.py b/tests/profilers/cpu_profiler_test.py similarity index 63% rename from tests/reporters/cpu_reporter_test.py rename to tests/profilers/cpu_profiler_test.py index 67f411b..1ea25b6 100644 --- a/tests/reporters/cpu_reporter_test.py +++ b/tests/profilers/cpu_profiler_test.py @@ -10,10 +10,10 @@ from stackimpact.runtime import runtime_info -class CPUReporterTestCase(unittest.TestCase): +class CPUProfilerTestCase(unittest.TestCase): def test_record_profile(self): - if not runtime_info.OS_LINUX and not runtime_info.OS_DARWIN: + if runtime_info.OS_WIN: return stackimpact._agent = None @@ -21,11 +21,17 @@ def test_record_profile(self): dashboard_address = 'http://localhost:5001', agent_key = 'key1', app_name = 'TestPythonApp', + auto_profiling = False, debug = True ) + agent.cpu_reporter.profiler.reset() + def record(): - agent.cpu_reporter.record(2) + agent.cpu_reporter.profiler.start_profiler() + time.sleep(2) + agent.cpu_reporter.profiler.stop_profiler() + record_t = threading.Thread(target=record) record_t.start() @@ -38,9 +44,10 @@ def cpu_work_main_thread(): record_t.join() - #print(agent.cpu_reporter.profile) + profile = agent.cpu_reporter.profiler.build_profile(2)[0]['profile'].to_dict() + #print(profile) - self.assertTrue('cpu_work_main_thread' in str(agent.cpu_reporter.profile)) + self.assertTrue('cpu_work_main_thread' in str(profile)) agent.destroy() diff --git a/tests/reporters/error_reporter_test.py b/tests/reporters/error_reporter_test.py index 6818d10..f8ec719 100644 --- a/tests/reporters/error_reporter_test.py +++ b/tests/reporters/error_reporter_test.py @@ -20,6 +20,7 @@ def test_add_exception(self): app_name = 'TestPythonApp', debug = True ) + agent.error_reporter.start() try: raise ValueError('test_exc_1') @@ -29,7 +30,7 @@ def test_add_exception(self): time.sleep(1.1) profile_handled_exc = agent.error_reporter.profile - print(profile_handled_exc) + #print(profile_handled_exc) self.assertTrue('ValueError: test_exc_1' in str(profile_handled_exc)) self.assertTrue('test_add_exception' in str(profile_handled_exc)) diff --git a/tests/reporters/process_reporter_test.py b/tests/reporters/process_reporter_test.py index a15fc72..4c1280d 100644 --- a/tests/reporters/process_reporter_test.py +++ b/tests/reporters/process_reporter_test.py @@ -18,6 +18,7 @@ def test_report(self): app_name = 'TestPythonApp', debug = True ) + agent.process_reporter.start() agent.process_reporter.report() time.sleep(0.1) @@ -25,10 +26,13 @@ def test_report(self): metrics = agent.process_reporter.metrics - self.is_valid(metrics, Metric.TYPE_COUNTER, Metric.CATEGORY_CPU, Metric.NAME_CPU_TIME, 0, float("inf")) - self.is_valid(metrics, Metric.TYPE_STATE, Metric.CATEGORY_CPU, Metric.NAME_CPU_USAGE, 0, float("inf")) + if not runtime_info.OS_WIN: + self.is_valid(metrics, Metric.TYPE_COUNTER, Metric.CATEGORY_CPU, Metric.NAME_CPU_TIME, 0, float("inf")) + self.is_valid(metrics, Metric.TYPE_STATE, Metric.CATEGORY_CPU, Metric.NAME_CPU_USAGE, 0, float("inf")) + + if not runtime_info.OS_WIN: + self.is_valid(metrics, Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_MAX_RSS, 0, float("inf")) - self.is_valid(metrics, Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_MAX_RSS, 0, float("inf")) if runtime_info.OS_LINUX: self.is_valid(metrics, Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_CURRENT_RSS, 0, float("inf")) self.is_valid(metrics, Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_VM_SIZE, 0, float("inf")) diff --git a/tests/reporters/profile_reporter_test.py b/tests/reporters/profile_reporter_test.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/reporters/span_reporter_test.py b/tests/reporters/span_reporter_test.py new file mode 100644 index 0000000..30c7afa --- /dev/null +++ b/tests/reporters/span_reporter_test.py @@ -0,0 +1,41 @@ + +import time +import unittest +import random +import threading +import sys +import traceback + +import stackimpact +from stackimpact.runtime import min_version + + +class SpanReporterTestCase(unittest.TestCase): + + def test_record_span(self): + stackimpact._agent = None + agent = stackimpact.start( + dashboard_address = 'http://localhost:5001', + agent_key = 'key1', + app_name = 'TestPythonApp', + debug = True + ) + agent.span_reporter.start() + + for i in range(10): + agent.span_reporter.record_span("span1", 10); + + span_counters = agent.span_reporter.span_counters; + agent.span_reporter.report(); + + counter = span_counters['span1'] + #print(counter) + + self.assertEqual(counter.name, 'span1') + self.assertEqual(counter.measurement, 10000) + + agent.destroy() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/runtime_test.py b/tests/runtime_test.py new file mode 100644 index 0000000..e97ee11 --- /dev/null +++ b/tests/runtime_test.py @@ -0,0 +1,47 @@ +import unittest +import signal +import os + +import stackimpact +from stackimpact.runtime import runtime_info, register_signal + + +class RuntimeTestCase(unittest.TestCase): + + def test_register_signal(self): + if runtime_info.OS_WIN: + return + + result = {'handler': 0} + + def _handler(signum, frame): + result['handler'] += 1 + + register_signal(signal.SIGUSR1, _handler) + + os.kill(os.getpid(), signal.SIGUSR1) + os.kill(os.getpid(), signal.SIGUSR1) + + signal.signal(signal.SIGUSR1, signal.SIG_DFL) + + self.assertEqual(result['handler'], 2) + + + '''def test_register_signal_default(self): + result = {'handler': 0} + + def _handler(signum, frame): + result['handler'] += 1 + + register_signal(signal.SIGUSR1, _handler, once = True) + + os.kill(os.getpid(), signal.SIGUSR1) + os.kill(os.getpid(), signal.SIGUSR1) + + self.assertEqual(result['handler'], 1)''' + + +if __name__ == '__main__': + unittest.main() + + diff --git a/tests/test.py b/tests/test.py deleted file mode 100644 index e071622..0000000 --- a/tests/test.py +++ /dev/null @@ -1,29 +0,0 @@ - -import threading -import collections -import signal - - - -class Sampler(object): - def __init__(self, interval=0.001): - self.stack_counts = collections.defaultdict(int) - self.interval = 0.001 - - def _sample(self, signum, frame): - stack = [] - while frame is not None: - formatted_frame = '{}({})'.format(frame.f_code.co_name, - frame.f_globals.get('__name__')) - stack.append(formatted_frame) - frame = frame.f_back - - formatted_stack = ';'.join(reversed(stack)) - self.stack_counts[formatted_stack] += 1 - signal.setitimer(signal.ITIMER_PROF, self.interval, 0) - - def start(self): - signal.signal(signal.SIGPROF, self._sample) - - - \ No newline at end of file diff --git a/tests/test_server.py b/tests/test_server.py index 5bd13dc..c9e260f 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -21,6 +21,8 @@ def __init__(self, port, delay = None, handler_func = None): self.port = port RequestHandler.delay = delay RequestHandler.handler_func = [handler_func] + RequestHandler.response_data = '{}' + RequestHandler.response_code = 200 threading.Thread.__init__(self) self.server = HTTPServer(('localhost', self.port), RequestHandler) @@ -30,6 +32,9 @@ def get_request_data(self): def set_response_data(self, response_data): RequestHandler.response_data = response_data + def set_response_code(self, response_code): + RequestHandler.response_code = response_code + def run(self): self.server.handle_request() @@ -39,6 +44,7 @@ class RequestHandler(BaseHTTPRequestHandler): handler_func = None request_data = None response_data = '{}' + response_code = 200 def do_GET(self): @@ -48,7 +54,7 @@ def do_GET(self): if RequestHandler.handler_func: RequestHandler.handler_func[0]() - self.send_response(200) + self.send_response(RequestHandler.response_code) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(RequestHandler.response_data.encode('utf-8')) @@ -64,7 +70,7 @@ def do_POST(self): decompressed_data = gzip.GzipFile(fileobj=BytesIO(self.rfile.read(content_len))).read() RequestHandler.request_data = decompressed_data.decode('utf-8') - self.send_response(200) + self.send_response(RequestHandler.response_code) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(RequestHandler.response_data.encode('utf-8'))