summaryrefslogtreecommitdiff
path: root/lang/python/tests/t-encrypt-large.py
blob: cdb4a32a0f7c73e69330849880085ae7b509c4ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env python

# Copyright (C) 2016 g10 Code GmbH
#
# This file is part of GPGME.
#
# GPGME is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# GPGME is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
# Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.

from __future__ import absolute_import, print_function, unicode_literals
del absolute_import, print_function, unicode_literals

import sys
import random
import gpg
import support

if len(sys.argv) == 2:
    nbytes = int(sys.argv[1])
else:
    nbytes = 100000

support.init_gpgme(gpg.constants.protocol.OpenPGP)
c = gpg.Context()

ntoread = nbytes
def read_cb(amount):
    global ntoread
    chunk = ntoread if ntoread < amount else amount
    ntoread -= chunk
    assert ntoread >= 0
    assert chunk >= 0
    return bytes(bytearray(random.randrange(256) for i in range(chunk)))

nwritten = 0
def write_cb(data):
    global nwritten
    nwritten += len(data)
    return len(data)

source = gpg.Data(cbs=(read_cb, None, None, lambda: None))
sink = gpg.Data(cbs=(None, write_cb, None, lambda: None))

keys = []
keys.append(c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False))
keys.append(c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False))

c.op_encrypt(keys, gpg.constants.ENCRYPT_ALWAYS_TRUST, source, sink)
result = c.op_encrypt_result()
assert not result.invalid_recipients, \
    "Invalid recipient encountered: {}".format(result.invalid_recipients.fpr)
assert ntoread == 0

if support.verbose:
    sys.stderr.write(
        "plaintext={} bytes, ciphertext={} bytes\n".format(nbytes, nwritten))