Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
N
news
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Sartika Aritonang
news
Commits
71d3ce9a
Commit
71d3ce9a
authored
May 29, 2020
by
Sartika Aritonang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
4de27d7e
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
421 additions
and
0 deletions
+421
-0
session.py
stbi/Lib/site-packages/pip/_internal/network/session.py
+421
-0
No files found.
stbi/Lib/site-packages/pip/_internal/network/session.py
0 → 100644
View file @
71d3ce9a
"""PipSession and supporting code, containing all pip-specific
network request configuration and behavior.
"""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import
email.utils
import
json
import
logging
import
mimetypes
import
os
import
platform
import
sys
import
warnings
from
pip._vendor
import
requests
,
six
,
urllib3
from
pip._vendor.cachecontrol
import
CacheControlAdapter
from
pip._vendor.requests.adapters
import
BaseAdapter
,
HTTPAdapter
from
pip._vendor.requests.models
import
Response
from
pip._vendor.requests.structures
import
CaseInsensitiveDict
from
pip._vendor.six.moves.urllib
import
parse
as
urllib_parse
from
pip._vendor.urllib3.exceptions
import
InsecureRequestWarning
from
pip
import
__version__
from
pip._internal.network.auth
import
MultiDomainBasicAuth
from
pip._internal.network.cache
import
SafeFileCache
# Import ssl from compat so the initial import occurs in only one place.
from
pip._internal.utils.compat
import
has_tls
,
ipaddress
from
pip._internal.utils.glibc
import
libc_ver
from
pip._internal.utils.misc
import
(
build_url_from_netloc
,
get_installed_version
,
parse_netloc
,
)
from
pip._internal.utils.typing
import
MYPY_CHECK_RUNNING
from
pip._internal.utils.urls
import
url_to_path
if
MYPY_CHECK_RUNNING
:
from
typing
import
(
Iterator
,
List
,
Optional
,
Tuple
,
Union
,
)
from
pip._internal.models.link
import
Link
SecureOrigin
=
Tuple
[
str
,
str
,
Optional
[
Union
[
int
,
str
]]]
logger
=
logging
.
getLogger
(
__name__
)
# Ignore warning raised when using --trusted-host.
warnings
.
filterwarnings
(
"ignore"
,
category
=
InsecureRequestWarning
)
SECURE_ORIGINS
=
[
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
(
"https"
,
"*"
,
"*"
),
(
"*"
,
"localhost"
,
"*"
),
(
"*"
,
"127.0.0.0/8"
,
"*"
),
(
"*"
,
"::1/128"
,
"*"
),
(
"file"
,
"*"
,
None
),
# ssh is always secure.
(
"ssh"
,
"*"
,
"*"
),
]
# type: List[SecureOrigin]
# These are environment variables present when running under various
# CI systems. For each variable, some CI systems that use the variable
# are indicated. The collection was chosen so that for each of a number
# of popular systems, at least one of the environment variables is used.
# This list is used to provide some indication of and lower bound for
# CI traffic to PyPI. Thus, it is okay if the list is not comprehensive.
# For more background, see: https://github.com/pypa/pip/issues/5499
CI_ENVIRONMENT_VARIABLES
=
(
# Azure Pipelines
'BUILD_BUILDID'
,
# Jenkins
'BUILD_ID'
,
# AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
'CI'
,
# Explicit environment variable.
'PIP_IS_CI'
,
)
def
looks_like_ci
():
# type: () -> bool
"""
Return whether it looks like pip is running under CI.
"""
# We don't use the method of checking for a tty (e.g. using isatty())
# because some CI systems mimic a tty (e.g. Travis CI). Thus that
# method doesn't provide definitive information in either direction.
return
any
(
name
in
os
.
environ
for
name
in
CI_ENVIRONMENT_VARIABLES
)
def
user_agent
():
"""
Return a string representing the user agent.
"""
data
=
{
"installer"
:
{
"name"
:
"pip"
,
"version"
:
__version__
},
"python"
:
platform
.
python_version
(),
"implementation"
:
{
"name"
:
platform
.
python_implementation
(),
},
}
if
data
[
"implementation"
][
"name"
]
==
'CPython'
:
data
[
"implementation"
][
"version"
]
=
platform
.
python_version
()
elif
data
[
"implementation"
][
"name"
]
==
'PyPy'
:
if
sys
.
pypy_version_info
.
releaselevel
==
'final'
:
pypy_version_info
=
sys
.
pypy_version_info
[:
3
]
else
:
pypy_version_info
=
sys
.
pypy_version_info
data
[
"implementation"
][
"version"
]
=
"."
.
join
(
[
str
(
x
)
for
x
in
pypy_version_info
]
)
elif
data
[
"implementation"
][
"name"
]
==
'Jython'
:
# Complete Guess
data
[
"implementation"
][
"version"
]
=
platform
.
python_version
()
elif
data
[
"implementation"
][
"name"
]
==
'IronPython'
:
# Complete Guess
data
[
"implementation"
][
"version"
]
=
platform
.
python_version
()
if
sys
.
platform
.
startswith
(
"linux"
):
from
pip._vendor
import
distro
distro_infos
=
dict
(
filter
(
lambda
x
:
x
[
1
],
zip
([
"name"
,
"version"
,
"id"
],
distro
.
linux_distribution
()),
))
libc
=
dict
(
filter
(
lambda
x
:
x
[
1
],
zip
([
"lib"
,
"version"
],
libc_ver
()),
))
if
libc
:
distro_infos
[
"libc"
]
=
libc
if
distro_infos
:
data
[
"distro"
]
=
distro_infos
if
sys
.
platform
.
startswith
(
"darwin"
)
and
platform
.
mac_ver
()[
0
]:
data
[
"distro"
]
=
{
"name"
:
"macOS"
,
"version"
:
platform
.
mac_ver
()[
0
]}
if
platform
.
system
():
data
.
setdefault
(
"system"
,
{})[
"name"
]
=
platform
.
system
()
if
platform
.
release
():
data
.
setdefault
(
"system"
,
{})[
"release"
]
=
platform
.
release
()
if
platform
.
machine
():
data
[
"cpu"
]
=
platform
.
machine
()
if
has_tls
():
import
_ssl
as
ssl
data
[
"openssl_version"
]
=
ssl
.
OPENSSL_VERSION
setuptools_version
=
get_installed_version
(
"setuptools"
)
if
setuptools_version
is
not
None
:
data
[
"setuptools_version"
]
=
setuptools_version
# Use None rather than False so as not to give the impression that
# pip knows it is not being run under CI. Rather, it is a null or
# inconclusive result. Also, we include some value rather than no
# value to make it easier to know that the check has been run.
data
[
"ci"
]
=
True
if
looks_like_ci
()
else
None
user_data
=
os
.
environ
.
get
(
"PIP_USER_AGENT_USER_DATA"
)
if
user_data
is
not
None
:
data
[
"user_data"
]
=
user_data
return
"{data[installer][name]}/{data[installer][version]} {json}"
.
format
(
data
=
data
,
json
=
json
.
dumps
(
data
,
separators
=
(
","
,
":"
),
sort_keys
=
True
),
)
class
LocalFSAdapter
(
BaseAdapter
):
def
send
(
self
,
request
,
stream
=
None
,
timeout
=
None
,
verify
=
None
,
cert
=
None
,
proxies
=
None
):
pathname
=
url_to_path
(
request
.
url
)
resp
=
Response
()
resp
.
status_code
=
200
resp
.
url
=
request
.
url
try
:
stats
=
os
.
stat
(
pathname
)
except
OSError
as
exc
:
resp
.
status_code
=
404
resp
.
raw
=
exc
else
:
modified
=
email
.
utils
.
formatdate
(
stats
.
st_mtime
,
usegmt
=
True
)
content_type
=
mimetypes
.
guess_type
(
pathname
)[
0
]
or
"text/plain"
resp
.
headers
=
CaseInsensitiveDict
({
"Content-Type"
:
content_type
,
"Content-Length"
:
stats
.
st_size
,
"Last-Modified"
:
modified
,
})
resp
.
raw
=
open
(
pathname
,
"rb"
)
resp
.
close
=
resp
.
raw
.
close
return
resp
def
close
(
self
):
pass
class
InsecureHTTPAdapter
(
HTTPAdapter
):
def
cert_verify
(
self
,
conn
,
url
,
verify
,
cert
):
super
(
InsecureHTTPAdapter
,
self
)
.
cert_verify
(
conn
=
conn
,
url
=
url
,
verify
=
False
,
cert
=
cert
)
class
InsecureCacheControlAdapter
(
CacheControlAdapter
):
def
cert_verify
(
self
,
conn
,
url
,
verify
,
cert
):
super
(
InsecureCacheControlAdapter
,
self
)
.
cert_verify
(
conn
=
conn
,
url
=
url
,
verify
=
False
,
cert
=
cert
)
class
PipSession
(
requests
.
Session
):
timeout
=
None
# type: Optional[int]
def
__init__
(
self
,
*
args
,
**
kwargs
):
"""
:param trusted_hosts: Domains not to emit warnings for when not using
HTTPS.
"""
retries
=
kwargs
.
pop
(
"retries"
,
0
)
cache
=
kwargs
.
pop
(
"cache"
,
None
)
trusted_hosts
=
kwargs
.
pop
(
"trusted_hosts"
,
[])
# type: List[str]
index_urls
=
kwargs
.
pop
(
"index_urls"
,
None
)
super
(
PipSession
,
self
)
.
__init__
(
*
args
,
**
kwargs
)
# Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class.
self
.
pip_trusted_origins
=
[]
# type: List[Tuple[str, Optional[int]]]
# Attach our User Agent to the request
self
.
headers
[
"User-Agent"
]
=
user_agent
()
# Attach our Authentication handler to the session
self
.
auth
=
MultiDomainBasicAuth
(
index_urls
=
index_urls
)
# Create our urllib3.Retry instance which will allow us to customize
# how we handle retries.
retries
=
urllib3
.
Retry
(
# Set the total number of retries that a particular request can
# have.
total
=
retries
,
# A 503 error from PyPI typically means that the Fastly -> Origin
# connection got interrupted in some way. A 503 error in general
# is typically considered a transient error so we'll go ahead and
# retry it.
# A 500 may indicate transient error in Amazon S3
# A 520 or 527 - may indicate transient error in CloudFlare
status_forcelist
=
[
500
,
503
,
520
,
527
],
# Add a small amount of back off between failed requests in
# order to prevent hammering the service.
backoff_factor
=
0.25
,
)
# Our Insecure HTTPAdapter disables HTTPS validation. It does not
# support caching so we'll use it for all http:// URLs.
# If caching is disabled, we will also use it for
# https:// hosts that we've marked as ignoring
# TLS errors for (trusted-hosts).
insecure_adapter
=
InsecureHTTPAdapter
(
max_retries
=
retries
)
# We want to _only_ cache responses on securely fetched origins or when
# the host is specified as trusted. We do this because
# we can't validate the response of an insecurely/untrusted fetched
# origin, and we don't want someone to be able to poison the cache and
# require manual eviction from the cache to fix it.
if
cache
:
secure_adapter
=
CacheControlAdapter
(
cache
=
SafeFileCache
(
cache
),
max_retries
=
retries
,
)
self
.
_trusted_host_adapter
=
InsecureCacheControlAdapter
(
cache
=
SafeFileCache
(
cache
),
max_retries
=
retries
,
)
else
:
secure_adapter
=
HTTPAdapter
(
max_retries
=
retries
)
self
.
_trusted_host_adapter
=
insecure_adapter
self
.
mount
(
"https://"
,
secure_adapter
)
self
.
mount
(
"http://"
,
insecure_adapter
)
# Enable file:// urls
self
.
mount
(
"file://"
,
LocalFSAdapter
())
for
host
in
trusted_hosts
:
self
.
add_trusted_host
(
host
,
suppress_logging
=
True
)
def
add_trusted_host
(
self
,
host
,
source
=
None
,
suppress_logging
=
False
):
# type: (str, Optional[str], bool) -> None
"""
:param host: It is okay to provide a host that has previously been
added.
:param source: An optional source string, for logging where the host
string came from.
"""
if
not
suppress_logging
:
msg
=
'adding trusted host: {!r}'
.
format
(
host
)
if
source
is
not
None
:
msg
+=
' (from {})'
.
format
(
source
)
logger
.
info
(
msg
)
host_port
=
parse_netloc
(
host
)
if
host_port
not
in
self
.
pip_trusted_origins
:
self
.
pip_trusted_origins
.
append
(
host_port
)
self
.
mount
(
build_url_from_netloc
(
host
)
+
'/'
,
self
.
_trusted_host_adapter
)
if
not
host_port
[
1
]:
# Mount wildcard ports for the same host.
self
.
mount
(
build_url_from_netloc
(
host
)
+
':'
,
self
.
_trusted_host_adapter
)
def
iter_secure_origins
(
self
):
# type: () -> Iterator[SecureOrigin]
for
secure_origin
in
SECURE_ORIGINS
:
yield
secure_origin
for
host
,
port
in
self
.
pip_trusted_origins
:
yield
(
'*'
,
host
,
'*'
if
port
is
None
else
port
)
def
is_secure_origin
(
self
,
location
):
# type: (Link) -> bool
# Determine if this url used a secure transport mechanism
parsed
=
urllib_parse
.
urlparse
(
str
(
location
))
origin_protocol
,
origin_host
,
origin_port
=
(
parsed
.
scheme
,
parsed
.
hostname
,
parsed
.
port
,
)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
origin_protocol
=
origin_protocol
.
rsplit
(
'+'
,
1
)[
-
1
]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for
secure_origin
in
self
.
iter_secure_origins
():
secure_protocol
,
secure_host
,
secure_port
=
secure_origin
if
origin_protocol
!=
secure_protocol
and
secure_protocol
!=
"*"
:
continue
try
:
addr
=
ipaddress
.
ip_address
(
None
if
origin_host
is
None
else
six
.
ensure_text
(
origin_host
)
)
network
=
ipaddress
.
ip_network
(
six
.
ensure_text
(
secure_host
)
)
except
ValueError
:
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if
(
origin_host
and
origin_host
.
lower
()
!=
secure_host
.
lower
()
and
secure_host
!=
"*"
):
continue
else
:
# We have a valid address and network, so see if the address
# is contained within the network.
if
addr
not
in
network
:
continue
# Check to see if the port matches.
if
(
origin_port
!=
secure_port
and
secure_port
!=
"*"
and
secure_port
is
not
None
):
continue
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
return
True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
logger
.
warning
(
"The repository located at
%
s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS we "
"recommend you use HTTPS instead, otherwise you may silence "
"this warning and allow it anyway with '--trusted-host
%
s'."
,
origin_host
,
origin_host
,
)
return
False
def
request
(
self
,
method
,
url
,
*
args
,
**
kwargs
):
# Allow setting a default timeout on a session
kwargs
.
setdefault
(
"timeout"
,
self
.
timeout
)
# Dispatch the actual request
return
super
(
PipSession
,
self
)
.
request
(
method
,
url
,
*
args
,
**
kwargs
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment