Import Modules

is_jupyter[source]

is_jupyter()

# For Test Cases (might have duplicate import because it will be in a dedicated file)
import requests
from datetime import datetime, timedelta
from pathlib import Path
from shutil import copy, rmtree
from typing import List

import graphviz
import numpy as np
import pandas as pd
import pytest
from PIL import Image
from test_common.utils_4_tests import DATA_DIR, IMG_DIR, check_no_log, check_only_warning

Image

Cleaning up error image

In some cases, we have error image that will interrupt model trainging. We can use [`clean_error_img`](/unpackai/utils.html#clean_error_img) to clean the image folder

check_img[source]

check_img(img:Path, formats:List[str]=['.jpg', '.jpeg', '.png', '.bmp'])

Check on a single image, If it's quality is troublesome we unlink/ditch the image

clean_error_img[source]

clean_error_img(path:Path, progress:bool=True)

  • path: an image directory
  • progress: do we print out progress bar or not default True
images_rob = list((IMG_DIR / "robustness").glob("*.*"))
IMG_RGB = Image.new("RGB", (60, 30), color=(73, 109, 137))
IMG_RGBA = Image.new("RGBA", (60, 30), color=(73, 109, 137, 100))

@pytest.mark.parametrize("img", images_rob, ids=[i.name for i in images_rob])
def test_check_img_error(img: Path, tmpdir, caplog):
    """Test check_img with incorrect images"""
    img_copy = Path(tmpdir) / img.name
    copy(img, img_copy)
    check_img(img_copy)
    check_only_warning(caplog, img.name)
    assert not img_copy.is_file(), f"File {img_copy} not be deleted"


def test_check_img_empty_wrong_suffix(tmpdir, caplog):
    """Test check_img with wrong image that does not have a correct extension"""
    img_path = Path(tmpdir) / "empty.txt"
    img_path.write_bytes(b"")
    check_img(img_path)
    check_no_log(caplog)
    assert img_path.is_file(), f"Image {img_path} not found"


@pytest.mark.parametrize("img", [IMG_RGB, IMG_RGBA])
@pytest.mark.parametrize("ext", ["png", "bmp", "jpg", "jpeg"])
def test_check_img_correct(img: Image, ext: str, tmpdir, caplog):
    """Correct that correct image is not removed"""
    alpha_suffix = "_alpha" if len(img.getcolors()[0][1]) == 4 else ""
    if alpha_suffix and ext.startswith("jp"):
        pytest.skip("JPG does not support RGBA")

    img_path = Path(tmpdir) / f"correct_image_blue{alpha_suffix}.{ext}"
    img.save(img_path)
    check_img(img_path)
    check_no_log(caplog)
    assert img_path.is_file(), f"Image {img_path} not found"
def test_clean_error_img(tmpdir, monkeypatch) -> None:
    """Test clean_error_img"""
    for img in images_rob:
        copy(img, Path(tmpdir) / img.name)

    monkeypatch.chdir(tmpdir)
    root = Path(".")

    sub1 = root / "sub1"
    sub2 = root / "sub2"
    sub1.mkdir()
    sub2.mkdir()
    sub3 = sub2 / "sub3"
    sub3.mkdir()


    (sub1 / "file11.BMP").write_text("fake image")
    (sub1/"file12 haha.jpg").write_text("fake image")
    (sub1 / "file13 haha.txt").write_text("not image")
    IMG_RGB.save(sub1 / "img14_good.jpg")
    IMG_RGBA.save(sub1 / "img15_good.png")

    (sub2 / "file21.jpg").write_text("fake image")
    (sub2 / "file22.jpeg").write_text("fake image")

    (sub3 / "file31.jpeg").write_text("fake image")
    IMG_RGB.save(sub3 / "img32_good.jpeg")
    IMG_RGBA.save(sub3 / "img33_good.bmp")


    good: List[Path] = list()
    bad: List[Path] = list()

    print("Existing files:")
    for file in root.rglob("*.*"):
        print(f" * {file}")
        if file.suffix.lower() in [".jpg", ".jpeg", ".png", ".bmp"]:
            (good if "good" in file.name else bad).append(file)

    print(" => CLEANING")
    clean_error_img(root, progress=False)

    good_removed = [f for f in good if not f.is_file()]
    assert not good_removed, f"Good pictures deleted: {good_removed}"

    bad_still_here = [f for f in bad if f.is_file()]
    assert not bad_still_here, f"Bad pictures not deleted: {bad_still_here}"

Magic function [`hush`](/unpackai/utils.html#hush)

Run things quietly...

This is a magical cell function, so remember to use the %%

Now we have 2 versions of hush, the 1st on is the backup one, that using ipywidget as event trigger

But this version of toggle action will be stuck by the ongoing interactive

If the process is working on some thing, you can't toggle until the end of the execution.

The further improvement will be move the toggle entirely to JavaScript, hence the second version

Hush event in JS

As not going through any amount of python backend after run

hush[source]

hush(line, cell)

A magic cell function to collapse the print out %%hush how_loud = 100 how_verbose = "very" do_some_python_thing(how_loud, how_verbose)

Try hushing various kinds of info

  • html display
  • print
  • logging
%%hush
import pandas as pd
import logging
logging.getLogger().setLevel('DEBUG')
a = 1
for i in range(1000):
    print(i, end="\t")
    
logging.info("a lots of text, VERBOSITY!!"*100)

# a big dataframe
display(pd.DataFrame({"a":[1,2]*50}))

# a big output
pd.DataFrame({"b":range(100)})

Capable of open/close verbosity during the main process

%%hush
from time import sleep
for i in tqdm(range(10)):
    sleep(1)

But we still want the error to be loud

As such information should interrupt the user

%%hush
def _test_hush():
    for i in range(20):
        print(f"some logging:\t{i}!")
    raise ValueError("but some error, because life!")

# Testing hush when exception is raised

Fetching static files

static_root[source]

static_root()

Return static path

Test the validation of such static path

def test_find_static():
    error_report_html = STATIC/"html"/"bug"/"error_report.html"
    assert (error_report_html).is_file(), f"'{STATIC}' is not a valid static path"

Listing files in DataFrame

friendly_size[source]

friendly_size(size:float)

Convert a size in bytes (as float) to a size with unit (as a string)

ls[source]

ls(root:Union[Path, str], exclude:List[str]=None, hide_info=False)

Return a DataFrame with list of files & directories in a given path.

Args: root: root path to look for files and directories recursively exclude: optional list of names to exclude in the search (e.g. [".git"])

@pytest.mark.parametrize(
    "size,exp",
    [
        (0, "0.00 B"),
        (1, "1.00 B"),
        (1024, "1.00 KB"),
        (1024 ** 2, "1.00 MB"),
        (1024 ** 3 * 3.14, "3.14 GB"),
    ],
)
def test_friendly_size(size, exp):
    """Test computation of friendly size (human readable)"""
    assert friendly_size(size) == exp
@pytest.fixture
def populated_tmp_dir(tmpdir):
    """Create files to test `ls` function"""
    tmpdir = Path(tmpdir)
    for dir_ in ["dir1/subdir1", "dir1/subdir2", "dir2"]:
        (tmpdir / dir_).mkdir(parents=True)
    for file in ["at_root.txt", "dir1/subdir1/at_subdir.txt", "dir2/at_dir.txt"]:
        (tmpdir / file).write_text("unpackai")
    (tmpdir / "dir1" / "subdir2" / "some_pickle.pkl").write_bytes(b"3141590000")

    return tmpdir


import numpy as np

exp_columns = [
    "Name",
    "Parent",
    "Path",
    "Level",
    "Last_Modif",
    "FileDir",
    "Extension",
    "Type",
    "Size",
    "Friendly_Size",
]


def test_ls(populated_tmp_dir):
    """Test DataFrame generated by `ls` function"""
    df = ls(populated_tmp_dir)
    now = datetime.now()
    assert list(df.columns) == exp_columns, "Incorrect columns in DF"

    # We want to copy the list of expecgted columns to keep it intact
    columns = exp_columns[:]

    assert all(now - timedelta(minutes=5) < date < now for date in df["Last_Modif"])
    df.drop("Last_Modif", axis=1, inplace=True)
    columns.remove("Last_Modif")

    # We check the file "at_root.txt" and do some cleanup
    at_root = df[df["Name"] == "at_root.txt"].iloc[0]
    assert Path(at_root["Path"]) == populated_tmp_dir.absolute() / "at_root.txt"
    assert at_root["Friendly_Size"] == friendly_size(at_root["Size"])
    df.drop(["Path", "Friendly_Size"], axis=1, inplace=True)
    columns.remove("Path")
    columns.remove("Friendly_Size")

    print("===TRUNCATED DF FOR LIST OF FILES/DIR===")
    print(df)
    print("=" * 20)

    exp_root_dir = populated_tmp_dir.name
    df_exp = pd.DataFrame(
        [
            ("at_root.txt", exp_root_dir, 0, "File", ".txt", "text/plain", 8.0),
            ("dir1", exp_root_dir, 0, "Dir", np.NaN, np.NaN, np.NaN),
            ("subdir1", "dir1", 1, "Dir", np.NaN, np.NaN, np.NaN),
            ("at_subdir.txt", "subdir1", 2, "File", ".txt", "text/plain", 8.0),
            ("subdir2", "dir1", 1, "Dir", np.NaN, np.NaN, np.NaN),
            ("some_pickle.pkl", "subdir2", 2, "File", ".pkl", None, 10.0),
            ("dir2", exp_root_dir, 0, "Dir", np.NaN, np.NaN, np.NaN),
            ("at_dir.txt", "dir2", 1, "File", ".txt", "text/plain", 8.0),
        ],
        columns=columns,
    )
    print("===EXPECTED DF FOR LIST OF FILES/DIR===")
    print(df_exp)
    print("=" * 20)

    compare_df = df.reset_index(drop=True).compare(df_exp.reset_index(drop=True))
    assert compare_df.empty, f"Differences found when checking DF:\n{compare_df}"


exp_files = [
    "at_root.txt",
    "dir1",
    "subdir1",
    "at_subdir.txt",
    "subdir2",
    "some_pickle.pkl",
    "dir2",
    "at_dir.txt",
]
exp_dir1 = ["at_root.txt", "dir2", "at_dir.txt"]
exp_dir2 = [
    "at_root.txt",
    "dir1",
    "subdir1",
    "subdir2",
    "at_subdir.txt",
    "some_pickle.pkl",
]
exp_subdir = [
    "at_root.txt",
    "dir1",
    "dir2",
    "subdir2",
    "some_pickle.pkl",
    "at_dir.txt",
]
exp_dir2_subdir = ["at_root.txt", "dir1", "subdir2", "some_pickle.pkl"]


def test_ls_path_as_str(populated_tmp_dir):
    """Test `ls` function called with a string for path"""
    df = ls(str(populated_tmp_dir))
    assert sorted(df["Name"]) == sorted(exp_files)

@pytest.mark.parametrize(
    "exclude, exp",
    [
        ([], exp_files),
        (["I am not a Dir"], exp_files),
        (["dir1"], exp_dir1),
        (["dir2"], exp_dir2),
        (["subdir1"], exp_subdir),
        (["subdir1", "dir2"], exp_dir2_subdir),
    ],
)
def test_ls_exclude(exclude, exp, populated_tmp_dir):
    """Test `ls` function with an exclusion of some directories"""
    df = ls(populated_tmp_dir, exclude=exclude)
    assert sorted(df["Name"]) == sorted(exp)


def test_ls_no_info(populated_tmp_dir):
    """Test `ls` function with `hide_info` set to True"""
    df = ls(populated_tmp_dir, hide_info=True)
    assert set(exp_columns) - set(df.columns) == {"Size", "Friendly_Size"}

def test_ls_not_exist():
    """Test `ls` when path does not exist"""
    with pytest.raises(FileNotFoundError):
        ls("this_path_does_not_exist")

def test_not_dir(tmpdir):
    """Test `ls` if root is a file and not a dir"""
    file_path = Path(tmpdir) / "toto.txt"
    file_path.touch(exist_ok=True)
    with pytest.raises(FileNotFoundError):
        ls(file_path)

GraphViz

Drawing a Graphviz Graph in Jupyter

gv[source]

gv(source:str, rankdir='LR')

Generate a GraphViz diagram, with a settable orientation (i.e. rankdir)

gv("program[shape=box3d width=1 height=0.7] inputs->program->results").__dict__
@pytest.mark.parametrize("rankdir", [None, "LR", "TD"])
def test_gv(rankdir):
    """Test Graphviz Generation via `gv`"""
    src = "inputs->program->results"
    if rankdir is None:
        graph = gv(src)
        rankdir = "LR"  # default orientation
    else:
        graph = gv(src, rankdir=rankdir)

    assert isinstance(
        graph, graphviz.Source
    ), f"Output shall be a GraphViz Source (but is {type(graph)})"

    gv_src = graph.source
    assert gv_src.startswith("digraph"), f"Source is not a digraph: {gv_src}"
    assert src in gv_src, f"Source input not in Graph Source: {gv_src}"
    assert f'rankdir="{rankdir}"' in gv_src, f"Rankdir {rankdir} not found: {gv_src}"

Download and unzip utilities

Download & url_2_text

get_url_size[source]

get_url_size(url:str)

Returns the size of URL, or -1 if it cannot get it

download[source]

download(url:str, dest:Union[Path, str]=None)

Download file and return the Path of the downloaded file

If the destination is not specified, download in the current directory with the name specified in the URL

url_2_text[source]

url_2_text(url:str)

Extract text content from an URL (textual content for an HTML)

GITHUB_TEST_DATA_URL = "https://raw.githubusercontent.com/unpackAI/unpackai/main/test/test_data/"
url_raw_txt = f"{GITHUB_TEST_DATA_URL}/to_download.txt"
test_data_txt = (DATA_DIR / "to_download.txt").read_text()


@pytest.fixture(scope="session")
def check_connection_github():
    try:
        with requests.request("HEAD", url_raw_txt, timeout=1) as resp:
            resp.raise_for_status()
    except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e:
        pytest.xfail(f"Cannot connect to Github: {e}")
@pytest.mark.github
def test_get_url_size(check_connection_github):
    assert get_url_size(url_raw_txt) == 264, f"Wrong size for {url_raw_txt}"


@pytest.mark.github
def test_download_dest(check_connection_github, tmpdir):
    """Test download of file to a destination"""
    dest = Path(tmpdir / "to_download.txt")
    download(url_raw_txt, dest)
    assert dest.is_file()
    assert dest.read_text() == test_data_txt


@pytest.mark.github
def test_download_empty(check_connection_github, tmpdir):
    """Test download of file without destination"""
    dest = Path("to_download.txt")
    download(url_raw_txt)
    try:
        assert dest.is_file()
        assert dest.read_text() == test_data_txt
    finally:
        dest.unlink()


@pytest.mark.github
def test_url_2_text(check_connection_github):
    """Test extraction of text from URL"""
    assert url_2_text(url_raw_txt) == test_data_txt

Unpack & Download-unpack

download_and_unpack[source]

download_and_unpack(url:str, extract_dir:Union[Path, str]=None, fmt:str=None)

Download a file and unzip it. Returns the path of unzipped directory

Args: url: URL of the archive to download extract_dir: name of the target directory, where the archive is unpacked. If not provided, / is used. fmt: archive format (one of "zip", "tar", "gztar", "bztar",or "xztar"). Or any other registered format. If not provided, it is guessed from extension.

url_ar = (
    r"https://alaska.usgs.gov/data/landBirds/sewardPeninsula/2012/"
    "avianHabitat_sewardPeninsula_McNew_2012.zip"
)


@pytest.mark.parametrize("url", [url_ar, url_ar + "?x=123"], ids=["url", "url?x=y"])
@pytest.mark.parametrize("dest", [None, "unzip_dir"], ids=["no dest", "dest"])
def test_download_and_unpack(url, dest, tmpdir):
    """Test download and unzip with `download_and_unpack`"""
    if dest is None:
        dest = Path(url.split("?")[0].rpartition("/")[-1]).stem

    extract_dir = Path(tmpdir / dest)
    download_and_unpack(url, extract_dir=extract_dir)

    df_files = ls(extract_dir)
    obt_files = {k: v for k, v in zip(df_files.Name, df_files.Size)}

    exp_files = {
        "avianHabitat_sewardPeninsula_McNew_2012.csv": 60617,
        "avianHabitat_sewardPeninsula_McNew_2012.html": 22883,
        "avianHabitat_sewardPeninsula_McNew_2012.xml": 14408,
    }
    assert (
        obt_files == exp_files
    ), f"Differences found in list of files:\n{obt_files}\nvs\n{exp_files}"

CSV in archive to DataFrame

read_csv_from_zip[source]

read_csv_from_zip(archive:Union[Path, str], csv_path:Union[Path, str])

Load a CSV file inside a zip/tar and returns the equivalent pandas DataFrame

Args: archive: path or URL of the zip (or tar/tag.gz) file csv_path: path of csv relative to root of archive Note: if root is a folder, csv_path shall include this path (e.g. "archive/my_table.csv")

LOCAL_ZIP_FLAT = DATA_DIR / "archive.zip"
LOCAL_TAR_FLAT = DATA_DIR / "archive.tar"
LOCAL_GZ_FLAT = DATA_DIR / "archive.tar.gz"
LOCAL_ZIP_FOLDER = DATA_DIR / "archived_folder.zip"
GITHUB_ZIP_FLAT = f"{GITHUB_TEST_DATA_URL}/archive.zip"
GITHUB_ZIP_FOLDER = f"{GITHUB_TEST_DATA_URL}/archived_folder.zip"


@pytest.mark.parametrize(
    "archive,csv",
    [
        (LOCAL_ZIP_FLAT, "100_rows.csv"),
        (LOCAL_TAR_FLAT, "100_rows.csv"),
        (LOCAL_GZ_FLAT, "100_rows.csv"),
        (LOCAL_ZIP_FOLDER, "archived_folder/100_rows (folder).csv"),
        (LOCAL_ZIP_FOLDER, "archived_folder/sub_folder/100_rows (subfolder).csv"),
    ],
    ids=["flat (zip)", "flat (tar)", "flat (tar.gz)", "folder", "subfolder"],
)
def test_read_csv_from_zip_local(archive, csv):
    """Test reading CSV from a local zip with read_csv_from_zip"""
    df = read_csv_from_zip(archive, csv)
    assert isinstance(df, pd.DataFrame), f"Result is not a DataFrame: {df}"
    assert len(df) == 100


@pytest.mark.github
@pytest.mark.parametrize(
    "archive,csv",
    [
        (GITHUB_ZIP_FLAT, "100_rows.csv"),
        (GITHUB_ZIP_FOLDER, "archived_folder/100_rows (folder).csv"),
        (GITHUB_ZIP_FOLDER, "archived_folder/sub_folder/100_rows (subfolder).csv"),
    ],
    ids=["flat", "folder", "subfolder"],
)
def test_read_csv_from_zip_github(archive, csv, check_connection_github):
    """Test reading CSV from a URL zip in GitHub with read_csv_from_zip"""
    df = read_csv_from_zip(archive, csv)
    assert isinstance(df, pd.DataFrame), f"Result is not a DataFrame: {df}"
    assert len(df) == 100


def test_read_csv_from_zip_url():
    """Test reading CSV from a URL zip with read_csv_from_zip"""
    url_ar = (
        r"https://alaska.usgs.gov/data/landBirds/sewardPeninsula/2012/"
        "avianHabitat_sewardPeninsula_McNew_2012.zip"
    )
    df = read_csv_from_zip(url_ar, "avianHabitat_sewardPeninsula_McNew_2012.csv")
    assert isinstance(df, pd.DataFrame), f"Result is not a DataFrame: {df}"
    assert len(df) == 1070


@pytest.mark.parametrize(
    "archive,csv,error",
    [
        ("does_not_exist.zip", "table.csv", FileNotFoundError),
        (LOCAL_ZIP_FLAT, "does_not_exist.csv", FileNotFoundError),
        (LOCAL_TAR_FLAT, "does_not_exist.csv", FileNotFoundError),
        (LOCAL_GZ_FLAT, "does_not_exist.csv", FileNotFoundError),
        (LOCAL_ZIP_FLAT, "not_a_csv.txt", AttributeError),
        (LOCAL_ZIP_FLAT, "not_a_csv", AttributeError),
        (DATA_DIR / "to_download.txt", "table.csv", AttributeError),
    ],
    ids=[
        "zip missing",
        "csv missing (zip)",
        "csv missing (tar)",
        "csv missing (tar.gz)",
        "not csv (extension)",
        "not csv (no extension)",
        "not archive",
    ],
)
def test_read_csv_from_zip_robustness(archive, csv, error):
    """Test robustness of read_csv_from_zip"""
    with pytest.raises(error):
        read_csv_from_zip(archive, csv)

Running all Test Cases

import gdown

model_url = "https://drive.google.com/file/d/1-DUsVYgY-oMTcNhM3-znP_nlu8yFtQAr/view?usp=sharing"
output = 'model_bear.pkl'
# gdown.download(model_url, output, quiet=False)
gdown.download(model_url, output, quiet=False, fuzzy=True)
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): drive.google.com:443
---------------------------------------------------------------------------
SSLError                                  Traceback (most recent call last)
C:\Python39\lib\site-packages\urllib3\connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    695             if is_new_proxy_conn and http_tunnel_required:
--> 696                 self._prepare_proxy(conn)
    697 

C:\Python39\lib\site-packages\urllib3\connectionpool.py in _prepare_proxy(self, conn)
    963 
--> 964         conn.connect()
    965 

C:\Python39\lib\site-packages\urllib3\connection.py in connect(self)
    363             if self.tls_in_tls_required:
--> 364                 conn = self._connect_tls_proxy(hostname, conn)
    365                 tls_in_tls = True

C:\Python39\lib\site-packages\urllib3\connection.py in _connect_tls_proxy(self, hostname, conn)
    500         # certificate validation
--> 501         socket = ssl_wrap_socket(
    502             sock=conn,

C:\Python39\lib\site-packages\urllib3\util\ssl_.py in ssl_wrap_socket(sock, keyfile, certfile, cert_reqs, ca_certs, server_hostname, ssl_version, ciphers, ssl_context, ca_cert_dir, key_password, ca_cert_data, tls_in_tls)
    452     else:
--> 453         ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls)
    454     return ssl_sock

C:\Python39\lib\site-packages\urllib3\util\ssl_.py in _ssl_wrap_socket_impl(sock, ssl_context, tls_in_tls, server_hostname)
    494     else:
--> 495         return ssl_context.wrap_socket(sock)

C:\Python39\lib\ssl.py in wrap_socket(self, sock, server_side, do_handshake_on_connect, suppress_ragged_eofs, server_hostname, session)
    499         # ctx._wrap_socket()
--> 500         return self.sslsocket_class._create(
    501             sock=sock,

C:\Python39\lib\ssl.py in _create(cls, sock, server_side, do_handshake_on_connect, suppress_ragged_eofs, server_hostname, context, session)
   1039                         raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets")
-> 1040                     self.do_handshake()
   1041             except (OSError, ValueError):

C:\Python39\lib\ssl.py in do_handshake(self, block)
   1308                 self.settimeout(None)
-> 1309             self._sslobj.do_handshake()
   1310         finally:

SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1129)

During handling of the above exception, another exception occurred:

MaxRetryError                             Traceback (most recent call last)
C:\Python39\lib\site-packages\requests\adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
    438             if not chunked:
--> 439                 resp = conn.urlopen(
    440                     method=request.method,

C:\Python39\lib\site-packages\urllib3\connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    754 
--> 755             retries = retries.increment(
    756                 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]

C:\Python39\lib\site-packages\urllib3\util\retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
    573         if new_retry.is_exhausted():
--> 574             raise MaxRetryError(_pool, url, error or ResponseError(cause))
    575 

MaxRetryError: HTTPSConnectionPool(host='drive.google.com', port=443): Max retries exceeded with url: /uc?id=1-DUsVYgY-oMTcNhM3-znP_nlu8yFtQAr (Caused by SSLError(SSLError(1, '[SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1129)')))

During handling of the above exception, another exception occurred:

SSLError                                  Traceback (most recent call last)
<ipython-input-41-cc0152e99bb9> in <module>
      4 output = 'model_bear.pkl'
      5 # gdown.download(model_url, output, quiet=False)
----> 6 gdown.download(model_url, output, quiet=False, fuzzy=True)

C:\Python39\lib\site-packages\gdown\download.py in download(url, output, quiet, proxy, speed, use_cookies, verify, id, fuzzy, resume)
    147     while True:
    148         try:
--> 149             res = sess.get(url, headers=headers, stream=True, verify=verify)
    150         except requests.exceptions.ProxyError as e:
    151             print("An error has occurred using proxy:", proxy, file=sys.stderr)

C:\Python39\lib\site-packages\requests\sessions.py in get(self, url, **kwargs)
    553 
    554         kwargs.setdefault('allow_redirects', True)
--> 555         return self.request('GET', url, **kwargs)
    556 
    557     def options(self, url, **kwargs):

C:\Python39\lib\site-packages\requests\sessions.py in request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    540         }
    541         send_kwargs.update(settings)
--> 542         resp = self.send(prep, **send_kwargs)
    543 
    544         return resp

C:\Python39\lib\site-packages\requests\sessions.py in send(self, request, **kwargs)
    653 
    654         # Send the request
--> 655         r = adapter.send(request, **kwargs)
    656 
    657         # Total elapsed time of the request (approximately)

C:\Python39\lib\site-packages\requests\adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
    512             if isinstance(e.reason, _SSLError):
    513                 # This branch is for urllib3 v1.22 and later.
--> 514                 raise SSLError(e, request=request)
    515 
    516             raise ConnectionError(e, request=request)

SSLError: HTTPSConnectionPool(host='drive.google.com', port=443): Max retries exceeded with url: /uc?id=1-DUsVYgY-oMTcNhM3-znP_nlu8yFtQAr (Caused by SSLError(SSLError(1, '[SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1129)')))