Infrastructures.Org | ISconf.Org TerraLuna Projects

root/trunk/lib/python/isconf/GPG.py

Revision 29 (checked in by stevegt, 7 years ago)

bug #7: Config.py done

Line 
1 """ A wrapper for the 'gpg' command. 
2
3 We're wrapping gpg instead of using, for example, cryptlib, in order
4 to reduce dependencies and cut down on the amount of code which
5 institutional ISconf users would have to audit before installing.
6 This portability will cost slightly us in terms of performance due to
7 the fact that we have to fork 'gpg' one or more times for each message
8 processed.  I don't expect ISconf4 message rates to be high enough for
9 this to matter.  If and when we start chunking files and managing
10 large blobs like install images with ISconf, this will likely have to
11 change.
12
13 This is a generic wrapper and is not ISconf-specific -- feel free to
14 use it in your own applications, and see the pycrypto license below. 
15
16 Portions of this module are derived from A.M. Kuchling's well-designed
17 GPG.py, using Richard Jones' updated version 1.3, which can be found
18 in the pycrypto CVS repository on Sourceforge:
19
20     http://cvs.sourceforge.net/viewcvs.py/pycrypto/gpg/GPG.py
21
22 This module is *not* forward-compatible with amk's; some of the
23 old interface has changed.  For instance, since I've added decrypt
24 functionality, I elected to initialize with a 'gnupghome' argument
25 instead of 'keyring', so that gpg can find both the public and secret
26 keyrings.  I've also altered some of the returned objects in order for
27 the caller to not have to know as much about the internals of the
28 result classes.
29
30 While the rest of ISconf is released under the GPL, I am releasing
31 this single file under the same terms that A.M. Kuchling used for
32 pycrypto:
33
34 _____________________ pycrypto LICENSE file starts __________________________
35 ===================================================================
36 Distribute and use freely; there are no restrictions on further
37 dissemination and usage except those imposed by the laws of your
38 country of residence.  This software is provided "as is" without
39 warranty of fitness for use or suitability for any purpose, express
40 or implied. Use at your own risk or not at all.
41 ===================================================================
42
43 Incorporating the code into commercial products is permitted; you do
44 not have to make source available or contribute your changes back
45 (though that would be nice).
46
47 --amk                                                             (www.amk.ca)
48 _______________________ pycrypto LICENSE file ends __________________________
49
50
51
52 Steve Traugott, stevegt@terraluna.org
53 Thu Jun 23 21:27:20 PDT 2005
54
55     
56 """
57
58 import os
59 import StringIO
60 import popen2
61
62 class GPG:
63
64     def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False):
65         """Initialize a GPG process wrapper.  Options are:
66
67         gpgbinary -- full pathname for GPG binary. 
68
69         gnupghome -- full pathname to where we can find the public and
70         private keyrings.  Default is whatever gpg defaults to.
71     
72         >>> gpg = GPG(gnupghome="/tmp/pygpgtest")
73
74         """
75         self.gpgbinary = gpgbinary
76         self.gnupghome = gnupghome
77         self.verbose = verbose
78         if gnupghome and not os.path.isdir(self.gnupghome):
79             os.makedirs(self.gnupghome,0700)
80         # if not os.path.isfile(self.gnupghome + "/secring.gpg"):
81         #     self.gen_key()
82
83     def _open_subprocess(self, args, passphrase=None):
84         # Internal method: open a pipe to a GPG subprocess and return
85         # the file objects for communicating with it.
86         cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
87         if self.gnupghome:
88             cmd.append('--homedir "%s" ' % self.gnupghome)
89         if passphrase:
90             cmd.append('--passphrase-fd 3')
91
92         cmd.extend(args)
93         cmd = ' '.join(cmd)
94         if self.verbose:
95             print cmd
96
97         child_stdout, child_stdin, child_stderr, child_pass = \
98             PopenHi.open(cmd, 3)
99         if passphrase:
100             child_pass.write(passphrase + "\n")
101             child_pass.close()
102         return child_stdout, child_stdin, child_stderr
103
104     def _read_response(self, child_stderr, response):
105         # Internal method: reads all the output from GPG, taking notice
106         # only of lines that begin with the magic [GNUPG:] prefix.
107         #
108         # Calls methods on the response object for each valid token found,
109         # with the arg being the remainder of the status line.
110         response.stderr = ''
111         while 1:
112             line = child_stderr.readline()
113             response.stderr += line
114             if self.verbose: print line
115             if line == "": break
116             line = line.rstrip()
117             if line[0:9] == '[GNUPG:] ':
118                 # Chop off the prefix
119                 line = line[9:]
120                 L = line.split(None, 1)
121                 keyword = L[0]
122                 if len(L) > 1:
123                     value = L[1]
124                 else:
125                     value = ""
126                 getattr(response, keyword)(value)
127
128     def _handle_gigo(self, args, file, result,passphrase=None):
129         # Handle a basic data call - pass data to GPG, handle the output
130         # including status information. Garbage In, Garbage Out :)
131         child_stdout, child_stdin, child_stderr = \
132             self._open_subprocess(args,passphrase)
133
134         # Copy the file to the GPG subprocess
135         while 1:
136             data = file.read(1024)
137             if data == "": break
138             child_stdin.write(data)
139         child_stdin.close()
140
141         # Get the response information
142         self._read_response(child_stderr, result)
143         self._read_data(child_stdout, result)
144
145         return result
146    
147     def _read_data(self,child_stdout,result):
148         # Read the contents of the file from GPG's stdout
149         result.data = ""
150         while 1:
151             data = child_stdout.read(1024)
152             if data == "": break
153             result.data = result.data + data
154    
155     #
156     # SIGNATURE METHODS
157     #
158     def sign(self,message,keyid=None,passphrase=None,clearsign=True):
159         """sign message"""
160        
161         args = ["-sa"]
162         if clearsign:
163             args.append("--clearsign")
164         if keyid:
165             args.append("--default-key %s" % keyid)
166         child_stdout, child_stdin, child_stderr = \
167             self._open_subprocess(args,passphrase=passphrase)
168         child_stdin.write(message)
169         child_stdin.close()
170         # Get the response information
171         result = Sign()
172         self._read_response(child_stderr, result)
173         self._read_data(child_stdout, result)
174         return result
175        
176     def verify(self, data):
177         """Verify the signature on the contents of the string 'data'
178
179         >>> gpg = GPG(gnupghome="/tmp/pygpgtest")
180         >>> input = gpg.gen_key_input(Passphrase='foo')
181         >>> key = gpg.gen_key(input)
182         >>> assert key
183         >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
184         >>> assert not sig
185         >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
186         >>> assert sig
187         >>> verify = gpg.verify(str(sig))
188         >>> assert verify
189
190         """
191
192         file = StringIO.StringIO(data)
193         return self.verify_file(file)
194    
195     def verify_file(self, file):
196         "Verify the signature on the contents of the file-like object 'file'"
197         sig = Verify()
198         self._handle_gigo([], file, sig)
199         return sig
200
201     #
202     # KEY MANAGEMENT
203     #
204
205     def import_key(self, key_data):
206         """ import the key_data into our keyring
207
208         >>> import shutil
209         >>> shutil.rmtree("/tmp/pygpgtest")
210         >>> gpg = GPG(gnupghome="/tmp/pygpgtest")
211         >>> input = gpg.gen_key_input()
212         >>> result = gpg.gen_key(input)
213         >>> print1 = result.fingerprint
214         >>> result = gpg.gen_key(input)
215         >>> print2 = result.fingerprint
216         >>> pubkey1 = gpg.export_key(print1)
217         >>> seckey1 = gpg.export_key(print1,secret=True)
218         >>> seckeys = gpg.list_keys(secret=True)
219         >>> pubkeys = gpg.list_keys()
220         >>> assert print1 in seckeys.fingerprints
221         >>> assert print1 in pubkeys.fingerprints
222         >>> gpg.delete_key(print1,secret=True)
223         ''
224         >>> gpg.delete_key(print1)
225         ''
226         >>> seckeys = gpg.list_keys(secret=True)
227         >>> pubkeys = gpg.list_keys()
228         >>> assert not print1 in seckeys.fingerprints
229         >>> assert not print1 in pubkeys.fingerprints
230         >>> result = gpg.import_key('foo')
231         >>> assert not result
232         >>> result = gpg.import_key(pubkey1)
233         >>> pubkeys = gpg.list_keys()
234         >>> seckeys = gpg.list_keys(secret=True)
235         >>> assert not print1 in seckeys.fingerprints
236         >>> assert print1 in pubkeys.fingerprints
237         >>> result = gpg.import_key(seckey1)
238         >>> assert result
239         >>> seckeys = gpg.list_keys(secret=True)
240         >>> pubkeys = gpg.list_keys()
241         >>> assert print1 in seckeys.fingerprints
242         >>> assert print1 in pubkeys.fingerprints
243         >>> assert print2 in pubkeys.fingerprints
244
245         """
246         child_stdout, child_stdin, child_stderr = \
247             self._open_subprocess(['--import'])
248
249         child_stdin.write(key_data)
250         child_stdin.close()
251
252         # Get the response information
253         result = ImportResult()
254         resp = self._read_response(child_stderr, result)
255
256         return result
257
258     def delete_key(self,fingerprint,secret=False):
259         which='key'
260         if secret:
261             which='secret-key'
262         args = ["--batch --delete-%s %s" % (which,fingerprint)]
263         child_stdout, child_stdin, child_stderr = \
264             self._open_subprocess(args)
265         child_stdin.close()
266         # XXX might want to check more status here
267         return child_stdout.read()
268
269     def export_key(self,keyid,secret=False):
270         """export the indicated key -- 'keyid' is anything gpg accepts"""
271         which=''
272         if secret:
273             which='-secret-key'
274         args = ["--armor --export%s %s" % (which,keyid)]
275         child_stdout, child_stdin, child_stderr = \
276             self._open_subprocess(args)
277         child_stdin.close()
278         # gpg --export produces no status-fd output; stdout will be
279         # empty in case of failure
280         return child_stdout.read()
281
282     def list_keys(self,secret=False):
283         """ list the keys currently in the keyring
284
285         >>> import shutil
286         >>> shutil.rmtree("/tmp/pygpgtest")
287         >>> gpg = GPG(gnupghome="/tmp/pygpgtest")
288         >>> input = gpg.gen_key_input()
289         >>> result = gpg.gen_key(input)
290         >>> print1 = result.fingerprint
291         >>> result = gpg.gen_key(input)
292         >>> print2 = result.fingerprint
293         >>> pubkeys = gpg.list_keys()
294         >>> assert print1 in pubkeys.fingerprints
295         >>> assert print2 in pubkeys.fingerprints
296         
297         """
298
299         which='keys'
300         if secret:
301             which='secret-keys'
302         args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which)
303         args = [args]
304         child_stdout, child_stdin, child_stderr = \
305             self._open_subprocess(args)
306         child_stdin.close()
307
308         # there might be some status thingumy here I should handle... (amk)
309         # ...nope, unless you care about expired sigs or keys (stevegt)
310
311         # Get the response information
312         result = ListKeys()
313         valid_keywords = 'pub uid sec fpr'.split()
314         while 1:
315             line = child_stdout.readline()
316             if self.verbose: print line
317             if not line:
318                 break
319             L = line.strip().split(':')
320             if not L:
321                 continue
322             keyword = L[0]
323             if keyword in valid_keywords:
324                 getattr(result, keyword)(L)
325         return result
326
327     def gen_key(self,input):
328         """Generate a key; you might use gen_key_input() to create the
329         control input.
330         
331         >>> gpg = GPG(gnupghome="/tmp/pygpgtest")
332         >>> input = gpg.gen_key_input()
333         >>> result = gpg.gen_key(input)
334         >>> assert result
335         >>> result = gpg.gen_key('foo')
336         >>> assert not result
337         
338         """
339         args = ["--gen-key --batch"]
340         result = GenKey()
341         file = StringIO.StringIO(input)
342         self._handle_gigo(args, file, result)
343         return result
344
345     def gen_key_input(self,**kwargs):
346         """
347         Generate --gen-key input per gpg doc/DETAILS
348
349         """
350         parms = {}
351         for (key,val) in kwargs.items():
352             key = key.replace('_','-')
353             parms[key] = val
354         parms.setdefault('Key-Type','RSA')
355         parms.setdefault('Key-Length',1024)
356         parms.setdefault('Name-Real', "Autogenerated Key")
357         parms.setdefault('Name-Comment', "Generated by isconf.GPG")
358         logname = os.environ['LOGNAME']
359         import socket
360         hostname = socket.gethostname()
361         parms.setdefault('Name-Email', "%s@%s" % (logname,hostname))
362         out = "Key-Type: %s\n" % parms['Key-Type']
363         del parms['Key-Type']
364         for (key,val) in parms.items():
365             out += "%s: %s\n" % (key, str(val))
366         out += "%commit\n"
367         return out
368        
369         # Key-Type: RSA
370         # Key-Length: 1024
371         # Name-Real: ISdlink Server on %s
372         # Name-Comment: Created by %s
373         # Name-Email: isdlink@%s
374         # Expire-Date: 0
375         # %commit
376         #
377         #
378         # Key-Type: DSA
379         # Key-Length: 1024
380         # Subkey-Type: ELG-E
381         # Subkey-Length: 1024
382         # Name-Real: Joe Tester
383         # Name-Comment: with stupid passphrase
384         # Name-Email: joe@foo.bar
385         # Expire-Date: 0
386         # Passphrase: abc
387         # %pubring foo.pub
388         # %secring foo.sec
389         # %commit
390
391
392     def import_keys(self,keydata,filter=None):
393         """import a set of keys, but only if filter(fingerprint)
394         is true for all of them
395
396         XXX test filter
397
398         """
399         args = "--with-fingerprint --with-colons" % self.path
400         args = [args]
401         result = ListKeys()
402         file = StringIO.StringIO(keydata)
403         self._handle_gigo(args, file, result)
404         for key in result:
405             if filter and not filter(key.fingerprint):
406                 return None
407         return self.import_key(keydata)
408
409     def showpubkey(self,i=0):
410         """return the ascii armored public key for the first key on
411         the secret ring, or the 'i'th key if given
412
413         XXX test
414
415         """
416         keys = self.list_keys(secret=True)
417         primary = keys[i]
418         ascii = self.export_key(keyid=primary['keyid'])
419         return ascii
420
421     def fingerprints(self,keyid='',secret=False):
422         keys = self.list_keys(secret=True)
423         return keys.fingerprints
424
425     #
426     # ENCRYPTION
427     #
428     def encrypt_file(self, file, recipients, sign=None,
429             always_trust=False, passphrase=None):
430         "Encrypt the message read from the file-like object 'file'"
431         args = ['--encrypt --armor']
432         if not (isinstance(recipients,list) or isinstance(recipients,tuple)):
433             recipients = [recipients]
434         for recipient in recipients:
435             args.append('--recipient %s'%recipient)
436         if sign:
437             args.append("--sign --default-key %s" % sign)
438         if always_trust:
439             args.append("--always-trust")
440         result = Crypt()
441         self._handle_gigo(args, file, result, passphrase=passphrase)
442         return result
443
444     def encrypt(self, data, recipients, **kwargs):
445         """Encrypt the message contained in the string 'data'
446
447         >>> import shutil
448         >>> if os.path.exists("/tmp/pygpgtest"):
449         ...     shutil.rmtree("/tmp/pygpgtest")
450         >>> gpg = GPG(gnupghome="/tmp/pygpgtest")
451         >>> input = gpg.gen_key_input(passphrase='foo')
452         >>> result = gpg.gen_key(input)
453         >>> print1 = result.fingerprint
454         >>> input = gpg.gen_key_input()
455         >>> result = gpg.gen_key(input)
456         >>> print2 = result.fingerprint
457         >>> result = gpg.encrypt("hello",print2)
458         >>> message = str(result)
459         >>> assert message != 'hello'
460         >>> result = gpg.decrypt(message)
461         >>> assert result
462         >>> str(result)
463         'hello'
464         >>> result = gpg.encrypt("hello again",print1)
465         >>> message = str(result)
466         >>> result = gpg.decrypt(message)
467         >>> result.status
468         'need passphrase'
469         >>> result = gpg.decrypt(message,passphrase='bar')
470         >>> result.status
471         'bad passphrase'
472         >>> assert not result
473         >>> result = gpg.decrypt(message,passphrase='foo')
474         >>> result.status
475         'decryption ok'
476         >>> str(result)
477         'hello again'
478         >>> result = gpg.encrypt("signed hello",print2,sign=print1)
479         >>> result.status
480         'need passphrase'
481         >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
482         >>> result.status
483         'encryption ok'
484         >>> message = str(result)
485         >>> result = gpg.decrypt(message)
486         >>> result.status
487         'decryption ok'
488         >>> assert result.fingerprint == print1
489
490         """
491         file = StringIO.StringIO(data)
492         return self.encrypt_file(file, recipients, **kwargs)
493
494     def decrypt(self,message,always_trust=False,passphrase=None):
495         args = ["--decrypt"]
496         if always_trust:
497             args.append("--always-trust")
498         result = Crypt()
499         file = StringIO.StringIO(message)
500         self._handle_gigo(args, file, result, passphrase)
501         return result
502
503 class Verify:
504     "Used to hold output of --verify"
505
506     def __init__(self):
507         self.valid = 0
508         self.fingerprint = self.creation_date = self.timestamp = None
509         self.signature_id = self.key_id = None
510         self.username = None
511
512     def __nonzero__(self):
513         if self.is_valid(): return 1
514         return 0
515
516     def BADSIG(self, value):
517         self.valid = 0
518         self.key_id, self.username = value.split(None, 1)
519     def GOODSIG(self, value):
520         self.valid = 1
521         self.key_id, self.username = value.split(None, 1)
522     def VALIDSIG(self, value):
523         #         C54065C14467F344A9585C1B96D482BAE5F1EA31 2005-08-10
524         #         1123652038 0 3 0 1 2 01
525         #         C54065C14467F344A9585C1B96D482BAE5F1EA31
526         self.fingerprint, self.creation_date, self.sig_timestamp, \
527                 self.expire_timestamp = value.split()[:4]
528     def SIG_ID(self, value):
529         self.signature_id, self.creation_date, self.timestamp = value.split()
530
531     # XXX do something with these; start using trust db
532     def TRUST_UNDEFINED (self,value): pass
533     def TRUST_NEVER(self,value): pass
534     def TRUST_MARGINAL(self,value): pass
535     def TRUST_FULLY(self,value): pass
536     def TRUST_ULTIMATE(self,value): pass
537
538     # these showed up in gpg 1.4.1
539     def PLAINTEXT(self,value): pass
540     def PLAINTEXT_LENGTH(self,value): pass
541
542     def is_valid(self):
543         return self.valid
544  
545 class ImportResult:
546     "Used to hold information about a key import result"
547
548     counts = '''count no_user_id imported imported_rsa unchanged
549             n_uids n_subk n_sigs n_revoc sec_read sec_imported
550             sec_dups not_imported'''.split()
551     def __init__(self):
552         self.imported = []
553         self.results = []
554         self.fingerprints = []
555         for result in self.counts:
556             setattr(self, result, None)
557    
558     def __nonzero__(self):
559         if self.not_imported: return 0
560         if not self.fingerprints: return 0
561         return 1
562
563     def NODATA(self, value):
564         self.results.append({'fingerprint': None,
565             'problem': '0', 'text': 'No valid data found'})
566     def IMPORTED(self, value):
567         # this duplicates info we already see in import_ok and import_problem
568         pass
569
570     ok_reason = {
571         '0': 'Not actually changed',
572         '1': 'Entirely new key',
573         '2': 'New user IDs',
574         '4': 'New signatures',
575         '8': 'New subkeys',
576         '16': 'Contains private key',
577     }
578     def IMPORT_OK(self, value):
579         reason, fingerprint = value.split()
580         reasons = []
581         for (code,text) in self.ok_reason.items():
582             if int(reason) | int(code) == int(reason):
583                 reasons.append(text)
584         reasontext = '\n'.join(reasons) + "\n"
585         self.results.append({'fingerprint': fingerprint,
586             'ok': reason, 'text': reasontext})
587         self.fingerprints.append(fingerprint)
588
589     problem_reason = {
590         '0': 'No specific reason given',
591         '1': 'Invalid Certificate',
592         '2': 'Issuer Certificate missing',
593         '3': 'Certificate Chain too long',
594         '4': 'Error storing certificate',
595     }
596     def IMPORT_PROBLEM(self, value):
597         try:
598             reason, fingerprint = value.split()
599         except:
600             reason = value
601             fingerprint = '<unknown>'
602         self.results.append({'fingerprint': fingerprint,
603             'problem': reason, 'text': self.problem_reason[reason]})
604     def IMPORT_RES(self, value):
605         import_res = value.split()
606         for i in range(len(self.counts)):
607             setattr(self, self.counts[i], int(import_res[i]))
608
609     def summary(self):
610         l = []
611         l.append('%d imported'%self.imported)
612         if self.not_imported:
613             l.append('%d not imported'%self.not_imported)
614         return ', '.join(l)
615
616 class ListKeys(list):
617     ''' Parse a --list-keys output
618
619         Handle pub and uid (relating the latter to the former).
620
621         Don't care about (info from src/DETAILS):
622
623         crt = X.509 certificate
624         crs = X.509 certificate and private key available
625         sub = subkey (secondary key)
626         ssb = secret subkey (secondary key)
627         uat = user attribute (same as user id except for field 10).
628         sig = signature
629         rev = revocation signature
630         pkd = public key data (special field format, see below)
631         grp = reserved for gpgsm
632         rvk = revocation key
633     '''
634     def __init__(self):
635         self.curkey = None
636         self.fingerprints = []
637
638     def key(self, args):
639         vars = ("""
640             type trust length algo keyid date expires dummy ownertrust uid
641         """).split()
642         self.curkey = {}
643         for i in range(len(vars)):
644             self.curkey[vars[i]] = args[i]
645         self.curkey['uids'] = [self.curkey['uid']]
646         del self.curkey['uid']
647         self.append(self.curkey)
648
649     pub = sec = key
650
651     def fpr(self, args):
652         # fpr:::::::::3324C8D0D1196A6CB497ABD6D694CE9742ABDCE6:
653         self.curkey['fingerprint'] = args[9]
654         self.fingerprints.append(args[9])
655        
656     def uid(self, args):
657         self.curkey['uids'].append(args[9])
658
659 class Crypt(Verify):
660     """Handle --encrypt or --decrypt status """
661
662     def __init__(self):
663         Verify.__init__(self)
664         self.data = ''
665         self.ok = False
666         self.status = ''
667     def __nonzero__(self):
668         if self.ok: return 1
669         return 0
670     def __str__(self):
671         return self.data
672     def ENC_TO(self, value): pass
673     def USERID_HINT(self, value): pass
674     def NEED_PASSPHRASE(self, value):
675         self.status = 'need passphrase'
676     def BAD_PASSPHRASE(self, value):
677         self.status = 'bad passphrase'
678     def GOOD_PASSPHRASE(self, value):
679         self.status = 'good passphrase'
680     def BEGIN_DECRYPTION(self, value):
681         self.status = self.status or 'decryption incomplete'
682     def DECRYPTION_FAILED(self, value):
683         self.status = self.status or 'decryption failed'
684     def DECRYPTION_OKAY(self, value):
685         self.status = 'decryption ok'
686         self.ok = True
687     def GOODMDC(self, value): pass
688     def END_DECRYPTION(self, value): pass
689
690     def BEGIN_ENCRYPTION(self, value):
691         self.status = self.status or 'encryption incomplete'
692     def END_ENCRYPTION(self, value):
693         self.status = 'encryption ok'
694         self.ok = True
695     def INV_RECP(self, value):
696         self.status = 'invalid recipient'
697     def KEYEXPIRED(self, value):
698         self.status = 'key expired'
699     def SIG_CREATED(self, value):
700         self.status = 'sig expired'
701     def SIGEXPIRED(self, value):
702         self.status = 'sig expired'
703
704
705 class GenKey:
706     """Handle --gen-key status """
707     def __init__(self):
708         self.type = None
709         self.fingerprint = None
710     def __nonzero__(self):
711         if self.fingerprint: return 1
712         return 0
713     def __str__(self):
714         return self.fingerprint or ''
715     def PROGRESS(self, value): pass
716     def GOOD_PASSPHRASE(self, value): pass
717     def NODATA(self, value): pass
718     def KEY_CREATED(self, value):
719         # P 95C91606D36AB8CACEB762DCD8DA31F0EE77B3A6
720         (self.type,self.fingerprint) = value.split()
721
722 class Sign:
723     """Handle --sign status """
724
725     def __init__(self):
726         self.type = None
727         self.fingerprint = None
728     def __nonzero__(self):
729         if self.fingerprint: return 1
730         return 0
731     def __str__(self):
732         return self.data or ''
733     def USERID_HINT(self, value): pass
734     def NEED_PASSPHRASE(self, value): pass
735     def BAD_PASSPHRASE(self, value): pass
736     def GOOD_PASSPHRASE(self, value): pass
737
738     # SIG_CREATED <type> <pubkey algo> <hash algo> <class> <timestamp> <key fpr>
739     def SIG_CREATED(self, value):
740         # P 95C91606D36AB8CACEB762DCD8DA31F0EE77B3A6
741         (self.type,algo,hashalgo,cls,self.timestamp,self.fingerprint
742             ) = value.split()
743
744 import types
745 class PopenHi:
746     """derived from open2.Popen3, but opens a fourth, high-numbered
747     fd for input to the child
748     
749     """
750
751     try:
752         MAXFD = os.sysconf('SC_OPEN_MAX')
753     except (AttributeError, ValueError):
754         MAXFD = 256
755
756     _active = []
757
758     def _cleanup(cls):
759         for obj in cls._active[:]:
760             obj.poll()
761     _cleanup = classmethod(_cleanup)
762
763     def __init__(self, cmd, fd, bufsize = -1):
764         PopenHi._cleanup()
765         self.sts = -1
766         p2cread, p2cwrite = os.pipe()
767         c2pread, c2pwrite = os.pipe()
768         hiread, hiwrite = os.pipe()
769         errout, errin = os.pipe()
770         self.pid = os.fork()
771         if self.pid == 0:
772             # Child
773             # os.setsid() # disconnect from tty
774             os.dup2(p2cread, 0)
775             os.dup2(c2pwrite, 1)
776             os.dup2(errin, 2)
777             os.dup2(hiread, fd)
778             if isinstance(cmd, types.StringTypes):
779                 cmd = ['/bin/sh', '-c', cmd]
780             for i in range(3, PopenHi.MAXFD):
781                 if i == fd:
782                     continue
783                 try:
784                     os.close(i)
785                 except OSError:
786                     pass
787             # debug = "%d child running %s\n" % (os.getpid(), cmd)
788             # open("/tmp/debug", 'w').write(debug)
789             try:
790                 os.execvp(cmd[0], cmd)
791             finally:
792                 os._exit(1)
793         os.close(p2cread)
794         self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
795         os.close(c2pwrite)
796         self.fromchild = os.fdopen(c2pread, 'r', bufsize)
797         os.close(errin)
798         self.childerr = os.fdopen(errout, 'r', bufsize)
799         os.close(hiread)
800         self.childhi = os.fdopen(hiwrite, 'w', bufsize)
801         PopenHi._active.append(self)
802
803     def poll(self):
804         """Return the exit status of the child process if it has finished,
805         or -1 if it hasn't finished yet."""
806         if self.sts < 0:
807             try:
808                 pid, sts = os.waitpid(self.pid, os.WNOHANG)
809                 if pid == self.pid:
810                     self.sts = sts
811                     PopenHi._active.remove(self)
812             except os.error:
813                 pass
814         return self.sts
815
816     def open(cls, cmd, fd, bufsize = -1):
817         obj = PopenHi(cmd, fd, bufsize = -1)
818         return obj.fromchild, obj.tochild, obj.childerr, obj.childhi
819     open = classmethod(open)
820
821 if __name__ == '__main__':
822     import sys
823     if len(sys.argv) == 1:
824         print 'Usage: GPG.py <signed file>'
825         sys.exit()
826
827     obj = GPGSubprocess()
828     file = open(sys.argv[1], 'rb')
829     sig = obj.verify_file( file )
830     print sig.__dict__
Note: See TracBrowser for help on using the browser.