check_docker

This commit is contained in:
Cyberes 2023-05-03 12:20:18 -06:00
parent cb9dd021ff
commit 5d1aa8e642
24 changed files with 4181 additions and 0 deletions

View File

@ -0,0 +1,13 @@
languages:
Ruby: true
JavaScript: true
PHP: true
Python: true
exclude_paths:
- "check_docker/tests/*"
- "tests/*"
plugins:
radon:
enabled: true
sonar-python:
enabled: true

6
check_docker/.coveragerc Normal file
View File

@ -0,0 +1,6 @@
[run]
include =
check_docker/check_*.py
omit =
tests/*
*/__init__.py

View File

@ -0,0 +1,4 @@
# Created by .ignore support plugin (hsz.mobi)
*
!testing_tools
!dev_requirements.txt

14
check_docker/.travis.yml Normal file
View File

@ -0,0 +1,14 @@
language: python
python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"
install:
- pip install pipenv
- pipenv install
- pipenv install codeclimate-test-reporter
# command to run tests
script:
- py.test --cov=check_docker
- codeclimate-test-reporter || echo "Ignoring Code Climate reporter upload failure"

View File

@ -0,0 +1,48 @@
# Development environment setup
You should have the following installed
- docker
- python (version >= 3.0)
- pipenv
- vagrant
Initialize your pipenv
pipenv install --skip-lock
# Running the tests
## Normal tests
tox and Pytest is used for testing. You can can run test by running the following from
the root of the project
tox
## Isolated tests
Sometimes test cases can interact with Docker on the development machine making
it hard to determine the cause of a test success or failure. To address this
you can use the `run_isolated_tests.sh` script to run pytest inside a
environment isolated from any network. Additionally this isolated test will
run the unit tests on multiple versions of python so you can validate your
changes are not python version specific.
./run_isolated_tests.sh
## Package tests
These test verify that, after created, the package can be installed and
runs successfully(not just passes unit tests). To do this a test environment is set up in vagrant.
./run_package_tests.sh
# Coverage report
The aim is to keep coverage above 90% on the actual checks
(check_docker.py and check_swarm.py). To generate a coverage report.
pipenv run py.test --cov=check_docker/
# Tips
When jumping back and forth between normal and isolated tests the `__pycache__`
directories can fall out fo sync with your execution environment. When this
happens you see errors like `ImportError: No module named 'check_docker'. The
fix is simple, just remove all the `__pycache__` directories in the project.

619
check_docker/LICENSE Normal file
View File

@ -0,0 +1,619 @@
GNU GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
Preamble
The GNU General Public License is a free, copyleft license for
software and other kinds of works.
The licenses for most software and other practical works are designed
to take away your freedom to share and change the works. By contrast,
the GNU General Public License is intended to guarantee your freedom to
share and change all versions of a program--to make sure it remains free
software for all its users. We, the Free Software Foundation, use the
GNU General Public License for most of our software; it applies also to
any other work released this way by its authors. You can apply it to
your programs, too.
When we speak of free software, we are referring to freedom, not
price. Our General Public Licenses are designed to make sure that you
have the freedom to distribute copies of free software (and charge for
them if you wish), that you receive source code or can get it if you
want it, that you can change the software or use pieces of it in new
free programs, and that you know you can do these things.
To protect your rights, we need to prevent others from denying you
these rights or asking you to surrender the rights. Therefore, you have
certain responsibilities if you distribute copies of the software, or if
you modify it: responsibilities to respect the freedom of others.
For example, if you distribute copies of such a program, whether
gratis or for a fee, you must pass on to the recipients the same
freedoms that you received. You must make sure that they, too, receive
or can get the source code. And you must show them these terms so they
know their rights.
Developers that use the GNU GPL protect your rights with two steps:
(1) assert copyright on the software, and (2) offer you this License
giving you legal permission to copy, distribute and/or modify it.
For the developers' and authors' protection, the GPL clearly explains
that there is no warranty for this free software. For both users' and
authors' sake, the GPL requires that modified versions be marked as
changed, so that their problems will not be attributed erroneously to
authors of previous versions.
Some devices are designed to deny users access to install or run
modified versions of the software inside them, although the manufacturer
can do so. This is fundamentally incompatible with the aim of
protecting users' freedom to change the software. The systematic
pattern of such abuse occurs in the area of products for individuals to
use, which is precisely where it is most unacceptable. Therefore, we
have designed this version of the GPL to prohibit the practice for those
products. If such problems arise substantially in other domains, we
stand ready to extend this provision to those domains in future versions
of the GPL, as needed to protect the freedom of users.
Finally, every program is threatened constantly by software patents.
States should not allow patents to restrict development and use of
software on general-purpose computers, but in those that do, we wish to
avoid the special danger that patents applied to a free program could
make it effectively proprietary. To prevent this, the GPL assures that
patents cannot be used to render the program non-free.
The precise terms and conditions for copying, distribution and
modification follow.
TERMS AND CONDITIONS
0. Definitions.
"This License" refers to version 3 of the GNU General Public License.
"Copyright" also means copyright-like laws that apply to other kinds of
works, such as semiconductor masks.
"The Program" refers to any copyrightable work licensed under this
License. Each licensee is addressed as "you". "Licensees" and
"recipients" may be individuals or organizations.
To "modify" a work means to copy from or adapt all or part of the work
in a fashion requiring copyright permission, other than the making of an
exact copy. The resulting work is called a "modified version" of the
earlier work or a work "based on" the earlier work.
A "covered work" means either the unmodified Program or a work based
on the Program.
To "propagate" a work means to do anything with it that, without
permission, would make you directly or secondarily liable for
infringement under applicable copyright law, except executing it on a
computer or modifying a private copy. Propagation includes copying,
distribution (with or without modification), making available to the
public, and in some countries other activities as well.
To "convey" a work means any kind of propagation that enables other
parties to make or receive copies. Mere interaction with a user through
a computer network, with no transfer of a copy, is not conveying.
An interactive user interface displays "Appropriate Legal Notices"
to the extent that it includes a convenient and prominently visible
feature that (1) displays an appropriate copyright notice, and (2)
tells the user that there is no warranty for the work (except to the
extent that warranties are provided), that licensees may convey the
work under this License, and how to view a copy of this License. If
the interface presents a list of user commands or options, such as a
menu, a prominent item in the list meets this criterion.
1. Source Code.
The "source code" for a work means the preferred form of the work
for making modifications to it. "Object code" means any non-source
form of a work.
A "Standard Interface" means an interface that either is an official
standard defined by a recognized standards body, or, in the case of
interfaces specified for a particular programming language, one that
is widely used among developers working in that language.
The "System Libraries" of an executable work include anything, other
than the work as a whole, that (a) is included in the normal form of
packaging a Major Component, but which is not part of that Major
Component, and (b) serves only to enable use of the work with that
Major Component, or to implement a Standard Interface for which an
implementation is available to the public in source code form. A
"Major Component", in this context, means a major essential component
(kernel, window system, and so on) of the specific operating system
(if any) on which the executable work runs, or a compiler used to
produce the work, or an object code interpreter used to run it.
The "Corresponding Source" for a work in object code form means all
the source code needed to generate, install, and (for an executable
work) run the object code and to modify the work, including scripts to
control those activities. However, it does not include the work's
System Libraries, or general-purpose tools or generally available free
programs which are used unmodified in performing those activities but
which are not part of the work. For example, Corresponding Source
includes interface definition files associated with source files for
the work, and the source code for shared libraries and dynamically
linked subprograms that the work is specifically designed to require,
such as by intimate data communication or control flow between those
subprograms and other parts of the work.
The Corresponding Source need not include anything that users
can regenerate automatically from other parts of the Corresponding
Source.
The Corresponding Source for a work in source code form is that
same work.
2. Basic Permissions.
All rights granted under this License are granted for the term of
copyright on the Program, and are irrevocable provided the stated
conditions are met. This License explicitly affirms your unlimited
permission to run the unmodified Program. The output from running a
covered work is covered by this License only if the output, given its
content, constitutes a covered work. This License acknowledges your
rights of fair use or other equivalent, as provided by copyright law.
You may make, run and propagate covered works that you do not
convey, without conditions so long as your license otherwise remains
in force. You may convey covered works to others for the sole purpose
of having them make modifications exclusively for you, or provide you
with facilities for running those works, provided that you comply with
the terms of this License in conveying all material for which you do
not control copyright. Those thus making or running the covered works
for you must do so exclusively on your behalf, under your direction
and control, on terms that prohibit them from making any copies of
your copyrighted material outside their relationship with you.
Conveying under any other circumstances is permitted solely under
the conditions stated below. Sublicensing is not allowed; section 10
makes it unnecessary.
3. Protecting Users' Legal Rights From Anti-Circumvention Law.
No covered work shall be deemed part of an effective technological
measure under any applicable law fulfilling obligations under article
11 of the WIPO copyright treaty adopted on 20 December 1996, or
similar laws prohibiting or restricting circumvention of such
measures.
When you convey a covered work, you waive any legal power to forbid
circumvention of technological measures to the extent such circumvention
is effected by exercising rights under this License with respect to
the covered work, and you disclaim any intention to limit operation or
modification of the work as a means of enforcing, against the work's
users, your or third parties' legal rights to forbid circumvention of
technological measures.
4. Conveying Verbatim Copies.
You may convey verbatim copies of the Program's source code as you
receive it, in any medium, provided that you conspicuously and
appropriately publish on each copy an appropriate copyright notice;
keep intact all notices stating that this License and any
non-permissive terms added in accord with section 7 apply to the code;
keep intact all notices of the absence of any warranty; and give all
recipients a copy of this License along with the Program.
You may charge any price or no price for each copy that you convey,
and you may offer support or warranty protection for a fee.
5. Conveying Modified Source Versions.
You may convey a work based on the Program, or the modifications to
produce it from the Program, in the form of source code under the
terms of section 4, provided that you also meet all of these conditions:
a) The work must carry prominent notices stating that you modified
it, and giving a relevant date.
b) The work must carry prominent notices stating that it is
released under this License and any conditions added under section
7. This requirement modifies the requirement in section 4 to
"keep intact all notices".
c) You must license the entire work, as a whole, under this
License to anyone who comes into possession of a copy. This
License will therefore apply, along with any applicable section 7
additional terms, to the whole of the work, and all its parts,
regardless of how they are packaged. This License gives no
permission to license the work in any other way, but it does not
invalidate such permission if you have separately received it.
d) If the work has interactive user interfaces, each must display
Appropriate Legal Notices; however, if the Program has interactive
interfaces that do not display Appropriate Legal Notices, your
work need not make them do so.
A compilation of a covered work with other separate and independent
works, which are not by their nature extensions of the covered work,
and which are not combined with it such as to form a larger program,
in or on a volume of a storage or distribution medium, is called an
"aggregate" if the compilation and its resulting copyright are not
used to limit the access or legal rights of the compilation's users
beyond what the individual works permit. Inclusion of a covered work
in an aggregate does not cause this License to apply to the other
parts of the aggregate.
6. Conveying Non-Source Forms.
You may convey a covered work in object code form under the terms
of sections 4 and 5, provided that you also convey the
machine-readable Corresponding Source under the terms of this License,
in one of these ways:
a) Convey the object code in, or embodied in, a physical product
(including a physical distribution medium), accompanied by the
Corresponding Source fixed on a durable physical medium
customarily used for software interchange.
b) Convey the object code in, or embodied in, a physical product
(including a physical distribution medium), accompanied by a
written offer, valid for at least three years and valid for as
long as you offer spare parts or customer support for that product
model, to give anyone who possesses the object code either (1) a
copy of the Corresponding Source for all the software in the
product that is covered by this License, on a durable physical
medium customarily used for software interchange, for a price no
more than your reasonable cost of physically performing this
conveying of source, or (2) access to copy the
Corresponding Source from a network server at no charge.
c) Convey individual copies of the object code with a copy of the
written offer to provide the Corresponding Source. This
alternative is allowed only occasionally and noncommercially, and
only if you received the object code with such an offer, in accord
with subsection 6b.
d) Convey the object code by offering access from a designated
place (gratis or for a charge), and offer equivalent access to the
Corresponding Source in the same way through the same place at no
further charge. You need not require recipients to copy the
Corresponding Source along with the object code. If the place to
copy the object code is a network server, the Corresponding Source
may be on a different server (operated by you or a third party)
that supports equivalent copying facilities, provided you maintain
clear directions next to the object code saying where to find the
Corresponding Source. Regardless of what server hosts the
Corresponding Source, you remain obligated to ensure that it is
available for as long as needed to satisfy these requirements.
e) Convey the object code using peer-to-peer transmission, provided
you inform other peers where the object code and Corresponding
Source of the work are being offered to the general public at no
charge under subsection 6d.
A separable portion of the object code, whose source code is excluded
from the Corresponding Source as a System Library, need not be
included in conveying the object code work.
A "User Product" is either (1) a "consumer product", which means any
tangible personal property which is normally used for personal, family,
or household purposes, or (2) anything designed or sold for incorporation
into a dwelling. In determining whether a product is a consumer product,
doubtful cases shall be resolved in favor of coverage. For a particular
product received by a particular user, "normally used" refers to a
typical or common use of that class of product, regardless of the status
of the particular user or of the way in which the particular user
actually uses, or expects or is expected to use, the product. A product
is a consumer product regardless of whether the product has substantial
commercial, industrial or non-consumer uses, unless such uses represent
the only significant mode of use of the product.
"Installation Information" for a User Product means any methods,
procedures, authorization keys, or other information required to install
and execute modified versions of a covered work in that User Product from
a modified version of its Corresponding Source. The information must
suffice to ensure that the continued functioning of the modified object
code is in no case prevented or interfered with solely because
modification has been made.
If you convey an object code work under this section in, or with, or
specifically for use in, a User Product, and the conveying occurs as
part of a transaction in which the right of possession and use of the
User Product is transferred to the recipient in perpetuity or for a
fixed term (regardless of how the transaction is characterized), the
Corresponding Source conveyed under this section must be accompanied
by the Installation Information. But this requirement does not apply
if neither you nor any third party retains the ability to install
modified object code on the User Product (for example, the work has
been installed in ROM).
The requirement to provide Installation Information does not include a
requirement to continue to provide support service, warranty, or updates
for a work that has been modified or installed by the recipient, or for
the User Product in which it has been modified or installed. Access to a
network may be denied when the modification itself materially and
adversely affects the operation of the network or violates the rules and
protocols for communication across the network.
Corresponding Source conveyed, and Installation Information provided,
in accord with this section must be in a format that is publicly
documented (and with an implementation available to the public in
source code form), and must require no special password or key for
unpacking, reading or copying.
7. Additional Terms.
"Additional permissions" are terms that supplement the terms of this
License by making exceptions from one or more of its conditions.
Additional permissions that are applicable to the entire Program shall
be treated as though they were included in this License, to the extent
that they are valid under applicable law. If additional permissions
apply only to part of the Program, that part may be used separately
under those permissions, but the entire Program remains governed by
this License without regard to the additional permissions.
When you convey a copy of a covered work, you may at your option
remove any additional permissions from that copy, or from any part of
it. (Additional permissions may be written to require their own
removal in certain cases when you modify the work.) You may place
additional permissions on material, added by you to a covered work,
for which you have or can give appropriate copyright permission.
Notwithstanding any other provision of this License, for material you
add to a covered work, you may (if authorized by the copyright holders of
that material) supplement the terms of this License with terms:
a) Disclaiming warranty or limiting liability differently from the
terms of sections 15 and 16 of this License; or
b) Requiring preservation of specified reasonable legal notices or
author attributions in that material or in the Appropriate Legal
Notices displayed by works containing it; or
c) Prohibiting misrepresentation of the origin of that material, or
requiring that modified versions of such material be marked in
reasonable ways as different from the original version; or
d) Limiting the use for publicity purposes of names of licensors or
authors of the material; or
e) Declining to grant rights under trademark law for use of some
trade names, trademarks, or service marks; or
f) Requiring indemnification of licensors and authors of that
material by anyone who conveys the material (or modified versions of
it) with contractual assumptions of liability to the recipient, for
any liability that these contractual assumptions directly impose on
those licensors and authors.
All other non-permissive additional terms are considered "further
restrictions" within the meaning of section 10. If the Program as you
received it, or any part of it, contains a notice stating that it is
governed by this License along with a term that is a further
restriction, you may remove that term. If a license document contains
a further restriction but permits relicensing or conveying under this
License, you may add to a covered work material governed by the terms
of that license document, provided that the further restriction does
not survive such relicensing or conveying.
If you add terms to a covered work in accord with this section, you
must place, in the relevant source files, a statement of the
additional terms that apply to those files, or a notice indicating
where to find the applicable terms.
Additional terms, permissive or non-permissive, may be stated in the
form of a separately written license, or stated as exceptions;
the above requirements apply either way.
8. Termination.
You may not propagate or modify a covered work except as expressly
provided under this License. Any attempt otherwise to propagate or
modify it is void, and will automatically terminate your rights under
this License (including any patent licenses granted under the third
paragraph of section 11).
However, if you cease all violation of this License, then your
license from a particular copyright holder is reinstated (a)
provisionally, unless and until the copyright holder explicitly and
finally terminates your license, and (b) permanently, if the copyright
holder fails to notify you of the violation by some reasonable means
prior to 60 days after the cessation.
Moreover, your license from a particular copyright holder is
reinstated permanently if the copyright holder notifies you of the
violation by some reasonable means, this is the first time you have
received notice of violation of this License (for any work) from that
copyright holder, and you cure the violation prior to 30 days after
your receipt of the notice.
Termination of your rights under this section does not terminate the
licenses of parties who have received copies or rights from you under
this License. If your rights have been terminated and not permanently
reinstated, you do not qualify to receive new licenses for the same
material under section 10.
9. Acceptance Not Required for Having Copies.
You are not required to accept this License in order to receive or
run a copy of the Program. Ancillary propagation of a covered work
occurring solely as a consequence of using peer-to-peer transmission
to receive a copy likewise does not require acceptance. However,
nothing other than this License grants you permission to propagate or
modify any covered work. These actions infringe copyright if you do
not accept this License. Therefore, by modifying or propagating a
covered work, you indicate your acceptance of this License to do so.
10. Automatic Licensing of Downstream Recipients.
Each time you convey a covered work, the recipient automatically
receives a license from the original licensors, to run, modify and
propagate that work, subject to this License. You are not responsible
for enforcing compliance by third parties with this License.
An "entity transaction" is a transaction transferring control of an
organization, or substantially all assets of one, or subdividing an
organization, or merging organizations. If propagation of a covered
work results from an entity transaction, each party to that
transaction who receives a copy of the work also receives whatever
licenses to the work the party's predecessor in interest had or could
give under the previous paragraph, plus a right to possession of the
Corresponding Source of the work from the predecessor in interest, if
the predecessor has it or can get it with reasonable efforts.
You may not impose any further restrictions on the exercise of the
rights granted or affirmed under this License. For example, you may
not impose a license fee, royalty, or other charge for exercise of
rights granted under this License, and you may not initiate litigation
(including a cross-claim or counterclaim in a lawsuit) alleging that
any patent claim is infringed by making, using, selling, offering for
sale, or importing the Program or any portion of it.
11. Patents.
A "contributor" is a copyright holder who authorizes use under this
License of the Program or a work on which the Program is based. The
work thus licensed is called the contributor's "contributor version".
A contributor's "essential patent claims" are all patent claims
owned or controlled by the contributor, whether already acquired or
hereafter acquired, that would be infringed by some manner, permitted
by this License, of making, using, or selling its contributor version,
but do not include claims that would be infringed only as a
consequence of further modification of the contributor version. For
purposes of this definition, "control" includes the right to grant
patent sublicenses in a manner consistent with the requirements of
this License.
Each contributor grants you a non-exclusive, worldwide, royalty-free
patent license under the contributor's essential patent claims, to
make, use, sell, offer for sale, import and otherwise run, modify and
propagate the contents of its contributor version.
In the following three paragraphs, a "patent license" is any express
agreement or commitment, however denominated, not to enforce a patent
(such as an express permission to practice a patent or covenant not to
sue for patent infringement). To "grant" such a patent license to a
party means to make such an agreement or commitment not to enforce a
patent against the party.
If you convey a covered work, knowingly relying on a patent license,
and the Corresponding Source of the work is not available for anyone
to copy, free of charge and under the terms of this License, through a
publicly available network server or other readily accessible means,
then you must either (1) cause the Corresponding Source to be so
available, or (2) arrange to deprive yourself of the benefit of the
patent license for this particular work, or (3) arrange, in a manner
consistent with the requirements of this License, to extend the patent
license to downstream recipients. "Knowingly relying" means you have
actual knowledge that, but for the patent license, your conveying the
covered work in a country, or your recipient's use of the covered work
in a country, would infringe one or more identifiable patents in that
country that you have reason to believe are valid.
If, pursuant to or in connection with a single transaction or
arrangement, you convey, or propagate by procuring conveyance of, a
covered work, and grant a patent license to some of the parties
receiving the covered work authorizing them to use, propagate, modify
or convey a specific copy of the covered work, then the patent license
you grant is automatically extended to all recipients of the covered
work and works based on it.
A patent license is "discriminatory" if it does not include within
the scope of its coverage, prohibits the exercise of, or is
conditioned on the non-exercise of one or more of the rights that are
specifically granted under this License. You may not convey a covered
work if you are a party to an arrangement with a third party that is
in the business of distributing software, under which you make payment
to the third party based on the extent of your activity of conveying
the work, and under which the third party grants, to any of the
parties who would receive the covered work from you, a discriminatory
patent license (a) in connection with copies of the covered work
conveyed by you (or copies made from those copies), or (b) primarily
for and in connection with specific products or compilations that
contain the covered work, unless you entered into that arrangement,
or that patent license was granted, prior to 28 March 2007.
Nothing in this License shall be construed as excluding or limiting
any implied license or other defenses to infringement that may
otherwise be available to you under applicable patent law.
12. No Surrender of Others' Freedom.
If conditions are imposed on you (whether by court order, agreement or
otherwise) that contradict the conditions of this License, they do not
excuse you from the conditions of this License. If you cannot convey a
covered work so as to satisfy simultaneously your obligations under this
License and any other pertinent obligations, then as a consequence you may
not convey it at all. For example, if you agree to terms that obligate you
to collect a royalty for further conveying from those to whom you convey
the Program, the only way you could satisfy both those terms and this
License would be to refrain entirely from conveying the Program.
13. Use with the GNU Affero General Public License.
Notwithstanding any other provision of this License, you have
permission to link or combine any covered work with a work licensed
under version 3 of the GNU Affero General Public License into a single
combined work, and to convey the resulting work. The terms of this
License will continue to apply to the part which is the covered work,
but the special requirements of the GNU Affero General Public License,
section 13, concerning interaction through a network will apply to the
combination as such.
14. Revised Versions of this License.
The Free Software Foundation may publish revised and/or new versions of
the GNU General Public License from time to time. Such new versions will
be similar in spirit to the present version, but may differ in detail to
address new problems or concerns.
Each version is given a distinguishing version number. If the
Program specifies that a certain numbered version of the GNU General
Public License "or any later version" applies to it, you have the
option of following the terms and conditions either of that numbered
version or of any later version published by the Free Software
Foundation. If the Program does not specify a version number of the
GNU General Public License, you may choose any version ever published
by the Free Software Foundation.
If the Program specifies that a proxy can decide which future
versions of the GNU General Public License can be used, that proxy's
public statement of acceptance of a version permanently authorizes you
to choose that version for the Program.
Later license versions may give you additional or different
permissions. However, no additional obligations are imposed on any
author or copyright holder as a result of your choosing to follow a
later version.
15. Disclaimer of Warranty.
THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
16. Limitation of Liability.
IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
SUCH DAMAGES.
17. Interpretation of Sections 15 and 16.
If the disclaimer of warranty and limitation of liability provided
above cannot be given local legal effect according to their terms,
reviewing courts shall apply local law that most closely approximates
an absolute waiver of all civil liability in connection with the
Program, unless a warranty or assumption of liability accompanies a
copy of the Program in return for a fee.

17
check_docker/Pipfile Normal file
View File

@ -0,0 +1,17 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[packages]
tox = '*'
tox-pyenv = '*'
pytest = '*'
pytest-random-order = '*'
coverage = '>4.0,<4.4'
pyfakefs = '*'
pytest-cov = '<2.6'
poetry = "*"
[requires]
python_version = "3.8"

168
check_docker/README.rst Normal file
View File

@ -0,0 +1,168 @@
|Build Status| |Code Climate| |Test Coverage| |Downloads|
============
check_docker
============
Nagios/NRPE compatible plugins for checking docker based services. Currently there are two nagios checks
- **check_docker** which checks docker container health
- **check_swarm** which checks health of swarm nodes and services
With **check_docker** can use it to check and alert on
- memory consumption in absolute units (bytes, kb, mb, gb) and as a percentage (0-100%)
of the container limit.
- CPU usages as a percentage (0-100%) of container limit.
- automatic restarts performed by the docker daemon
- container status, i.e. is it running?
- container health checks are passing?
- uptime, i.e. is it able to stay running for a long enough time?
- the presence of a container or containers matching specified names
- image version, does the running image match that in the remote registry?
- image age, when was the image built the last time?
With **check_swarm** you can alert
- if a node is not joined to a docker swarm
- if a service is running in a swarm
These checks can communicate with a local docker daemon socket file (default) or with local
or remote docker daemons using secure and non-secure TCP connections.
These plugins require python 3. It is tested on 3.5 and greater but may work on older
versions of 3.
Installation
-----------------
With pip
::
pip3 install check_docker
--or--
pip install check_docker
With curl
::
curl -o /usr/local/bin/check_docker https://raw.githubusercontent.com/timdaman/check_docker/master/check_docker/check_docker.py
curl -o /usr/local/bin/check_swarm https://raw.githubusercontent.com/timdaman/check_docker/master/check_docker/check_swarm.py
chmod a+rx /usr/local/bin/check_docker /usr/local/bin/check_swarm
With wget
::
wget -O /usr/local/bin/check_docker https://raw.githubusercontent.com/timdaman/check_docker/master/check_docker/check_docker.py
wget -O /usr/local/bin/check_swarm https://raw.githubusercontent.com/timdaman/check_docker/master/check_docker/check_swarm.py
chmod a+rx /usr/local/bin/check_docker /usr/local/bin/check_swarm
check_docker Usage
------------------
::
usage: check_docker.py [-h]
[--connection [/<path to>/docker.socket|<ip/host address>:<port>]
| --secure-connection [<ip/host address>:<port>]]
[--binary_units | --decimal_units] [--timeout TIMEOUT]
[--containers CONTAINERS [CONTAINERS ...]] [--present]
[--threads THREADS] [--cpu WARN:CRIT]
[--memory WARN:CRIT:UNITS] [--status STATUS] [--health]
[--uptime WARN:CRIT] [--image-age WARN:CRIT] [--version]
[--insecure-registries INSECURE_REGISTRIES [INSECURE_REGISTRIES ...]]
[--restarts WARN:CRIT] [--no-ok] [--no-performance] [-V]
Check docker containers.
optional arguments:
-h, --help show this help message and exit
--connection [/<path to>/docker.socket|<ip/host address>:<port>]
Where to find docker daemon socket. (default:
/var/run/docker.sock)
--secure-connection [<ip/host address>:<port>]
Where to find TLS protected docker daemon socket.
--binary_units Use a base of 1024 when doing calculations of KB, MB,
GB, & TB (This is default)
--decimal_units Use a base of 1000 when doing calculations of KB, MB,
GB, & TB
--timeout TIMEOUT Connection timeout in seconds. (default: 10.0)
--containers CONTAINERS [CONTAINERS ...]
One or more RegEx that match the names of the
container(s) to check. If omitted all containers are
checked. (default: ['all'])
--present Modifies --containers so that each RegEx must match at
least one container.
--threads THREADS This + 1 is the maximum number of concurent
threads/network connections. (default: 10)
--cpu WARN:CRIT Check cpu usage percentage taking into account any
limits. Valid values are 0 - 100.
--memory WARN:CRIT:UNITS
Check memory usage taking into account any limits.
Valid values for units are %,B,KB,MB,GB.
--status STATUS Desired container status (running, exited, etc).
--health Check container's health check status
--uptime WARN:CRIT Minimum container uptime in seconds. Use when
infrequent crashes are tolerated.
--image-age WARN:CRIT Maximum image age in days.
--version Check if the running images are the same version as
those in the registry. Useful for finding stale
images. Does not support login.
--insecure-registries INSECURE_REGISTRIES [INSECURE_REGISTRIES ...]
List of registries to connect to with http(no TLS).
Useful when using "--version" with images from
insecure registries.
--restarts WARN:CRIT Container restart thresholds.
--no-ok Make output terse suppressing OK messages. If all
checks are OK return a single OK.
--no-performance Suppress performance data. Reduces output when
performance data is not being used.
-V show program's version number and exit
check_swarm Usage
-----------------
::
usage: check_swarm.py [-h]
[--connection [/<path to>/docker.socket|<ip/host address>:<port>]
| --secure-connection [<ip/host address>:<port>]]
[--timeout TIMEOUT]
(--swarm | --service SERVICE [SERVICE ...] | --ignore_paused)
[-V]
Check docker swarm.
optional arguments:
-h, --help show this help message and exit
--connection [/<path to>/docker.socket|<ip/host address>:<port>]
Where to find docker daemon socket. (default:
/var/run/docker.sock)
--secure-connection [<ip/host address>:<port>]
Where to find TLS protected docker daemon socket.
--timeout TIMEOUT Connection timeout in seconds. (default: 10.0)
--swarm Check swarm status
--service SERVICE [SERVICE ...]
One or more RegEx that match the names of the
services(s) to check.
--ignore_paused Don't require global services to be running on paused nodes
-V show program's version number and exit
Gotchas
-------
- When using check_docker with older versions of docker (I have seen 1.4 and 1.5) status only supports running, restarting, and paused.
- When using check_docker, if no container is specified, all containers are checked. Some containers may return critcal status if the selected check(s) require a running container.
- When using check_docker, --present cannot be used without --containers to indicate what to check the presence of.
.. |Build Status| image:: https://travis-ci.org/timdaman/check_docker.svg?branch=master
:target: https://travis-ci.org/timdaman/check_docker
.. |Code Climate| image:: https://codeclimate.com/github/timdaman/check_docker/badges/gpa.svg
:target: https://codeclimate.com/github/timdaman/check_docker
.. |Test Coverage| image:: https://codeclimate.com/github/timdaman/check_docker/badges/coverage.svg
:target: https://codeclimate.com/github/timdaman/check_docker/coverage
.. |Downloads| image:: http://pepy.tech/badge/check-docker
:target: http://pepy.tech/count/check-docker

View File

@ -0,0 +1,2 @@
"""Nagios/NRPE compatible plugins for checking docker based services"""
__version__ = "2.2.2"

View File

@ -0,0 +1,999 @@
#!/usr/bin/env python3
# logging.basicConfig(level=logging.DEBUG)
import argparse
import json
import logging
import math
import os
import re
import socket
import stat
import traceback
from collections import deque, namedtuple, UserDict, defaultdict
from concurrent import futures
from datetime import datetime, timezone
from functools import lru_cache
from http.client import HTTPConnection
from sys import argv
from urllib import request
from urllib.error import HTTPError, URLError
from urllib.request import AbstractHTTPHandler, HTTPHandler, HTTPSHandler, OpenerDirector, HTTPRedirectHandler, \
Request, HTTPBasicAuthHandler
logger = logging.getLogger()
__author__ = 'Tim Laurence'
__copyright__ = "Copyright 2019"
__credits__ = ['Tim Laurence']
__license__ = "GPL"
__version__ = "2.2.2"
'''
nrpe compatible check for docker containers.
Requires Python 3
Note: I really would have preferred to have used requests for all the network connections but that would have added a
dependency.
'''
DEFAULT_SOCKET = '/var/run/docker.sock'
DEFAULT_TIMEOUT = 10.0
DEFAULT_PORT = 2375
DEFAULT_MEMORY_UNITS = 'B'
DEFAULT_HEADERS = [('Accept', 'application/vnd.docker.distribution.manifest.v2+json')]
DEFAULT_PUBLIC_REGISTRY = 'registry-1.docker.io'
# The second value is the power to raise the base to.
UNIT_ADJUSTMENTS_TEMPLATE = {
'%': 0,
'B': 0,
'KB': 1,
'MB': 2,
'GB': 3,
'TB': 4
}
unit_adjustments = None
# Reduce message to a single OK unless a checks fail.
no_ok = False
# Suppress performance data reporting
no_performance = False
OK_RC = 0
WARNING_RC = 1
CRITICAL_RC = 2
UNKNOWN_RC = 3
# These hold the final results
rc = -1
messages = []
performance_data = []
ImageName = namedtuple('ImageName', "registry name tag full_name")
class ThresholdSpec(UserDict):
def __init__(self, warn, crit, units=''):
super().__init__(warn=warn, crit=crit, units=units)
def __getattr__(self, item):
if item in ('warn', 'crit', 'units'):
return self.data[item]
return super().__getattr__(item)
# How much threading can we do? We are generally not CPU bound so I am using this a worse case cap
DEFAULT_PARALLELISM = 10
# Holds list of all threads
threads = []
# This is used during testing
DISABLE_THREADING = False
# Hacked up urllib to handle sockets
#############################################################################################
# Docker runs a http connection over a socket. http.client is knows how to deal with these
# but lacks some niceties. Urllib wraps that and makes up for some of the deficiencies but
# cannot fix the fact http.client can't read from socket files. In order to take advantage of
# urllib and http.client's capabilities the class below tweaks HttpConnection and passes it
# to urllib registering for socket:// connections
# This is all side effect so excluding coverage
class SocketFileHandler(AbstractHTTPHandler):
class SocketFileToHttpConnectionAdaptor(HTTPConnection): # pragma: no cover
def __init__(self, socket_file, timeout=DEFAULT_TIMEOUT):
super().__init__(host='', port=0, timeout=timeout)
self.socket_file = socket_file
def connect(self):
self.sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0, fileno=None)
self.sock.settimeout(self.timeout)
self.sock.connect(self.socket_file)
def socket_open(self, req):
socket_file, path = req.selector.split(':', 1)
req.host = socket_file
req.selector = path
return self.do_open(self.SocketFileToHttpConnectionAdaptor, req)
# Tokens are not cached because I expect the callers to cache the responses
class Oauth2TokenAuthHandler(HTTPBasicAuthHandler):
auth_failure_tracker = defaultdict(int)
def http_response(self, request, response):
code, hdrs = response.code, response.headers
www_authenticate_header = response.headers.get('www-authenticate', None)
if code == 401 and www_authenticate_header:
scheme = www_authenticate_header.split()[0]
if scheme.lower() == 'bearer':
return self.process_oauth2(request, response, www_authenticate_header)
return response
https_response = http_response
@staticmethod
def _get_outh2_token(www_authenticate_header):
auth_fields = dict(re.findall(r"""(?:(?P<key>[^ ,=]+)="([^"]+)")""", www_authenticate_header))
auth_url = "{realm}?scope={scope}&service={service}".format(
realm=auth_fields['realm'],
scope=auth_fields['scope'],
service=auth_fields['service'],
)
token_request = Request(auth_url)
token_request.add_header("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
token_response = request.urlopen(token_request)
return process_urllib_response(token_response)['token']
def process_oauth2(self, request, response, www_authenticate_header):
# This keeps infinite auth loops from happening
full_url = request.full_url
self.auth_failure_tracker[full_url] += 1
if self.auth_failure_tracker[full_url] > 1:
raise HTTPError(full_url, 401, "Stopping Oauth2 failure loop for {}".format(full_url),
response.headers, response)
auth_token = self._get_outh2_token(www_authenticate_header)
request.add_unredirected_header('Authorization', 'Bearer ' + auth_token)
return self.parent.open(request, timeout=request.timeout)
better_urllib_get = OpenerDirector()
better_urllib_get.addheaders = DEFAULT_HEADERS.copy()
better_urllib_get.add_handler(HTTPHandler())
better_urllib_get.add_handler(HTTPSHandler())
better_urllib_get.add_handler(HTTPRedirectHandler())
better_urllib_get.add_handler(SocketFileHandler())
better_urllib_get.add_handler(Oauth2TokenAuthHandler())
class RegistryError(Exception):
def __init__(self, response):
self.response_obj = response
# Util functions
#############################################################################################
def parse_thresholds(spec, include_units=True, units_required=True):
"""
Given a spec string break it up into ':' separated chunks. Convert strings to ints as it makes sense
:param spec: The threshold specification being parsed
:param include_units: Specifies that units should be processed and returned if present
:param units_required: Mark spec as invalid if the units are missing.
:return: A list containing the thresholds in order of warn, crit, and units(if included and present)
"""
parts = deque(spec.split(':'))
if not all(parts):
raise ValueError("Blanks are not allowed in a threshold specification: {}".format(spec))
# Warn
warn = int(parts.popleft())
# Crit
crit = int(parts.popleft())
units = ''
if include_units:
if len(parts):
# units
units = parts.popleft()
elif units_required:
raise ValueError("Missing units in {}".format(spec))
if len(parts) != 0:
raise ValueError("Too many threshold specifiers in {}".format(spec))
return ThresholdSpec(warn=warn, crit=crit, units=units)
def pretty_time(seconds):
remainder = seconds
result = []
if remainder > 24 * 60 * 60:
days, remainder = divmod(remainder, 24 * 60 * 60)
result.append("{}d".format(int(days)))
if remainder > 60 * 60:
hours, remainder = divmod(remainder, 60 * 60)
result.append("{}h".format(int(hours)))
if remainder > 60:
minutes, remainder = divmod(remainder, 60)
result.append("{}min".format(int(minutes)))
result.append("{}s".format(int(remainder)))
return result
def evaluate_numeric_thresholds(container, value, thresholds, name, short_name,
min=None, max=None, greater_than=True):
rounder = lambda x: round(x, 2)
INTEGER_UNITS = ['B', '%', '']
# Some units don't have decimal places
rounded_value = int(value) if thresholds.units in INTEGER_UNITS else rounder(value)
perf_string = "{container}_{short_name}={value}{units};{warn};{crit}".format(
container=container,
short_name=short_name,
value=rounded_value,
**thresholds)
if min is not None:
rounded_min = math.floor(min) if thresholds.units in INTEGER_UNITS else rounder(min)
perf_string += ';{}'.format(rounded_min)
if max is not None:
rounded_max = math.ceil(max) if thresholds.units in INTEGER_UNITS else rounder(max)
perf_string += ';{}'.format(rounded_max)
global performance_data
performance_data.append(perf_string)
if thresholds.units == 's':
nice_time = ' '.join(pretty_time(rounded_value)[:2])
results_str = "{} {} is {}".format(container, name, nice_time)
else:
results_str = "{} {} is {}{}".format(container, name, rounded_value, thresholds.units)
if greater_than:
comparator = lambda value, threshold: value >= threshold
else:
comparator = lambda value, threshold: value <= threshold
if comparator(value, thresholds.crit):
critical(results_str)
elif comparator(value, thresholds.warn):
warning(results_str)
else:
ok(results_str)
@lru_cache(maxsize=None)
def get_url(url):
logger.debug("get_url: {}".format(url))
response = better_urllib_get.open(url, timeout=timeout)
logger.debug("get_url: {} {}".format(url, response.status))
return process_urllib_response(response), response.status
def process_urllib_response(response):
response_bytes = response.read()
body = response_bytes.decode('utf-8')
# logger.debug("BODY: {}".format(body))
return json.loads(body)
def get_container_info(name):
content, _ = get_url(daemon + '/containers/{container}/json'.format(container=name))
return content
def get_image_info(name):
content, _ = get_url(daemon + '/images/{image}/json'.format(image=name))
return content
def get_state(container):
return get_container_info(container)['State']
def get_stats(container):
content, _ = get_url(daemon + '/containers/{container}/stats?stream=0'.format(container=container))
return content
def get_ps_name(name_list):
# Pick the name that starts with a '/' but doesn't contain a '/' and return that value
for name in name_list:
if '/' not in name[1:] and name[0] == '/':
return name[1:]
else:
raise NameError("Error when trying to identify 'ps' name in {}".format(name_list))
def get_containers(names, require_present):
containers_list, _ = get_url(daemon + '/containers/json?all=1')
all_container_names = set(get_ps_name(x['Names']) for x in containers_list)
if 'all' in names:
return all_container_names
filtered = set()
for matcher in names:
found = False
for candidate in all_container_names:
if re.match("^{}$".format(matcher), candidate):
filtered.add(candidate)
found = True
# If we don't find a container that matches out regex
if require_present and not found:
critical("No containers match {}".format(matcher))
return filtered
def get_container_image_id(container):
# find registry and tag
inspection = get_container_info(container)
return inspection['Image']
def get_container_image_urls(container):
inspection = get_container_info(container)
image_id = inspection['Image']
image_info = get_image_info(image_id)
return image_info['RepoTags']
def normalize_image_name_to_manifest_url(image_name, insecure_registries):
parsed_url = parse_image_name(image_name)
lower_insecure = [reg.lower() for reg in insecure_registries]
# Registry query url
scheme = 'http' if parsed_url.registry.lower() in lower_insecure else 'https'
url = '{scheme}://{registry}/v2/{image_name}/manifests/{image_tag}'.format(scheme=scheme,
registry=parsed_url.registry,
image_name=parsed_url.name,
image_tag=parsed_url.tag)
return url, parsed_url.registry
# Auth servers seem picky about being hit too hard. Can't figure out why. ;)
# As result it is best to single thread this check
# This is based on https://docs.docker.com/registry/spec/auth/token/#requesting-a-token
def get_digest_from_registry(url):
logger.debug("get_digest_from_registry")
# query registry
# TODO: Handle logging in if needed
registry_info, status_code = get_url(url=url)
if status_code != 200:
raise RegistryError(response=registry_info)
return registry_info['config'].get('digest', None)
def set_rc(new_rc):
global rc
rc = new_rc if new_rc > rc else rc
def ok(message):
set_rc(OK_RC)
messages.append('OK: ' + message)
def warning(message):
set_rc(WARNING_RC)
messages.append('WARNING: ' + message)
def critical(message):
set_rc(CRITICAL_RC)
messages.append('CRITICAL: ' + message)
def unknown(message):
set_rc(UNKNOWN_RC)
messages.append('UNKNOWN: ' + message)
def require_running(name):
def inner_decorator(func):
def wrapper(container, *args, **kwargs):
container_state = get_state(container)
state = normalize_state(container_state)
if state.lower() == "running":
func(container, *args, **kwargs)
else:
# container is not running, can't perform check
critical('{container} is not "running", cannot check {check}"'.format(container=container,
check=name))
return wrapper
return inner_decorator
def multithread_execution(disable_threading=DISABLE_THREADING):
def inner_decorator(func):
def wrapper(container, *args, **kwargs):
if DISABLE_THREADING:
func(container, *args, **kwargs)
else:
threads.append(parallel_executor.submit(func, container, *args, **kwargs))
return wrapper
return inner_decorator
def singlethread_execution(disable_threading=DISABLE_THREADING):
def inner_decorator(func):
def wrapper(container, *args, **kwargs):
if DISABLE_THREADING:
func(container, *args, **kwargs)
else:
threads.append(serial_executor.submit(func, container, *args, **kwargs))
return wrapper
return inner_decorator
def parse_image_name(image_name):
"""
Parses image names into their constituent parts.
:param image_name:
:return: ImageName
"""
# These are based on information found here
# https://docs.docker.com/engine/reference/commandline/tag/#extended-description
# https://github.com/docker/distribution/blob/master/reference/regexp.go
host_segment_re = '[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?'
hostname_re = r'({host_segment}\.)+{host_segment}'.format(host_segment=host_segment_re)
registry_re = r'((?P<registry>({hostname_re}(:\d+)?|{host_segment_re}:\d+))/)'.format(
host_segment_re=host_segment_re, hostname_re=hostname_re)
name_component_ends_re = '[a-z0-9]'
name_component_middle_re = '[a-z0-9._-]' # Ignoring spec limit of two _
name_component_re = '({end}{middle}*{end}|{end})'.format(end=name_component_ends_re,
middle=name_component_middle_re)
image_name_re = "(?P<image_name>({name_component}/)*{name_component})".format(name_component=name_component_re)
image_tag_re = '(?P<image_tag>[a-zA-Z0-9_][a-zA-Z0-9_.-]*)'
full_re = '^{registry}?{image_name}(:{image_tag})?$'.format(registry=registry_re, image_name=image_name_re,
image_tag=image_tag_re)
parsed = re.match(full_re, image_name)
registry = parsed.group('registry') if parsed.group('registry') else DEFAULT_PUBLIC_REGISTRY
image_name = parsed.group('image_name')
image_name = image_name if '/' in image_name or registry != DEFAULT_PUBLIC_REGISTRY else 'library/' + image_name
image_tag = parsed.group('image_tag')
image_tag = image_tag if image_tag else 'latest'
full_image_name = "{registry}/{image_name}:{image_tag}".format(
registry=registry,
image_name=image_name,
image_tag=image_tag)
return ImageName(registry=registry, name=image_name, tag=image_tag, full_name=full_image_name)
def normalize_state(status_info):
# Ugh, docker used to report state in as silly way then they figured out how to do it better.
# This tries the simpler new way and if that doesn't work fails back to the old way
# On new docker engines the status holds whatever the current state is, running, stopped, paused, etc.
if "Status" in status_info:
return status_info['Status']
status = 'Exited'
if status_info["Restarting"]:
status = 'Restarting'
elif status_info["Paused"]:
status = 'Paused'
elif status_info["Dead"]:
status = 'Dead'
elif status_info["Running"]:
return "Running"
return status
# Checks
#############################################################################################
@multithread_execution()
@require_running(name='memory')
def check_memory(container, thresholds):
if not thresholds.units in unit_adjustments:
unknown("Memory units must be one of {}".format(list(unit_adjustments.keys())))
return
inspection = get_stats(container)
# Subtracting cache to match what `docker stats` does.
adjusted_usage = inspection['memory_stats']['usage'] - inspection['memory_stats']['stats']['total_cache']
if thresholds.units == '%':
max = 100
usage = int(100 * adjusted_usage / inspection['memory_stats']['limit'])
else:
max = inspection['memory_stats']['limit'] / unit_adjustments[thresholds.units]
usage = adjusted_usage / unit_adjustments[thresholds.units]
evaluate_numeric_thresholds(container=container, value=usage, thresholds=thresholds, name='memory',
short_name='mem', min=0, max=max)
@multithread_execution()
def check_status(container, desired_state):
normized_desired_state = desired_state.lower()
normalized_state = normalize_state(get_state(container)).lower()
if normized_desired_state != normalized_state:
critical("{} state is not {}".format(container, desired_state))
return
ok("{} status is {}".format(container, desired_state))
@multithread_execution()
@require_running('health')
def check_health(container):
state = get_state(container)
if "Health" in state and "Status" in state["Health"]:
health = state["Health"]["Status"]
message = "{} is {}".format(container, health)
if health == 'healthy':
ok(message)
elif health == 'unhealthy':
critical(message)
else:
unknown(message)
else:
unknown('{} has no health check data'.format(container))
@multithread_execution()
@require_running('uptime')
def check_uptime(container, thresholds):
inspection = get_container_info(container)['State']['StartedAt']
only_secs = inspection[0:19]
start = datetime.strptime(only_secs, "%Y-%m-%dT%H:%M:%S")
start = start.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
uptime = (now - start).total_seconds()
graph_padding = 2
thresholds.units = 's'
evaluate_numeric_thresholds(container=container, value=uptime, thresholds=thresholds, name='uptime',
short_name='up', min=0, max=graph_padding, greater_than=False)
@multithread_execution()
def check_image_age(container, thresholds):
container_image = get_container_info(container)['Image']
image_created = get_image_info(container_image)['Created']
only_secs = image_created[0:19]
start = datetime.strptime(only_secs, "%Y-%m-%dT%H:%M:%S")
start = start.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
image_age = (now - start).days
graph_padding = 2
thresholds.units = 'd'
evaluate_numeric_thresholds(container=container, value=image_age, thresholds=thresholds, name='image_age',
short_name='age', min=0, max=graph_padding, greater_than=True)
@multithread_execution()
@require_running('restarts')
def check_restarts(container, thresholds):
inspection = get_container_info(container)
restarts = int(inspection['RestartCount'])
graph_padding = 2
evaluate_numeric_thresholds(container=container, value=restarts, thresholds=thresholds, name='restarts',
short_name='re', min=0, max=graph_padding)
@singlethread_execution()
def check_version(container, insecure_registries):
image_id = get_container_image_id(container)
logger.debug("Local container image ID: {}".format(image_id))
if image_id is None:
unknown('Checksum missing for "{}", try doing a pull'.format(container))
return
image_urls = get_container_image_urls(container=container)
if len(image_urls) > 1:
unknown('"{}" has multiple tags/names. Unsure which one to use to check the version.'.format(container))
return
elif len(image_urls) == 0:
unknown('"{}" has last no repository tag. Is this anywhere else?'.format(container))
return
url, registry = normalize_image_name_to_manifest_url(image_urls[0], insecure_registries)
logger.debug("Looking up image digest here {}".format(url))
try:
registry_hash = get_digest_from_registry(url)
except URLError as e:
if hasattr(e.reason, 'reason') and e.reason.reason == 'UNKNOWN_PROTOCOL':
unknown(
"TLS error connecting to registry {} for {}, should you use the '--insecure-registry' flag?" \
.format(registry, container))
return
elif hasattr(e.reason, 'strerror') and e.reason.strerror == 'nodename nor servname provided, or not known':
unknown(
"Cannot reach registry for {} at {}".format(container, url))
return
else:
raise e
except RegistryError as e:
unknown("Cannot check version, couldn't retrieve digest for {} while checking {}.".format(container, url))
return
logger.debug("Image digests, local={} remote={}".format(image_id, registry_hash))
if registry_hash == image_id:
ok("{}'s version matches registry".format(container))
return
critical("{}'s version does not match registry".format(container))
def calculate_cpu_capacity_precentage(info, stats):
host_config = info['HostConfig']
if 'online_cpus' in stats['cpu_stats']:
num_cpus = stats['cpu_stats']['online_cpus']
else:
num_cpus = len(stats['cpu_stats']['cpu_usage']['percpu_usage'])
# Identify limit system being used
# --cpus
if 'NanoCpus' in host_config and host_config['NanoCpus'] != 0:
period = 1000000000
quota = host_config['NanoCpus']
# --cpu-quota
elif 'CpuQuota' in host_config and host_config['CpuQuota'] != 0:
period = 100000 if host_config['CpuPeriod'] == 0 else host_config['CpuPeriod']
quota = host_config['CpuQuota']
# unlimited
else:
period = 1
quota = num_cpus
if period * num_cpus < quota:
# This handles the case where the quota is actually bigger than amount available by all the cpus.
available_limit_ratio = 1
else:
available_limit_ratio = (period * num_cpus) / quota
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - stats['precpu_stats']['system_cpu_usage']
usage = (cpu_delta / system_delta) * available_limit_ratio
usage = round(usage * 100, 0)
return usage
@multithread_execution()
@require_running('cpu')
def check_cpu(container, thresholds):
info = get_container_info(container)
stats = get_stats(container=container)
usage = calculate_cpu_capacity_precentage(info=info, stats=stats)
max = 100
thresholds.units = '%'
evaluate_numeric_thresholds(container=container, value=usage, thresholds=thresholds, name='cpu', short_name='cpu',
min=0, max=max)
def process_args(args):
parser = argparse.ArgumentParser(description='Check docker containers.')
# Connect to local socket or ip address
connection_group = parser.add_mutually_exclusive_group()
connection_group.add_argument('--connection',
dest='connection',
action='store',
default=DEFAULT_SOCKET,
type=str,
metavar='[/<path to>/docker.socket|<ip/host address>:<port>]',
help='Where to find docker daemon socket. (default: %(default)s)')
connection_group.add_argument('--secure-connection',
dest='secure_connection',
action='store',
type=str,
metavar='[<ip/host address>:<port>]',
help='Where to find TLS protected docker daemon socket.')
base_group = parser.add_mutually_exclusive_group()
base_group.add_argument('--binary_units',
dest='units_base',
action='store_const',
const=1024,
help='Use a base of 1024 when doing calculations of KB, MB, GB, & TB (This is default)')
base_group.add_argument('--decimal_units',
dest='units_base',
action='store_const',
const=1000,
help='Use a base of 1000 when doing calculations of KB, MB, GB, & TB')
parser.set_defaults(units_base=1024)
# Connection timeout
parser.add_argument('--timeout',
dest='timeout',
action='store',
type=float,
default=DEFAULT_TIMEOUT,
help='Connection timeout in seconds. (default: %(default)s)')
# Container name
parser.add_argument('--containers',
dest='containers',
action='store',
nargs='+',
type=str,
default=['all'],
help='One or more RegEx that match the names of the container(s) to check. If omitted all containers are checked. (default: %(default)s)')
# Container name
parser.add_argument('--present',
dest='present',
default=False,
action='store_true',
help='Modifies --containers so that each RegEx must match at least one container.')
# Threads
parser.add_argument('--threads',
dest='threads',
default=DEFAULT_PARALLELISM,
action='store',
type=int,
help='This + 1 is the maximum number of concurent threads/network connections. (default: %(default)s)')
# CPU
parser.add_argument('--cpu',
dest='cpu',
action='store',
type=str,
metavar='WARN:CRIT',
help='Check cpu usage percentage taking into account any limits.')
# Memory
parser.add_argument('--memory',
dest='memory',
action='store',
type=str,
metavar='WARN:CRIT:UNITS',
help='Check memory usage taking into account any limits. Valid values for units are %%,B,KB,MB,GB.')
# State
parser.add_argument('--status',
dest='status',
action='store',
type=str,
help='Desired container status (running, exited, etc).')
# Health
parser.add_argument('--health',
dest='health',
default=None,
action='store_true',
help="Check container's health check status")
# Age
parser.add_argument('--uptime',
dest='uptime',
action='store',
type=str,
metavar='WARN:CRIT',
help='Minimum container uptime in seconds. Use when infrequent crashes are tolerated.')
# Image Age
parser.add_argument('--image-age',
dest='image_age',
action='store',
type=str,
metavar='WARN:CRIT',
help='Maximum image age in days.')
# Version
parser.add_argument('--version',
dest='version',
default=None,
action='store_true',
help='Check if the running images are the same version as those in the registry. Useful for finding stale images. Does not support login.')
# Version
parser.add_argument('--insecure-registries',
dest='insecure_registries',
action='store',
nargs='+',
type=str,
default=[],
help='List of registries to connect to with http(no TLS). Useful when using "--version" with images from insecure registries.')
# Restart
parser.add_argument('--restarts',
dest='restarts',
action='store',
type=str,
metavar='WARN:CRIT',
help='Container restart thresholds.')
# no-ok
parser.add_argument('--no-ok',
dest='no_ok',
action='store_true',
help='Make output terse suppressing OK messages. If all checks are OK return a single OK.')
# no-performance
parser.add_argument('--no-performance',
dest='no_performance',
action='store_true',
help='Suppress performance data. Reduces output when performance data is not being used.')
parser.add_argument('-V', action='version', version='%(prog)s {}'.format(__version__))
if len(args) == 0:
parser.print_help()
parsed_args = parser.parse_args(args=args)
global timeout
timeout = parsed_args.timeout
global daemon
global connection_type
if parsed_args.secure_connection:
daemon = 'https://' + parsed_args.secure_connection
connection_type = 'https'
elif parsed_args.connection:
if parsed_args.connection[0] == '/':
daemon = 'socket://' + parsed_args.connection + ':'
connection_type = 'socket'
else:
daemon = 'http://' + parsed_args.connection
connection_type = 'http'
return parsed_args
def no_checks_present(parsed_args):
# Look for all functions whose name starts with 'check_'
checks = [key[6:] for key in globals().keys() if key.startswith('check_')]
# Act like --present is a check though it is not implemented like one
return all(getattr(parsed_args, check) is None for check in checks) and not parsed_args.present
def socketfile_permissions_failure(parsed_args):
if connection_type == 'socket':
return not (os.path.exists(parsed_args.connection)
and stat.S_ISSOCK(os.stat(parsed_args.connection).st_mode)
and os.access(parsed_args.connection, os.R_OK)
and os.access(parsed_args.connection, os.W_OK))
else:
return False
def print_results():
if no_ok:
# Remove all the "OK"s
filtered_messages = [message for message in messages if not message.startswith('OK: ')]
if len(filtered_messages) == 0:
messages_concat = 'OK'
else:
messages_concat = '; '.join(filtered_messages)
else:
messages_concat = '; '.join(messages)
if no_performance or len(performance_data) == 0:
print(messages_concat)
else:
perfdata_concat = ' '.join(performance_data)
print(messages_concat + '|' + perfdata_concat)
def perform_checks(raw_args):
args = process_args(raw_args)
global parallel_executor
parallel_executor = futures.ThreadPoolExecutor(max_workers=args.threads)
global serial_executor
serial_executor = futures.ThreadPoolExecutor(max_workers=1)
global unit_adjustments
unit_adjustments = {key: args.units_base ** value for key, value in UNIT_ADJUSTMENTS_TEMPLATE.items()}
global no_ok
no_ok = args.no_ok
global no_performance
no_performance = args.no_ok
if socketfile_permissions_failure(args):
unknown("Cannot access docker socket file. User ID={}, socket file={}".format(os.getuid(), args.connection))
return
if args.containers == ["all"] and args.present:
unknown("You can not use --present without --containers")
return
if no_checks_present(args):
unknown("No checks specified.")
return
# Here is where all the work happens
#############################################################################################
try:
containers = get_containers(args.containers, args.present)
except ConnectionRefusedError:
critical('Failed to connect to daemon: connection refused.')
if len(containers) == 0 and not args.present:
unknown("No containers names found matching criteria")
return
for container in containers:
# Check status
if args.status:
check_status(container, args.status)
# Check version
if args.version:
check_version(container, args.insecure_registries)
# below are checks that require a 'running' status
# Check status
if args.health:
check_health(container)
# Check cpu usage
if args.cpu:
check_cpu(container, parse_thresholds(args.cpu, units_required=False))
# Check memory usage
if args.memory:
check_memory(container, parse_thresholds(args.memory, units_required=False))
# Check uptime
if args.uptime:
check_uptime(container, parse_thresholds(args.uptime, include_units=False))
# Check image age
if args.image_age:
check_image_age(container, parse_thresholds(args.image_age, include_units=False))
# Check restart count
if args.restarts:
check_restarts(container, parse_thresholds(args.restarts, include_units=False))
def main():
try:
perform_checks(argv[1:])
# get results to let exceptions in threads bubble out
[x.result() for x in futures.as_completed(threads)]
except Exception as e:
traceback.print_exc()
unknown("Exception raised during check': {}".format(repr(e)))
print_results()
exit(rc)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,362 @@
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import re
import socket
import stat
import traceback
from functools import lru_cache
from http.client import HTTPConnection
from sys import argv
from urllib.request import AbstractHTTPHandler, HTTPHandler, HTTPSHandler, OpenerDirector
logger = logging.getLogger()
__author__ = 'Tim Laurence'
__copyright__ = "Copyright 2019"
__credits__ = ['Tim Laurence']
__license__ = "GPL"
__version__ = "2.2.2"
'''
nrpe compatible check for docker swarm
Requires Python 3
Note: I really would have preferred to have used requests for all the network connections but that would have added a
dependency.
'''
DEFAULT_SOCKET = '/var/run/docker.sock'
DEFAULT_TIMEOUT = 10.0
DEFAULT_PORT = 2375
DEFAULT_HEADERS = [('Accept', 'application/vnd.docker.distribution.manifest.v2+json')]
OK_RC = 0
WARNING_RC = 1
CRITICAL_RC = 2
UNKNOWN_RC = 3
HTTP_GOOD_CODES = range(200, 299)
# These hold the final results
rc = -1
messages = []
# Hacked up urllib to handle sockets
#############################################################################################
# Docker runs a http connection over a socket. http.client is knows how to deal with these
# but lacks some niceties. Urllib wraps that and makes up for some of the deficiencies but
# cannot fix the fact http.client can't read from socket files. In order to take advantage of
# urllib and http.client's capabilities the class below tweaks HttpConnection and passes it
# to urllib registering for socket:// connections
# This is all side effect so excluding coverage
class SocketFileHandler(AbstractHTTPHandler): # pragma: no cover
class SocketFileToHttpConnectionAdaptor(HTTPConnection):
def __init__(self, socket_file, timeout=DEFAULT_TIMEOUT):
super().__init__(host='', port=0, timeout=timeout)
self.socket_file = socket_file
def connect(self):
self.sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0, fileno=None)
self.sock.settimeout(self.timeout)
self.sock.connect(self.socket_file)
def socket_open(self, req):
socket_file, path = req.selector.split(':', 1)
req.host = socket_file
req.selector = path
return self.do_open(self.SocketFileToHttpConnectionAdaptor, req)
better_urllib_get = OpenerDirector()
better_urllib_get.addheaders = DEFAULT_HEADERS.copy()
better_urllib_get.add_handler(HTTPHandler())
better_urllib_get.add_handler(HTTPSHandler())
better_urllib_get.add_handler(SocketFileHandler())
# Util functions
#############################################################################################
@lru_cache()
def get_url(url):
response = better_urllib_get.open(url, timeout=timeout)
return process_urllib_response(response), response.status
def process_urllib_response(response):
response_bytes = response.read()
body = response_bytes.decode('utf-8')
logger.debug(body)
return json.loads(body)
def get_swarm_status():
content, status = get_url(daemon + '/swarm')
return status
def get_service_info(name):
return get_url(daemon + '/services/{service}'.format(service=name))
def get_service_tasks(name):
tasks, status = get_url(daemon + '/tasks?filters={{"name":{{"{service}":true}}}}'.format(service=name))
return tasks
def get_nodes():
return get_url(daemon + '/nodes')
def get_services(names):
services_list, status = get_url(daemon + '/services')
if status == 406:
critical("Error checking service status, node is not in swarm mode")
return []
elif status not in HTTP_GOOD_CODES:
unknown("Could not retrieve service info")
return []
all_services_names = set(x['Spec']['Name'] for x in services_list)
if 'all' in names:
return all_services_names
filtered = set()
not_found = []
for matcher in names:
found = False
for candidate in all_services_names:
if re.match("^{}$".format(matcher), candidate):
filtered.add(candidate)
found = True
# If we don't find a service that matches out regex
if not found:
not_found.append(matcher)
if len(not_found) > 0:
critical("No services match {}".format(','.join(not_found)))
return filtered
def set_rc(new_rc):
global rc
rc = new_rc if new_rc > rc else rc
def ok(message):
set_rc(OK_RC)
messages.append('OK: ' + message)
def warning(message):
set_rc(WARNING_RC)
messages.append('WARNING: ' + message)
def critical(message):
set_rc(CRITICAL_RC)
messages.append('CRITICAL: ' + message)
def unknown(message):
set_rc(UNKNOWN_RC)
messages.append('UNKNOWN: ' + message)
# Checks
#############################################################################################
def check_swarm():
status = get_swarm_status()
process_url_status(status, ok_msg='Node is in a swarm',
critical_msg='Node is not in a swarm', unknown_msg='Error accessing swarm info')
def process_global_service(name, ignore_paused=False):
bad_node_states = {'drain'}
if ignore_paused:
bad_node_states.add('paused')
# Get all the nodes we care about based on their state
node_list, status = get_nodes()
node_index = set()
for node in node_list:
if node['Spec']['Availability'] in bad_node_states:
continue
node_index.add(node['ID'])
# If a task is on a targeted node confirm it is running
# Services that are not running are considered bad. This is to prevent services in crash loops from being ignored
# Also note, this ignores conditions where services state they are running on a node not in the index.
service_tasks = get_service_tasks(name)
for task in service_tasks:
if task['Status']['State'] != 'running':
critical('Global service {service} has one or more tasks not running'.format(service=name))
return
node_index.discard(task['NodeID'])
if len(node_index) > 0:
critical('Global service {service} has {count} tasks not running'.format(service=name, count=len(node_list)))
ok('Global service {service} OK'.format(service=name))
def process_replicated_service(name, replicas_desired):
# Services that are not running are considered bad. This is to prevent services in crash loops from being ignored
all_tasks = get_service_tasks(name)
running_tasks = [task for task in all_tasks if task['Status']['State'] == 'running']
num_tasks = len(running_tasks)
if num_tasks != replicas_desired:
critical('Replicated service {service} has {num_tasks} tasks, {replicas_desired} desired'.
format(service=name, num_tasks=num_tasks, replicas_desired=replicas_desired))
else:
ok('Replicated service {service} OK'.format(service=name))
def check_service(name, ignore_paused=False):
# get service mode
service_info, status = get_service_info(name)
mode_info = service_info['Spec']['Mode']
# if global ensure one per node
if 'Global' in mode_info:
process_global_service(name=name, ignore_paused=ignore_paused)
# if replicated ensure sufficient number of replicas
elif 'Replicated' in mode_info:
process_replicated_service(name=name, replicas_desired=mode_info['Replicated']['Replicas'])
def process_url_status(status, ok_msg=None, critical_msg=None, unknown_msg=None):
if status in HTTP_GOOD_CODES:
return ok(ok_msg)
elif status in [503, 404, 406]:
return critical(critical_msg)
else:
return unknown(unknown_msg)
def process_args(args):
parser = argparse.ArgumentParser(description='Check docker swarm.')
# Connect to local socket or ip address
connection_group = parser.add_mutually_exclusive_group()
connection_group.add_argument('--connection',
dest='connection',
action='store',
default=DEFAULT_SOCKET,
type=str,
metavar='[/<path to>/docker.socket|<ip/host address>:<port>]',
help='Where to find docker daemon socket. (default: %(default)s)')
connection_group.add_argument('--secure-connection',
dest='secure_connection',
action='store',
type=str,
metavar='[<ip/host address>:<port>]',
help='Where to find TLS protected docker daemon socket.')
# Connection timeout
parser.add_argument('--timeout',
dest='timeout',
action='store',
type=float,
default=DEFAULT_TIMEOUT,
help='Connection timeout in seconds. (default: %(default)s)')
swarm_group = parser.add_mutually_exclusive_group(required=True)
# Swarm
swarm_group.add_argument('--swarm',
dest='swarm',
default=None,
action='store_true',
help='Check swarm status')
# Service
swarm_group.add_argument('--service',
dest='service',
action='store',
type=str,
nargs='+',
default=[],
help='One or more RegEx that match the names of the services(s) to check.')
swarm_group.add_argument('--ignore_paused',
dest='ignore_paused',
action='store_true',
help="Don't require global services to be running on paused nodes")
parser.add_argument('-V', action='version', version='%(prog)s {}'.format(__version__))
if len(args) == 0:
parser.print_help()
parsed_args = parser.parse_args(args=args)
global timeout
timeout = parsed_args.timeout
global daemon
global connection_type
if parsed_args.secure_connection:
daemon = 'https://' + parsed_args.secure_connection
connection_type = 'https'
elif parsed_args.connection:
if parsed_args.connection[0] == '/':
daemon = 'socket://' + parsed_args.connection + ':'
connection_type = 'socket'
else:
daemon = 'http://' + parsed_args.connection
connection_type = 'http'
return parsed_args
def socketfile_permissions_failure(parsed_args):
if connection_type == 'socket':
return not (os.path.exists(parsed_args.connection)
and stat.S_ISSOCK(os.stat(parsed_args.connection).st_mode)
and os.access(parsed_args.connection, os.R_OK)
and os.access(parsed_args.connection, os.W_OK))
else:
return False
def print_results():
print('; '.join(messages))
def perform_checks(raw_args):
args = process_args(raw_args)
if socketfile_permissions_failure(args):
unknown("Cannot access docker socket file. User ID={}, socket file={}".format(os.getuid(), args.connection))
else:
# Here is where all the work happens
#############################################################################################
try:
if args.swarm:
check_swarm()
elif args.service:
services = get_services(args.service)
# Status is set to critical by get_services() if nothing is found for a name
for service in services:
check_service(name=service, ignore_paused=args.ignore_paused)
except Exception as e:
traceback.print_exc()
unknown("Exception raised during check: {}".format(repr(e)))
print_results()
def main():
perform_checks(argv[1:])
exit(rc)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,33 @@
[build-system]
requires = ["poetry"]
build-backend = "poetry.masonry.api"
[tool.poetry]
name = "check_docker"
version = "2.2.2"
description = "Nagios/NRPE compatible plugins for checking Docker based services"
license = "GPL-3.0"
authors = ["Tim Laurence <timdaman@gmail.com>"]
readme = "README.rst"
homepage = "https://github.com/timdaman/check_docker"
repository = "https://github.com/timdaman/check_docker"
classifiers=[
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Intended Audience :: System Administrators",
"Environment :: Other Environment",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Topic :: System :: Networking",
]
packages = [
{ include = "check_docker" },
]
[tool.poetry.scripts]
check_docker = "check_docker.check_docker:main"
check_swarm = "check_docker.check_swarm:main"

View File

@ -0,0 +1,42 @@
1. Confirm documentation is updated
- README
- DEV doc
1. Unit tests pass
1. Isolated tests pass
./run_isolated_tests.sh
1. make package
pipenv run poetry build
1. Uninstall check_docker and install package
pipenv uninstall check_docker && pipenv run pip install dist/check_docker-X.X.X-py2.py3-none-any.whl
1. Bats smoke tests pass
./run_package_tests.sh
1. Push to branch
1. Confirm doc looks good on github
1. Travis tests pass
1. Create and merge PR
1. Confirm Travis still passes
1. CodeClimate does not show scary issues (need to modify analyzed branch)
1. Upload package to test repo
poetry publish -r pypi -u timdaman -p xxxx
1. Check test project page for formatting
https://test.pypi.org/project/check_docker/
1. Upload package to prod repo
poetry publish -r prodpypi -u timdaman -p xxxx
1. Check project page for formatting
https://pypi.org/project/check_docker/

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -eu
(cd testing_tools && docker build -t check_docker_tests .)
docker run --rm -v $PWD:$PWD -w $PWD -ti check_docker_tests tox

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -eux
cd testing_tools/vagrant
vagrant up
vagrant ssh -c "bats -p /check_docker/testing_tools/vagrant"
vagrant suspend

View File

@ -0,0 +1,33 @@
FROM ubuntu:20.04 AS build
ENV DEBIAN_FRONTEND=noninteractive
ENV PYENV_ROOT="/pyenv"
ENV PATH="$PYENV_ROOT/bin:$PATH"
WORKDIR /
RUN apt update
RUN apt install --no-install-recommends --fix-missing -y build-essential make locales libssl1.1 libssl-dev \
libffi-dev libbz2-dev libreadline-dev libsqlite3-dev libjpeg-dev zlib1g-dev libxml2-dev libxslt1-dev \
curl ca-certificates
RUN curl -kL https://github.com/pyenv/pyenv/archive/master.tar.gz | tar -xz \
&& mv pyenv-master /pyenv
RUN echo 3.5.6 3.6.7 3.7.1 | xargs -n 1 -P $(nproc) pyenv install
RUN /pyenv/versions/3.7.1/bin/pip3.7 install setuptools wheel flit tox
FROM ubuntu:20.04
ENV DEBIAN_FRONTEND=noninteractive
SHELL ["/bin/bash", "-lc"]
ENTRYPOINT ["/bin/bash", "-lc"]
RUN apt update \
&& apt install --no-install-recommends --fix-missing -y git libssl1.1 ca-certificates netbase \
&& apt-get autoremove -y \
&& apt-get clean all \
&& rm -rf /var/lib/apt/lists/*
COPY --from=build /pyenv /pyenv
ENV PYENV_ROOT="/pyenv"
RUN echo 'PATH="/pyenv/bin:$PATH"' >> /etc/profile.d/02-pyenv.sh
RUN echo 'eval "$(pyenv init -)"' >> /etc/profile.d/02-pyenv.sh
RUN echo 'pyenv global 3.5.6 3.6.7 3.7.1' >> /etc/profile.d/02-pyenv.sh
# These are needed for some tests
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
ENV isolated=true

View File

@ -0,0 +1,83 @@
# -*- mode: ruby -*-
# vi: set ft=ruby :
# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.
Vagrant.configure("2") do |config|
# The most common configuration options are documented and commented below.
# For a complete reference, please see the online documentation at
# https://docs.vagrantup.com.
# Every Vagrant development environment requires a box. You can search for
# boxes at https://atlas.hashicorp.com/search.
config.vm.box = "geerlingguy/ubuntu1604"
config.vm.box_version = "1.2.5"
# Disable automatic box update checking. If you disable this, then
# boxes will only be checked for updates when the user runs
# `vagrant box outdated`. This is not recommended.
# config.vm.box_check_update = false
# Create a forwarded port mapping which allows access to a specific port
# within the machine from a port on the host machine. In the example below,
# accessing "localhost:8080" will access port 80 on the guest machine.
# config.vm.network "forwarded_port", guest: 80, host: 8080
# Create a private network, which allows host-only access to the machine
# using a specific IP.
# config.vm.network "private_network", ip: "192.168.33.10"
# Create a public network, which generally matched to bridged network.
# Bridged networks make the machine appear as another physical device on
# your network.
# config.vm.network "public_network"
# Share an additional folder to the guest VM. The first argument is
# the path on the host to the actual folder. The second argument is
# the path on the guest to mount the folder. And the optional third
# argument is a set of non-required options.
# config.vm.synced_folder "../data", "/vagrant_data"
# Provider-specific configuration so you can fine-tune various
# backing providers for Vagrant. These expose provider-specific options.
# Example for VirtualBox:
#
# config.vm.provider "virtualbox" do |vb|
# # Display the VirtualBox GUI when booting the machine
# vb.gui = true
#
# # Customize the amount of memory on the VM:
# vb.memory = "1024"
# end
#
# View the documentation for the provider you are using for more
# information on available options.
# Define a Vagrant Push strategy for pushing to Atlas. Other push strategies
# such as FTP and Heroku are also available. See the documentation at
# https://docs.vagrantup.com/v2/push/atlas.html for more information.
# config.push.define "atlas" do |push|
# push.app = "YOUR_ATLAS_USERNAME/YOUR_APPLICATION_NAME"
# end
# Enable provisioning with a shell script. Additional provisioners such as
# Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the
# documentation for more information about their specific syntax and use.
config.vm.provision "shell", inline: <<-SHELL
add-apt-repository -y ppa:deadsnakes/ppa
apt update
apt install -y python3.8 3.8-distutils curl
curl -s https://bootstrap.pypa.io/get-pip.py | python3.8
curl -fsSL https://get.docker.com | sh
usermod -a -G docker vagrant
curl -s -L https://github.com/bats-core/bats-core/archive/master.tar.gz | tar -xzf -
bash bats-core-master/install.sh /usr/local
rm -rf ./bats-core-master
docker swarm init
SHELL
# No FS share to allow any depds to the host
config.vm.synced_folder "../../", "/check_docker", disabled: false, mount_options: ["ro"]
end

View File

@ -0,0 +1,29 @@
good_container() {
docker run -d --name good_sleep busybox sleep 1d
}
bad_container() {
docker run -d --name bad_sleep busybox false
}
current_container() {
docker pull busybox:latest
docker run -d --name current_container busybox:latest sleep 1d
}
old_container() {
docker pull busybox:1.28.1
docker tag busybox:1.28.1 busybox:latest
docker rmi busybox:1.28.1
docker run -d --name old_container busybox:latest sleep 1d
}
crashing_container() {
docker run -d --name crashes --restart always busybox false
}
get_check_docker_version() {
pip3 show check_docker 2>/dev/null | sed -n '/^Version: /s/^Version: //p'
}

View File

@ -0,0 +1,279 @@
if ! id vagrant
then
echo "This is only intended to be run inside a vagrant box!" >&2
echo "Running it outside may result in data loss" >&2
fi
NEWEST_SDIST="$(ls -t /check_docker/dist/check_docker-*.tar.gz | head -1)"
NEWEST_WHEEL="$(ls -t /check_docker/dist/check_docker-*.whl | head -1)"
teardown()
{
docker ps -aq
COUNT=$(docker ps -aq | wc -l)
if [ $COUNT -ne 0 ]
then
docker stop -t 0 $(docker ps -aq)
docker rm -f $(docker ps -aq)
fi
STACKS=$(docker stack ls)
if grep -q TEST_STACK <<<"$STACKS"
then
docker stack rm TEST_STACK
TEST_CONTAINERS_COUNT=$(docker ps | grep TEST_STACK | wc -l)
while [ $TEST_CONTAINERS_COUNT -ne 0 ]
do
sleep 1
TEST_CONTAINERS_COUNT=$(docker ps | grep TEST_STACK | wc -l)
done
TEST_NETWORK_COUNT=$(docker network ls | grep TEST_STACK | wc -l)
while [ $TEST_NETWORK_COUNT -ne 0 ]
do
sleep 1
TEST_NETWORK_COUNT=$(docker network ls | grep TEST_STACK | wc -l)
done
fi
}
load bats_fixtures
@test "Confirm check_docker is not in path" {
# Before we start make sure check_docker is not present
sudo -H pip3.8 uninstall -y check-docker || true
run which check_docker
[ "$status" -eq 1 ]
}
@test "Confirm 'check-docker' is not installed" {
# Before we start make sure check_docker is not present
pip3.8 list 2>&1 | grep -ve check-docker
}
@test "Confirm source package, $NEWEST_SDIST, is installable" {
echo pip3.8 install "$NEWEST_SDIST"
run sudo -H pip3.8 install "$NEWEST_SDIST"
[ "$status" -eq 0 ]
}
@test "Re-Confirm 'check-docker' is not installed" {
# This should never error since the previous step ensures package is already present
sudo -H pip3.8 uninstall -y check-docker
# Before we start make sure check_docker is not present
pip3.8 list 2>&1 | grep -ve check-docker
}
@test "Confirm wheel package, $NEWEST_WHEEL, is installable" {
run sudo -H pip3.8 install "$NEWEST_WHEEL"
[ "$status" -eq 0 ]
}
@test "Confirm check_docker appears in path" {
run which check_docker
[ "$status" -eq 0 ]
}
@test "Confirm package is installed" {
pip3.8 list | grep 'check-docker'
}
# It is normal for this to fail when preparing for a PR.
@test "Confirm package version is not already in PyPi" {
VERSION=$(get_check_docker_version)
REMOTE_HTTP_STATUS=$(curl -LI https://pypi.org/project/check_docker/${VERSION}/ -w "%{http_code}" -o /dev/null -s)
[ "$REMOTE_HTTP_STATUS" == 404 ]
}
@test "Confirm check_docker version matches package" {
PACKAGE_VERSION=$(get_check_docker_version)
CHECK_VERSION=$(python3.8 -c 'from check_docker import check_docker; print(check_docker.__version__)')
[ "$PACKAGE_VERSION" == "$CHECK_VERSION" ]
}
@test "Confirm check_swarm version matches package" {
PACKAGE_VERSION=$(get_check_docker_version)
CHECK_VERSION=$(python3.8 -c 'from check_docker import check_swarm; print(check_swarm.__version__)')
[ "$PACKAGE_VERSION" == "$CHECK_VERSION" ]
}
@test "Good status" {
good_container
sleep 1
run check_docker --container good_sleep --status running
echo "$status"
echo $output
[ "$status" -eq 0 ]
}
@test "Bad status" {
bad_container
run check_docker --container bad_sleep --status running
echo "$status"
echo $output
[ "$status" -eq 2 ]
}
@test "Current version" {
docker pull busybox
current_container
run check_docker --container current_container --version
echo "$status"
echo $output
[ "$status" -eq 0 ]
}
@test "Old version" {
old_container
run check_docker --container old_container --version
echo "$status"
echo $output
[ "$status" -eq 2 ]
}
@test "Doesn't crash" {
good_container
sleep 5
run check_docker --container good_sleep --restarts 1:2
echo "$status"
echo $output
[ "$status" -eq 0 ]
}
@test "Does crash" {
crashing_container
sleep 5
run check_docker --container crashes --restarts 1:2
echo "$status"
echo $output
[ "$status" -eq 2 ]
}
@test "Checks multiple containers" {
good_container
current_container
run check_docker --container good_sleep current_container --status running
echo "$status"
echo $output
[ "$status" -eq 0 ]
}
@test "Checks multiple containers regex" {
good_container
current_container
run check_docker --container '.*' --status running
echo "$status"
echo $output
[ "$status" -eq 0 ]
}
@test "Checks get all containers" {
good_container
current_container
run check_docker --container '.*' --status running
echo "$status"
echo $output
[ "$status" -eq 0 ]
CONTIANERS_IN_CHECK=$(echo $output | tr ';' '\n' | wc -l)
[ "$CONTIANERS_IN_CHECK" -eq 2 ]
}
SITE_PACKAGES_DIR=/$(pip3.8 show check_docker | grep '^Location' | cut -d ' ' -f 2)/check_docker
@test "Can check_docker be run when called directly" {
run python3.8 $SITE_PACKAGES_DIR/check_docker.py --help
[ "$status" -eq 0 ]
}
@test "Can check_swarm be run when called directly" {
run python3.8 $SITE_PACKAGES_DIR/check_swarm.py --help
[ "$status" -eq 0 ]
}
@test "Confirm replicated service failures are noticed" {
cat <<END | docker stack deploy -c - TEST_STACK
version: "3"
services:
test:
image: busybox
command: "false"
deploy:
mode: replicated
replicas: 2
END
sleep 1
run check_swarm --service TEST_STACK
[ "$status" -eq 2 ]
}
@test "Confirm global service failures are noticed" {
cat <<END | docker stack deploy -c - TEST_STACK
version: "3"
services:
test:
image: busybox
command: "false"
deploy:
mode: global
END
sleep 1
run check_swarm --service TEST_STACK
[ "$status" -eq 2 ]
}
@test "Confirm global service succeed" {
cat <<END | docker stack deploy -c - TEST_STACK
version: "3"
services:
test:
image: busybox
command: sleep 100
deploy:
mode: replicated
replicas: 2
END
sleep 5
run check_swarm --service TEST_STACK_test
echo $OUTPUT
[ "$status" -eq 0 ]
}
@test "Confirm replicated service succeed" {
echo BEFORE
docker ps
docker network ls
cat <<END | docker stack deploy -c - TEST_STACK
version: "3"
services:
test:
image: busybox
command: sleep 100
deploy:
mode: replicated
replicas: 2
END
sleep 5
echo AFTER
docker ps -a
docker network ls
docker service ls
run check_swarm --service TEST_STACK_test
echo $output
[ "$status" -eq 0 ]
}

View File

View File

@ -0,0 +1,956 @@
import json
import stat
from collections import defaultdict
from datetime import datetime, timezone, timedelta
try:
from importlib import reload
except ImportError:
from imp import reload
from io import BytesIO
from unittest.mock import patch
from urllib.error import HTTPError, URLError
import pytest
from check_docker import check_docker as cd
__author__ = 'Tim Laurence'
class FakeHttpResponse(BytesIO):
def __init__(self, content=b'', http_code=200, headers=None, method='GET'):
self.status = http_code
self.code = http_code
self.headers = headers if headers else {}
self.method = method
super(FakeHttpResponse, self).__init__(content)
def getheader(self, header, default):
return self.headers.get(header, default)
@pytest.fixture()
def check_docker_fresh():
"""
This is used for tests that have issues with cross test interaction
:return:
"""
reload(cd)
return cd
@pytest.fixture()
def check_docker():
cd.rc = -1
check_docker.no_ok = False
check_docker.no_performance = False
cd.timeout = 1
cd.messages = []
cd.performance_data = []
cd.daemon = 'socket:///notreal'
cd.get_url.cache_clear()
cd.DISABLE_THREADING = True
cd.Oauth2TokenAuthHandler.auth_failure_tracker = defaultdict(int)
def fake_exit(_=None):
pass
cd.exit = fake_exit
return cd
@pytest.fixture
def check_docker_with_units(check_docker):
check_docker.unit_adjustments = {key: 1024 ** value for key, value in
check_docker.UNIT_ADJUSTMENTS_TEMPLATE.items()}
return check_docker
def test_get_url(check_docker):
obj = {'foo': 'bar'}
encoded = json.dumps(obj=obj).encode('utf-8')
expected_response = FakeHttpResponse(content=encoded, http_code=200)
def mock_open(*args, **kwargs):
return expected_response
with patch('check_docker.check_docker.better_urllib_get.open', side_effect=mock_open):
response, _ = check_docker.get_url(url='/test')
assert response == obj
def test_get_url_with_oauth2(check_docker):
headers1 = {
'www-authenticate': 'Bearer realm="https://docker-auth.example.com/auth",service="token-service",scope="repository:something/something_else:pull"'}
mock_response1 = FakeHttpResponse(method='GET', content=b'', http_code=401,
headers=headers1)
mock_response2 = FakeHttpResponse(method='GET', content=b'{"test_key":"test_value"}', http_code=200,
headers={'test': 'test'})
with patch('check_docker.check_docker.HTTPSHandler.https_open', side_effect=[mock_response1, mock_response2]), \
patch('check_docker.check_docker.Oauth2TokenAuthHandler._get_outh2_token',
return_value='test_token') as get_token:
response = check_docker.get_url(url='https://example.com/test')
assert response == ({"test_key": "test_value"}, 200)
assert get_token.call_count == 1
def test_get_url_with_oauth2_loop(check_docker):
headers = {
'www-authenticate': 'Bearer realm="https://docker-auth.example.com/auth",service="token-service",scope="repository:something/something_else:pull"'}
mock_response = FakeHttpResponse(method='GET', http_code=401, headers=headers)
def mock_open(*args, **kwargs):
return mock_response
with patch('check_docker.check_docker.HTTPSHandler.https_open', side_effect=mock_open), \
patch('check_docker.check_docker.Oauth2TokenAuthHandler._get_outh2_token',
return_value='test_token') as get_token:
with pytest.raises(HTTPError):
check_docker.get_url(url='https://example.com/test')
def test_get_url_500(check_docker):
expected_exception = HTTPError(code=500, fp=None, url='url', msg='msg', hdrs=[])
with patch('check_docker.check_docker.HTTPSHandler.https_open', side_effect=expected_exception), \
pytest.raises(HTTPError):
check_docker.get_url(url='https://example.com/test')
@pytest.mark.parametrize("func", [
'get_stats',
'get_state',
'get_image_info'
])
def test_get_url_calls(check_docker, func):
# TODO
with patch('check_docker.check_docker.get_url', return_value=({'State': 'State'}, 200)) as patched:
getattr(check_docker, func)('container')
assert patched.call_count == 1
@pytest.mark.parametrize("value, expected", [
(1, ["1s"]),
(61, ["1min", "1s"]),
(3661, ["1h", "1min", "1s"]),
(86401, ["1d", "1s"])
])
def test_pretty_time(check_docker, value, expected):
assert check_docker.pretty_time(value) == expected
@pytest.mark.parametrize("value, rc, messages, perf_data", [
(1, cd.OK_RC, ['OK: container metric is 1B'], ['container_met=1B;2;3;0;10']),
(2, cd.WARNING_RC, ['WARNING: container metric is 2B'], ['container_met=2B;2;3;0;10']),
(3, cd.CRITICAL_RC, ['CRITICAL: container metric is 3B'], ['container_met=3B;2;3;0;10'])
])
def test_evaluate_numeric_thresholds(check_docker, value, rc, messages, perf_data):
thresholds = cd.ThresholdSpec(warn=2, crit=3, units='B')
check_docker.evaluate_numeric_thresholds(container='container', value=value, name='metric', short_name='met',
min=0, max=10, thresholds=thresholds
)
assert check_docker.rc == rc
assert check_docker.messages == messages
assert check_docker.performance_data == perf_data
@pytest.mark.parametrize('func,arg,rc,messages',
(
('ok', "OK test", cd.OK_RC, ['OK: OK test']),
('warning', "WARN test", cd.WARNING_RC, ['WARNING: WARN test']),
('critical', "CRIT test", cd.CRITICAL_RC, ['CRITICAL: CRIT test']),
('unknown', "UNKNOWN test", cd.UNKNOWN_RC, ['UNKNOWN: UNKNOWN test']),
))
def test_status_update(check_docker, func, arg, rc, messages):
getattr(check_docker, func)(arg)
assert check_docker.rc == rc
assert check_docker.messages == messages
@pytest.mark.parametrize('input, units_required, expected', (
('1:2:3', True, cd.ThresholdSpec(warn=1, crit=2, units='3')),
('1:2', False, cd.ThresholdSpec(warn=1, crit=2, units='')),
('1:2:3', False, cd.ThresholdSpec(warn=1, crit=2, units='3')),
))
def test_parse_thresholds(check_docker, input, units_required, expected):
result = check_docker.parse_thresholds(input, units_required=units_required)
assert expected == result
@pytest.mark.parametrize('spec, kwargs, exception', (
('1:2', {}, ValueError),
('1:2:b', {'include_units': False}, ValueError),
('1:2', {'include_units': True}, ValueError),
("1", {}, IndexError),
(":1", {}, ValueError),
(":1:c", {}, ValueError),
("1:", {}, ValueError),
("1::c", {}, ValueError),
('1:2:', {'units_required': True}, ValueError),
("a:1:c", {}, ValueError),
("1:b:c", {}, ValueError),
)
)
def test_parse_thresholds_exceptions(check_docker, spec, kwargs, exception):
with pytest.raises(exception):
check_docker.parse_thresholds(spec, **kwargs)
def test_set_rc(check_docker):
# Can I do a basic set
check_docker.set_rc(check_docker.OK_RC)
assert check_docker.rc == check_docker.OK_RC
# Does it prevent downgrades of rc
check_docker.set_rc(check_docker.WARNING_RC)
assert check_docker.rc == check_docker.WARNING_RC
check_docker.set_rc(check_docker.OK_RC)
assert check_docker.rc == check_docker.WARNING_RC
@pytest.mark.parametrize('response, expected_status', (
({'State': {'Running': True, "Restarting": False, "Paused": False, "Dead": False}}, cd.OK_RC),
({'State': {'Running': True, "Restarting": True, "Paused": False, "Dead": False}}, cd.CRITICAL_RC),
({'State': {'Status': 'stopped'}}, cd.CRITICAL_RC),
({'State': {'Running': False, "Restarting": False, "Paused": False, "Dead": False}}, cd.CRITICAL_RC),
))
def test_check_status(check_docker, response, expected_status):
def mock_response(*args, **kwargs):
encoded = json.dumps(obj=response).encode('utf-8')
return FakeHttpResponse(encoded, 200)
with patch('check_docker.check_docker.better_urllib_get.open', side_effect=mock_response):
check_docker.check_status(container='container', desired_state='running')
assert check_docker.rc == expected_status
@pytest.mark.parametrize('response, expected_status', (
({'State': {'Health': {'Status': 'healthy'}, 'Running': True, "Restarting": False, "Paused": False,
"Dead": False}}, cd.OK_RC),
({'State': {'Health': {'Status': 'unhealthy'}, 'Running': True, "Restarting": False, "Paused": False,
"Dead": False}}, cd.CRITICAL_RC),
({'State': {'Running': True, "Restarting": False, "Paused": False, "Dead": False}}, cd.UNKNOWN_RC),
(
{'State': {'Health': {}, 'Running': True, "Restarting": False, "Paused": False, "Dead": False}},
cd.UNKNOWN_RC),
({'State': {'Health': {'Status': 'starting'}, 'Running': True, "Restarting": False, "Paused": False,
"Dead": False}}, cd.UNKNOWN_RC)
))
def test_check_health(check_docker, response, expected_status):
def mock_response(*args, **kwargs):
encoded = json.dumps(obj=response).encode('utf-8')
return FakeHttpResponse(encoded, 200)
with patch('check_docker.check_docker.better_urllib_get.open', side_effect=mock_response):
check_docker.check_health(container='container')
assert check_docker.rc == expected_status
@pytest.mark.parametrize('memory_stats, warn, crit, units, expected_status', (
({'limit': 10, 'usage': 1, 'stats': {'total_cache': 1}}, 1, 2, 'B', cd.OK_RC),
({'limit': 10, 'usage': 2, 'stats': {'total_cache': 1}}, 1, 2, 'B', cd.WARNING_RC),
({'limit': 10, 'usage': 3, 'stats': {'total_cache': 1}}, 1, 2, 'B', cd.CRITICAL_RC),
({'limit': 10, 'usage': 1, 'stats': {'total_cache': 1}}, 20, 30, '%', cd.OK_RC),
({'limit': 10, 'usage': 3, 'stats': {'total_cache': 1}}, 20, 30, '%', cd.WARNING_RC),
({'limit': 10, 'usage': 4, 'stats': {'total_cache': 1}}, 20, 30, '%', cd.CRITICAL_RC),
({'limit': 10, 'usage': 4, 'stats': {'total_cache': 1}}, 20, 30, 'BAD_UNITS', cd.UNKNOWN_RC),
))
def test_check_memory(check_docker_with_units, memory_stats, warn, crit, units, expected_status):
response = {
'memory_stats': memory_stats,
'State': {'Running': True, "Restarting": False, "Paused": False, "Dead": False}
}
def mock_response(*args, **kwargs):
encoded = json.dumps(obj=response).encode('utf-8')
return FakeHttpResponse(encoded, 200)
with patch('check_docker.check_docker.better_urllib_get.open', side_effect=mock_response):
thresholds = cd.ThresholdSpec(warn=warn, crit=crit, units=units)
check_docker_with_units.check_memory(container='container', thresholds=thresholds)
assert check_docker_with_units.rc == expected_status
cpu_param_fields = 'host_config, cpu_stats, precpu_stats, warn, crit, expected_status, expected_percent'
cpu_parm_tests = (({"NanoCpus": 1000000000, "CpuPeriod": 0, "CpuQuota": 0},
{'cpu_usage': {'percpu_usage': [15], 'total_usage': 15}, 'online_cpus': 1, 'system_cpu_usage': 100},
{'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0},
10, 20, cd.OK_RC, 5),
({"NanoCpus": 1000000000, "CpuPeriod": 0, "CpuQuota": 0},
{'cpu_usage': {'percpu_usage': [25], 'total_usage': 25}, 'online_cpus': 1, 'system_cpu_usage': 100},
{'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0},
10, 20, cd.WARNING_RC, 15),
({"NanoCpus": 1000000000, "CpuPeriod": 0, "CpuQuota": 0},
{'cpu_usage': {'percpu_usage': [35], 'total_usage': 35}, 'online_cpus': 1, 'system_cpu_usage': 100},
{'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0},
10, 20, cd.CRITICAL_RC, 25),
({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 10000},
{'cpu_usage': {'percpu_usage': [15], 'total_usage': 15}, 'online_cpus': 1, 'system_cpu_usage': 100},
{'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0},
10, 20, cd.CRITICAL_RC, 50),
({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 0},
{'cpu_usage': {'percpu_usage': [35], 'total_usage': 35}, 'online_cpus': 1, 'system_cpu_usage': 100},
{'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0},
10, 20, cd.CRITICAL_RC, 25),
({"NanoCpus": 0, "CpuPeriod": 1, "CpuQuota": 2},
{'cpu_usage': {'percpu_usage': [35], 'total_usage': 35}, 'online_cpus': 1, 'system_cpu_usage': 100},
{'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'system_cpu_usage': 0},
10, 20, cd.CRITICAL_RC, 25),
({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 0},
{'cpu_usage': {'total_usage': 36}, 'online_cpus': 2, 'system_cpu_usage': 200},
{'cpu_usage': {'total_usage': 10}, 'system_cpu_usage': 0},
10, 20, cd.WARNING_RC, 13),
({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 0},
{'cpu_usage': {'percpu_usage': [35, 1], 'total_usage': 36}, 'system_cpu_usage': 200},
{'cpu_usage': {'total_usage': 10}, 'system_cpu_usage': 0},
10, 20, cd.WARNING_RC, 13
)
)
@pytest.mark.parametrize(cpu_param_fields, cpu_parm_tests)
def test_check_cpu(check_docker, host_config, cpu_stats, precpu_stats, warn, crit, expected_status, expected_percent):
container_stats = {
'cpu_stats': cpu_stats,
'precpu_stats': precpu_stats
}
container_info = {
'State': {'Running': True, "Restarting": False, "Paused": False, "Dead": False},
"HostConfig": host_config
}
def mock_stats_response(*args, **kwargs):
return container_stats
def mock_info_response(*args, **kwargs):
return container_info
with patch('check_docker.check_docker.get_stats', side_effect=mock_stats_response), \
patch('check_docker.check_docker.get_container_info', side_effect=mock_info_response):
thresholds = cd.ThresholdSpec(warn=warn, crit=crit, units=None)
check_docker.check_cpu(container='container', thresholds=thresholds)
assert check_docker.rc == expected_status
@pytest.mark.parametrize(cpu_param_fields, cpu_parm_tests)
def test_calculate_cpu(check_docker, host_config, cpu_stats, precpu_stats, warn, crit, expected_status,
expected_percent):
container_stats = {
'cpu_stats': cpu_stats,
'precpu_stats': precpu_stats
}
container_info = {
'State': {'Running': True, "Restarting": False, "Paused": False, "Dead": False},
"HostConfig": host_config
}
pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats)
assert pecentage == expected_percent
def test_require_running(check_docker):
""" This confirms the 'require_running decorator is working properly with a stopped container"""
container_info = {'RestartCount': 0, 'State': {'Running': False, "Restarting": True}}
def mock_info_response(*args, **kwargs):
return container_info
with patch('check_docker.check_docker.get_container_info', side_effect=mock_info_response):
thresholds = cd.ThresholdSpec(warn=1, crit=2, units='')
check_docker.check_restarts(container='container', thresholds=thresholds)
assert check_docker.rc == check_docker.CRITICAL_RC
@pytest.mark.parametrize("restarts, expected_status", (
(0, cd.OK_RC),
(1, cd.WARNING_RC),
(3, cd.CRITICAL_RC),
))
def test_restarts(check_docker, restarts, expected_status):
container_info = {'RestartCount': restarts,
'State': {'Running': True, "Restarting": False, "Paused": False, "Dead": False}}
def mock_info_response(*args, **kwargs):
return container_info
with patch('check_docker.check_docker.get_container_info', side_effect=mock_info_response):
thresholds = cd.ThresholdSpec(warn=1, crit=2, units='')
check_docker.check_restarts(container='container', thresholds=thresholds)
assert check_docker.rc == expected_status
@pytest.mark.parametrize("uptime, warn, crit, expected_status", (
(timedelta(seconds=0), 10, 5, cd.CRITICAL_RC),
(timedelta(seconds=9), 10, 1, cd.WARNING_RC),
(timedelta(seconds=10), 2, 1, cd.OK_RC),
(timedelta(days=1, seconds=0), 2, 1, cd.OK_RC)
))
def test_check_uptime1(check_docker, uptime, warn, crit, expected_status):
time = datetime.now(tz=timezone.utc) - uptime
time_str = time.strftime("%Y-%m-%dT%H:%M:%S.0000000000Z")
json_results = {
'State': {'StartedAt': time_str,
'Running': True,
"Restarting": False,
"Paused": False,
"Dead": False},
}
def mock_response(*args, **kwargs):
encoded = json.dumps(obj=json_results).encode('utf-8')
return FakeHttpResponse(encoded, 200)
with patch('check_docker.check_docker.better_urllib_get.open', side_effect=mock_response):
thresholds = cd.ThresholdSpec(warn=warn, crit=crit, units='')
check_docker.check_uptime(container='container', thresholds=thresholds)
assert check_docker.rc == expected_status
@pytest.mark.parametrize("image_age, warn, crit, expected_status", (
(timedelta(days=20), 10, 20, cd.CRITICAL_RC),
(timedelta(days=15), 10, 20, cd.WARNING_RC),
(timedelta(days=5), 10, 20, cd.OK_RC),
))
def test_check_image_age(check_docker, image_age, warn, crit, expected_status):
time = datetime.now(tz=timezone.utc) - image_age
time_str = time.strftime("%Y-%m-%dT%H:%M:%S.0000000000Z")
container_response = {'Image': 'test'}
image_response = {'Created': time_str}
def mock_response(*args, **kwargs):
encoded = json.dumps(obj=image_response).encode('utf-8')
return FakeHttpResponse(encoded, 200)
with patch('check_docker.check_docker.get_container_info', return_value=container_response), \
patch('check_docker.check_docker.get_image_info', return_value=image_response):
thresholds = cd.ThresholdSpec(warn=warn, crit=crit, units='')
check_docker.check_image_age(container='container', thresholds=thresholds)
assert check_docker.rc == expected_status
sample_containers = [
{'Names': ['/name1']},
{'Names': ['/name2']}]
@pytest.fixture
def sample_containers_json():
return sample_containers
@pytest.fixture
def mock_get_container_info():
def mock(id):
return {'Name': sample_containers[id]}
return mock
def test_args_help(check_docker, capsys):
args = tuple()
check_docker.process_args(args=args)
out, err = capsys.readouterr()
assert 'usage: ' in out
@pytest.mark.parametrize("args, expected_value, default_value", (
(('--timeout', '9999'), 9999, cd.DEFAULT_TIMEOUT),
(('--containers', 'foo', 'bar'), ['foo', 'bar'], ['all']),
(('--present',), True, False),
(('--threads', '23'), 23, cd.DISABLE_THREADING),
(('--cpu', 'non-default'), 'non-default', None),
(('--memory', 'non-default'), 'non-default', None),
(('--status', 'non-default'), 'non-default', None),
(('--health',), True, None),
(('--uptime', 'non-default'), 'non-default', None),
(('--version',), True, None),
(('--insecure-registries', 'non-default'), ['non-default'], None),
(('--restarts', 'non-default'), 'non-default', None),
(('--no-ok',), True, False),
(('--no-performance',), True, False),
))
def test_args(check_docker, args, expected_value, default_value):
attrib_name = args[0][2:].replace('-', '_') # Strip the -- off the first arg
if default_value:
default_result = check_docker.process_args(args=[])
assert getattr(default_result, attrib_name) == default_value
result = check_docker.process_args(args=args)
assert getattr(result, attrib_name) == expected_value
def test_args_containers_blank(check_docker):
args = ('--containers',)
with pytest.raises(SystemExit):
check_docker.process_args(args=args)
def test_args_connection(check_docker):
args = ('--connection', '/foo')
result = check_docker.process_args(args=args)
assert result.connection == '/foo'
assert check_docker.daemon == 'socket:///foo:'
args = ('--connection', 'foo.com/bar')
result = check_docker.process_args(args=args)
assert result.connection == 'foo.com/bar'
assert check_docker.daemon == 'http://foo.com/bar'
def test_args_secure_connection(check_docker):
check_docker.rc = -1
args = ('--secure-connection', 'non-default')
result = check_docker.process_args(args=args)
assert result.secure_connection == 'non-default'
args = ('--secure-connection', 'foo.com/bar')
result = check_docker.process_args(args=args)
assert result.secure_connection == 'foo.com/bar'
assert check_docker.daemon == 'https://foo.com/bar'
@pytest.mark.parametrize('args', (
('--connection', 'non-default', '--secure-connection', 'non-default'),
('--binary_units', '--decimal_units')
))
def test_exclusive_args(check_docker, args):
with pytest.raises(SystemExit):
check_docker.process_args(args)
def test_units_base_uninitialized(check_docker_fresh):
# Assert value is driven by argprase results, i.e. there is no default value
assert check_docker_fresh.unit_adjustments is None, "unit_adjustments has no sensible default without knowing the base"
def test_units_base_initialized(check_docker_fresh):
# Confirm default value is set
parsed_args = check_docker_fresh.process_args([])
assert parsed_args.units_base == 1024, "units_base should default to 1024"
@pytest.mark.parametrize('arg, one_kb', (
('--binary_units', 1024),
('--decimal_units', 1000)
))
def test_units_base(check_docker, fs, arg, one_kb):
# Confirm value is updated by argparse flags
parsed_args = check_docker.process_args([arg])
assert parsed_args.units_base == one_kb, "units_base should be influenced by units flags"
fs.create_file(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
with patch('check_docker.check_docker.get_containers', return_value=['test']), \
patch('check_docker.check_docker.get_stats',
return_value={'memory_stats': {'limit': one_kb, 'usage': one_kb, 'stats': {'total_cache': 0}}}), \
patch('check_docker.check_docker.get_state',
return_value={'Running': True, "Restarting": False, "Paused": False, "Dead": False}):
check_docker.perform_checks(['--memory', '0:0:KB', arg])
# Confirm unit adjustment table was updated by argument
assert check_docker.unit_adjustments['KB'] == one_kb
# Confirm output shows unit conversion specified by arg
assert check_docker.performance_data == ['test_mem=1.0KB;0;0;0;1.0']
def test_missing_check(check_docker):
check_docker.rc = -1
args = tuple()
result = check_docker.process_args(args=args)
assert check_docker.no_checks_present(result)
def test_present_check(check_docker):
check_docker.rc = -1
args = ('--status', 'running')
result = check_docker.process_args(args=args)
assert not check_docker.no_checks_present(result)
def test_disallow_present_without_containers(check_docker):
args = ('--cpu', '0:0', '--present')
with patch('check_docker.check_docker.get_containers') as patched_get_containers:
with patch('check_docker.check_docker.unknown') as patched_unknown:
check_docker.perform_checks(args)
assert patched_unknown.call_count == 1
assert patched_get_containers.call_count == 0
def test_get_containers_1(check_docker, sample_containers_json, mock_get_container_info):
with patch('check_docker.check_docker.get_url', return_value=(sample_containers_json, 200)), \
patch('check_docker.check_docker.get_container_info', side_effect=mock_get_container_info):
container_list = check_docker.get_containers('all', False)
assert container_list == {'name1', 'name2'}
def test_get_containers_2(check_docker, sample_containers_json, mock_get_container_info):
with patch('check_docker.check_docker.get_url', return_value=(sample_containers_json, 200)):
with patch('check_docker.check_docker.get_container_info', side_effect=mock_get_container_info):
container_list = check_docker.get_containers(['name.*'], False)
assert container_list == {'name1', 'name2'}
def test_get_containers_3(check_docker, sample_containers_json, mock_get_container_info):
check_docker.rc = -1
with patch('check_docker.check_docker.get_url', return_value=(sample_containers_json, 200)), \
patch('check_docker.check_docker.unknown') as patched, \
patch('check_docker.check_docker.get_container_info', side_effect=mock_get_container_info):
container_list = check_docker.get_containers({'foo'}, False)
assert container_list == set()
assert patched.call_count == 0
def test_get_containers_4(check_docker, sample_containers_json, mock_get_container_info):
check_docker.rc = -1
with patch('check_docker.check_docker.get_url', return_value=(sample_containers_json, 200)):
with patch('check_docker.check_docker.critical') as patched, \
patch('check_docker.check_docker.get_container_info', side_effect=mock_get_container_info):
container_list = check_docker.get_containers({'foo'}, True)
assert container_list == set()
assert patched.call_count == 1
def test_socketfile_failure_false(check_docker, fs):
fs.create_file('/tmp/socket', contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ('--status', 'running', '--connection', '/tmp/socket')
result = check_docker.process_args(args=args)
assert not check_docker.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_result(check_docker):
# Confirm bad socket results in uknown status
args = ('--cpu', '0:0', '--connection', '/tmp/missing')
with patch('check_docker.check_docker.get_url', return_value=(['thing1'], 200)):
with patch('check_docker.check_docker.unknown') as patched:
check_docker.perform_checks(args)
assert patched.call_count == 1
def test_socketfile_failure_filetype(check_docker, fs):
fs.create_file('/tmp/not_socket', contents='testing')
args = ('--status', 'running', '--connection', '/tmp/not_socket')
result = check_docker.process_args(args=args)
assert check_docker.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_missing(check_docker, fs):
args = ('--status', 'running', '--connection', '/tmp/missing')
result = check_docker.process_args(args=args)
assert check_docker.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_unwriteable(check_docker, fs):
fs.create_file('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--status', 'running', '--connection', '/tmp/unwritable')
result = check_docker.process_args(args=args)
assert check_docker.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_unreadable(check_docker, fs):
fs.create_file('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--status', 'running', '--connection', '/tmp/unreadable')
result = check_docker.process_args(args=args)
assert check_docker.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_http(check_docker, fs):
fs.create_file('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--status', 'running', '--connection', 'http://127.0.0.1')
result = check_docker.process_args(args=args)
assert not check_docker.socketfile_permissions_failure(parsed_args=result)
def test_perform_with_no_containers(check_docker, fs):
fs.create_file(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--cpu', '0:0']
with patch('check_docker.check_docker.get_url', return_value=([], 200)):
with patch('check_docker.check_docker.unknown') as patched:
check_docker.perform_checks(args)
assert patched.call_count == 1
def test_perform_with_uncaught_exception(check_docker, fs):
fs.create_file(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
with patch('check_docker.check_docker.get_url', return_value=([{'Names': ('/thing1',)}], 200)), \
patch('check_docker.check_docker.check_cpu', side_effect=Exception("Oh no!")), \
patch('check_docker.check_docker.argv', side_effect=['', '--cpu', '0:0']), \
patch('check_docker.check_docker.unknown') as patched:
check_docker.main()
assert patched.call_count == 1
@pytest.mark.parametrize("args, called", (
(['--cpu', '0:0'], 'check_cpu'),
(['--memory', '0:0'], 'check_memory'),
(['--health'], 'check_health'),
(['--restarts', '1:1'], 'check_restarts'),
(['--status', 'running'], 'check_status'),
(['--uptime', '0:0'], 'check_uptime'),
(['--version'], 'check_version'),
([], 'unknown')
))
def test_perform(check_docker, fs, args, called):
fs.create_file(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
with patch('check_docker.check_docker.get_containers', return_value=['thing1']):
with patch('check_docker.check_docker.' + called) as patched:
check_docker.perform_checks(args)
assert patched.call_count == 1
@pytest.mark.parametrize("messages, perf_data, expected", (
(['TEST'], [], 'TEST'),
(['FOO', 'BAR'], [], 'FOO; BAR'),
(['FOO', 'BAR'], ['1;2;3;4;'], 'FOO; BAR|1;2;3;4;')
))
def test_print_results(check_docker, capsys, messages, perf_data, expected):
# These sometimes get set to true when using random-order plugin, for example --random-order-seed=620808
check_docker.no_ok = False
check_docker.no_performance = False
check_docker.messages = messages
check_docker.performance_data = perf_data
check_docker.print_results()
out, err = capsys.readouterr()
assert out.strip() == expected
@pytest.mark.parametrize("messages, perf_data, no_ok, no_performance, expected", (
([], [], False, False, ''),
(['TEST'], [], False, False, 'TEST'),
(['FOO', 'BAR'], [], False, False, 'FOO; BAR'),
(['FOO', 'BAR'], ['1;2;3;4;'], False, False, 'FOO; BAR|1;2;3;4;'),
([], [], True, False, 'OK'),
(['OK: TEST'], [], True, False, 'OK'),
(['OK: FOO', 'OK: BAR'], [], True, False, 'OK'),
(['OK: FOO', 'BAR'], ['1;2;3;4;'], True, False, 'BAR|1;2;3;4;'),
([], [], False, True, ''),
(['OK: TEST'], [], False, True, 'OK: TEST'),
(['OK: TEST'], ['1;2;3;4;'], False, True, 'OK: TEST'),
(['OK: FOO', 'OK: BAR'], ['1;2;3;4;'], True, True, 'OK'),
))
def test_print_results_no_ok(check_docker, capsys, messages, perf_data, no_ok, no_performance, expected):
check_docker.messages = messages
check_docker.performance_data = perf_data
check_docker.no_ok = no_ok
check_docker.no_performance = no_performance
check_docker.print_results()
out, err = capsys.readouterr()
assert out.strip() == expected
@pytest.mark.parametrize('url, expected', (
("short", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/short", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/short:latest")),
("simple/name", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="simple/name", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/simple/name:latest")),
("library/ubuntu", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/ubuntu", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/ubuntu:latest")),
("docker/stevvooe/app",
cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="docker/stevvooe/app", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/docker/stevvooe/app:latest")),
("aa/aa/aa/aa/aa/aa/aa/aa/aa/bb/bb/bb/bb/bb/bb",
cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="aa/aa/aa/aa/aa/aa/aa/aa/aa/bb/bb/bb/bb/bb/bb",
tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/aa/aa/aa/aa/aa/aa/aa/aa/aa/bb/bb/bb/bb/bb/bb:latest")),
("aa/aa/bb/bb/bb", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="aa/aa/bb/bb/bb", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/aa/aa/bb/bb/bb:latest")),
("a/a/a/a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="a/a/a/a", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/a/a/a/a:latest")),
("a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/a", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/a:latest")),
("a/aa", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="a/aa", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/a/aa:latest")),
("a/aa/a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="a/aa/a", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/a/aa/a:latest")),
("foo.com", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/foo.com", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/foo.com:latest")),
("foo.com:8080/bar",
cd.ImageName(registry="foo.com:8080", name="bar", tag="latest", full_name="foo.com:8080/bar:latest")),
("foo.com/bar", cd.ImageName(registry="foo.com", name="bar", tag="latest", full_name="foo.com/bar:latest")),
("foo.com/bar/baz",
cd.ImageName(registry="foo.com", name="bar/baz", tag="latest", full_name="foo.com/bar/baz:latest")),
("localhost:8080/bar",
cd.ImageName(registry="localhost:8080", name="bar", tag="latest", full_name="localhost:8080/bar:latest")),
("sub-dom1.foo.com/bar/baz/quux", cd.ImageName(registry="sub-dom1.foo.com", name="bar/baz/quux", tag="latest",
full_name="sub-dom1.foo.com/bar/baz/quux:latest")),
("blog.foo.com/bar/baz",
cd.ImageName(registry="blog.foo.com", name="bar/baz", tag="latest", full_name="blog.foo.com/bar/baz:latest")),
("aa-a/a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="aa-a/a", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/aa-a/a:latest")),
("foo_bar", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/foo_bar", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/foo_bar:latest")),
("foo_bar.com", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/foo_bar.com", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/foo_bar.com:latest")),
("foo.com/foo_bar",
cd.ImageName(registry="foo.com", name="foo_bar", tag="latest", full_name="foo.com/foo_bar:latest")),
("b.gcr.io/test.example.com/my-app",
cd.ImageName(registry="b.gcr.io", name="test.example.com/my-app", tag="latest",
full_name="b.gcr.io/test.example.com/my-app:latest")),
("xn--n3h.com/myimage",
cd.ImageName(registry="xn--n3h.com", name="myimage", tag="latest", full_name="xn--n3h.com/myimage:latest")),
("xn--7o8h.com/myimage",
cd.ImageName(registry="xn--7o8h.com", name="myimage", tag="latest", full_name="xn--7o8h.com/myimage:latest")),
("example.com/xn--7o8h.com/myimage",
cd.ImageName(registry="example.com", name="xn--7o8h.com/myimage", tag="latest",
full_name="example.com/xn--7o8h.com/myimage:latest")),
("example.com/some_separator__underscore/myimage",
cd.ImageName(registry="example.com", name="some_separator__underscore/myimage", tag="latest",
full_name="example.com/some_separator__underscore/myimage:latest")),
("do__cker/docker", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="do__cker/docker", tag="latest",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/do__cker/docker:latest")),
("b.gcr.io/test.example.com/my-app",
cd.ImageName(registry="b.gcr.io", name="test.example.com/my-app", tag="latest",
full_name="b.gcr.io/test.example.com/my-app:latest")),
("registry.io/foo/project--id.module--name.ver---sion--name",
cd.ImageName(registry="registry.io", name="foo/project--id.module--name.ver---sion--name", tag="latest",
full_name="registry.io/foo/project--id.module--name.ver---sion--name:latest")),
("Asdf.com/foo/bar",
cd.ImageName(registry="Asdf.com", name="foo/bar", tag="latest", full_name="Asdf.com/foo/bar:latest")),
("host.tld:12/name:tag",
cd.ImageName(registry="host.tld:12", name="name", tag="tag", full_name="host.tld:12/name:tag")),
("host.tld/name:tag", cd.ImageName(registry="host.tld", name="name", tag="tag", full_name="host.tld/name:tag")),
("name/name:tag", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="name/name", tag="tag",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/name/name:tag")),
("name:tag", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/name", tag="tag",
full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/name:tag")),
("host:21/name:tag", cd.ImageName(registry='host:21', name="name", tag="tag",
full_name="host:21/name:tag")),
))
def test_parse_image_name(check_docker, url, expected):
parsed_name = check_docker.parse_image_name(url)
assert parsed_name == expected
def test_get_manifest_auth_token(check_docker):
obj = {'token': 'test'}
encoded = json.dumps(obj=obj).encode('utf-8')
expected_response = FakeHttpResponse(content=encoded, http_code=200)
with patch('check_docker.check_docker.request.urlopen', return_value=expected_response):
www_authenticate_header = 'Bearer realm="https://example.com/token",service="example.com",scope="repository:test:pull"'
token = check_docker.Oauth2TokenAuthHandler._get_outh2_token(www_authenticate_header)
assert token == 'test'
def test_get_container_image_urls(check_docker):
container_response = {'Image': 'test'}
image_response = {'RepoTags': ['test']}
with patch('check_docker.check_docker.get_container_info', return_value=container_response), \
patch('check_docker.check_docker.get_image_info', return_value=image_response):
urls = check_docker.get_container_image_urls('container')
assert urls == ['test']
@pytest.mark.parametrize('image_url, expected_normal_url', (
('foo', 'https://' + cd.DEFAULT_PUBLIC_REGISTRY + '/v2/library/foo/manifests/latest'),
('insecure.com/foo', 'http://insecure.com/v2/foo/manifests/latest'),
))
def test_normalize_image_name_to_manifest_url(check_docker, image_url, expected_normal_url):
insecure_registries = ('insecure.com',)
normal_url, _ = check_docker.normalize_image_name_to_manifest_url(image_url, insecure_registries)
assert normal_url == expected_normal_url
def test_get_container_image_id(check_docker):
container_response = {'Image': 'test'}
with patch('check_docker.check_docker.get_container_info', return_value=container_response):
digest = check_docker.get_container_image_id('container')
assert digest == 'test'
def test_get_digest_from_registry_no_auth(check_docker):
fake_data = {'config': {'digest': 'test_token'}}
with patch('check_docker.check_docker.get_url', return_value=(fake_data, 200)):
digest = check_docker.get_digest_from_registry('https://example.com/v2/test/manifests/lastest')
assert digest == "test_token"
def test_get_digest_from_registry_missing_digest(check_docker):
with patch('check_docker.check_docker.get_url', return_value=({},404)):
with pytest.raises(check_docker.RegistryError):
check_docker.get_digest_from_registry('https://example.com/v2/test/manifests/lastest')
@pytest.mark.parametrize('local_container_container_image_id,registry_container_digest, image_urls, expected_rc', (
('AAAA', 'AAAA', ('example.com/foo',), cd.OK_RC),
('AAAA', 'BBBB', ('example.com/foo',), cd.CRITICAL_RC),
(None, '', ('example.com/foo',), cd.UNKNOWN_RC),
('AAAA', 'AAAA', ('example.com/foo', 'example.com/bar'), cd.UNKNOWN_RC),
('AAAA', 'AAAA', tuple(), cd.UNKNOWN_RC),
))
def test_check_version(check_docker, local_container_container_image_id, registry_container_digest, image_urls,
expected_rc):
with patch('check_docker.check_docker.get_container_image_id', return_value=local_container_container_image_id), \
patch('check_docker.check_docker.get_container_image_urls', return_value=image_urls), \
patch('check_docker.check_docker.get_digest_from_registry', return_value=registry_container_digest):
check_docker.check_version('container', tuple())
assert check_docker.rc == expected_rc
def test_check_version_missing_digest(check_docker):
with patch('check_docker.check_docker.get_container_image_id', return_value='AAA'), \
patch('check_docker.check_docker.get_container_image_urls', return_value=('example.com/foo',)), \
patch('check_docker.check_docker.get_digest_from_registry',
side_effect=check_docker.RegistryError(response=None)):
check_docker.check_version('container', tuple())
assert check_docker.rc == cd.UNKNOWN_RC
def test_check_version_not_tls(check_docker):
class Reason():
reason = 'UNKNOWN_PROTOCOL'
exception = URLError(reason=Reason)
with patch('check_docker.check_docker.get_container_image_id', return_value='AAA'), \
patch('check_docker.check_docker.get_container_image_urls', return_value=('example.com/foo',)), \
patch('check_docker.check_docker.get_digest_from_registry', side_effect=exception):
check_docker.check_version('container', tuple())
assert check_docker.rc == cd.UNKNOWN_RC
assert 'TLS error' in check_docker.messages[0]
def test_check_version_no_such_host(check_docker):
class Reason():
strerror = 'nodename nor servname provided, or not known'
exception = URLError(reason=Reason)
with patch('check_docker.check_docker.get_container_image_id', return_value='AAA'), \
patch('check_docker.check_docker.get_container_image_urls', return_value=('example.com/foo',)), \
patch('check_docker.check_docker.get_digest_from_registry', side_effect=exception):
check_docker.check_version('container', tuple())
assert check_docker.rc == cd.UNKNOWN_RC
assert 'Cannot reach registry' in check_docker.messages[0]
def test_check_version_exception(check_docker):
# Unhandled exceptions should be passed on
exception = URLError(reason=None)
with patch('check_docker.check_docker.get_container_image_id', return_value='AAA'), \
patch('check_docker.check_docker.get_container_image_urls', return_value=('example.com/foo',)), \
patch('check_docker.check_docker.get_digest_from_registry', side_effect=exception), \
pytest.raises(URLError):
check_docker.check_version('container', tuple())
@pytest.mark.parametrize('names', (
(('\\a', 'a\\b'),),
(('\\a'),),
(('a\\b', '\\a'),)
))
def test_get_ps_name_ok(check_docker, names):
assert check_docker.ps_name(names) == 'a'
@pytest.mark.parametrize('names', (
('a\\b'),
set(),
('a\\b', 'b\\a'),
('\\b', '\\a'),
))
def test_get_ps_name_ok(check_docker, names):
with pytest.raises(NameError):
check_docker.get_ps_name(names)

View File

@ -0,0 +1,385 @@
import argparse
import json
import stat
from io import BytesIO
from unittest.mock import patch, call
import pytest
from check_docker import check_swarm as cs
__author__ = 'tim'
@pytest.fixture
def check_swarm():
# This is needed because `check_docker` does not end a a .py so it won't be found by default
from check_docker import check_swarm
check_swarm.rc = -1
check_swarm.timeout = 1
check_swarm.messages = []
check_swarm.performance_data = []
check_swarm.daemon = 'socket:///notreal'
check_swarm.get_url.cache_clear()
return check_swarm
@pytest.fixture
def active_node():
return {"ID": 44, 'Spec': {'Availability': 'active'}}
@pytest.fixture
def paused_node():
return {"ID": 43, 'Spec': {'Availability': 'paused'}}
@pytest.fixture
def drain_node():
return {"ID": 42, 'Spec': {'Availability': 'drain'}}
@pytest.fixture
def node_list(active_node, paused_node, drain_node):
return active_node, paused_node, drain_node
active_node_task = {"NodeID": 44, 'Status': {'State': 'running'}}
paused_node_task = {"NodeID": 43, 'Status': {'State': 'running'}}
drain_node_task = {"NodeID": 42, 'Status': {'State': 'running'}}
class FakeHttpResponse(BytesIO):
def __init__(self, content, http_code):
self.status = http_code
super(FakeHttpResponse, self).__init__(content)
def test_get_url(check_swarm, monkeypatch):
obj = {'foo': 'bar'}
encoded = json.dumps(obj=obj).encode('utf-8')
expected_response = FakeHttpResponse(content=encoded, http_code=200)
def mock_open(*args, **kwargs):
return expected_response
monkeypatch.setattr(check_swarm.better_urllib_get, 'open', value=mock_open)
response, _ = check_swarm.get_url(url='/test')
assert response == obj
def test_get_swarm_status(check_swarm):
with patch('check_docker.check_swarm.get_url', return_value=('', 999)):
response = check_swarm.get_swarm_status()
assert response == 999
def test_get_service_info(check_swarm):
sample_response = ([{'Status': {'State': 'running', 'DesiredState': 'running'}},
{'Status': {'State': 'failed', 'DesiredState': 'running'}}], 999)
with patch('check_docker.check_swarm.get_url', return_value=sample_response):
response_data = check_swarm.get_service_tasks('FOO')
assert len(response_data) == 2
def test_get_services_not_swarm(check_swarm):
with patch('check_docker.check_swarm.get_url', return_value=('', 406)):
check_swarm.get_services('FOO')
assert check_swarm.rc == check_swarm.CRITICAL_RC
def test_get_services_error(check_swarm):
with patch('check_docker.check_swarm.get_url', return_value=('', 500)):
check_swarm.get_services('FOO')
assert check_swarm.rc == check_swarm.UNKNOWN_RC
def test_get_services_all(check_swarm):
services = [{'Spec': {"Name": 'FOO'}},
{'Spec': {"Name": 'BAR'}}]
with patch('check_docker.check_swarm.get_url', return_value=(services, 200)):
result = check_swarm.get_services('all')
assert len(result) == len(services)
@pytest.mark.parametrize('func,arg,rc,messages',
(
('ok', "OK test", cs.OK_RC, ['OK: OK test']),
('warning', "WARN test", cs.WARNING_RC, ['WARNING: WARN test']),
('critical', "CRIT test", cs.CRITICAL_RC, ['CRITICAL: CRIT test']),
('unknown', "UNKNOWN test", cs.UNKNOWN_RC, ['UNKNOWN: UNKNOWN test']),
))
def test_status_update(check_swarm, func, arg, rc, messages):
getattr(check_swarm, func)(arg)
assert check_swarm.rc == rc
assert check_swarm.messages == messages
def test_set_rc(check_swarm):
# Can I do a basic set
check_swarm.set_rc(check_swarm.OK_RC)
assert check_swarm.rc == check_swarm.OK_RC
# Does it prevent downgrades of rc
check_swarm.set_rc(check_swarm.WARNING_RC)
assert check_swarm.rc == check_swarm.WARNING_RC
check_swarm.set_rc(check_swarm.OK_RC)
assert check_swarm.rc == check_swarm.WARNING_RC
@pytest.mark.parametrize('code, expected_rc, expected_messages', (
(200, cs.OK_RC, ['OK: ok_msg']),
(404, cs.CRITICAL_RC, ['CRITICAL: critical_msg']),
(418, cs.UNKNOWN_RC, ['UNKNOWN: unknown_msg']),
))
def test_process_url_status_ok(check_swarm, code, expected_rc, expected_messages):
check_swarm.process_url_status(code, ok_msg='ok_msg', critical_msg='critical_msg', unknown_msg='unknown_msg')
assert check_swarm.rc == expected_rc
assert check_swarm.messages == expected_messages
def test_args_timeout(check_swarm):
args = ('--timeout', '9999', '--swarm')
result = check_swarm.process_args(args=args)
assert result.timeout == 9999.0
def test_args_connection(check_swarm):
args = ('--connection', '/foo', '--swarm')
result = check_swarm.process_args(args=args)
assert result.connection == '/foo'
assert check_swarm.daemon == 'socket:///foo:'
args = ('--connection', 'foo.com/bar', '--swarm')
result = check_swarm.process_args(args=args)
assert result.connection == 'foo.com/bar'
assert check_swarm.daemon == 'http://foo.com/bar'
def test_args_secure_connection(check_swarm):
args = ('--secure-connection', 'non-default', '--swarm')
result = check_swarm.process_args(args=args)
assert result.secure_connection == 'non-default'
args = ('--secure-connection', 'foo.com/bar', '--swarm')
result = check_swarm.process_args(args=args)
assert result.secure_connection == 'foo.com/bar'
assert check_swarm.daemon == 'https://foo.com/bar'
def test_args_mixed_connection(check_swarm):
args = ('--connection', 'non-default', '--secure-connection', 'non-default', '--swarm')
with pytest.raises(SystemExit):
check_swarm.process_args(args)
def test_missing_check(check_swarm):
try:
with pytest.raises(argparse.ArgumentError):
check_swarm.process_args(tuple())
except SystemExit: # Argument failures exit as well
pass
def test_args_mixed_checks(check_swarm):
try:
with pytest.raises(argparse.ArgumentError):
check_swarm.process_args(['--swarm', "--service", "FOO"])
except SystemExit: # Argument failures exit as well
pass
def test_socketfile_failure_false(check_swarm, fs):
fs.create_file('/tmp/socket', contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ('--swarm', '--connection', '/tmp/socket')
result = check_swarm.process_args(args=args)
assert not check_swarm.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_filetype(check_swarm, fs):
fs.create_file('/tmp/not_socket', contents='testing')
args = ('--swarm', '--connection', '/tmp/not_socket')
result = check_swarm.process_args(args=args)
assert check_swarm.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_missing(check_swarm, fs):
args = ('--swarm', '--connection', '/tmp/missing')
result = check_swarm.process_args(args=args)
check_swarm.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_unwriteable(check_swarm, fs):
fs.create_file('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--swarm', '--connection', '/tmp/unwritable')
result = check_swarm.process_args(args=args)
assert check_swarm.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_unreadable(check_swarm, fs):
fs.create_file('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--swarm', '--connection', '/tmp/unreadable')
result = check_swarm.process_args(args=args)
assert check_swarm.socketfile_permissions_failure(parsed_args=result)
def test_socketfile_failure_http(check_swarm, fs):
fs.create_file('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000))
args = ('--swarm', '--connection', 'http://127.0.0.1')
result = check_swarm.process_args(args=args)
assert not check_swarm.socketfile_permissions_failure(parsed_args=result)
def test_check_swarm_called(check_swarm, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--swarm']
with patch('check_docker.check_swarm.check_swarm') as patched:
check_swarm.perform_checks(args)
assert patched.call_count == 1
def test_check_swarm_results_OK(check_swarm, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--swarm']
with patch('check_docker.check_swarm.get_swarm_status', return_value=200):
check_swarm.perform_checks(args)
assert check_swarm.rc == cs.OK_RC
def test_check_swarm_results_CRITICAL(check_swarm, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--swarm']
with patch('check_docker.check_swarm.get_swarm_status', return_value=406):
check_swarm.perform_checks(args)
assert check_swarm.rc == cs.CRITICAL_RC
def test_check_service_called(check_swarm, fs):
service_info = {'Spec': {'Mode': {'Replicated': {'Replicas': 1}}}}
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--service', 'FOO']
with patch('check_docker.check_swarm.get_services', return_value=[service_info]):
with patch('check_docker.check_swarm.check_service') as patched:
check_swarm.perform_checks(args)
assert patched.call_count == 1
@pytest.mark.parametrize("service_info, expected_func, expected_args", (
({'Spec': {'Mode': {'Global': {}}}}, 'process_global_service', {'name': 'FOO', 'ignore_paused': False}),
({'Spec': {'Mode': {'Replicated': {'Replicas': 1}}}}, 'process_replicated_service',
{'name': 'FOO', 'replicas_desired': 1}),
({'Spec': {'Mode': {'Replicated': {'Replicas': 3}}}}, 'process_replicated_service',
{'name': 'FOO', 'replicas_desired': 3}),
))
def test_check_services_routing_global(check_swarm, service_info, expected_func, expected_args, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
with patch('check_docker.check_swarm.get_service_info', return_value=(service_info, 999)), \
patch('check_docker.check_swarm.{}'.format(expected_func)) as patched:
check_swarm.check_service('FOO')
assert patched.call_count == 1
assert patched.call_args == call(**expected_args)
def test_check_services_global_ignore_paused(check_swarm, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
service_info = {'Spec': {'Mode': {'Global': {}}}}
with patch('check_docker.check_swarm.get_service_info', return_value=(service_info, 999)), \
patch('check_docker.check_swarm.process_global_service') as patched:
check_swarm.check_service('FOO', True)
assert patched.call_count == 1
assert patched.call_args == call(name='FOO', ignore_paused=True)
@pytest.mark.parametrize("service_list, ignore_paused, expected_rc", (
([active_node_task, paused_node_task, drain_node_task], False, cs.OK_RC),
([active_node_task, drain_node_task], False, cs.CRITICAL_RC),
([active_node_task, paused_node_task], False, cs.OK_RC),
([active_node_task], False, cs.CRITICAL_RC),
([paused_node_task], False, cs.CRITICAL_RC),
([], False, cs.CRITICAL_RC),
([active_node_task], True, cs.OK_RC),
([paused_node_task], True, cs.CRITICAL_RC),
))
def test_process_global_service(check_swarm, fs, node_list, service_list, ignore_paused, expected_rc):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
with patch('check_docker.check_swarm.get_nodes', return_value=(node_list, 999)) as patched_get_nodes, \
patch('check_docker.check_swarm.get_service_tasks', return_value=service_list) as patched_get_service_tasks:
check_swarm.process_global_service('FOO', ignore_paused)
assert patched_get_nodes.call_count == 1
assert patched_get_service_tasks.call_count == 1
assert check_swarm.rc == expected_rc
@pytest.mark.parametrize("service_list, expected_rc", (
([active_node_task, paused_node_task, drain_node_task], cs.CRITICAL_RC),
([active_node_task, paused_node_task], cs.OK_RC),
([active_node_task], cs.CRITICAL_RC),
([], cs.CRITICAL_RC),
))
def test_process_replicated_service(check_swarm, fs, service_list, expected_rc):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
with patch('check_docker.check_swarm.get_service_tasks',
return_value=service_list) as patched_get_service_running_tasks:
check_swarm.process_replicated_service('FOO', 2)
assert patched_get_service_running_tasks.call_count == 1
assert check_swarm.rc == expected_rc
def test_check_service_results_FAIL_missing(check_swarm, fs):
service_info = {'Spec': {'Name': 'FOO', 'Mode': {'Global': {}}}}
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--service', 'missing1']
with patch('check_docker.check_swarm.get_url', return_value=([service_info], 200)):
check_swarm.perform_checks(args)
assert check_swarm.rc == cs.CRITICAL_RC
def test_check_service_results_FAIL_unknown(check_swarm, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--service', 'FOO']
with patch('check_docker.check_swarm.get_services', return_value=['FOO', 'BAR']):
with patch('check_docker.check_swarm.get_service_info', return_value=('', 500)):
check_swarm.perform_checks(args)
assert check_swarm.rc == cs.UNKNOWN_RC
def test_check_no_services(check_swarm, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--service', 'missing2']
with patch('check_docker.check_swarm.get_url', return_value=([], 200)):
check_swarm.perform_checks(args)
assert check_swarm.rc == cs.CRITICAL_RC
def test_check_missing_service(check_swarm, fs):
service_info = {'Spec': {'Name': 'FOO', 'Mode': {'Global': {}}}}
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--service', 'missing3']
with patch('check_docker.check_swarm.get_url', return_value=([service_info], 200)):
check_swarm.perform_checks(args)
assert check_swarm.rc == cs.CRITICAL_RC
def test_check_not_swarm_service(check_swarm, fs):
fs.create_file(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666))
args = ['--service', 'missing4']
with patch('check_docker.check_swarm.get_url', return_value=('', 406)):
check_swarm.perform_checks(args)
assert check_swarm.rc == cs.CRITICAL_RC
@pytest.mark.parametrize("messages, perf_data, expected", (
([], [], ''),
(['TEST'], [], 'TEST'),
(['FOO', 'BAR'], [], 'FOO; BAR'),
))
def test_print_results(check_swarm, capsys, messages, perf_data, expected):
check_swarm.messages = messages
check_swarm.performance_data = perf_data
check_swarm.print_results()
out, err = capsys.readouterr()
assert out.strip() == expected

View File

@ -0,0 +1,55 @@
import os
import sys
from urllib import request
from urllib.error import HTTPError
import pytest
import toml
import check_docker as module
from check_docker import check_swarm, check_docker
def test_versions_match():
assert check_docker.__version__ == check_swarm.__version__
def test_module_version_matches():
assert module.__version__ == check_docker.__version__
def test_project_version_matches():
project_config = toml.load("pyproject.toml")
project_version = project_config['tool']['poetry']['version']
assert project_version == check_docker.__version__
@pytest.mark.skipif('isolated' in os.environ and os.environ['isolated'].lower != 'false',
reason="Can not reach Python packge index when isolated")
@pytest.mark.skipif(sys.version_info[0:2] != (3, 8), reason="Only check on python 3.8, not {}".
format(sys.version_info[0:2]))
def test_package_present():
req = request.Request("https://pypi.org/project/check_docker/", method="HEAD")
with request.urlopen(req) as resp:
assert resp.getcode() == 200
@pytest.mark.xfail('TRAVIS_BRANCH' in os.environ and os.environ['TRAVIS_BRANCH'].lower != 'master',
reason="Ignore version check outside of master")
@pytest.mark.xfail('GITHUB_HEAD_REF' in os.environ and os.environ['GITHUB_HEAD_REF'].lower != 'master',
reason="Ignore version check outside of master")
@pytest.mark.skipif('isolated' in os.environ and os.environ['isolated'].lower != 'false',
reason="Can not reach Python package index when isolated")
@pytest.mark.skipif(sys.version_info[0:2] != (3, 8), reason="Only check on python 3.8")
def test_ensure_new_version():
version = check_docker.__version__
req = request.Request("https://pypi.org/project/check_docker/{version}/".
format(version=version), method="HEAD")
try:
with request.urlopen(req) as resp:
http_code = resp.getcode()
except HTTPError as e:
http_code = e.code
assert http_code == 404, "Version already exists. Ignore this if you are working on a PR"

20
check_docker/tox.ini Normal file
View File

@ -0,0 +1,20 @@
# tox (https://tox.readthedocs.io/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py35,py36,py37,py38
tox_pyenv_fallback = False
isolated_build = True
skip_missing_interpreters = True
[testenv]
deps = pipenv
commands =
pipenv install --skip-lock
pytest {posargs: -v --random-order-bucket module}
setenv =
LC_ALL = {env:LC_ALL:en_US.UTF-8}
LANG = {env:LC_ALL:en_US.UTF-8}