diff --git a/CHANGELOG.md b/CHANGELOG.md index d37308b..f0fb89d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,32 @@ # Changelog +## v2.5.5 + +#### Enhancements + +- Support for new RiskIQ Illuminate Vulnerability Intelligence API endpoints in core API library. +- New `cves` property of AttackSurface objects finds vulnerabilities impacting assets within that + attack surface. Works identically for the primary (your own) attack surface and third-party + attack surfaces. +- New `AttackSurfaceCVEs` record list to contain a list of `AttackSurfaceCVE` objects, with properties + to access the vulnerability report, RiskIQ priority score, and list of impacted assets. +- New `VulnArticle` object to provide details on a CVE and discover the list of third-party vendors + with assets impacted by the vuln. Custom views in the article's `to_dataframe()` method render + dataframes focused on article references, component detections, and third-party impacts. +- New helper method `analyzer.AttackSurface()` to directly load an attack surface. Works without params to load + the main attack surface, with an ID to load a third-party vendor attack surface by ID, or with a string + to find an attack surface by vendor name. +- Re-organized Illuminate-specific code in the `analyzer` module into distinct files located under a + subpackage. Existing imports in client code should not be impacted. + + +#### Pull Requests + +- Publishes pull request #38 "Remove ez_setup dependancy." + + + + ## v2.5.4 #### Enhancements diff --git a/examples/notebooks/Attack Surface & Vulnerability Intelligence - RiskIQ API.ipynb b/examples/notebooks/Attack Surface & Vulnerability Intelligence - RiskIQ API.ipynb new file mode 100644 index 0000000..17b7570 --- /dev/null +++ b/examples/notebooks/Attack Surface & Vulnerability Intelligence - RiskIQ API.ipynb @@ -0,0 +1,898 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# RiskIQ PassiveTotal Python Library" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "#### *Illuminate Attack Surface Intelligence (ASI)* including *Vulnerability Intel*" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "## Getting Started" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "This notebook leverages the RiskIQ Illuminate / PassiveTotal API through the `passivetotal` Python library. \n", + "\n", + "Documentation for the library, including how to install it and configure API keys, are available here:\n", + "https://passivetotal.readthedocs.io/en/latest/getting-started.html\n", + "\n", + "You will need API credentials to authenticate with the API server that provide access to the datasets queried in this notebook. Ask your RiskIQ contact for details or visit https://info.riskiq.net/ to contact the support team." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "### Optional Dependencies\n", + "\n", + "This notebook uses the `pandas` Python library primarily to improve the visual output of data tables retrieved from the API. You will need to install that library in your Python (virtual) environment (`pip install pandas`) or change the code examples to return a Python dictionary instead of a dataframe. Simply change `.as_df` to `.as_dict`.\n", + "\n", + "Note that some examples may use special features in `pandas` to filter or aggregate data, but these can also be implemented in pure Python." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "### Product Context\n", + "\n", + "https://www.riskiq.com/solutions/attack-surface-intelligence/" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "### Setup Notebook\n", + "*If this returns errors, ensure you have followed the Getting Started document linked above to install necessary dependencies and configure your API keys.*" + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "from passivetotal import analyzer\n", + "analyzer.init()" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "## Attack Surface Intelligence" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "### Your Attack Surface" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Define a variable to store your organization's attack surface" + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "my_asi = analyzer.AttackSurface()\n", + "my_asi" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The `my_asi` variable here now stores an instance of `AttackSurface` object. To learn what you can do with this object, place your cursor after the variable name, add a dot (.), and press the (tab) key. You'll see a menu of options. \n", + "\n", + "The complete list of properties is available in the [reference docs](\n", + "https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurface)." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "---\n", + "RiskIQ assesses your Attack Surface by analyzing a set of insights and testing whether the discovered assets in your Attack Surface are impacted by each insight. These impacted assets are listed as observations, and are grouped into three levels: high, medium, and low.\n", + "\n", + "To obtain the list of impacted assets, first enumerate the insights, either by a specific priority or across all priority levels. The most direct route is the `all_active_insights` property." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "my_asi.all_active_insights.as_dict" + ], + "outputs": [], + "metadata": { + "tags": [] + } + }, + { + "cell_type": "markdown", + "source": [ + "> This property is filtered to only the insights with observations, but the API provides all insights, even those without observations. To see them, use the `all_insights` property instead." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The `all_active_insights` property returns an object of type `AttackSurfaceInsights`. Complete details on the capability of this object are available [in the reference docs](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceInsights) and follow the same list of options available for most list-like Analyzer objects. \n", + "\n", + "To get started, loop through the `all_active_insights` property as if it was Python list. " + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for insight in my_asi.all_active_insights:\n", + " print(insight)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The `all_active_insights` property returns an object of type `AttackSurfaceInsight` which can be printed like a string, but also offers additional properties. Use tab-completion here in Jupyter on one insight or consult [the docs](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceInsight).\n", + "\n", + "For example, we can sort the high-priority insights by reverse order of observations, select the first insight in the list, and look at the observations for that insight." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "my_asi.high_priority_insights.sorted_by('observation_count', True)[0].observations" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Observations are of type `AttackSurfaceObservations` which is also list-like in it's behavior. Complete details are in the [reference docs](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceObservations) but again, the easiest way to start is to simply iterate the list." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for obs in my_asi.high_priority_insights.sorted_by('observation_count', True)[0].observations:\n", + " print(obs)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Each observation is of type `AttackSurfaceObservation` and when printed simply shows the asset name, although many more details are available in [other properties](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceObservation) including the dates when the observation was last seen." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "---\n", + "Consider using pandas DataFrames if you are working with ASI interactively in a notebook. Virtually every object offers an `as_df` property which is especially useful for lists." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "my_asi.high_priority_insights.as_df" + ], + "outputs": [], + "metadata": { + "tags": [] + } + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "my_asi.high_priority_insights.only_active_insights[0].observations.as_df" + ], + "outputs": [], + "metadata": { + "tags": [] + } + }, + { + "cell_type": "markdown", + "source": [ + "> Notice the use of `only_active_insights` here to filter the list of insights to only those with observations. If you skip this step you may get an API error when you query for observations if none are available for that insight." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "### Third-Party (Vendor) Attack Surfaces" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "#### Load all third-party ASIs" + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Define a variable to store all third-party attack surfaces and load them from the API." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "vendor_asi = analyzer.illuminate.AttackSurfaces.load()\n", + "vendor_asi" + ], + "outputs": [], + "metadata": { + "tags": [] + } + }, + { + "cell_type": "markdown", + "source": [ + "> The list of third-party vendors is defined in your account settings in consultation with your RiskIQ account team. There are no options to change the composition of the list in the API." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The object returned is of type `AttackSurfaces` - this can be treated as a list, filtered, or displayed in several ways. Full details are in the [reference docs](\n", + "https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaces).\n", + "\n", + "If you have a very large list of third-party vendors, the API will return the data one page at a time, but that will be handled automatically by the Python library.\n", + "\n", + "This will return a list of third-party vendors (associated with Third-Party Intelligence module) and other third-party metadata (attack surface id, name of the vendor, if the name of the organization is your own, if the attack surface is a third-party vendor, number of active high priority, medium priority, and low priority assets linked to insight detected observations. " + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "You can iterate through the list as with any Python list, or if you have `pandas` installed, use the `as_df` property to see a dataframe in your notebook." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "vendor_asi.as_df" + ], + "outputs": [], + "metadata": { + "tags": [] + } + }, + { + "cell_type": "markdown", + "source": [ + "___\n", + "#### Load a specific vendor\n", + "\n", + "To easily load an attack surface for a specific vendor, use the same shortcut method on the `analyzer` module you used to load your own attack surface, but supply a RiskIQ-assigned vendor ID instead." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "vendor_asi = analyzer.AttackSurface(51620)\n", + "vendor_asi" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Alternatively, pass a string to the `analyzer.AttackSurface()` method. This will load all the third-party Attack Surfaces for your account and perform a case-insensitive substring search on their names. You'll get an error message if no vendors match or if more than one vendor matched." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "analyzer.AttackSurface('tencent')" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "> Although this is a handy way to find an attack surface interactively, it is not recommended for use in automated scripts because of the extra round-trip to the API server to get the list of attack surfaces, the extra memory to contain them, and the extra CPU cycles to filter them. Instead, consider storing vendor ID's in your own system and iterating through them explicitly." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The unique identifier for a third-party vendor is available in the `id` property. Use that ID to load the attack surface directly." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "analyzer.AttackSurface('tencent').id" + ], + "outputs": [], + "metadata": { + "tags": [] + } + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_asi = analyzer.AttackSurface(71172)\n", + "focus_asi" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Once you load the third-party attack surface, you can interact with it exactly as you did for your own attack surface earlier in this notebook. Here, we display a vendor's attack surface as a dataframe to see a quick snapshot of meaningful insights." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_asi.as_df.T" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "> The `T` property of pandas dataframes rotates the table 90 degrees which improves formatting when you only have one row of data." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "We can return all active insights with the `all_active_insights` property." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_asi.all_active_insights.as_df" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "> Remember, we are using the `as_df` property to improve usability in a notebook context, but you can easily access the underlying objects, either by iterating through the `focus_asi.all_active_insights` property, or using the `as_dict` property instead of `as_df` to get the data as a regular Python dictionary." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Insights can be treated like strings to make printing them easier, but remember there are more fields available on each insight." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for insight in focus_asi.all_active_insights:\n", + " print(insight)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "---\n", + "Using simple string matching, we can search a vendor's attack surface for a specific insight." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for insight in focus_asi.all_active_insights:\n", + " if insight.name == 'ASI: REvil Ransomware Actors Exploit Kaseya VSA Software in Broad Supply Chain Attack':\n", + " for obs in insight.observations:\n", + " print (obs)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The `all_active_insights` property of an `AttackSurface` object offers a number of filtering options, including `filter_substring` that performs a case-insensitive match on any string field in the objects in that list. This is a property available on most `RecordList` type objects in the Analyzer." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for insight in focus_asi.all_active_insights.filter_substring(name='kaseya'):\n", + " for obs in insight.observations:\n", + " print(obs)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "We can apply the same technique to search across all vendor attack surfaces. Here, we iterate (loop through) the `vendor_asi` variable we stored earlier that contains all third-party attack surfces, and then store the length of the insight list that matches our keyword. " + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for vendor in vendor_asi:\n", + " kaseya_insights = len(vendor.all_active_insights.filter_substring(name='kaseya'))\n", + " print(vendor.name, kaseya_insights) " + ], + "outputs": [], + "metadata": { + "tags": [] + } + }, + { + "cell_type": "markdown", + "source": [ + "## Vulnerability Intelligence\n", + "\n", + "RiskIQ's Vulnerability Intelligence (Vuln Intel) provides a practical picture of vulnerability risk, focused on a specific Attack Surface (your own or a third-party vendor). " + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "---\n", + "### CVEs for your Attack Surface\n", + "\n", + "In the `analyzer` module, Vuln Intel is offered primarily through the `cves` property of an Attack Surface. " + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "analyzer.AttackSurface().cves" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The returned object is of type `AttackSurfaceCVEs` that can be iterated just like any other `analyzer` record list." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for cve in analyzer.AttackSurface().cves:\n", + " print(cve)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Consider using `pandas` dataframes for a friendlier view." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "analyzer.AttackSurface().cves.as_df" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "---\n", + "### CVEs for third-party vendors\n", + "\n", + "The `cves` property of AttackSurface objects also works with third-party (vendor) attack surfaces to discover vulnerabilites and impacted assets within other attack surfaces." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "analyzer.AttackSurface('rhythm').cves.as_df" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "---\n", + "### CVE Observations\n", + "\n", + "An Observation is a discovered asset (i.e. hostname or IP address) with in a specific Attack Surface that is impacted by a vulnerability. Access the `observations` property of a specific CVE to get the complete list." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_cve = (\n", + " analyzer\n", + " .AttackSurface()\n", + " .cves\n", + " .filter_fn(lambda c: c.score > 80 and c.observation_count > 50)\n", + " .sorted_by('score',reverse=True)[0]\n", + ")\n", + "focus_cve.as_df.T" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "> Here, we used the `filter_fn` method available on all analyzer RecordLists to apply a custom test to each CVE that filters only those with a score greater than 80 and more observations than 50. The `[0]` syntax gives us the first item on the list." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Obtain the list of impacted assets with the `observations` property of the CVE." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_cve.observations" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for obs in focus_cve.observations:\n", + " print(obs)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Each observation can be printed as a string, which directly accesses the identifier for the impacted asset (the IP address or the hostname, for example), but the `obs` object has more properties available. " + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_cve.observations[0].firstseen, focus_cve.observations[0].lastseen, focus_cve.observations[0].type" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "Or, view the entire list of observations as a dataframe." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_cve.observations.as_df" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "---\n", + "### Vulnerability Intelligence Articles\n", + "\n", + "Complete details on a vulnerability is available through the top-level `VulnArticle` objects. You can access them from the `article` property of a CVE or instantiate them directly if you already know the CVE identifier." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "#### Load a vuln article by ID" + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "article = analyzer.illuminate.VulnArticle.load('CVE-2021-23017')\n", + "print(article.description)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "article.as_df.T" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "#### Access a vuln article from a CVE" + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_article = focus_cve.article\n", + "focus_article.to_dataframe()" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "> The `to_dataframe()` method usually operates in the background when you access the `as_df` property on `analyzer` objects, but for some objects it provides additional functionality. We can use the `view` param to access other lists in the article." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_article.to_dataframe(view='references')" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_article.to_dataframe(view='components')" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_article.to_dataframe(view='impacts')" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "#### Impacted Assets\n", + "\n", + "One of the most powerful features of the Vulnerability Intelligence module is to quickly assess whether your attack surface is impacted by a vulnerability, and also whether any third-party (vendor) attack surfaces may be impacted. You can obtain this information from the `cves` property of a given attack surface, as described above, but you can also access it directly from an article. \n", + "\n", + "The article provides an `observation_count` and `observations` properties that are focused on your own attack surface." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_article.observation_count" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "len(focus_article.observations)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "> We are showing the length of the observations list, but of course you can also access the list directly." + ], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "To view which third-party vendors have assets impacted by the vulnerability, access the `attack_surfaces` property. These are returned as a list of `VulnArticleImpacts` that provide the vendor's name and the count of impacted assets." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_article.attack_surfaces" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "for vendor in focus_article.attack_surfaces:\n", + " print(vendor)" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "The list of impacted assets (observations) is available on a given vendor's `VulnArticleImpact` object in the `observation_count` and `observations` properties, just like with our own attack surface." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "impacted_vendor = focus_article.attack_surfaces.filter_substring(vendor_name='union')[0]\n", + "impacted_vendor.observation_count" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "impacted_vendor.observations.as_df" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "markdown", + "source": [ + "#### Access related OSINT Articles\n", + "\n", + "Many CVEs are related to open-source threat intelligence articles collected and published by RiskIQ analysts. These articles are available through the `analyzer.AllArticles` object, but you can also obtain related articles directly from a Vulnerability Article." + ], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "focus_article.osint_articles.as_df" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [], + "outputs": [], + "metadata": {} + } + ], + "metadata": { + "kernelspec": { + "name": "python3", + "display_name": "Python 3.8.2 64-bit ('venv': venv)" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.2" + }, + "interpreter": { + "hash": "48a94fd9c3faed58921e8eeafd6133f2080fb9f3fec1af84aa81ec152ca66bb0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/examples/notebooks/Attack Surface Intelligence (ASI) - RiskIQ API.ipynb b/examples/notebooks/Attack Surface Intelligence (ASI) - RiskIQ API.ipynb deleted file mode 100644 index 2b2d9df..0000000 --- a/examples/notebooks/Attack Surface Intelligence (ASI) - RiskIQ API.ipynb +++ /dev/null @@ -1,518 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "d19be504-6baf-4e12-9726-7386b8906c5b", - "metadata": {}, - "source": [ - "# RiskIQ PassiveTotal Python Library" - ] - }, - { - "cell_type": "markdown", - "id": "875d1455-8044-48f8-882a-c0846e23711a", - "metadata": {}, - "source": [ - "#### *Illuminate Attack Surface Intelligence (ASI)*" - ] - }, - { - "cell_type": "markdown", - "id": "5f58e227-2051-45b7-9739-43de8b2aa2b5", - "metadata": {}, - "source": [ - "## Getting Started" - ] - }, - { - "cell_type": "markdown", - "id": "f347eda2-d21e-4371-ac96-b59a211b9cd4", - "metadata": {}, - "source": [ - "This notebook leverages the RiskIQ Illuminate / PassiveTotal API through the `passivetotal` Python library. \n", - "\n", - "Documentation for the library, including how to install it and configure API keys, are available here:\n", - "https://passivetotal.readthedocs.io/en/latest/getting-started.html\n", - "\n", - "You will need API credentials to authenticate with the API server that provide access to the datasets queried in this notebook. Ask your RiskIQ contact for details or visit https://info.riskiq.net/ to contact the support team." - ] - }, - { - "cell_type": "markdown", - "id": "f9640d09-3008-4328-9ab8-5320c9e3abeb", - "metadata": {}, - "source": [ - "### Optional Dependencies\n", - "\n", - "This notebook uses the `pandas` Python library primarily to improve the visual output of data tables retrieved from the API. You will need to install that library in your Python (virtual) environment (`pip install pandas`) or change the code examples to return a Python dictionary instead of a dataframe. Simply change `.as_df` to `.as_dict`.\n", - "\n", - "Note that some examples may use special features in `pandas` to filter or aggregate data, but these can also be implemented in pure Python." - ] - }, - { - "cell_type": "markdown", - "id": "a8abf502-701e-4bba-98fd-b874662e5a55", - "metadata": {}, - "source": [ - "### Product Context\n", - "\n", - "https://www.riskiq.com/solutions/attack-surface-intelligence/" - ] - }, - { - "cell_type": "markdown", - "id": "615231b1-6a0c-4e27-9c02-2e6d44b0ea2d", - "metadata": {}, - "source": [ - "### Setup Notebook\n", - "*If this returns errors, ensure you have followed the Getting Started document linked above to install necessary dependencies and configure your API keys.*" - ] - }, - { - "cell_type": "code", - "execution_count": 15, - "id": "9ac4f5fa-d2e1-4f7a-b212-ade74eaa2c04", - "metadata": {}, - "outputs": [], - "source": [ - "from passivetotal import analyzer\n", - "analyzer.init()" - ] - }, - { - "cell_type": "markdown", - "id": "9dcadca5-9476-41c2-882a-ddf4ceba7b77", - "metadata": {}, - "source": [ - "## Attack Surface Intelligence" - ] - }, - { - "cell_type": "markdown", - "id": "ea95a33f-b343-42a9-97d3-08715f5eb9c2", - "metadata": {}, - "source": [ - "### Your Attack Surface" - ] - }, - { - "cell_type": "markdown", - "id": "answering-pipeline", - "metadata": {}, - "source": [ - "Define a variable to store your organization's attack surface" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "483b4055-2172-45fe-a225-6234fd2b1c12", - "metadata": {}, - "outputs": [], - "source": [ - "my_asi = analyzer.illuminate.AttackSurface.load()\n", - "my_asi" - ] - }, - { - "cell_type": "markdown", - "id": "e672583e-0d9e-4201-b7f0-cdd3bc49ca05", - "metadata": {}, - "source": [ - "The `my_asi` variable here now stores an instance of `AttackSurface` object. To learn what you can do with this object, place your cursor after the variable name, add a dot (.), and press the (tab) key. You'll see a menu of options. \n", - "\n", - "The complete list of properties is available in the [reference docs](\n", - "https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurface)." - ] - }, - { - "cell_type": "markdown", - "id": "controlling-tyler", - "metadata": {}, - "source": [ - "---\n", - "RiskIQ assesses your Attack Surface by analyzing a set of insights and testing whether the discovered assets in your Attack Surface are impacted by each insight. These impacted assets are listed as observations, and are grouped into three levels: high, medium, and low.\n", - "\n", - "To obtain the list of impacted assets, first enumerate the insights, either by a specific priority or across all priority levels. The most direct route is the `all_active_insights` property." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8578db6f-d959-4e80-b7ec-65ca41bca162", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "my_asi.all_active_insights.as_dict" - ] - }, - { - "cell_type": "markdown", - "id": "c9222dc9-f4ed-4aef-8d24-0ca93643e421", - "metadata": {}, - "source": [ - "> This property is filtered to only the insights with observations, but the API provides all insights, even those without observations. To see them, use the `all_insights` property instead." - ] - }, - { - "cell_type": "markdown", - "id": "3f95e252-c309-432d-b552-9f2fb95f1260", - "metadata": {}, - "source": [ - "The `all_active_insights` property returns an object of type `AttackSurfaceInsights`. Complete details on the capability of this object are available [in the reference docs](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceInsights) and follow the same list of options available for most list-like Analyzer objects. \n", - "\n", - "To get started, loop through the `all_active_insights` property as if it was Python list. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "compound-boating", - "metadata": {}, - "outputs": [], - "source": [ - "for insight in my_asi.all_active_insights:\n", - " print(insight)" - ] - }, - { - "cell_type": "markdown", - "id": "unknown-collaboration", - "metadata": {}, - "source": [ - "The `all_active_insights` property returns an object of type `AttackSurfaceInsight` which can be printed like a string, but also offers additional properties. Use tab-completion here in Jupyter on one insight or consult [the docs](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceInsight).\n", - "\n", - "For example, we can sort the high-priority insights by reverse order of observations, select the first insight in the list, and look at the observations for that insight." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e733df5c-42e8-4c73-9844-0cbb418694a3", - "metadata": {}, - "outputs": [], - "source": [ - "my_asi.high_priority_insights.sorted_by('observation_count', True)[0].observations" - ] - }, - { - "cell_type": "markdown", - "id": "4d728531-0674-48df-9a4e-dfad9af0f4cf", - "metadata": {}, - "source": [ - "Observations are of type `AttackSurfaceObservations` which is also list-like in it's behavior. Complete details are in the [reference docs](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceObservations) but again, the easiest way to start is to simply iterate the list." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f52775e9-b4d2-4bba-ad15-b02a175e7808", - "metadata": {}, - "outputs": [], - "source": [ - "for obs in my_asi.high_priority_insights.sorted_by('observation_count', True)[0].observations:\n", - " print(obs)" - ] - }, - { - "cell_type": "markdown", - "id": "92d7de53-80f0-4755-bed6-070c0edf5681", - "metadata": {}, - "source": [ - "Each observation is of type `AttackSurfaceObservation` and when printed simply shows the asset name, although many more details are available in [other properties](https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaceObservation) including the dates when the observation was last seen." - ] - }, - { - "cell_type": "markdown", - "id": "7ebabc93-3e2c-4c28-a4df-16f11c2bff51", - "metadata": {}, - "source": [ - "---\n", - "Consider using pandas DataFrames if you are working with ASI interactively in a notebook. Virtually every object offers an `as_df` property which is especially useful for lists." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "32bc23fd-0b2c-46e2-af96-76b2827ab172", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "my_asi.high_priority_insights.as_df" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4290fb9e-d75f-48ea-ac49-2c8a310bf3b3", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "my_asi.high_priority_insights.only_active_insights[0].observations.as_df" - ] - }, - { - "cell_type": "markdown", - "id": "ead2a49a-4e5b-46b0-9c7b-a8eceaf0ffae", - "metadata": {}, - "source": [ - "> Notice the use of `only_active_insights` here to filter the list of insights to only those with observations. If you skip this step you may get an API error when you query for observations if none are available for that insight." - ] - }, - { - "cell_type": "markdown", - "id": "6d654480-7dce-402d-b07c-675736718052", - "metadata": {}, - "source": [ - "### Third-Party (Vendor) Attack Surfaces" - ] - }, - { - "cell_type": "markdown", - "id": "light-microphone", - "metadata": {}, - "source": [ - "Define a variable to store all third-party attack surfaces and load them from the API." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4829d40f-26b7-4341-9ec7-cdc1d5075fa4", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "vendor_asi = analyzer.illuminate.AttackSurfaces.load()\n", - "vendor_asi" - ] - }, - { - "cell_type": "markdown", - "id": "0db89ca3-b0ef-4c3f-bdd9-75dff91b8ad2", - "metadata": {}, - "source": [ - "> The list of third-party vendors is defined in your account settings in consultation with your RiskIQ account team. There are no options to change the composition of the list in the API." - ] - }, - { - "cell_type": "markdown", - "id": "perceived-mumbai", - "metadata": {}, - "source": [ - "The object returned is of type `AttackSurfaces` - this can be treated as a list, filtered, or displayed in several ways. Full details are in the [reference docs](\n", - "https://passivetotal.readthedocs.io/en/latest/illuminate.html#passivetotal.analyzer.illuminate.AttackSurfaces).\n", - "\n", - "If you have a very large list of third-party vendors, the API will return the data one page at a time, but that will be handled automatically by the Python library.\n", - "\n", - "This will return a list of third-party vendors (associated with Third-Party Intelligence module) and other third-party metadata (attack surface id, name of the vendor, if the name of the organization is your own, if the attack surface is a third-party vendor, number of active high priority, medium priority, and low priority assets linked to insight detected observations. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "250b0676-8c24-4091-8adc-c3f529afd6f8", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "vendor_asi.as_df" - ] - }, - { - "cell_type": "markdown", - "id": "pressed-tablet", - "metadata": {}, - "source": [ - "---\n", - "There are several ways to filter this list to focus on a specific vendor, especially once you determine the asi_id that RiskIQ applies to it. Here, we use features from the `pandas` data library to filter the pandas DataFrame to include only those records with a name that matches a specific vendor. Note this search is case-sensistive." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4c325c9b-4515-432d-8c6c-bc91f2404f40", - "metadata": {}, - "outputs": [], - "source": [ - "vendor_asi.as_df[vendor_asi.as_df['name'].str.contains('Rhythmic')]" - ] - }, - { - "cell_type": "markdown", - "id": "stuck-training", - "metadata": {}, - "source": [ - "Once we know the vendor's Attack Surfce ID we can load it by number." - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "16dd8b4c-03f0-4ca7-a155-062a9b801d78", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "rhythmic_asi = analyzer.illuminate.AttackSurface.load(553865)" - ] - }, - { - "cell_type": "markdown", - "id": "stylish-foster", - "metadata": {}, - "source": [ - "This object behaves the same as the attack surface we retrieved for our own attack surface earlier in this notebook." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "91cc74e0-614e-48e7-bd28-1ff9d69bf797", - "metadata": {}, - "outputs": [], - "source": [ - "rhythmic_asi.as_df.T" - ] - }, - { - "cell_type": "markdown", - "id": "b8f98b6c-a6ce-467d-b4b3-ded75a8b412a", - "metadata": {}, - "source": [ - "> The `T` property of pandas dataframes rotates the table 90 degrees which improves formatting when you only have one row of data." - ] - }, - { - "cell_type": "markdown", - "id": "political-liabilities", - "metadata": {}, - "source": [ - "We can return all active insights with the `all_active_insights` property." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "510aaf20-3c1d-4f28-b1bc-065d452db1ba", - "metadata": {}, - "outputs": [], - "source": [ - "rhythmic_asi.all_active_insights.as_df" - ] - }, - { - "cell_type": "markdown", - "id": "presidential-problem", - "metadata": {}, - "source": [ - "Insights can be treated like strings to make printing them easier, but remember there are more fields available on each insight." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "middle-cigarette", - "metadata": {}, - "outputs": [], - "source": [ - "for insight in rhythmic_asi.all_active_insights:\n", - " print(insight)" - ] - }, - { - "cell_type": "markdown", - "id": "addressed-lodging", - "metadata": {}, - "source": [ - "---\n", - "Using simple string matching, we can search a vendor's attack surface for a specific insight." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "clinical-calcium", - "metadata": {}, - "outputs": [], - "source": [ - "for insight in rhythmic_asi.all_active_insights:\n", - " if insight.name == 'ASI: REvil Ransomware Actors Exploit Kaseya VSA Software in Broad Supply Chain Attack':\n", - " for obs in insight.observations:\n", - " print (obs)" - ] - }, - { - "cell_type": "markdown", - "id": "corporate-matthew", - "metadata": {}, - "source": [ - "The `all_active_insights` property of an `AttackSurface` object offers a number of filtering options, including `filter_substring` that performs a case-insensitive match on any string field in the objects in that list. This is a property available on most `RecordList` type objects in the Analyzer." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "92fdb11e-4c73-43b7-88a4-5272bbccbadb", - "metadata": {}, - "outputs": [], - "source": [ - "for insight in rhythmic_asi.all_active_insights.filter_substring(name='kaseya'):\n", - " for obs in insight.observations:\n", - " print(obs)" - ] - }, - { - "cell_type": "markdown", - "id": "standard-lingerie", - "metadata": {}, - "source": [ - "We can apply the same technique to search across all vendor attack surfaces. Here, we iterate (loop through) the `vendor_asi` variable we stored earlier that contains all third-party attack surfces, and then store the length of the insight list that matches our keyword. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "partial-measurement", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "for vendor in vendor_asi:\n", - " kaseya_insights = len(vendor.all_active_insights.filter_substring(name='kaseya'))\n", - " print(vendor.name, kaseya_insights) " - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "ptlib_dev", - "language": "python", - "name": "passivetotal_dev" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.2" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/passivetotal/analyzer/__init__.py b/passivetotal/analyzer/__init__.py index 8439f50..843ff1d 100644 --- a/passivetotal/analyzer/__init__.py +++ b/passivetotal/analyzer/__init__.py @@ -231,6 +231,22 @@ def set_project(name_or_guid, visibility='analyst', description='', tags=None, c else: raise AnalyzerError('More than one project found; narrow the search criteria or use a unique name') +def AttackSurface(id_or_name=None): + """Find one attack surface. + + Call with no parameters to find your Attack Surface. + Pass a number to load a specific Attack Surface by ID, or pass a string to load the entire + list of Attack Surfaces and search them by case-insensitive substring. + + Raises `AnalyzerError` if no attack surfaces are found or if more than one Attack Surface name + matches the parameter. + + :returns: :class:`passivetotal.illuminate.AttackSurface` + """ + from passivetotal.analyzer.illuminate import AttackSurface as ASI + return ASI.find(id_or_name) + + from passivetotal.analyzer.hostname import Hostname from passivetotal.analyzer.ip import IPAddress diff --git a/passivetotal/analyzer/articles.py b/passivetotal/analyzer/articles.py index d8d3eb6..20db5e9 100644 --- a/passivetotal/analyzer/articles.py +++ b/passivetotal/analyzer/articles.py @@ -34,6 +34,18 @@ def parse(self, api_response): for article in api_response.get('articles', []): self._records.append(Article(article, self._query)) + @staticmethod + def find(query): + """Query the Articles API endpoint and find articles that match the search term. + + :rtype: :class:`passivetotal.analyzer.articles.ArticlesList` + """ + articles = ArticlesList() + articles._query = query + response = get_api('Articles').get_articles(query=query) + articles.parse(response) + return articles + def filter_tags(self, tags): """Filtered article list that includes articles with an exact match to one or more tags. diff --git a/passivetotal/analyzer/illuminate/__init__.py b/passivetotal/analyzer/illuminate/__init__.py new file mode 100644 index 0000000..62527cb --- /dev/null +++ b/passivetotal/analyzer/illuminate/__init__.py @@ -0,0 +1,15 @@ +from collections import OrderedDict, namedtuple +from functools import lru_cache, partial +from passivetotal.analyzer import get_api, get_object +from passivetotal.analyzer._common import ( + AsDictionary, ForPandas, RecordList, Record, FirstLastSeen, + PagedRecordList, AnalyzerAPIError, AnalyzerError +) + +from .reputation import ReputationScore, HasReputation +from .cti import IntelProfile, IntelProfiles, HasIntelProfiles +from .asi import AttackSurface, AttackSurfaces +from .vuln import AttackSurfaceCVEs, AttackSurfaceComponents, VulnArticle + + + \ No newline at end of file diff --git a/passivetotal/analyzer/illuminate.py b/passivetotal/analyzer/illuminate/asi.py similarity index 50% rename from passivetotal/analyzer/illuminate.py rename to passivetotal/analyzer/illuminate/asi.py index 1d9383e..30328fc 100644 --- a/passivetotal/analyzer/illuminate.py +++ b/passivetotal/analyzer/illuminate/asi.py @@ -1,515 +1,18 @@ -from collections import OrderedDict, namedtuple -from functools import lru_cache, total_ordering, partial +from collections import OrderedDict +from functools import lru_cache, partial + from urllib.parse import parse_qsl + from passivetotal.analyzer import get_api, get_object from passivetotal.analyzer._common import ( - AsDictionary, ForPandas, RecordList, Record, FirstLastSeen, - PagedRecordList, AnalyzerAPIError, AnalyzerError + Record, RecordList, PagedRecordList, FirstLastSeen, + ForPandas, AnalyzerError ) -INDICATOR_PAGE_SIZE = 200 - - -@total_ordering -class ReputationScore(AsDictionary, ForPandas): - - """RiskIQ Illuminate Reputation profile for a hostname or an IP.""" - - def __init__(self, api_response, query=None): - self._response = api_response - self._query = query - - def __str__(self): - return '{0.score} ({0.classification})'.format(self) - - def __repr__(self): - return ''.format(self) - - def __int__(self): - return self.score - - def __gt__(self, other): - return self.score > other - - def __eq__(self, other): - return self.score == other - - def to_dataframe(self, explode_rules=False, drop_links=False): - """Render this object as a Pandas DataFrame. - - :param explode_rules: Whether to create a row for each rule using `pandas.DataFrame.explode` (optional, defaults to False) - :param drop_links: Whether to include links when present in exploded rules (optional, defaults to False) - :rtype: :class:`pandas.DataFrame` - """ - pd = self._get_pandas() - as_d = OrderedDict( - query = self._query, - score = self.score, - classification = self.classification, - rules = self.rules - ) - df = pd.DataFrame([as_d]) - if not explode_rules: - return df - df_rules = df.explode('rules', ignore_index=True) - df_wide = pd.concat([df_rules.drop('rules', axis='columns'), df_rules['rules'].apply(pd.Series)], axis='columns') - if drop_links: - return df_wide.drop('link', axis='columns') - return df_wide - - - @property - def as_dict(self): - """Representation as a dictionary object.""" - return { - 'score': self.score, - 'classification': self.classification, - 'rules': self.rules, - } - - @property - def score(self): - """Reputation score as an integer ranging from 0-100. - - Higher values indicate a greater likelihood of maliciousness. - """ - return self._response.get('score') - - @property - def classification(self): - """Reputation classification as a string. - - Typical values include GOOD, SUSPICIOUS, MALICIOUS, or UNKNOWN. - """ - return self._response.get('classification') - - @property - def rules(self): - """List of rules that informed the reputation score. - - Returns a list of dictionaries. - """ - return self._response.get('rules') - - -class HasReputation: +INDICATOR_PAGE_SIZE = 400 - """An object with a RiskIQ Illuminate Reputation score.""" - - def _api_get_reputation(self): - """Query the reputation endpoint.""" - query=self.get_host_identifier() - response = get_api('Illuminate').get_reputation(query=query) - self._reputation = ReputationScore(response, query) - return self._reputation - - @property - def reputation(self): - """RiskIQ Illuminate Reputation profile for a hostname or IP. - - :rtype: :class:`passivetotal.analyzer.illuminate.ReputationScore` - """ - if getattr(self, '_reputation', None) is not None: - return self._reputation - return self._api_get_reputation() - - - -class IntelProfiles(RecordList, ForPandas): - - """List of RiskIQ Intel Profiles from the Illuminate CTI module.""" - - def __getitem__(self, key): - if isinstance(key, str): - filtered = self.filter(id=key) - if len(filtered) != 1: - raise KeyError('No profile found for id {}'.format(key)) - return filtered[0] - return super().__getitem__(key) - - def _get_shallow_copy_fields(self): - return ['_totalrecords'] - - def _get_sortable_fields(self): - return ['id','title'] - - def _get_dict_fields(self): - return ['totalrecords'] - - @staticmethod - def load(query=None, profile_type=None): - """Get a list of all RiskIQ Intel Profiles. - - :param query: Submit a query param to the API to limit results to only matching providers (optional) - :param profile_type: Submit a type param to the API to limit results to only certain profile types (optional) - """ - response = get_api('Illuminate').get_intel_profiles(query=query, type=profile_type) - return IntelProfiles(response) - - @staticmethod - def find_by_indicator(query, **kwargs): - """Search profiles by indicator. - - :param query: Indicator value as a string - :param types: Types of indicators (optional) - :param categories: Categories of indicators (optional) - :param sources: Sources of indicators [riskiq, osint] (optional) - """ - try: - response = get_api('Illuminate').get_intel_profiles_for_indicator(query, **kwargs) - except AnalyzerAPIError as e: - if e.status_code == 404: - return IntelProfiles() - else: - raise e - return IntelProfiles(response) - - def parse(self, api_response): - """Parse an API response.""" - self._totalrecords = api_response.get('totalCount', 0) - self._records = [] - for result in api_response.get('results', []): - self._records.append(IntelProfile(id=result.get('id'),api_response=result)) - - def to_dataframe(self, ignore_index=False, **kwargs): - """Render this object as a Pandas dataframe.""" - pd = self._get_pandas() - return pd.concat([ r.to_dataframe(**kwargs) for r in self], ignore_index=ignore_index) - - @property - def totalrecords(self): - """Total number of profiles available in this record list.""" - return self._totalrecords - - - -class IntelProfile(Record, ForPandas): - - """RiskIQ Intel Profile on a specific actor group.""" - - _instances = {} - - ProfileTag = namedtuple('ProfileTag','label,country') - - def __new__(cls, id=None, api_response=None): - if id is not None: - self = cls._instances.get(id) - if self is not None: - return self - self = cls._instances[id] = object.__new__(cls) - self._id = id - self._has_details = False - if api_response is not None: - self._parse(api_response) - return self - - def __str__(self): - return self.title - - def __repr__(self): - return ''.format(self.id) - - def _ensure_details(self): - """Ensure details are loaded from the API for this profile.""" - if not getattr(self, '_has_details', False): - response = get_api('Illuminate').get_intel_profile_details(self._id) - self._parse(response) - - def _get_dict_fields(self): - return ['id','title','indicatorcount_osint','indicatorcount_riskiq', - 'tags_raw','title'] - - def _parse(self, api_response): - """Parse an API response into object properties.""" - self._id = api_response.get('id') - self._title = api_response.get('title') - self._link = api_response.get('link') - self._ioccount_osint = api_response.get('osintIndicatorsCount') - self._ioccount_riskiq = api_response.get('riskIqIndicatorsCount') - self._api_link_indicators = api_response.get('indicators') - self._aliases = api_response.get('aliases') - self._tags = api_response.get('tags') - self._has_details = True - - @lru_cache(maxsize=None) - def get_indicators(self, all_pages=True, types=None, categories=None, sources=None, pagesize=INDICATOR_PAGE_SIZE): - """Get a list of indicators associated with this intel profile. - - Loads all pages of indicators by default. Results with identical params are cached. - - :param all_pages: Whether to retrieve all pages (optional, defaults to True) - :param types: Types of indicators to search for (optional). - :param categories: Categories of indicators to filter on (optional). - :param sources: Sources of indicators [osint, riskiq] (optional). - :param pagesize: Size of pages to return from the API (defaults to `INDICATOR_PAGE_SIZE`). - - :rypte: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` - """ - iocs = IntelProfileIndicatorList( - profile_id=self._id, - types=types, - categories=categories, - sources=sources, - pagesize=pagesize - ) - iocs.load_all_pages() - return iocs - - def to_dataframe(self): - """Render this profile as a Pandas DataFrame. - - :rtype: :class:`pandas.DataFrame` - """ - pd = self._get_pandas() - cols = ['id','title','indicatorcount_osint','indicatorcount_riskiq','aliases','tags'] - as_d = { - f: getattr(self, f) for f in cols - } - return pd.DataFrame.from_records([as_d], index='id', columns=cols) - - @property - def aliases(self): - """List of alternative names for this actor group.""" - self._ensure_details() - return self._aliases - - @property - def id(self): - """RiskIQ identifier for this actor group.""" - return self._id - - @property - def indicatorcount_osint(self): - """Count of available indicators from open source intelligence reports.""" - self._ensure_details() - return self._ioccount_osint - - @property - def indicatorcount_riskiq(self): - """Count of available indicators sourced from RiskIQ primary research.""" - self._ensure_details() - return self._ioccount_riskiq - - @property - def indicators(self): - """Unfiltered indicator list associated with this intel profile. - - Calls `passivetotal.analyzer.illuminate.IntelProfile.get_indicators()' - with default parameters. Use that method directly for more granular control. - - :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` - """ - return self.get_indicators() - - @property - def tags(self): - """List of profile tags associated with this actor group. - - :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfile.ProfileTag` - """ - self._ensure_details() - return [ self.ProfileTag(t['label'], t['countryCode']) for t in self._tags ] - - @property - def tags_raw(self): - """List of profile tags as returned by the API.""" - self._ensure_details() - return self._tags - - @property - def title(self): - """RiskIQ title for this actor profile.""" - self._ensure_details() - return self._title - - - -class IntelProfileIndicatorList(RecordList, PagedRecordList, ForPandas): - - def __init__(self, profile_id=None, query=None, types=None, categories=None, sources=None, pagesize=INDICATOR_PAGE_SIZE): - """List of indicators associated with a RiskIQ Intel Profile. - - :param profile_id: Threat intel profile ID to search for. - :param query: Indicator value to query for. - :param types: Types of indicators to search for (optional). - :param categories: Categories of indicators to filter on (optional). - :param sources: Sources of indicators [osint, riskiq] (optiona). - """ - self._totalrecords = None - self._types = [] - self._pagination_current_page = 0 - self._pagination_page_size = pagesize - self._pagination_has_more = True - self._records = [] - self._profile_id = profile_id - self._pagination_callable = partial( - get_api('Illuminate').get_intel_profile_indicators, - self._profile_id, - query=query, - types=types, - categories=categories, - sources=sources, - size=pagesize - ) - - def _get_shallow_copy_fields(self): - return ['_totalrecords','_pagination_current_page','_pagination_page_size', - '_types', '_pagination_callable', '_pagination_has_more', '_profile_id'] - - def _get_sortable_fields(self): - return ['id','firstseen','lastseen','type','category','value','is_osint','profile_id'] - - def _pagination_parse_page(self, api_response): - """Parse a page of API response data.""" - self._totalrecords = api_response.get('totalCount') - self._types = api_response.get('types') - if self._pagination_current_page == 0: - self._records = [] - for result in api_response.get('results',[]): - self._records.append(IntelProfileIndicator(result)) - - def to_dataframe(self, ignore_index=False, **kwargs): - """Render this object as a Pandas dataframe.""" - pd = self._get_pandas() - return pd.concat([ r.to_dataframe(**kwargs) for r in self], ignore_index=ignore_index) - - @property - def only_osint(self): - """Filtered list with only indicators from open sources. - - :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` - """ - return self.filter(is_osint=True) - - @property - def only_riskiq(self): - """Filtered list with only indicators sourced by RiskIQ. - - :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` - """ - return self.filter(is_osint=False) - - @property - def types(self): - """List of indicator types in the list.""" - return self._types - - @property - def values(self): - """List of all values in the indicator list.""" - return [ i.value for i in self ] - - - -class IntelProfileIndicator(Record, FirstLastSeen, ForPandas): - - """An indicator associated with an intel profile.""" - - def __init__(self, api_response): - self._id = api_response.get('id') - self._firstseen = api_response.get('firstSeen') - self._lastseen = api_response.get('lastSeen') - self._profile_id = api_response.get('profileId') - self._type = api_response.get('type') - self._value = api_response.get('value') - self._category = api_response.get('category') - self._osint = api_response.get('osint') - self._link = api_response.get('osintUrl') - self._articleguids = api_response.get('articleGuids') - - def __repr__(self): - return ''.format(self._id) - - def __str__(self): - return self._value - - def _get_dict_fields(self): - return ['id','str:firstseen','str:lastseen','profile_id','type', - 'value','category','is_osint','osint_link','articleguids'] - - def to_dataframe(self): - """Render this object as a Pandas DataFrame. - - :rtype: :class:`pandas.DataFrame` - """ - pd = self._get_pandas() - cols = ['id','value','type','category','firstseen','lastseen', - 'profile_id','is_osint','osint_link','articleguids'] - as_d = { - f: getattr(self, f) for f in cols - } - df = pd.DataFrame.from_records([as_d], index='id', columns=cols) - return df - - @property - def id(self): - """RiskIQ identifier for this indicator.""" - return self._id - - @property - def profile_id(self): - """RiskIQ identifier for the intel profile associated with this indicator.""" - return self._profile_id - - @property - def intel_profile(self): - """RiskIQ threat intel profile associated with this indicator. - - :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfile` - """ - return IntelProfile(id=self._profile_id) - - @property - def type(self): - """Type of indicator.""" - return self._type - - @property - def value(self): - """Value of the indicator.""" - return self._value - - @property - def category(self): - """Indicator category.""" - return self._category - - @property - def is_osint(self): - """Whether this indicator was published in open source intelligence articles.""" - return self._osint - - @property - def osint_link(self): - """URL for the OSINT source of the indicator, or none if this is not an OSINT indicator.""" - return self._link - - @property - def articleguids(self): - """List of RiskIQ OSINT article GUIDs associated with this indicator.""" - return self._articleguids - - - -class HasIntelProfiles: - - """An object that may be listed in threat intel profiles.""" - - @property - def intel_profiles(self): - """List of RiskIQ Threat Intel Profiles that reference this host. - - For more granular searches, call the - `passivetotal.analyzer.illuminate.IntelProfiles.find_by_indicators()` method directly. - - :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfiles` - """ - if getattr(self, '_intel_profiles', None) is not None: - return self._intel_profiles - self._intel_profiles = IntelProfiles.find_by_indicator(self.get_host_identifier()) - return self._intel_profiles class AttackSurfaces(RecordList, PagedRecordList, ForPandas): @@ -559,6 +62,7 @@ def _pagination_parse_page(self, api_response): self._records.append(AttackSurface(api_response=result)) @staticmethod + @lru_cache def load(pagesize=INDICATOR_PAGE_SIZE): """Get a list of all third-party (vendor) attack surfaces authorized for this API account. @@ -622,6 +126,32 @@ def load(id=None): response = get_api('Illuminate').get_asi_3p_vendor_summary(id) response['own'] = False return AttackSurface(api_response=response) + + @staticmethod + @lru_cache(typed=True, maxsize=10) + def find(id_or_name=None): + """Find one attack surface. + + Call with no parameters to find your Attack Surface. + Pass a number to load a specific Attack Surface by ID, or pass a string to load the entire + list of Attack Surfaces and search them by case-insensitive substring. + + Raises `AnalyzerError` if no attack surfaces are found or if more than one Attack Surface name + matches the parameter. + + :returns: :class:`passivetotal.illuminate.AttackSurface` + """ + if id_or_name is None: + return AttackSurface.load() + if isinstance(id_or_name, int): + return AttackSurface.load(id_or_name) + all_asi = AttackSurfaces.load() + filtered_asi = all_asi.filter_substring(name=id_or_name) + if len(filtered_asi) == 0: + raise AnalyzerError('No attack surfaces found that match that name.') + if len(filtered_asi) > 1: + raise AnalyzerError('More than one attack surface was found - try a more specific name') + return filtered_asi[0] def get_insights(self, level): """Get insights at a level (high, medium or low). @@ -644,6 +174,26 @@ def get_observation_count(self, level): self._ensure_valid_level(level) return self._priorities_raw[level]['observationCount'] + def get_cves(self, pagesize=INDICATOR_PAGE_SIZE): + """Get a list of CVEs impacting assets in this attack surface. + + :param pagesize: Size of pages to retrieve from the API. + """ + from passivetotal.analyzer.illuminate import AttackSurfaceCVEs + self._cves = AttackSurfaceCVEs(self, pagesize) + self._cves.load_all_pages() + return self._cves + + def get_components(self, pagesize=INDICATOR_PAGE_SIZE): + """Get a list of vulnerable components (detections) in this attack surface. + + :param pagesize: Size of pages to retrieve from the API. + """ + from passivetotal.analyzer.illuminate import AttackSurfaceComponents + self._components = AttackSurfaceComponents(self, pagesize) + self._components.load_all_pages() + return self._components + def to_dataframe(self): """Render this object as a Pandas DataFrame. @@ -735,7 +285,23 @@ def low_priority_insights(self): :rtype: List of :class:`passivetotal.analyzer.illuminate.AttackSurfaceInsights` """ return self.get_insights('low') + + @property + def cves(self): + """Get a list of CVEs associated with this attack surface. + + :rtype: List of :class:`passivetotal.analyzer.illuminate.AttackSurfaceCVE` + """ + if getattr(self, '_cves', None) is not None: + return self._cves + return self.get_cves() + @property + def components(self): + """List of components (detections) vulnerable to this CVE in this attack surface.""" + if getattr(self, '_components', None) is not None: + return self._components + return self.get_components() class AttackSurfaceInsights(RecordList, ForPandas): @@ -1039,7 +605,4 @@ def ip(self): try: return get_object(self._name, 'IPAddress') except AnalyzerError: - return None - - - + return None \ No newline at end of file diff --git a/passivetotal/analyzer/illuminate/cti.py b/passivetotal/analyzer/illuminate/cti.py new file mode 100644 index 0000000..de469f5 --- /dev/null +++ b/passivetotal/analyzer/illuminate/cti.py @@ -0,0 +1,428 @@ +from collections import namedtuple +from functools import lru_cache, partial + + +from passivetotal.analyzer import get_api +from passivetotal.analyzer._common import ( + Record, RecordList, PagedRecordList, FirstLastSeen, + ForPandas, AnalyzerError, AnalyzerAPIError +) + + + +INDICATOR_PAGE_SIZE = 400 + + + +class IntelProfiles(RecordList, ForPandas): + + """List of RiskIQ Intel Profiles from the Illuminate CTI module.""" + + def __getitem__(self, key): + if isinstance(key, str): + filtered = self.filter(id=key) + if len(filtered) != 1: + raise KeyError('No profile found for id {}'.format(key)) + return filtered[0] + return super().__getitem__(key) + + def _get_shallow_copy_fields(self): + return ['_totalrecords'] + + def _get_sortable_fields(self): + return ['id','title'] + + def _get_dict_fields(self): + return ['totalrecords'] + + @staticmethod + def load(query=None, profile_type=None): + """Get a list of all RiskIQ Intel Profiles. + + :param query: Submit a query param to the API to limit results to only matching providers (optional) + :param profile_type: Submit a type param to the API to limit results to only certain profile types (optional) + """ + response = get_api('Illuminate').get_intel_profiles(query=query, type=profile_type) + return IntelProfiles(response) + + @staticmethod + def find_by_indicator(query, **kwargs): + """Search profiles by indicator. + + :param query: Indicator value as a string + :param types: Types of indicators (optional) + :param categories: Categories of indicators (optional) + :param sources: Sources of indicators [riskiq, osint] (optional) + """ + try: + response = get_api('Illuminate').get_intel_profiles_for_indicator(query, **kwargs) + except AnalyzerAPIError as e: + if e.status_code == 404: + return IntelProfiles() + else: + raise e + return IntelProfiles(response) + + def parse(self, api_response): + """Parse an API response.""" + self._totalrecords = api_response.get('totalCount', 0) + self._records = [] + for result in api_response.get('results', []): + self._records.append(IntelProfile(id=result.get('id'),api_response=result)) + + def to_dataframe(self, ignore_index=False, **kwargs): + """Render this object as a Pandas dataframe.""" + pd = self._get_pandas() + return pd.concat([ r.to_dataframe(**kwargs) for r in self], ignore_index=ignore_index) + + @property + def totalrecords(self): + """Total number of profiles available in this record list.""" + return self._totalrecords + + + +class IntelProfile(Record, ForPandas): + + """RiskIQ Intel Profile on a specific actor group.""" + + _instances = {} + + ProfileTag = namedtuple('ProfileTag','label,country') + + def __new__(cls, id=None, api_response=None): + if id is not None: + self = cls._instances.get(id) + if self is not None: + return self + self = cls._instances[id] = object.__new__(cls) + self._id = id + self._has_details = False + if api_response is not None: + self._parse(api_response) + return self + + def __str__(self): + return self.title + + def __repr__(self): + return ''.format(self.id) + + def _ensure_details(self): + """Ensure details are loaded from the API for this profile.""" + if not getattr(self, '_has_details', False): + response = get_api('Illuminate').get_intel_profile_details(self._id) + self._parse(response) + + def _get_dict_fields(self): + return ['id','title','indicatorcount_osint','indicatorcount_riskiq', + 'tags_raw','title'] + + def _parse(self, api_response): + """Parse an API response into object properties.""" + self._id = api_response.get('id') + self._title = api_response.get('title') + self._link = api_response.get('link') + self._ioccount_osint = api_response.get('osintIndicatorsCount') + self._ioccount_riskiq = api_response.get('riskIqIndicatorsCount') + self._api_link_indicators = api_response.get('indicators') + self._aliases = api_response.get('aliases') + self._tags = api_response.get('tags') + self._has_details = True + + @staticmethod + def load(profile_id): + """Load an intel profile by the RiskIQ-assigned identifier string. + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfile` + """ + profile = IntelProfile(profile_id) + if profile._has_details: + return profile + try: + response = get_api('Illuminate').get_intel_profile_details(profile_id) + except AnalyzerAPIError as e: + if e.status_code==404: + raise AnalyzerError(f'Cannot locate intel profile with id "{profile_id}"') + else: + raise e + profile = IntelProfile(profile_id, response) + return profile + + @lru_cache(maxsize=None) + def get_indicators(self, all_pages=True, types=None, categories=None, sources=None, pagesize=INDICATOR_PAGE_SIZE): + """Get a list of indicators associated with this intel profile. + + Loads all pages of indicators by default. Results with identical params are cached. + + :param all_pages: Whether to retrieve all pages (optional, defaults to True) + :param types: Types of indicators to search for (optional). + :param categories: Categories of indicators to filter on (optional). + :param sources: Sources of indicators [osint, riskiq] (optional). + :param pagesize: Size of pages to return from the API (defaults to `INDICATOR_PAGE_SIZE`). + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` + """ + iocs = IntelProfileIndicatorList( + profile_id=self._id, + types=types, + categories=categories, + sources=sources, + pagesize=pagesize + ) + iocs.load_all_pages() + return iocs + + def to_dataframe(self): + """Render this profile as a Pandas DataFrame. + + :rtype: :class:`pandas.DataFrame` + """ + pd = self._get_pandas() + cols = ['id','title','indicatorcount_osint','indicatorcount_riskiq','aliases','tags'] + as_d = { + f: getattr(self, f) for f in cols + } + return pd.DataFrame.from_records([as_d], index='id', columns=cols) + + @property + def aliases(self): + """List of alternative names for this actor group.""" + self._ensure_details() + return self._aliases + + @property + def id(self): + """RiskIQ identifier for this actor group.""" + return self._id + + @property + def indicatorcount_osint(self): + """Count of available indicators from open source intelligence reports.""" + self._ensure_details() + return self._ioccount_osint + + @property + def indicatorcount_riskiq(self): + """Count of available indicators sourced from RiskIQ primary research.""" + self._ensure_details() + return self._ioccount_riskiq + + @property + def indicators(self): + """Unfiltered indicator list associated with this intel profile. + + Calls `passivetotal.analyzer.illuminate.IntelProfile.get_indicators()' + with default parameters. Use that method directly for more granular control. + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` + """ + return self.get_indicators() + + @property + def tags(self): + """List of profile tags associated with this actor group. + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfile.ProfileTag` + """ + self._ensure_details() + return [ self.ProfileTag(t['label'], t['countryCode']) for t in self._tags ] + + @property + def tags_raw(self): + """List of profile tags as returned by the API.""" + self._ensure_details() + return self._tags + + @property + def title(self): + """RiskIQ title for this actor profile.""" + self._ensure_details() + return self._title + + + +class IntelProfileIndicatorList(RecordList, PagedRecordList, ForPandas): + + def __init__(self, profile_id=None, query=None, types=None, categories=None, sources=None, pagesize=INDICATOR_PAGE_SIZE): + """List of indicators associated with a RiskIQ Intel Profile. + + :param profile_id: Threat intel profile ID to search for. + :param query: Indicator value to query for. + :param types: Types of indicators to search for (optional). + :param categories: Categories of indicators to filter on (optional). + :param sources: Sources of indicators [osint, riskiq] (optiona). + """ + self._totalrecords = None + self._types = [] + self._pagination_current_page = 0 + self._pagination_page_size = pagesize + self._pagination_has_more = True + self._records = [] + self._profile_id = profile_id + self._pagination_callable = partial( + get_api('Illuminate').get_intel_profile_indicators, + self._profile_id, + query=query, + types=types, + categories=categories, + sources=sources, + size=pagesize + ) + + def _get_shallow_copy_fields(self): + return ['_totalrecords','_pagination_current_page','_pagination_page_size', + '_types', '_pagination_callable', '_pagination_has_more', '_profile_id'] + + def _get_sortable_fields(self): + return ['id','firstseen','lastseen','type','category','value','is_osint','profile_id'] + + def _pagination_parse_page(self, api_response): + """Parse a page of API response data.""" + self._totalrecords = api_response.get('totalCount') + self._types = api_response.get('types') + if self._pagination_current_page == 0: + self._records = [] + for result in api_response.get('results',[]): + self._records.append(IntelProfileIndicator(result)) + + def to_dataframe(self, ignore_index=False, **kwargs): + """Render this object as a Pandas dataframe.""" + pd = self._get_pandas() + return pd.concat([ r.to_dataframe(**kwargs) for r in self], ignore_index=ignore_index) + + @property + def only_osint(self): + """Filtered list with only indicators from open sources. + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` + """ + return self.filter(is_osint=True) + + @property + def only_riskiq(self): + """Filtered list with only indicators sourced by RiskIQ. + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfileIndicatorList` + """ + return self.filter(is_osint=False) + + @property + def types(self): + """List of indicator types in the list.""" + return self._types + + @property + def values(self): + """List of all values in the indicator list.""" + return [ i.value for i in self ] + + + +class IntelProfileIndicator(Record, FirstLastSeen, ForPandas): + + """An indicator associated with an intel profile.""" + + def __init__(self, api_response): + self._id = api_response.get('id') + self._firstseen = api_response.get('firstSeen') + self._lastseen = api_response.get('lastSeen') + self._profile_id = api_response.get('profileId') + self._type = api_response.get('type') + self._value = api_response.get('value') + self._category = api_response.get('category') + self._osint = api_response.get('osint') + self._link = api_response.get('osintUrl') + self._articleguids = api_response.get('articleGuids') + + def __repr__(self): + return ''.format(self._id) + + def __str__(self): + return self._value + + def _get_dict_fields(self): + return ['id','str:firstseen','str:lastseen','profile_id','type', + 'value','category','is_osint','osint_link','articleguids'] + + def to_dataframe(self): + """Render this object as a Pandas DataFrame. + + :rtype: :class:`pandas.DataFrame` + """ + pd = self._get_pandas() + cols = ['id','value','type','category','firstseen','lastseen', + 'profile_id','is_osint','osint_link','articleguids'] + as_d = { + f: getattr(self, f) for f in cols + } + df = pd.DataFrame.from_records([as_d], index='id', columns=cols) + return df + + @property + def id(self): + """RiskIQ identifier for this indicator.""" + return self._id + + @property + def profile_id(self): + """RiskIQ identifier for the intel profile associated with this indicator.""" + return self._profile_id + + @property + def intel_profile(self): + """RiskIQ threat intel profile associated with this indicator. + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfile` + """ + return IntelProfile(id=self._profile_id) + + @property + def type(self): + """Type of indicator.""" + return self._type + + @property + def value(self): + """Value of the indicator.""" + return self._value + + @property + def category(self): + """Indicator category.""" + return self._category + + @property + def is_osint(self): + """Whether this indicator was published in open source intelligence articles.""" + return self._osint + + @property + def osint_link(self): + """URL for the OSINT source of the indicator, or none if this is not an OSINT indicator.""" + return self._link + + @property + def articleguids(self): + """List of RiskIQ OSINT article GUIDs associated with this indicator.""" + return self._articleguids + + + +class HasIntelProfiles: + + """An object that may be listed in threat intel profiles.""" + + @property + def intel_profiles(self): + """List of RiskIQ Threat Intel Profiles that reference this host. + + For more granular searches, call the + `passivetotal.analyzer.illuminate.IntelProfiles.find_by_indicators()` method directly. + + :rtype: :class:`passivetotal.analyzer.illuminate.IntelProfiles` + """ + if getattr(self, '_intel_profiles', None) is not None: + return self._intel_profiles + self._intel_profiles = IntelProfiles.find_by_indicator(self.get_host_identifier()) + return self._intel_profiles \ No newline at end of file diff --git a/passivetotal/analyzer/illuminate/reputation.py b/passivetotal/analyzer/illuminate/reputation.py new file mode 100644 index 0000000..96731ca --- /dev/null +++ b/passivetotal/analyzer/illuminate/reputation.py @@ -0,0 +1,111 @@ +from collections import OrderedDict +from functools import total_ordering + +from passivetotal.analyzer import get_api +from passivetotal.analyzer._common import AsDictionary, ForPandas + + + +@total_ordering +class ReputationScore(AsDictionary, ForPandas): + + """RiskIQ Illuminate Reputation profile for a hostname or an IP.""" + + def __init__(self, api_response, query=None): + self._response = api_response + self._query = query + + def __str__(self): + return '{0.score} ({0.classification})'.format(self) + + def __repr__(self): + return ''.format(self) + + def __int__(self): + return self.score + + def __gt__(self, other): + return self.score > other + + def __eq__(self, other): + return self.score == other + + def to_dataframe(self, explode_rules=False, drop_links=False): + """Render this object as a Pandas DataFrame. + + :param explode_rules: Whether to create a row for each rule using `pandas.DataFrame.explode` (optional, defaults to False) + :param drop_links: Whether to include links when present in exploded rules (optional, defaults to False) + :rtype: :class:`pandas.DataFrame` + """ + pd = self._get_pandas() + as_d = OrderedDict( + query = self._query, + score = self.score, + classification = self.classification, + rules = self.rules + ) + df = pd.DataFrame([as_d]) + if not explode_rules: + return df + df_rules = df.explode('rules', ignore_index=True) + df_wide = pd.concat([df_rules.drop('rules', axis='columns'), df_rules['rules'].apply(pd.Series)], axis='columns') + if drop_links: + return df_wide.drop('link', axis='columns') + return df_wide + + + @property + def as_dict(self): + """Representation as a dictionary object.""" + return { + 'score': self.score, + 'classification': self.classification, + 'rules': self.rules, + } + + @property + def score(self): + """Reputation score as an integer ranging from 0-100. + + Higher values indicate a greater likelihood of maliciousness. + """ + return self._response.get('score') + + @property + def classification(self): + """Reputation classification as a string. + + Typical values include GOOD, SUSPICIOUS, MALICIOUS, or UNKNOWN. + """ + return self._response.get('classification') + + @property + def rules(self): + """List of rules that informed the reputation score. + + Returns a list of dictionaries. + """ + return self._response.get('rules') + + + +class HasReputation: + + """An object with a RiskIQ Illuminate Reputation score.""" + + def _api_get_reputation(self): + """Query the reputation endpoint.""" + query=self.get_host_identifier() + response = get_api('Illuminate').get_reputation(query=query) + self._reputation = ReputationScore(response, query) + return self._reputation + + @property + def reputation(self): + """RiskIQ Illuminate Reputation profile for a hostname or IP. + + :rtype: :class:`passivetotal.analyzer.illuminate.ReputationScore` + """ + if getattr(self, '_reputation', None) is not None: + return self._reputation + return self._api_get_reputation() \ No newline at end of file diff --git a/passivetotal/analyzer/illuminate/vuln.py b/passivetotal/analyzer/illuminate/vuln.py new file mode 100644 index 0000000..87dfcd9 --- /dev/null +++ b/passivetotal/analyzer/illuminate/vuln.py @@ -0,0 +1,776 @@ +from datetime import datetime +from functools import partial, lru_cache + + +from passivetotal.analyzer import get_api +from passivetotal.analyzer._common import ( + Record, RecordList, PagedRecordList, FirstLastSeen, ForPandas, AnalyzerError +) + + + +INDICATOR_PAGE_SIZE = 400 + + + +class AttackSurfaceCVEs(RecordList, PagedRecordList, ForPandas): + + """List of CVEs associated with an attack surface.""" + + def __init__(self, attack_surface=None, pagesize=400): + self._totalrecords = None + self._attack_surface = attack_surface + self._records = [] + self._pagination_current_page = 0 + self._pagination_page_size = pagesize + self._pagination_has_more = True + if attack_surface is not None: + self.attack_surface = attack_surface + + def _get_shallow_copy_fields(self): + return ['_totalrecords','_pagination_current_page','_pagination_page_size', + 'attack_surface', '_pagination_callable', '_pagination_has_more'] + + def _get_sortable_fields(self): + return ['id','score','observation_count'] + + def _pagination_parse_page(self, api_response): + """Parse a page of API response data.""" + self._totalrecords = api_response.get('totalCount') + if self._pagination_current_page == 0: + self._records = [] + for result in api_response.get('cves',[]): + self._records.append(AttackSurfaceCVE(self._attack_surface, result)) + + @property + def attack_surface(self): + """Get the Illuminate Attack Surface associated with this list of CVEs. + + :rtype: :class:`passivetotal.analyzer.illuminate.AttackSurface` + """ + return self._attack_surface + + @attack_surface.setter # internal API - necessary for shallow copy operations + def attack_surface(self, attack_surface): + self._attack_surface = attack_surface + if attack_surface.is_own: + self._pagination_callable = partial( + get_api('Illuminate').get_asi_vuln_cves + ) + else: + self._pagination_callable = partial( + get_api('Illuminate').get_asi_3p_vuln_cves, + attack_surface.id + ) + + + +class AttackSurfaceCVE(Record, ForPandas): + + """A record of a CVE associated with an asset in an attack surface.""" + + def __init__(self, attack_surface, api_response={}): + self._attack_surface = attack_surface + self._cve_id = api_response.get('cveId') + self._score = api_response.get('priorityScore') + self._count_observations = api_response.get('observationCount') + self._link = api_response.get('cveLink') + self._cwes = api_response.get('cwes',[]) + + def __str__(self): + return '{}'.format(self._cve_id) + + def __repr__(self): + return ''.format(self) + + def _get_dict_fields(self): + return ['id','score','observation_count','cwes'] + + def to_dataframe(self): + """Render this object as a Pandas dataframe. + + :rtype: :class:`pandas.DataFrame` + """ + pd = self._get_pandas() + cols = ['attack_surface', 'cve_id','score','observations','cwes','first_cwe'] + as_d = { + 'attack_surface': self.attack_surface.name, + 'cve_id': self.id, + 'score': self.score, + 'observations': self.observation_count, + 'cwes': len(self.cwes), + 'first_cwe': self.cwes[0]['cweId'], + } + return pd.DataFrame.from_records([as_d], columns=cols) + + def get_observations(self, pagesize=INDICATOR_PAGE_SIZE): + """Get a list of observations(assets) vulnerable to this CVE in this attack surface. + + :param pagesize: Size of pages to retrieve from the API. + """ + self._observations = AttackSurfaceCVEObservations(self, pagesize) + self._observations.load_all_pages() + return self._observations + + @property + def article(self): + """CVE article with complete details on this vulnerability. + + :rtype: :class:`passivetotal.analyzer.illuminate.vuln.VulnArticle` + """ + return VulnArticle.load(self._cve_id) + + @property + def attack_surface(self): + """Attack surface this CVE is associated with. + + :rtype: :class:`passivetotal.analyzer.illuminate.AttackSurface` + """ + return self._attack_surface + + @property + def description(self): + """Description of the CVE, retrieved from the vulnerability article associated with this CVE.""" + return self.article.description + + @property + def publish_date(self): + """Publication date of the vulnerability article associated with this CVE.""" + return self.article.date_published + + @property + def cwes(self): + """List of CWE IDs for this CVE.""" + return self._cwes + + @property + def id(self): + """CVE identifier (alias for `cve_id`).""" + return self._cve_id + + @property + def cve_id(self): + """CVE identifier.""" + return self._cve_id + + @property + def link(self): + """API link to get CVE article. + + Consider using the `article` property to access the article directly.""" + return self._link + + @property + def score(self): + """RiskIQ priority score for this CVE.""" + return self._score + + @property + def observation_count(self): + """Number of observations (assets) in this attack surface impacted by this vulnerabilty.""" + return self._count_observations + + @property + def observations(self): + """List of observations (assets) in this attack surface vulnerable to this CVE.""" + if getattr(self, '_observations', None) is not None: + return self._observations + return self.get_observations() + + + + +class AttackSurfaceCVEObservations(RecordList, PagedRecordList, ForPandas): + + """List of observations (assets) associated with a CVE in a specific attack surface.""" + + def __init__(self, cve=None, pagesize=400): + self._totalrecords = None + self._cve = cve + self._records = [] + self._pagination_current_page = 0 + self._pagination_page_size = pagesize + self._pagination_has_more = True + if cve is not None: + self.cve = cve + + def _get_shallow_copy_fields(self): + return ['_totalrecords','_pagination_current_page','_pagination_page_size', + 'cve', '_pagination_callable', '_pagination_has_more'] + + def _get_sortable_fields(self): + return ['type','name','firstseen','lastseen'] + + def _pagination_parse_page(self, api_response): + """Parse a page of API response data.""" + self._totalrecords = api_response.get('totalCount') + if self._pagination_current_page == 0: + self._records = [] + for result in api_response.get('assets',[]): + self._records.append(AttackSurfaceCVEObservation(self._cve, result)) + + @property + def cve(self): + """Get the CVE associated with this list of observations. + + :rtype: :class:`passivetotal.analyzer.illuminate.AttackSurfaceCVE` + """ + return self._cve + + @cve.setter # internal API - necessary for shallow copy operations + def cve(self, cve): + self._cve = cve + if cve.attack_surface.is_own: + self._pagination_callable = partial( + get_api('Illuminate').get_asi_vuln_cve_observations, + cve.id + ) + else: + self._pagination_callable = partial( + get_api('Illuminate').get_asi_3p_vuln_cve_observations, + cve.attack_surface.id, + cve.id + ) + + + +class AttackSurfaceCVEObservation(Record, FirstLastSeen, ForPandas): + + """An observation (asset) vulnerable to a specific CVE in a given attack surface.""" + + def __init__(self, cve, api_response={}): + self._cve = cve + self._type = api_response.get('type') + self._name = api_response.get('name') + self._firstseen = api_response.get('firstSeen') + self._lastseen = api_response.get('lastSeen') + + def __str__(self): + return '{}'.format(self._name) + + def __repr__(self): + return ''.format(self) + + def _get_dict_fields(self): + return ['cve_id','type','name','str:firstseen','str:lastseen'] + + def to_dataframe(self): + """Render this object as a Pandas dataframe. + + :rtype: :class:`pandas.DataFrame` + """ + pd = self._get_pandas() + cols = ['attack_surface','cve_id','type','name','firstseen','lastseen'] + as_d = { + 'attack_surface': self.attack_surface.name, + 'cve_id': self.cve.id, + 'type': self.type, + 'name': self.name, + 'firstseen': self.firstseen, + 'lastseen': self.lastseen, + } + return pd.DataFrame.from_records([as_d], columns=cols) + + @property + def attack_surface(self): + """Attack surface this observation is associated with. + + :rtype: :class:`passivetotal.analyzer.illuminate.AttackSurface` + """ + return self.cve.attack_surface + + @property + def cve(self): + """CVE this observation is vulnerable to, in the context of a specific attack surface. + + :rtype: :class:`passivetotal.analyzer.illuminate.AttackSurfaceCVE` + """ + return self._cve + + @property + def cve_id(self): + """RiskIQ identifier for the CVE this observation is vulnerable to.""" + return self.cve.id + + @property + def type(self): + """Type of this observation (asset).""" + return self._type + + @property + def name(self): + """Name of this observation (asset).""" + return self._name + + + +class AttackSurfaceComponents(RecordList, PagedRecordList, ForPandas): + + """List of vulnerable components (detections) associated with an attack surface.""" + + def __init__(self, attack_surface=None, pagesize=400): + self._totalrecords = None + self._attack_surface = attack_surface + self._records = [] + self._pagination_current_page = 0 + self._pagination_page_size = pagesize + self._pagination_has_more = True + if attack_surface is not None: + self.attack_surface = attack_surface + + def _get_shallow_copy_fields(self): + return ['_totalrecords','_pagination_current_page','_pagination_page_size', + 'attack_surface', '_pagination_callable', '_pagination_has_more'] + + def _get_sortable_fields(self): + return ['type','name','severity','count'] + + def _pagination_parse_page(self, api_response): + """Parse a page of API response data.""" + self._totalrecords = api_response.get('totalCount') + if self._pagination_current_page == 0: + self._records = [] + for result in api_response.get('vulnerableComponents',[]): + self._records.append(AttackSurfaceComponent(self._attack_surface, result)) + + @property + def attack_surface(self): + """Get the CVE associated with this list of observations. + + :rtype: :class:`passivetotal.analyzer.illuminate.AttackSurface` + """ + return self._attack_surface + + @attack_surface.setter # internal API - necessary for shallow copy operations + def attack_surface(self, attack_surface): + self._attack_surface = attack_surface + if attack_surface.is_own: + self._pagination_callable = partial( + get_api('Illuminate').get_asi_vuln_components + ) + else: + self._pagination_callable = partial( + get_api('Illuminate').get_asi_3p_vuln_components, + attack_surface.id + ) + + + +class AttackSurfaceComponent(Record, FirstLastSeen, ForPandas): + + """A vulnerable component (detection) observed in an attack surface.""" + + def __init__(self, attack_surface, api_response={}): + self._attack_surface = attack_surface + self._type = api_response.get('type') + self._name = api_response.get('name') + self._severity = api_response.get('severity') + self._count = api_response.get('count') + + def __str__(self): + return '{}'.format(self._name) + + def __repr__(self): + return ''.format(self) + + def _get_dict_fields(self): + return ['vendor_id','type','name','severity','count'] + + def to_dataframe(self): + """Render this object as a Pandas dataframe. + + :rtype: :class:`pandas.DataFrame` + """ + pd = self._get_pandas() + cols = ['attack_surface','type','name','severity','count'] + as_d = { + 'attack_surface': self.attack_surface.name, + 'type': self.type, + 'name': self.name, + 'severity': self.severity, + 'count': self.count, + } + return pd.DataFrame.from_records([as_d], columns=cols) + + @property + def attack_surface(self): + """Attack surface this component is associated with. + + :rtype: :class:`passivetotal.analyzer.illuminate.AttackSurface` + """ + return self._attack_surface + + @property + def type(self): + """Type of this component (detection).""" + return self._type + + @property + def name(self): + """Name of this component (detection).""" + return self._name + + @property + def severity(self): + """Severity of this detection.""" + return self._severity + + @property + def count(self): + """Count.""" + return self._count + + + +class VulnArticle(Record, ForPandas): + + """Vulnerabilty report providing details on impacted assets and third-party vendors.""" + + _instances = {} + + def __new__(cls, id=None, api_response=None): + if id is None and api_response is not None and 'cveInfo' in api_response and 'cveId' in api_response['cveInfo']: + id = api_response['cveInfo'].get('cveId') + if id is not None: + self = cls._instances.get(id) + if self is not None: + return self + self = cls._instances[id] = object.__new__(cls) + self._is_loaded = False + if api_response is not None: + self._parse(api_response) + return self + + def __repr__(self): + return ''.format(self._id) + + def __str__(self): + try: + cwe_list = '({})'.format(','.join(self.cwes)) + except Exception: + cwe_list = '' + return '[{0.id}] {0.description}{1}'.format(self, cwe_list) + + def _parse(self, api_response): + self._id = api_response['cveInfo']['cveId'] + self._description = api_response['cveInfo']['description'] + self._cwes = api_response['cveInfo']['cwes'] + self._priority_score = api_response['cveInfo']['priorityScore'] + self._cvss2_score = api_response['cveInfo']['cvss2Score'] + self._cvss3_score = api_response['cveInfo']['cvss3Score'] + self._date_published = api_response['cveInfo']['datePublished'] + self._date_created = api_response['cveInfo']['dateCreated'] + self._date_publisher_updated = api_response['cveInfo']['datePublisherUpdate'] + self._references = api_response['cveInfo']['references'] + self._components = api_response['components'] + self._impacted3p = api_response['impactedThirdParties'] + self._observation_count = api_response['observationCount'] + self._link = api_response['articlesLink'] + self._is_loaded = True + + def _api_get_article(self, id): + response = get_api('Illuminate').get_vuln_article(id) + self._parse(response) + + def _get_dict_fields(self): + return ['id','description','cwes','score','cvss2score','cvss3score','str:date_published', + 'str:date_updated','str:date_publisher_updated','references','components', + 'observation_count'] + + def to_dataframe(self, view='info'): + """Render this object as a Pandas dataframe. + + :param view: View to generate (info, references, components, or impacts) + + :rtype: :class:`pandas.DataFrame` + """ + views = ['info','references','components','impacts'] + if view not in views: + raise AnalyzerError('view must be one of {}'.format(views)) + pd = self._get_pandas() + cols = { + 'info': ['cve_id','description','score','cvss2score','cvss3score','date_published','date_created', + 'date_pubupdate','observations','references','components','impacts'], + 'references': ['cve_id','reference_url','reference_name'], + 'components': ['cve_id','component'], + 'impacts': ['cve_id', 'vendor_name','vendor_id','observation_count'] + } + records = { + 'info': [{ + 'cve_id': self.id, + 'description': self.description, + 'score': self.score, + 'cvss2score': self.cvss2score, + 'cvss3score': self.cvss3score, + 'date_published': self.date_published, + 'date_created': self.date_created, + 'date_pubupdate': self.date_publisher_updated, + 'observations': self.observation_count, + 'references': len(self.references), + 'components': len(self.components), + 'impacts': len(self._impacted3p) + }], + 'references': [{ + 'cve_id': self.id, + 'reference_url': r['url'], + 'reference_name': r['name'] + } for r in self.references ], + 'components': [{ + 'cve_id': self.id, + 'component': c['name'] + } for c in self.components ], + 'impacts': [{ + 'cve_id': self.id, + 'vendor_name': i.vendor_name, + 'vendor_id': i.vendor_id, + 'observation_count': i.observation_count + } for i in self.attack_surfaces ] + } + return pd.DataFrame.from_records(records[view], index='cve_id', columns=cols[view]) + + @staticmethod + def load(id): + """Load a Vulnerability Article by ID. + + :rtype: :class:`VulnArticle` + """ + article = VulnArticle(id) + if not article._is_loaded: + article._api_get_article(id) + return article + + @property + def id(self): + """CVE identifier string (alias for cve_id).""" + return self._id + + @property + def cve_id(self): + """CVE identifier string.""" + return self._id + + @property + def osint_articles(self): + """Get a list of RiskIQ open-source intelligence articles that reference this vulnerability. + + :rtype: :class:`passivetotal.analyzer.articles.ArticlesList` + """ + from passivetotal.analyzer.articles import ArticlesList + return ArticlesList.find(self.id) + + @property + def description(self): + """Narrative description of the CVE.""" + return self._description + + @property + def cwes(self): + """List of CWE IDs.""" + return self._cwes + + @property + def score(self): + """RiskIQ-assigned priority score for this vulnerability, ranging between 0 and 100.""" + return self._priority_score + + @property + def cvss2score(self): + """The CVSS2 score for this vulnerability.""" + return self._cvss2_score + + @property + def cvss3score(self): + """The CVSSS3 score for this vulnerability.""" + return self._cvss3_score + + @property + def date_published(self): + """The date the article was published.""" + return datetime.fromisoformat(self._date_published) + + @property + def date_published_raw(self): + """The raw (string) value returned from the API with the date the article was published.""" + return self._date_published + + @property + def date_created(self): + """The date the article was created.""" + return datetime.fromisoformat(self._date_created) + + @property + def date_created_raw(self): + """The raw (string) value returned from the API with the date the article was created.""" + return self._date_created + + @property + def date_publisher_updated(self): + """The date the article was updated by the publisher.""" + return datetime.fromisoformat(self._date_publisher_updated) + + @property + def date_publisherupdate_raw(self): + """The raw (string) value returned from the API with the date the article was updated by the publisher.""" + return self._date_publisher_updated + + @property + def references(self): + """List of references for this article.""" + return self._references + + @property + def components(self): + """List of components (detections) RiskIQ will search for to determine if assets are impacted by this vulnerability.""" + return self._components + + @property + @lru_cache + def attack_surfaces(self): + """List of Illuminate Attack Surfaces (aka third-party vendors) with assets impacted by this vulnerability. + + :rtype: :class:`VulnArticleImpacts` + """ + return VulnArticleImpacts(self, self._impacted3p) + + @property + def observation_count(self): + """Number of observations (assets) within the primary attack surface that are impacted by this vulnerability.""" + return self._observation_count + + @property + def observations(self): + """List of observations (assets) within the primary attack surface that are impacted by this vulnerability.""" + from . import AttackSurface + attack_surface = AttackSurface.load() + article = { + 'cveId': self.id, + 'cwes': self.cwes, + 'priorityScore': self.score, + 'observationCount': self.observation_count, + 'cveLink': '' + } + cve = AttackSurfaceCVE(attack_surface, article) + return cve.observations + + + +class VulnArticleImpacts(RecordList, ForPandas): + + """List of Illuminate Attack Surfaces impacted by a vulnerability.""" + + def __init__(self, article=None, impacts=[]): + self._records = [] + if article is not None: + self._article = article + if len(impacts): + self._records = [ VulnArticleImpact(self._article, i) for i in impacts ] + + def __repr__(self): + return ''.format(self) + + def __str__(self): + return '{0.article.id} impacts {0.impact_count:,} attack surfaces(s)'.format(self) + + def _get_dict_fields(self): + return ['cve_id', 'impact_count'] + + def _get_shallow_copy_fields(self): + return ['_article'] + + def _get_sortable_fields(self): + return ['vendor_name','vendor_id','observation_count'] + + @property + def article(self): + """Article that describes the vulnerability impacting these attack surfaces.""" + return self._article + + @property + def attack_surfaces(self): + """List of impacted attack surfaces. + + :rtypte: :class:`VulnArticleImpact` + """ + return self._records + + @property + def cve_id(self): + """CVE identifier for the vulnerability this article applies to.""" + return self.article.id + + @property + def impact_count(self): + """Number of attack surfaces impacted by this vulnerability.""" + return len(self._records) + + + +class VulnArticleImpact(Record, ForPandas): + + """An impacted third-party attack surface with observations (assets) affected by a given vulnerabilty.""" + + def __init__(self, article, api_response=None): + self._article = article + if api_response is not None: + self._parse(api_response) + + def __repr__(self): + return "".format(self._vendorname) + + def __str__(self): + return '{0.vendor_name}: {0.observation_count:,} observations'.format(self) + + def _get_dict_fields(self): + return ['vendor_name', 'vendor_id', 'observation_count'] + + def _parse(self, api_response): + self._vendorid = api_response['vendorID'] + self._vendorname = api_response['name'] + self._assetcount = api_response['assetCount'] + + @property + def article(self): + """Article that describes the vulnerability this observation (asset) is impacted by. + + :rtype: :class:`VulnArticle` + """ + return self._article + + @property + def vendor_name(self): + """Name of the vendor with observations (assets) impacted by this vulnerability.""" + return self._vendorname + + @property + def vendor_id(self): + """The RiskIQ-assigned identifier for this vendor.""" + return self._vendorid + + @property + def attack_surface(self): + """Illuminate Attack Surface for the third-party vendor impacted by this vulnerability.""" + from . import AttackSurface + return AttackSurface.load(self._vendorid) + + @property + def observation_count(self): + """Number of observations (assets) within a vendor's attack surface that are impacted by this vulnerability.""" + return self._assetcount + + @property + def observations(self): + """List of observations (assets) within this vendor's attack surface that are impacted by this vulnerability.""" + article = { + 'cveId': self.article.id, + 'cwes': self.article.cwes, + 'priorityScore': self.article.score, + 'observationCount': self.article.observation_count, + 'cveLink': '' + } + cve = AttackSurfaceCVE(self.attack_surface, article) + return cve.observations + + + + + diff --git a/passivetotal/libs/illuminate.py b/passivetotal/libs/illuminate.py index e618a1b..6c0492f 100644 --- a/passivetotal/libs/illuminate.py +++ b/passivetotal/libs/illuminate.py @@ -130,6 +130,75 @@ def get_asi_3p_vendor_insights(self, vendor_id, insight_id, **kwargs): """ path = 'third-party/{0}/insight/{1}'.format(vendor_id, insight_id) return self._get('attack-surface', path, **kwargs) + + def get_asi_vuln_components(self, **kwargs): + """Get attack surface vulnerable components. + + Reference: https://api.riskiq.net/api/asi_thirdparty/#!/default/get_pt_v2_attack_surface_vuln_intel_components + + :return: Dict of results + """ + return self._get('attack-surface', 'vuln-intel/components', **kwargs) + + def get_asi_vuln_cves(self, **kwargs): + """Get attack surface vulnerabilities. + + Reference: https://api.riskiq.net/api/asi_thirdparty/#!/default/get_pt_v2_attack_surface_vuln_intel_cves + + :return: Dict of results + """ + return self._get('attack-surface', 'vuln-intel/cves', **kwargs) + + def get_asi_vuln_cve_observations(self, cve_id, **kwargs): + """Get attack surface observations for a given CVE. + + Reference: https://api.riskiq.net/api/asi_thirdparty/#!/default/get_pt_v2_attack_surface_vuln_intel_cves_cveId_observations + + :return: Dict of results + """ + path = 'vuln-intel/cves/{0}/observations'.format(cve_id) + return self._get('attack-surface', path, **kwargs) + + def get_asi_3p_vuln_components(self, vendor_id, **kwargs): + """Get attack surface vulnerable components for a third-party vendor. + + Reference: https://api.riskiq.net/api/asi_thirdparty/#!/default/get_pt_v2_attack_surface_vuln_intel_third_party_id_components + + :return: Dict of results + """ + path = 'vuln-intel/third-party/{0}/components'.format(vendor_id) + return self._get('attack-surface', path, **kwargs) + + def get_asi_3p_vuln_cves(self, vendor_id, **kwargs): + """Get attack surface vulnerabilities for a third-party vendor. + + Reference: https://api.riskiq.net/api/asi_thirdparty/#!/default/get_pt_v2_attack_surface_vuln_intel_third_party_id_cves + + :return: Dict of results + """ + path = 'vuln-intel/third-party/{0}/cves'.format(vendor_id) + return self._get('attack-surface', path, **kwargs) + + def get_asi_3p_vuln_cve_observations(self, vendor_id, cve_id, **kwargs): + """Get attack surface observations for a given CVE and third-party vendor ID. + + Reference: https://api.riskiq.net/api/asi_thirdparty/#!/default/get_pt_v2_attack_surface_vuln_intel_third_party_id_cves_cveId_observations + + :return: Dict of results + """ + path = 'vuln-intel/third-party/{0}/cves/{1}/observations'.format(vendor_id, cve_id) + return self._get('attack-surface', path, **kwargs) + + def get_vuln_article(self, cve_id, **kwargs): + """Get details on a CVE vulnerability article. + + Reference: https://api.riskiq.net/api/vulnerability/#!/default/get_pt_v2_vuln_intel_article_cveId + + :return: Dict of results + """ + path = 'article/{0}'.format(cve_id) + return self._get('vuln-intel', path, **kwargs) +